Files
forum-scrapper/test_database.py
Bacruru Sakaguchi 1ddfe375e5 feat(core): add recursive loading and analyzer tools
Add recursive thread loading method to load threads with all messages and images from the database.

Add support for image captions in the database schema and Image model.

Introduce new analyzer tools for thread management:
- Load thread names from database
- Group threads by similarity
- Merge thread groups

Update main menu to include new Analyzer submenu and options to process threads without downloading images.
2026-02-18 18:19:33 +00:00

347 lines
14 KiB
Python
Executable File

"""
Test script for Database class.
This script tests the database functionality with parsed thread data.
"""
import sys
import os
# Add Program directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'Program'))
from Programm.Parsers.Dvach.Parser import Parser
from Programm.Parsers.Dvach.parse_thread import parse_thread
from Programm.Database import Database
def test_database():
"""Test the database with parsed thread data."""
print("=" * 80)
print("TESTING DATABASE")
print("=" * 80)
# Create a test database file
db_file = "test_forum.db"
# Read HTML file
html_file = "samples/thread.html"
if not os.path.exists(html_file):
print(f"ERROR: File {html_file} not found!")
return
print(f"\n1. Reading HTML file: {html_file}")
with open(html_file, 'r', encoding='utf-8') as f:
html_content = f.read()
print(f" File size: {len(html_content)} bytes")
# Parse thread
print("\n2. Parsing thread...")
thread = parse_thread(html_content)
if not thread:
print(" ERROR: Failed to parse thread!")
return
print(f" ✓ Thread parsed successfully")
print(f" - Thread ID: {thread.id}")
print(f" - Title: {thread.title}")
print(f" - Messages: {len(thread.messages)}")
# Count total images
total_images = sum(len(msg.images) for msg in thread.messages)
print(f" - Total images: {total_images}")
# Download all images
print("\n3. Downloading all images...")
parser = Parser()
downloaded_count = parser.download_all_images(thread)
print(f"\n4. Database operations:")
print(f" - Total images found: {total_images}")
print(f" - Successfully downloaded: {downloaded_count}")
# Save to database
print(f"\n5. Saving to database: {db_file}")
with Database(db_file) as db:
saved_count = db.save_thread_recursive(thread)
print(f"\n6. Save summary:")
print(f" - Objects saved: {saved_count}")
print(f" - Objects skipped: {(total_images + len(thread.messages) + 1) - saved_count}")
# Verify database contents
print(f"\n7. Verifying database contents:")
with Database(db_file) as db:
# Check threads
cursor = db.connection.cursor()
cursor.execute('SELECT COUNT(*) FROM threads')
thread_count = cursor.fetchone()[0]
print(f" - Threads in database: {thread_count}")
# Check messages
cursor.execute('SELECT COUNT(*) FROM messages')
message_count = cursor.fetchone()[0]
print(f" - Messages in database: {message_count}")
# Check images
cursor.execute('SELECT COUNT(*) FROM images')
image_count = cursor.fetchone()[0]
print(f" - Images in database: {image_count}")
# Check if images have data
cursor.execute('SELECT COUNT(*) FROM images WHERE data IS NOT NULL')
images_with_data = cursor.fetchone()[0]
print(f" - Images with data: {images_with_data}")
# Show sample image data
cursor.execute('SELECT name, size, length(data) as data_size FROM images LIMIT 3')
print(f"\n Sample images:")
for row in cursor.fetchall():
name, size, data_size = row
print(f" - {name}: {size}KB, {data_size} bytes of data")
# Test saving again (should skip existing data)
print(f"\n8. Testing duplicate save (should skip existing data)...")
with Database(db_file) as db:
saved_count = db.save_thread_recursive(thread)
print(f" - Objects saved on second run: {saved_count}")
print(f" - Expected: 0 (all should be skipped)")
print("\n" + "=" * 80)
print("TEST COMPLETED")
print("=" * 80)
print(f"\nDatabase file: {db_file}")
print("You can inspect the database with: sqlite3 test_forum.db")
def test_load_thread_recursive():
"""Test loading a thread from database and saving to another database."""
print("\n" + "=" * 80)
print("TESTING LOAD_THREAD_RECURSIVE")
print("=" * 80)
# Create source and destination database files
source_db = "samples/test_source.db"
dest_db = "samples/test_dest.db"
# Read HTML file
html_file = "samples/thread.html"
if not os.path.exists(html_file):
print(f"ERROR: File {html_file} not found!")
return
print(f"\n1. Reading HTML file: {html_file}")
with open(html_file, 'r', encoding='utf-8') as f:
html_content = f.read()
print(f" File size: {len(html_content)} bytes")
# Parse thread
print("\n2. Parsing thread...")
thread = parse_thread(html_content)
if not thread:
print(" ERROR: Failed to parse thread!")
return
print(f" ✓ Thread parsed successfully")
print(f" - Thread ID: {thread.id}")
print(f" - Title: {thread.title}")
print(f" - Messages: {len(thread.messages)}")
# Download all images
print("\n3. Downloading all images...")
parser = Parser()
downloaded_count = parser.download_all_images(thread)
print(f"\n4. Saving to source database: {source_db}")
with Database(source_db) as db:
saved_count = db.save_thread_recursive(thread)
print(f" - Objects saved: {saved_count}")
# Verify source database
print(f"\n5. Verifying source database contents:")
with Database(source_db) as db:
cursor = db.connection.cursor()
cursor.execute('SELECT COUNT(*) FROM threads')
thread_count = cursor.fetchone()[0]
print(f" - Threads in database: {thread_count}")
cursor.execute('SELECT COUNT(*) FROM messages')
message_count = cursor.fetchone()[0]
print(f" - Messages in database: {message_count}")
cursor.execute('SELECT COUNT(*) FROM images')
image_count = cursor.fetchone()[0]
print(f" - Images in database: {image_count}")
# Load thread from source database
print(f"\n6. Loading thread from source database: {source_db}")
with Database(source_db) as db:
loaded_thread = db.load_thread_recursive(thread.id)
if loaded_thread:
print(f" ✓ Thread loaded successfully")
print(f" - Thread ID: {loaded_thread.id}")
print(f" - Title: {loaded_thread.title}")
print(f" - Messages: {len(loaded_thread.messages)}")
# Count total images in loaded thread
total_images = sum(len(msg.images) for msg in loaded_thread.messages)
print(f" - Total images: {total_images}")
# Verify data integrity
print(f"\n7. Verifying data integrity:")
if thread.id == loaded_thread.id:
print(f" ✓ Thread ID matches")
else:
print(f" ✗ Thread ID mismatch!")
return
if thread.title == loaded_thread.title:
print(f" ✓ Thread title matches")
else:
print(f" ✗ Thread title mismatch!")
return
if len(thread.messages) == len(loaded_thread.messages):
print(f" ✓ Message count matches")
else:
print(f" ✗ Message count mismatch!")
return
if total_images == sum(len(msg.images) for msg in loaded_thread.messages):
print(f" ✓ Image count matches")
else:
print(f" ✗ Image count mismatch!")
return
# Verify first message
if thread.messages:
original_first = thread.messages[0]
loaded_first = loaded_thread.messages[0]
if original_first.id == loaded_first.id:
print(f" ✓ First message ID matches")
else:
print(f" ✗ First message ID mismatch!")
return
if original_first.author == loaded_first.author:
print(f" ✓ First message author matches")
else:
print(f" ✗ First message author mismatch!")
return
if original_first.text_content == loaded_first.text_content:
print(f" ✓ First message content matches")
else:
print(f" ✗ First message content mismatch!")
return
# Verify images
for i, (orig_msg, loaded_msg) in enumerate(zip(thread.messages, loaded_thread.messages)):
if orig_msg.images and loaded_msg.images:
if len(orig_msg.images) == len(loaded_msg.images):
print(f" ✓ Message {i} has matching image count")
else:
print(f" ✗ Message {i} has mismatched image count!")
return
for j, (orig_img, loaded_img) in enumerate(zip(orig_msg.images, loaded_msg.images)):
if orig_img.name == loaded_img.name:
print(f" ✓ Image {j} in message {i} name matches")
else:
print(f" ✗ Image {j} in message {i} name mismatch!")
return
if orig_img.size == loaded_img.size:
print(f" ✓ Image {j} in message {i} size matches")
else:
print(f" ✗ Image {j} in message {i} size mismatch!")
return
# Save loaded thread to destination database
print(f"\n8. Saving loaded thread to destination database: {dest_db}")
with Database(dest_db) as db:
dest_saved_count = db.save_thread_recursive(loaded_thread)
print(f" - Objects saved: {dest_saved_count}")
# Verify destination database
print(f"\n9. Verifying destination database contents:")
with Database(dest_db) as db:
cursor = db.connection.cursor()
cursor.execute('SELECT COUNT(*) FROM threads')
thread_count = cursor.fetchone()[0]
print(f" - Threads in database: {thread_count}")
cursor.execute('SELECT COUNT(*) FROM messages')
message_count = cursor.fetchone()[0]
print(f" - Messages in database: {message_count}")
cursor.execute('SELECT COUNT(*) FROM images')
image_count = cursor.fetchone()[0]
print(f" - Images in database: {image_count}")
# Load from destination database to verify round-trip
print(f"\n10. Loading from destination database to verify round-trip:")
with Database(dest_db) as db:
dest_loaded_thread = db.load_thread_recursive(thread.id)
if dest_loaded_thread:
print(f" ✓ Thread loaded from destination database")
print(f" - Thread ID: {dest_loaded_thread.id}")
print(f" - Title: {dest_loaded_thread.title}")
print(f" - Messages: {len(dest_loaded_thread.messages)}")
# Final verification
print(f"\n11. Final verification:")
if thread.id == dest_loaded_thread.id:
print(f" ✓ Original and final thread IDs match")
else:
print(f" ✗ Thread ID mismatch!")
return
if thread.title == dest_loaded_thread.title:
print(f" ✓ Original and final thread titles match")
else:
print(f" ✗ Thread title mismatch!")
return
if len(thread.messages) == len(dest_loaded_thread.messages):
print(f" ✓ Original and final message counts match")
else:
print(f" ✗ Message count mismatch!")
return
total_images = sum(len(msg.images) for msg in thread.messages)
dest_total_images = sum(len(msg.images) for msg in dest_loaded_thread.messages)
if total_images == dest_total_images:
print(f" ✓ Original and final image counts match")
else:
print(f" ✗ Image count mismatch!")
return
print(f"\n ✓ All verifications passed!")
else:
print(f" ✗ Failed to load thread from destination database!")
return
else:
print(f" ✗ Failed to load thread from source database!")
return
print("\n" + "=" * 80)
print("TEST COMPLETED")
print("=" * 80)
print(f"\nSource database: {source_db}")
print(f"Destination database: {dest_db}")
print("You can inspect the databases with: sqlite3 test_source.db and sqlite3 test_dest.db")
if __name__ == "__main__":
# test_database()
test_load_thread_recursive()