Add recursive thread loading method to load threads with all messages and images from the database. Add support for image captions in the database schema and Image model. Introduce new analyzer tools for thread management: - Load thread names from database - Group threads by similarity - Merge thread groups Update main menu to include new Analyzer submenu and options to process threads without downloading images.
347 lines
14 KiB
Python
Executable File
347 lines
14 KiB
Python
Executable File
"""
|
|
Test script for Database class.
|
|
|
|
This script tests the database functionality with parsed thread data.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Add Program directory to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'Program'))
|
|
|
|
from Programm.Parsers.Dvach.Parser import Parser
|
|
from Programm.Parsers.Dvach.parse_thread import parse_thread
|
|
from Programm.Database import Database
|
|
|
|
|
|
def test_database():
|
|
"""Test the database with parsed thread data."""
|
|
|
|
print("=" * 80)
|
|
print("TESTING DATABASE")
|
|
print("=" * 80)
|
|
|
|
# Create a test database file
|
|
db_file = "test_forum.db"
|
|
|
|
# Read HTML file
|
|
html_file = "samples/thread.html"
|
|
if not os.path.exists(html_file):
|
|
print(f"ERROR: File {html_file} not found!")
|
|
return
|
|
|
|
print(f"\n1. Reading HTML file: {html_file}")
|
|
with open(html_file, 'r', encoding='utf-8') as f:
|
|
html_content = f.read()
|
|
|
|
print(f" File size: {len(html_content)} bytes")
|
|
|
|
# Parse thread
|
|
print("\n2. Parsing thread...")
|
|
thread = parse_thread(html_content)
|
|
|
|
if not thread:
|
|
print(" ERROR: Failed to parse thread!")
|
|
return
|
|
|
|
print(f" ✓ Thread parsed successfully")
|
|
print(f" - Thread ID: {thread.id}")
|
|
print(f" - Title: {thread.title}")
|
|
print(f" - Messages: {len(thread.messages)}")
|
|
|
|
# Count total images
|
|
total_images = sum(len(msg.images) for msg in thread.messages)
|
|
print(f" - Total images: {total_images}")
|
|
|
|
# Download all images
|
|
print("\n3. Downloading all images...")
|
|
parser = Parser()
|
|
downloaded_count = parser.download_all_images(thread)
|
|
|
|
print(f"\n4. Database operations:")
|
|
print(f" - Total images found: {total_images}")
|
|
print(f" - Successfully downloaded: {downloaded_count}")
|
|
|
|
# Save to database
|
|
print(f"\n5. Saving to database: {db_file}")
|
|
with Database(db_file) as db:
|
|
saved_count = db.save_thread_recursive(thread)
|
|
|
|
print(f"\n6. Save summary:")
|
|
print(f" - Objects saved: {saved_count}")
|
|
print(f" - Objects skipped: {(total_images + len(thread.messages) + 1) - saved_count}")
|
|
|
|
# Verify database contents
|
|
print(f"\n7. Verifying database contents:")
|
|
with Database(db_file) as db:
|
|
# Check threads
|
|
cursor = db.connection.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM threads')
|
|
thread_count = cursor.fetchone()[0]
|
|
print(f" - Threads in database: {thread_count}")
|
|
|
|
# Check messages
|
|
cursor.execute('SELECT COUNT(*) FROM messages')
|
|
message_count = cursor.fetchone()[0]
|
|
print(f" - Messages in database: {message_count}")
|
|
|
|
# Check images
|
|
cursor.execute('SELECT COUNT(*) FROM images')
|
|
image_count = cursor.fetchone()[0]
|
|
print(f" - Images in database: {image_count}")
|
|
|
|
# Check if images have data
|
|
cursor.execute('SELECT COUNT(*) FROM images WHERE data IS NOT NULL')
|
|
images_with_data = cursor.fetchone()[0]
|
|
print(f" - Images with data: {images_with_data}")
|
|
|
|
# Show sample image data
|
|
cursor.execute('SELECT name, size, length(data) as data_size FROM images LIMIT 3')
|
|
print(f"\n Sample images:")
|
|
for row in cursor.fetchall():
|
|
name, size, data_size = row
|
|
print(f" - {name}: {size}KB, {data_size} bytes of data")
|
|
|
|
# Test saving again (should skip existing data)
|
|
print(f"\n8. Testing duplicate save (should skip existing data)...")
|
|
with Database(db_file) as db:
|
|
saved_count = db.save_thread_recursive(thread)
|
|
print(f" - Objects saved on second run: {saved_count}")
|
|
print(f" - Expected: 0 (all should be skipped)")
|
|
|
|
print("\n" + "=" * 80)
|
|
print("TEST COMPLETED")
|
|
print("=" * 80)
|
|
print(f"\nDatabase file: {db_file}")
|
|
print("You can inspect the database with: sqlite3 test_forum.db")
|
|
|
|
|
|
def test_load_thread_recursive():
|
|
"""Test loading a thread from database and saving to another database."""
|
|
|
|
print("\n" + "=" * 80)
|
|
print("TESTING LOAD_THREAD_RECURSIVE")
|
|
print("=" * 80)
|
|
|
|
# Create source and destination database files
|
|
source_db = "samples/test_source.db"
|
|
dest_db = "samples/test_dest.db"
|
|
|
|
# Read HTML file
|
|
html_file = "samples/thread.html"
|
|
if not os.path.exists(html_file):
|
|
print(f"ERROR: File {html_file} not found!")
|
|
return
|
|
|
|
print(f"\n1. Reading HTML file: {html_file}")
|
|
with open(html_file, 'r', encoding='utf-8') as f:
|
|
html_content = f.read()
|
|
|
|
print(f" File size: {len(html_content)} bytes")
|
|
|
|
# Parse thread
|
|
print("\n2. Parsing thread...")
|
|
thread = parse_thread(html_content)
|
|
|
|
if not thread:
|
|
print(" ERROR: Failed to parse thread!")
|
|
return
|
|
|
|
print(f" ✓ Thread parsed successfully")
|
|
print(f" - Thread ID: {thread.id}")
|
|
print(f" - Title: {thread.title}")
|
|
print(f" - Messages: {len(thread.messages)}")
|
|
|
|
# Download all images
|
|
print("\n3. Downloading all images...")
|
|
parser = Parser()
|
|
downloaded_count = parser.download_all_images(thread)
|
|
|
|
print(f"\n4. Saving to source database: {source_db}")
|
|
with Database(source_db) as db:
|
|
saved_count = db.save_thread_recursive(thread)
|
|
print(f" - Objects saved: {saved_count}")
|
|
|
|
# Verify source database
|
|
print(f"\n5. Verifying source database contents:")
|
|
with Database(source_db) as db:
|
|
cursor = db.connection.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM threads')
|
|
thread_count = cursor.fetchone()[0]
|
|
print(f" - Threads in database: {thread_count}")
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM messages')
|
|
message_count = cursor.fetchone()[0]
|
|
print(f" - Messages in database: {message_count}")
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM images')
|
|
image_count = cursor.fetchone()[0]
|
|
print(f" - Images in database: {image_count}")
|
|
|
|
# Load thread from source database
|
|
print(f"\n6. Loading thread from source database: {source_db}")
|
|
with Database(source_db) as db:
|
|
loaded_thread = db.load_thread_recursive(thread.id)
|
|
|
|
if loaded_thread:
|
|
print(f" ✓ Thread loaded successfully")
|
|
print(f" - Thread ID: {loaded_thread.id}")
|
|
print(f" - Title: {loaded_thread.title}")
|
|
print(f" - Messages: {len(loaded_thread.messages)}")
|
|
|
|
# Count total images in loaded thread
|
|
total_images = sum(len(msg.images) for msg in loaded_thread.messages)
|
|
print(f" - Total images: {total_images}")
|
|
|
|
# Verify data integrity
|
|
print(f"\n7. Verifying data integrity:")
|
|
if thread.id == loaded_thread.id:
|
|
print(f" ✓ Thread ID matches")
|
|
else:
|
|
print(f" ✗ Thread ID mismatch!")
|
|
return
|
|
|
|
if thread.title == loaded_thread.title:
|
|
print(f" ✓ Thread title matches")
|
|
else:
|
|
print(f" ✗ Thread title mismatch!")
|
|
return
|
|
|
|
if len(thread.messages) == len(loaded_thread.messages):
|
|
print(f" ✓ Message count matches")
|
|
else:
|
|
print(f" ✗ Message count mismatch!")
|
|
return
|
|
|
|
if total_images == sum(len(msg.images) for msg in loaded_thread.messages):
|
|
print(f" ✓ Image count matches")
|
|
else:
|
|
print(f" ✗ Image count mismatch!")
|
|
return
|
|
|
|
# Verify first message
|
|
if thread.messages:
|
|
original_first = thread.messages[0]
|
|
loaded_first = loaded_thread.messages[0]
|
|
|
|
if original_first.id == loaded_first.id:
|
|
print(f" ✓ First message ID matches")
|
|
else:
|
|
print(f" ✗ First message ID mismatch!")
|
|
return
|
|
|
|
if original_first.author == loaded_first.author:
|
|
print(f" ✓ First message author matches")
|
|
else:
|
|
print(f" ✗ First message author mismatch!")
|
|
return
|
|
|
|
if original_first.text_content == loaded_first.text_content:
|
|
print(f" ✓ First message content matches")
|
|
else:
|
|
print(f" ✗ First message content mismatch!")
|
|
return
|
|
|
|
# Verify images
|
|
for i, (orig_msg, loaded_msg) in enumerate(zip(thread.messages, loaded_thread.messages)):
|
|
if orig_msg.images and loaded_msg.images:
|
|
if len(orig_msg.images) == len(loaded_msg.images):
|
|
print(f" ✓ Message {i} has matching image count")
|
|
else:
|
|
print(f" ✗ Message {i} has mismatched image count!")
|
|
return
|
|
|
|
for j, (orig_img, loaded_img) in enumerate(zip(orig_msg.images, loaded_msg.images)):
|
|
if orig_img.name == loaded_img.name:
|
|
print(f" ✓ Image {j} in message {i} name matches")
|
|
else:
|
|
print(f" ✗ Image {j} in message {i} name mismatch!")
|
|
return
|
|
|
|
if orig_img.size == loaded_img.size:
|
|
print(f" ✓ Image {j} in message {i} size matches")
|
|
else:
|
|
print(f" ✗ Image {j} in message {i} size mismatch!")
|
|
return
|
|
|
|
# Save loaded thread to destination database
|
|
print(f"\n8. Saving loaded thread to destination database: {dest_db}")
|
|
with Database(dest_db) as db:
|
|
dest_saved_count = db.save_thread_recursive(loaded_thread)
|
|
print(f" - Objects saved: {dest_saved_count}")
|
|
|
|
# Verify destination database
|
|
print(f"\n9. Verifying destination database contents:")
|
|
with Database(dest_db) as db:
|
|
cursor = db.connection.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM threads')
|
|
thread_count = cursor.fetchone()[0]
|
|
print(f" - Threads in database: {thread_count}")
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM messages')
|
|
message_count = cursor.fetchone()[0]
|
|
print(f" - Messages in database: {message_count}")
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM images')
|
|
image_count = cursor.fetchone()[0]
|
|
print(f" - Images in database: {image_count}")
|
|
|
|
# Load from destination database to verify round-trip
|
|
print(f"\n10. Loading from destination database to verify round-trip:")
|
|
with Database(dest_db) as db:
|
|
dest_loaded_thread = db.load_thread_recursive(thread.id)
|
|
|
|
if dest_loaded_thread:
|
|
print(f" ✓ Thread loaded from destination database")
|
|
print(f" - Thread ID: {dest_loaded_thread.id}")
|
|
print(f" - Title: {dest_loaded_thread.title}")
|
|
print(f" - Messages: {len(dest_loaded_thread.messages)}")
|
|
|
|
# Final verification
|
|
print(f"\n11. Final verification:")
|
|
if thread.id == dest_loaded_thread.id:
|
|
print(f" ✓ Original and final thread IDs match")
|
|
else:
|
|
print(f" ✗ Thread ID mismatch!")
|
|
return
|
|
|
|
if thread.title == dest_loaded_thread.title:
|
|
print(f" ✓ Original and final thread titles match")
|
|
else:
|
|
print(f" ✗ Thread title mismatch!")
|
|
return
|
|
|
|
if len(thread.messages) == len(dest_loaded_thread.messages):
|
|
print(f" ✓ Original and final message counts match")
|
|
else:
|
|
print(f" ✗ Message count mismatch!")
|
|
return
|
|
|
|
total_images = sum(len(msg.images) for msg in thread.messages)
|
|
dest_total_images = sum(len(msg.images) for msg in dest_loaded_thread.messages)
|
|
if total_images == dest_total_images:
|
|
print(f" ✓ Original and final image counts match")
|
|
else:
|
|
print(f" ✗ Image count mismatch!")
|
|
return
|
|
|
|
print(f"\n ✓ All verifications passed!")
|
|
else:
|
|
print(f" ✗ Failed to load thread from destination database!")
|
|
return
|
|
else:
|
|
print(f" ✗ Failed to load thread from source database!")
|
|
return
|
|
|
|
print("\n" + "=" * 80)
|
|
print("TEST COMPLETED")
|
|
print("=" * 80)
|
|
print(f"\nSource database: {source_db}")
|
|
print(f"Destination database: {dest_db}")
|
|
print("You can inspect the databases with: sqlite3 test_source.db and sqlite3 test_dest.db")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# test_database()
|
|
test_load_thread_recursive() |