From 3fee0962c6bb7caa3b2629efab405cf2c152438f Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Wed, 11 Feb 2026 16:04:12 -0800 Subject: [PATCH] feat: implement two-phase scanning architecture (fast + enrich) Phase 1 (FAST): Creates stub records with filesystem metadata only - path, size, mtime - no file content reading - Populates asset database quickly on startup Phase 2 (ENRICH): Extracts metadata and computes hashes - Safetensors header parsing, MIME types - Optional blake3 hash computation - Updates existing stub records Changes: - Add ScanPhase enum (FAST, ENRICH, FULL) - Add enrichment_level column to AssetCacheState (0=stub, 1=metadata, 2=hashed) - Add build_stub_specs() for fast scanning without metadata extraction - Add get_unenriched_cache_states(), enrich_asset(), enrich_assets_batch() - Add start_fast(), start_enrich() convenience methods to AssetSeeder - Update start() to accept phase parameter (defaults to FULL) - Split _run_scan() into _run_fast_phase() and _run_enrich_phase() - Add migration 0003_add_enrichment_level.py - Update tests for new architecture Amp-Thread-ID: https://ampcode.com/threads/T-019c4eef-1568-778f-aede-38254728f848 Co-authored-by: Amp --- .../versions/0003_add_enrichment_level.py | 44 +++ app/assets/database/models.py | 6 + app/assets/database/queries/__init__.py | 8 + app/assets/database/queries/cache_state.py | 100 ++++++ app/assets/scanner.py | 182 +++++++++++ app/assets/seeder.py | 297 ++++++++++++++---- tests-unit/seeder_test/test_seeder.py | 112 ++++++- 7 files changed, 675 insertions(+), 74 deletions(-) create mode 100644 alembic_db/versions/0003_add_enrichment_level.py diff --git a/alembic_db/versions/0003_add_enrichment_level.py b/alembic_db/versions/0003_add_enrichment_level.py new file mode 100644 index 000000000..c9a0ee212 --- /dev/null +++ b/alembic_db/versions/0003_add_enrichment_level.py @@ -0,0 +1,44 @@ +""" +Add enrichment_level column to asset_cache_state for phased scanning + +Level 0: Stub record (path, size, mtime only) +Level 1: Metadata extracted (safetensors header, mime type) +Level 2: Hash computed (blake3) + +Revision ID: 0003_add_enrichment_level +Revises: 0002_add_is_missing +Create Date: 2025-02-10 00:00:00 +""" + +from alembic import op +import sqlalchemy as sa + +revision = "0003_add_enrichment_level" +down_revision = "0002_add_is_missing" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "asset_cache_state", + sa.Column( + "enrichment_level", + sa.Integer(), + nullable=False, + server_default=sa.text("0"), + ), + ) + op.create_index( + "ix_asset_cache_state_enrichment_level", + "asset_cache_state", + ["enrichment_level"], + ) + # Treat existing records as fully enriched (level 1 = metadata done) + # since they were created with the old scanner that extracted metadata + op.execute("UPDATE asset_cache_state SET enrichment_level = 1") + + +def downgrade() -> None: + op.drop_index("ix_asset_cache_state_enrichment_level", table_name="asset_cache_state") + op.drop_column("asset_cache_state", "enrichment_level") diff --git a/app/assets/database/models.py b/app/assets/database/models.py index 3338ce789..58547d1be 100644 --- a/app/assets/database/models.py +++ b/app/assets/database/models.py @@ -84,6 +84,7 @@ class AssetCacheState(Base): mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True) needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) is_missing: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + enrichment_level: Mapped[int] = mapped_column(Integer, nullable=False, default=0) asset: Mapped[Asset] = relationship(back_populates="cache_states") @@ -91,9 +92,14 @@ class AssetCacheState(Base): Index("ix_asset_cache_state_file_path", "file_path"), Index("ix_asset_cache_state_asset_id", "asset_id"), Index("ix_asset_cache_state_is_missing", "is_missing"), + Index("ix_asset_cache_state_enrichment_level", "enrichment_level"), CheckConstraint( "(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg" ), + CheckConstraint( + "enrichment_level >= 0 AND enrichment_level <= 2", + name="ck_acs_enrichment_level_range", + ), UniqueConstraint("file_path", name="uq_asset_cache_state_file_path"), ) diff --git a/app/assets/database/queries/__init__.py b/app/assets/database/queries/__init__.py index fa365fad0..b59ffe29b 100644 --- a/app/assets/database/queries/__init__.py +++ b/app/assets/database/queries/__init__.py @@ -25,7 +25,9 @@ from app.assets.database.queries.asset_info import ( ) from app.assets.database.queries.cache_state import ( CacheStateRow, + UnenrichedAssetRow, bulk_insert_cache_states_ignore_conflicts, + bulk_update_enrichment_level, bulk_update_is_missing, bulk_update_needs_verify, delete_assets_by_ids, @@ -33,10 +35,12 @@ from app.assets.database.queries.cache_state import ( delete_orphaned_seed_asset, get_cache_states_by_paths_and_asset_ids, get_cache_states_for_prefixes, + get_unenriched_cache_states, get_unreferenced_unhashed_asset_ids, list_cache_states_by_asset_id, mark_cache_states_missing_outside_prefixes, restore_cache_states_by_paths, + update_enrichment_level, upsert_cache_state, ) from app.assets.database.queries.tags import ( @@ -59,6 +63,7 @@ __all__ = [ "CacheStateRow", "RemoveTagsDict", "SetTagsDict", + "UnenrichedAssetRow", "add_missing_tag_for_asset_id", "add_tags_to_asset_info", "asset_exists_by_hash", @@ -67,6 +72,7 @@ __all__ = [ "bulk_insert_assets", "bulk_insert_cache_states_ignore_conflicts", "bulk_insert_tags_and_meta", + "bulk_update_enrichment_level", "bulk_update_is_missing", "bulk_update_needs_verify", "delete_asset_info_by_id", @@ -84,6 +90,7 @@ __all__ = [ "get_cache_states_by_paths_and_asset_ids", "get_cache_states_for_prefixes", "get_or_create_asset_info", + "get_unenriched_cache_states", "get_unreferenced_unhashed_asset_ids", "insert_asset_info", "list_asset_infos_page", @@ -100,6 +107,7 @@ __all__ = [ "update_asset_info_name", "update_asset_info_timestamps", "update_asset_info_updated_at", + "update_enrichment_level", "upsert_asset", "upsert_cache_state", ] diff --git a/app/assets/database/queries/cache_state.py b/app/assets/database/queries/cache_state.py index 4e6ea4ccc..c5c475160 100644 --- a/app/assets/database/queries/cache_state.py +++ b/app/assets/database/queries/cache_state.py @@ -302,6 +302,106 @@ def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool: return False +class UnenrichedAssetRow(NamedTuple): + """Row for assets needing enrichment.""" + + cache_state_id: int + asset_id: str + asset_info_id: str + file_path: str + enrichment_level: int + + +def get_unenriched_cache_states( + session: Session, + prefixes: list[str], + max_level: int = 0, + limit: int = 1000, +) -> list[UnenrichedAssetRow]: + """Get cache states that need enrichment (enrichment_level <= max_level). + + Args: + session: Database session + prefixes: List of absolute directory prefixes to scan + max_level: Maximum enrichment level to include (0=stubs, 1=metadata done) + limit: Maximum number of rows to return + + Returns: + List of unenriched asset rows with file paths + """ + if not prefixes: + return [] + + conds = [] + for p in prefixes: + base = os.path.abspath(p) + if not base.endswith(os.sep): + base += os.sep + escaped, esc = escape_sql_like_string(base) + conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) + + query = ( + sa.select( + AssetCacheState.id, + AssetCacheState.asset_id, + AssetInfo.id, + AssetCacheState.file_path, + AssetCacheState.enrichment_level, + ) + .join(Asset, Asset.id == AssetCacheState.asset_id) + .join(AssetInfo, AssetInfo.asset_id == Asset.id) + .where(sa.or_(*conds)) + .where(AssetCacheState.is_missing == False) # noqa: E712 + .where(AssetCacheState.enrichment_level <= max_level) + .order_by(AssetCacheState.id.asc()) + .limit(limit) + ) + + rows = session.execute(query).all() + return [ + UnenrichedAssetRow( + cache_state_id=row[0], + asset_id=row[1], + asset_info_id=row[2], + file_path=row[3], + enrichment_level=row[4], + ) + for row in rows + ] + + +def update_enrichment_level( + session: Session, + cache_state_id: int, + level: int, +) -> None: + """Update the enrichment level for a cache state.""" + session.execute( + sa.update(AssetCacheState) + .where(AssetCacheState.id == cache_state_id) + .values(enrichment_level=level) + ) + + +def bulk_update_enrichment_level( + session: Session, + cache_state_ids: list[int], + level: int, +) -> int: + """Update enrichment level for multiple cache states. + + Returns: Number of rows updated + """ + if not cache_state_ids: + return 0 + result = session.execute( + sa.update(AssetCacheState) + .where(AssetCacheState.id.in_(cache_state_ids)) + .values(enrichment_level=level) + ) + return result.rowcount + + def bulk_insert_cache_states_ignore_conflicts( session: Session, rows: list[dict], diff --git a/app/assets/scanner.py b/app/assets/scanner.py index b622b78f7..2d2f3b165 100644 --- a/app/assets/scanner.py +++ b/app/assets/scanner.py @@ -7,13 +7,16 @@ from typing import Literal, TypedDict import folder_paths from app.assets.database.queries import ( add_missing_tag_for_asset_id, + bulk_update_enrichment_level, bulk_update_is_missing, bulk_update_needs_verify, delete_cache_states_by_ids, delete_orphaned_seed_asset, ensure_tags_exist, get_cache_states_for_prefixes, + get_unenriched_cache_states, remove_missing_tag_for_asset_id, + set_asset_info_metadata, ) from app.assets.services.bulk_ingest import ( SeedAssetSpec, @@ -341,6 +344,59 @@ def build_asset_specs( return specs, tag_pool, skipped +def build_stub_specs( + paths: list[str], + existing_paths: set[str], +) -> tuple[list[SeedAssetSpec], set[str], int]: + """Build minimal stub specs for fast phase scanning. + + Only collects filesystem metadata (stat), no file content reading. + This is the fastest possible scan to populate the asset database. + + Args: + paths: List of file paths to process + existing_paths: Set of paths that already exist in the database + + Returns: + Tuple of (specs, tag_pool, skipped_count) + """ + specs: list[SeedAssetSpec] = [] + tag_pool: set[str] = set() + skipped = 0 + + for p in paths: + abs_p = os.path.abspath(p) + if abs_p in existing_paths: + skipped += 1 + continue + try: + stat_p = os.stat(abs_p, follow_symlinks=False) + except OSError: + continue + if not stat_p.st_size: + continue + + name, tags = get_name_and_tags_from_asset_path(abs_p) + rel_fname = compute_relative_filename(abs_p) + + specs.append( + { + "abs_path": abs_p, + "size_bytes": stat_p.st_size, + "mtime_ns": get_mtime_ns(stat_p), + "info_name": name, + "tags": tags, + "fname": rel_fname, + "metadata": None, + "hash": None, + "mime_type": None, + } + ) + tag_pool.update(tags) + + return specs, tag_pool, skipped + + def insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int: """Insert asset specs into database, returning count of created infos.""" if not specs: @@ -394,3 +450,129 @@ def seed_assets( skipped_existing, len(paths), ) + + +# Enrichment level constants +ENRICHMENT_STUB = 0 # Fast scan: path, size, mtime only +ENRICHMENT_METADATA = 1 # Metadata extracted (safetensors header, mime type) +ENRICHMENT_HASHED = 2 # Hash computed (blake3) + + +def get_unenriched_assets_for_roots( + roots: tuple[RootType, ...], + max_level: int = ENRICHMENT_STUB, + limit: int = 1000, +) -> list: + """Get assets that need enrichment for the given roots. + + Args: + roots: Tuple of root types to scan + max_level: Maximum enrichment level to include + limit: Maximum number of rows to return + + Returns: + List of UnenrichedAssetRow + """ + prefixes: list[str] = [] + for root in roots: + prefixes.extend(get_prefixes_for_root(root)) + + if not prefixes: + return [] + + with create_session() as sess: + return get_unenriched_cache_states(sess, prefixes, max_level=max_level, limit=limit) + + +def enrich_asset( + file_path: str, + cache_state_id: int, + asset_info_id: str, + extract_metadata: bool = True, + compute_hash: bool = False, +) -> int: + """Enrich a single asset with metadata and/or hash. + + Args: + file_path: Absolute path to the file + cache_state_id: ID of the cache state to update + asset_info_id: ID of the asset info to update + extract_metadata: If True, extract safetensors header and mime type + compute_hash: If True, compute blake3 hash + + Returns: + New enrichment level achieved + """ + new_level = ENRICHMENT_STUB + + try: + stat_p = os.stat(file_path, follow_symlinks=True) + except OSError: + return new_level + + rel_fname = compute_relative_filename(file_path) + + with create_session() as sess: + if extract_metadata: + metadata = extract_file_metadata( + file_path, + stat_result=stat_p, + enable_safetensors=True, + relative_filename=rel_fname, + ) + if metadata: + user_metadata = metadata.to_user_metadata() + set_asset_info_metadata(sess, asset_info_id, user_metadata) + new_level = ENRICHMENT_METADATA + + if compute_hash: + try: + digest = compute_blake3_hash(file_path) + # TODO: Update asset.hash field + # For now just mark the enrichment level + new_level = ENRICHMENT_HASHED + except Exception as e: + logging.warning("Failed to hash %s: %s", file_path, e) + + bulk_update_enrichment_level(sess, [cache_state_id], new_level) + sess.commit() + + return new_level + + +def enrich_assets_batch( + rows: list, + extract_metadata: bool = True, + compute_hash: bool = False, +) -> tuple[int, int]: + """Enrich a batch of assets. + + Args: + rows: List of UnenrichedAssetRow from get_unenriched_assets_for_roots + extract_metadata: If True, extract metadata for each asset + compute_hash: If True, compute hash for each asset + + Returns: + Tuple of (enriched_count, failed_count) + """ + enriched = 0 + failed = 0 + + for row in rows: + try: + new_level = enrich_asset( + file_path=row.file_path, + cache_state_id=row.cache_state_id, + asset_info_id=row.asset_info_id, + extract_metadata=extract_metadata, + compute_hash=compute_hash, + ) + if new_level > row.enrichment_level: + enriched += 1 + else: + failed += 1 + except Exception as e: + logging.warning("Failed to enrich %s: %s", row.file_path, e) + failed += 1 + + return enriched, failed diff --git a/app/assets/seeder.py b/app/assets/seeder.py index 19e175c69..73a663f11 100644 --- a/app/assets/seeder.py +++ b/app/assets/seeder.py @@ -9,11 +9,15 @@ from enum import Enum from typing import Callable from app.assets.scanner import ( + ENRICHMENT_METADATA, + ENRICHMENT_STUB, RootType, - build_asset_specs, + build_stub_specs, collect_paths_for_roots, + enrich_assets_batch, get_all_known_prefixes, get_prefixes_for_root, + get_unenriched_assets_for_roots, insert_asset_specs, mark_missing_outside_prefixes_safely, sync_root_safely, @@ -29,6 +33,14 @@ class State(Enum): CANCELLING = "CANCELLING" +class ScanPhase(Enum): + """Scan phase options.""" + + FAST = "fast" # Phase 1: filesystem only (stubs) + ENRICH = "enrich" # Phase 2: metadata + hash + FULL = "full" # Both phases sequentially + + @dataclass class Progress: """Progress information for a scan operation.""" @@ -79,12 +91,14 @@ class AssetSeeder: self._thread: threading.Thread | None = None self._cancel_event = threading.Event() self._roots: tuple[RootType, ...] = () + self._phase: ScanPhase = ScanPhase.FULL self._compute_hashes: bool = False self._progress_callback: ProgressCallback | None = None def start( self, roots: tuple[RootType, ...] = ("models", "input", "output"), + phase: ScanPhase = ScanPhase.FULL, progress_callback: ProgressCallback | None = None, prune_first: bool = False, compute_hashes: bool = False, @@ -93,6 +107,7 @@ class AssetSeeder: Args: roots: Tuple of root types to scan (models, input, output) + phase: Scan phase to run (FAST, ENRICH, or FULL for both) progress_callback: Optional callback called with progress updates prune_first: If True, prune orphaned assets before scanning compute_hashes: If True, compute blake3 hashes for each file (slow for large files) @@ -107,6 +122,7 @@ class AssetSeeder: self._progress = Progress() self._errors = [] self._roots = roots + self._phase = phase self._prune_first = prune_first self._compute_hashes = compute_hashes self._progress_callback = progress_callback @@ -119,6 +135,54 @@ class AssetSeeder: self._thread.start() return True + def start_fast( + self, + roots: tuple[RootType, ...] = ("models", "input", "output"), + progress_callback: ProgressCallback | None = None, + prune_first: bool = False, + ) -> bool: + """Start a fast scan (phase 1 only) - creates stub records. + + Args: + roots: Tuple of root types to scan + progress_callback: Optional callback for progress updates + prune_first: If True, prune orphaned assets before scanning + + Returns: + True if scan was started, False if already running + """ + return self.start( + roots=roots, + phase=ScanPhase.FAST, + progress_callback=progress_callback, + prune_first=prune_first, + compute_hashes=False, + ) + + def start_enrich( + self, + roots: tuple[RootType, ...] = ("models", "input", "output"), + progress_callback: ProgressCallback | None = None, + compute_hashes: bool = False, + ) -> bool: + """Start an enrichment scan (phase 2 only) - extracts metadata and hashes. + + Args: + roots: Tuple of root types to scan + progress_callback: Optional callback for progress updates + compute_hashes: If True, compute blake3 hashes + + Returns: + True if scan was started, False if already running + """ + return self.start( + roots=roots, + phase=ScanPhase.ENRICH, + progress_callback=progress_callback, + prune_first=False, + compute_hashes=compute_hashes, + ) + def cancel(self) -> bool: """Request cancellation of the current scan. @@ -291,8 +355,10 @@ class AssetSeeder: """Main scan loop running in background thread.""" t_start = time.perf_counter() roots = self._roots + phase = self._phase cancelled = False total_created = 0 + total_enriched = 0 skipped_existing = 0 total_paths = 0 @@ -318,95 +384,58 @@ class AssetSeeder: self._log_scan_config(roots) - existing_paths: set[str] = set() - for r in roots: + # Phase 1: Fast scan (stub records) + if phase in (ScanPhase.FAST, ScanPhase.FULL): + total_created, skipped_existing, total_paths = self._run_fast_phase(roots) + if self._is_cancelled(): - logging.info("Asset scan cancelled during sync phase") - cancelled = True - return - existing_paths.update(sync_root_safely(r)) - - if self._is_cancelled(): - logging.info("Asset scan cancelled after sync phase") - cancelled = True - return - - paths = collect_paths_for_roots(roots) - total_paths = len(paths) - self._update_progress(total=total_paths) - - self._emit_event( - "assets.seed.started", - {"roots": list(roots), "total": total_paths}, - ) - - specs, tag_pool, skipped_existing = build_asset_specs( - paths, existing_paths, compute_hashes=self._compute_hashes - ) - self._update_progress(skipped=skipped_existing) - - if self._is_cancelled(): - logging.info("Asset scan cancelled after building specs") - cancelled = True - return - - batch_size = 500 - last_progress_time = time.perf_counter() - progress_interval = 1.0 - - for i in range(0, len(specs), batch_size): - if self._is_cancelled(): - logging.info( - "Asset scan cancelled after %d/%d files (created=%d)", - i, - len(specs), - total_created, - ) cancelled = True return - batch = specs[i : i + batch_size] - batch_tags = {t for spec in batch for t in spec["tags"]} - try: - created = insert_asset_specs(batch, batch_tags) - total_created += created - except Exception as e: - self._add_error(f"Batch insert failed at offset {i}: {e}") - logging.exception("Batch insert failed at offset %d", i) + self._emit_event( + "assets.seed.fast_complete", + { + "roots": list(roots), + "created": total_created, + "skipped": skipped_existing, + "total": total_paths, + }, + ) - scanned = i + len(batch) - now = time.perf_counter() - self._update_progress(scanned=scanned, created=total_created) + # Phase 2: Enrichment scan (metadata + hashes) + if phase in (ScanPhase.ENRICH, ScanPhase.FULL): + if self._is_cancelled(): + cancelled = True + return - if now - last_progress_time >= progress_interval: - self._emit_event( - "assets.seed.progress", - { - "scanned": scanned, - "total": len(specs), - "created": total_created, - }, - ) - last_progress_time = now + total_enriched = self._run_enrich_phase(roots) - self._update_progress(scanned=len(specs), created=total_created) + self._emit_event( + "assets.seed.enrich_complete", + { + "roots": list(roots), + "enriched": total_enriched, + }, + ) elapsed = time.perf_counter() - t_start logging.info( - "Asset scan(roots=%s) completed in %.3fs (created=%d, skipped=%d, total=%d)", + "Asset scan(roots=%s, phase=%s) completed in %.3fs (created=%d, enriched=%d, skipped=%d)", roots, + phase.value, elapsed, total_created, + total_enriched, skipped_existing, - len(paths), ) self._emit_event( "assets.seed.completed", { - "scanned": len(specs), + "phase": phase.value, "total": total_paths, "created": total_created, + "enriched": total_enriched, "skipped": skipped_existing, "elapsed": round(elapsed, 3), }, @@ -429,5 +458,135 @@ class AssetSeeder: with self._lock: self._state = State.IDLE + def _run_fast_phase(self, roots: tuple[RootType, ...]) -> tuple[int, int, int]: + """Run phase 1: fast scan to create stub records. + + Returns: + Tuple of (total_created, skipped_existing, total_paths) + """ + total_created = 0 + skipped_existing = 0 + + existing_paths: set[str] = set() + for r in roots: + if self._is_cancelled(): + return total_created, skipped_existing, 0 + existing_paths.update(sync_root_safely(r)) + + if self._is_cancelled(): + return total_created, skipped_existing, 0 + + paths = collect_paths_for_roots(roots) + total_paths = len(paths) + self._update_progress(total=total_paths) + + self._emit_event( + "assets.seed.started", + {"roots": list(roots), "total": total_paths, "phase": "fast"}, + ) + + # Use stub specs (no metadata extraction, no hashing) + specs, tag_pool, skipped_existing = build_stub_specs(paths, existing_paths) + self._update_progress(skipped=skipped_existing) + + if self._is_cancelled(): + return total_created, skipped_existing, total_paths + + batch_size = 500 + last_progress_time = time.perf_counter() + progress_interval = 1.0 + + for i in range(0, len(specs), batch_size): + if self._is_cancelled(): + logging.info( + "Fast scan cancelled after %d/%d files (created=%d)", + i, + len(specs), + total_created, + ) + return total_created, skipped_existing, total_paths + + batch = specs[i : i + batch_size] + batch_tags = {t for spec in batch for t in spec["tags"]} + try: + created = insert_asset_specs(batch, batch_tags) + total_created += created + except Exception as e: + self._add_error(f"Batch insert failed at offset {i}: {e}") + logging.exception("Batch insert failed at offset %d", i) + + scanned = i + len(batch) + now = time.perf_counter() + self._update_progress(scanned=scanned, created=total_created) + + if now - last_progress_time >= progress_interval: + self._emit_event( + "assets.seed.progress", + { + "phase": "fast", + "scanned": scanned, + "total": len(specs), + "created": total_created, + }, + ) + last_progress_time = now + + self._update_progress(scanned=len(specs), created=total_created) + return total_created, skipped_existing, total_paths + + def _run_enrich_phase(self, roots: tuple[RootType, ...]) -> int: + """Run phase 2: enrich existing records with metadata and hashes. + + Returns: + Total number of assets enriched + """ + total_enriched = 0 + batch_size = 100 + last_progress_time = time.perf_counter() + progress_interval = 1.0 + + # Get the target enrichment level based on compute_hashes + target_max_level = ENRICHMENT_STUB if not self._compute_hashes else ENRICHMENT_METADATA + + self._emit_event( + "assets.seed.started", + {"roots": list(roots), "phase": "enrich"}, + ) + + while True: + if self._is_cancelled(): + logging.info("Enrich scan cancelled after %d assets", total_enriched) + break + + # Fetch next batch of unenriched assets + unenriched = get_unenriched_assets_for_roots( + roots, + max_level=target_max_level, + limit=batch_size, + ) + + if not unenriched: + break + + enriched, failed = enrich_assets_batch( + unenriched, + extract_metadata=True, + compute_hash=self._compute_hashes, + ) + total_enriched += enriched + + now = time.perf_counter() + if now - last_progress_time >= progress_interval: + self._emit_event( + "assets.seed.progress", + { + "phase": "enrich", + "enriched": total_enriched, + }, + ) + last_progress_time = now + + return total_enriched + asset_seeder = AssetSeeder() diff --git a/tests-unit/seeder_test/test_seeder.py b/tests-unit/seeder_test/test_seeder.py index 9acb3f797..121896119 100644 --- a/tests-unit/seeder_test/test_seeder.py +++ b/tests-unit/seeder_test/test_seeder.py @@ -6,7 +6,7 @@ from unittest.mock import patch import pytest -from app.assets.seeder import AssetSeeder, Progress, State +from app.assets.seeder import AssetSeeder, Progress, ScanPhase, State @pytest.fixture @@ -26,8 +26,10 @@ def mock_dependencies(): patch("app.assets.seeder.dependencies_available", return_value=True), patch("app.assets.seeder.sync_root_safely", return_value=set()), patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), - patch("app.assets.seeder.build_asset_specs", return_value=([], set(), 0)), + patch("app.assets.seeder.build_stub_specs", return_value=([], set(), 0)), patch("app.assets.seeder.insert_asset_specs", return_value=0), + patch("app.assets.seeder.get_unenriched_assets_for_roots", return_value=[]), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), ): yield @@ -202,6 +204,9 @@ class TestSeederCancellation: "info_name": f"file{i}", "tags": [], "fname": f"file{i}", + "metadata": None, + "hash": None, + "mime_type": None, } for i, p in enumerate(paths) ] @@ -211,9 +216,11 @@ class TestSeederCancellation: patch("app.assets.seeder.sync_root_safely", return_value=set()), patch("app.assets.seeder.collect_paths_for_roots", return_value=paths), patch( - "app.assets.seeder.build_asset_specs", return_value=(specs, set(), 0) + "app.assets.seeder.build_stub_specs", return_value=(specs, set(), 0) ), patch("app.assets.seeder.insert_asset_specs", side_effect=slow_insert), + patch("app.assets.seeder.get_unenriched_assets_for_roots", return_value=[]), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), ): fresh_seeder.start(roots=("models",)) time.sleep(0.1) @@ -237,7 +244,7 @@ class TestSeederErrorHandling: return_value=["/path/file.safetensors"], ), patch( - "app.assets.seeder.build_asset_specs", + "app.assets.seeder.build_stub_specs", return_value=( [ { @@ -247,6 +254,9 @@ class TestSeederErrorHandling: "info_name": "file", "tags": [], "fname": "file", + "metadata": None, + "hash": None, + "mime_type": None, } ], set(), @@ -257,6 +267,8 @@ class TestSeederErrorHandling: "app.assets.seeder.insert_asset_specs", side_effect=Exception("DB connection failed"), ), + patch("app.assets.seeder.get_unenriched_assets_for_roots", return_value=[]), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), ): fresh_seeder.start(roots=("models",)) fresh_seeder.wait(timeout=5.0) @@ -413,11 +425,101 @@ class TestSeederMarkMissing: patch("app.assets.seeder.mark_missing_outside_prefixes_safely", side_effect=track_mark), patch("app.assets.seeder.sync_root_safely", side_effect=track_sync), patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), - patch("app.assets.seeder.build_asset_specs", return_value=([], set(), 0)), + patch("app.assets.seeder.build_stub_specs", return_value=([], set(), 0)), patch("app.assets.seeder.insert_asset_specs", return_value=0), + patch("app.assets.seeder.get_unenriched_assets_for_roots", return_value=[]), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), ): fresh_seeder.start(roots=("models",), prune_first=True) fresh_seeder.wait(timeout=5.0) assert call_order[0] == "mark_missing" assert "sync_models" in call_order + + +class TestSeederPhases: + """Test phased scanning behavior.""" + + def test_start_fast_only_runs_fast_phase(self, fresh_seeder: AssetSeeder): + """Verify start_fast only runs the fast phase.""" + fast_called = [] + enrich_called = [] + + def track_fast(*args, **kwargs): + fast_called.append(True) + return ([], set(), 0) + + def track_enrich(*args, **kwargs): + enrich_called.append(True) + return [] + + with ( + patch("app.assets.seeder.dependencies_available", return_value=True), + patch("app.assets.seeder.sync_root_safely", return_value=set()), + patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), + patch("app.assets.seeder.build_stub_specs", side_effect=track_fast), + patch("app.assets.seeder.insert_asset_specs", return_value=0), + patch("app.assets.seeder.get_unenriched_assets_for_roots", side_effect=track_enrich), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), + ): + fresh_seeder.start_fast(roots=("models",)) + fresh_seeder.wait(timeout=5.0) + + assert len(fast_called) == 1 + assert len(enrich_called) == 0 + + def test_start_enrich_only_runs_enrich_phase(self, fresh_seeder: AssetSeeder): + """Verify start_enrich only runs the enrich phase.""" + fast_called = [] + enrich_called = [] + + def track_fast(*args, **kwargs): + fast_called.append(True) + return ([], set(), 0) + + def track_enrich(*args, **kwargs): + enrich_called.append(True) + return [] + + with ( + patch("app.assets.seeder.dependencies_available", return_value=True), + patch("app.assets.seeder.sync_root_safely", return_value=set()), + patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), + patch("app.assets.seeder.build_stub_specs", side_effect=track_fast), + patch("app.assets.seeder.insert_asset_specs", return_value=0), + patch("app.assets.seeder.get_unenriched_assets_for_roots", side_effect=track_enrich), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), + ): + fresh_seeder.start_enrich(roots=("models",)) + fresh_seeder.wait(timeout=5.0) + + assert len(fast_called) == 0 + assert len(enrich_called) == 1 + + def test_full_scan_runs_both_phases(self, fresh_seeder: AssetSeeder): + """Verify full scan runs both fast and enrich phases.""" + fast_called = [] + enrich_called = [] + + def track_fast(*args, **kwargs): + fast_called.append(True) + return ([], set(), 0) + + def track_enrich(*args, **kwargs): + enrich_called.append(True) + return [] + + with ( + patch("app.assets.seeder.dependencies_available", return_value=True), + patch("app.assets.seeder.sync_root_safely", return_value=set()), + patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), + patch("app.assets.seeder.build_stub_specs", side_effect=track_fast), + patch("app.assets.seeder.insert_asset_specs", return_value=0), + patch("app.assets.seeder.get_unenriched_assets_for_roots", side_effect=track_enrich), + patch("app.assets.seeder.enrich_assets_batch", return_value=(0, 0)), + ): + fresh_seeder.start(roots=("models",), phase=ScanPhase.FULL) + fresh_seeder.wait(timeout=5.0) + + assert len(fast_called) == 1 + assert len(enrich_called) == 1