feat: implement two-phase scanning architecture (fast + enrich)

Phase 1 (FAST): Creates stub records with filesystem metadata only
- path, size, mtime - no file content reading
- Populates asset database quickly on startup

Phase 2 (ENRICH): Extracts metadata and computes hashes
- Safetensors header parsing, MIME types
- Optional blake3 hash computation
- Updates existing stub records

Changes:
- Add ScanPhase enum (FAST, ENRICH, FULL)
- Add enrichment_level column to AssetCacheState (0=stub, 1=metadata, 2=hashed)
- Add build_stub_specs() for fast scanning without metadata extraction
- Add get_unenriched_cache_states(), enrich_asset(), enrich_assets_batch()
- Add start_fast(), start_enrich() convenience methods to AssetSeeder
- Update start() to accept phase parameter (defaults to FULL)
- Split _run_scan() into _run_fast_phase() and _run_enrich_phase()
- Add migration 0003_add_enrichment_level.py
- Update tests for new architecture

Amp-Thread-ID: https://ampcode.com/threads/T-019c4eef-1568-778f-aede-38254728f848
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-11 16:04:12 -08:00
parent bd17ee3dc9
commit c7368205e3
7 changed files with 675 additions and 74 deletions

View File

@@ -25,7 +25,9 @@ from app.assets.database.queries.asset_info import (
)
from app.assets.database.queries.cache_state import (
CacheStateRow,
UnenrichedAssetRow,
bulk_insert_cache_states_ignore_conflicts,
bulk_update_enrichment_level,
bulk_update_is_missing,
bulk_update_needs_verify,
delete_assets_by_ids,
@@ -33,10 +35,12 @@ from app.assets.database.queries.cache_state import (
delete_orphaned_seed_asset,
get_cache_states_by_paths_and_asset_ids,
get_cache_states_for_prefixes,
get_unenriched_cache_states,
get_unreferenced_unhashed_asset_ids,
list_cache_states_by_asset_id,
mark_cache_states_missing_outside_prefixes,
restore_cache_states_by_paths,
update_enrichment_level,
upsert_cache_state,
)
from app.assets.database.queries.tags import (
@@ -59,6 +63,7 @@ __all__ = [
"CacheStateRow",
"RemoveTagsDict",
"SetTagsDict",
"UnenrichedAssetRow",
"add_missing_tag_for_asset_id",
"add_tags_to_asset_info",
"asset_exists_by_hash",
@@ -67,6 +72,7 @@ __all__ = [
"bulk_insert_assets",
"bulk_insert_cache_states_ignore_conflicts",
"bulk_insert_tags_and_meta",
"bulk_update_enrichment_level",
"bulk_update_is_missing",
"bulk_update_needs_verify",
"delete_asset_info_by_id",
@@ -84,6 +90,7 @@ __all__ = [
"get_cache_states_by_paths_and_asset_ids",
"get_cache_states_for_prefixes",
"get_or_create_asset_info",
"get_unenriched_cache_states",
"get_unreferenced_unhashed_asset_ids",
"insert_asset_info",
"list_asset_infos_page",
@@ -100,6 +107,7 @@ __all__ = [
"update_asset_info_name",
"update_asset_info_timestamps",
"update_asset_info_updated_at",
"update_enrichment_level",
"upsert_asset",
"upsert_cache_state",
]

View File

@@ -302,6 +302,106 @@ def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool:
return False
class UnenrichedAssetRow(NamedTuple):
"""Row for assets needing enrichment."""
cache_state_id: int
asset_id: str
asset_info_id: str
file_path: str
enrichment_level: int
def get_unenriched_cache_states(
session: Session,
prefixes: list[str],
max_level: int = 0,
limit: int = 1000,
) -> list[UnenrichedAssetRow]:
"""Get cache states that need enrichment (enrichment_level <= max_level).
Args:
session: Database session
prefixes: List of absolute directory prefixes to scan
max_level: Maximum enrichment level to include (0=stubs, 1=metadata done)
limit: Maximum number of rows to return
Returns:
List of unenriched asset rows with file paths
"""
if not prefixes:
return []
conds = []
for p in prefixes:
base = os.path.abspath(p)
if not base.endswith(os.sep):
base += os.sep
escaped, esc = escape_sql_like_string(base)
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
query = (
sa.select(
AssetCacheState.id,
AssetCacheState.asset_id,
AssetInfo.id,
AssetCacheState.file_path,
AssetCacheState.enrichment_level,
)
.join(Asset, Asset.id == AssetCacheState.asset_id)
.join(AssetInfo, AssetInfo.asset_id == Asset.id)
.where(sa.or_(*conds))
.where(AssetCacheState.is_missing == False) # noqa: E712
.where(AssetCacheState.enrichment_level <= max_level)
.order_by(AssetCacheState.id.asc())
.limit(limit)
)
rows = session.execute(query).all()
return [
UnenrichedAssetRow(
cache_state_id=row[0],
asset_id=row[1],
asset_info_id=row[2],
file_path=row[3],
enrichment_level=row[4],
)
for row in rows
]
def update_enrichment_level(
session: Session,
cache_state_id: int,
level: int,
) -> None:
"""Update the enrichment level for a cache state."""
session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id == cache_state_id)
.values(enrichment_level=level)
)
def bulk_update_enrichment_level(
session: Session,
cache_state_ids: list[int],
level: int,
) -> int:
"""Update enrichment level for multiple cache states.
Returns: Number of rows updated
"""
if not cache_state_ids:
return 0
result = session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(cache_state_ids))
.values(enrichment_level=level)
)
return result.rowcount
def bulk_insert_cache_states_ignore_conflicts(
session: Session,
rows: list[dict],