From 7519a556df5ba1f618375e8528d47a65ead6a962 Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Fri, 6 Feb 2026 19:22:56 -0800 Subject: [PATCH] Add optional blake3 hashing during asset scanning - Make blake3 import lazy in hashing.py (only imported when needed) - Add compute_hashes parameter to AssetSeeder.start(), build_asset_specs(), and seed_assets() - Fix missing tag clearing: include is_missing states in sync when update_missing_tags=True - Clear is_missing flag on cache states when files are restored with matching mtime/size - Fix validation error serialization in routes.py (use json.loads(ve.json())) Amp-Thread-ID: https://ampcode.com/threads/T-019c3614-56d4-74a8-a717-19922d6dbbee Co-authored-by: Amp --- app/assets/api/routes.py | 25 ++++------- app/assets/api/schemas_in.py | 6 ++- app/assets/api/upload.py | 10 +++-- app/assets/database/queries/asset.py | 2 +- app/assets/database/queries/common.py | 4 +- app/assets/scanner.py | 56 +++++++++++++++++++++---- app/assets/seeder.py | 35 ++++++++++------ app/assets/services/asset_management.py | 4 +- app/assets/services/bulk_ingest.py | 3 +- app/assets/services/file_utils.py | 7 ++-- app/assets/services/hashing.py | 19 +++++++-- 11 files changed, 119 insertions(+), 52 deletions(-) diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index 74364d445..389f4280b 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -79,7 +79,9 @@ def _build_error_response( def _build_validation_error_response(code: str, ve: ValidationError) -> web.Response: - return _build_error_response(400, code, "Validation failed.", {"errors": ve.json()}) + import json + errors = json.loads(ve.json()) + return _build_error_response(400, code, "Validation failed.", {"errors": errors}) def _validate_sort_field(requested: str | None) -> str: @@ -123,11 +125,8 @@ async def list_assets_route(request: web.Request) -> web.Response: return _build_validation_error_response("INVALID_QUERY", ve) sort = _validate_sort_field(q.sort) - order = ( - "desc" - if (q.order or "desc").lower() not in {"asc", "desc"} - else q.order.lower() - ) + order_candidate = (q.order or "desc").lower() + order = order_candidate if order_candidate in {"asc", "desc"} else "desc" result = list_assets_page( owner_id=USER_MANAGER.get_request_user_id(request), @@ -233,7 +232,7 @@ async def download_asset_content(request: web.Request) -> web.Response: ) quoted = (filename or "").replace("\r", "").replace("\n", "").replace('"', "'") - cd = f"{disposition}; filename=\"{quoted}\"; filename*=UTF-8''{urllib.parse.quote(filename)}" + cd = f"{disposition}; filename=\"{quoted}\"; filename*=UTF-8''{urllib.parse.quote(quoted)}" file_size = os.path.getsize(abs_path) logging.info( @@ -490,15 +489,9 @@ async def get_tags(request: web.Request) -> web.Response: try: query = schemas_in.TagsListQuery.model_validate(query_map) except ValidationError as e: - return web.json_response( - { - "error": { - "code": "INVALID_QUERY", - "message": "Invalid query parameters", - "details": e.errors(), - } - }, - status=400, + import json + return _build_error_response( + 400, "INVALID_QUERY", "Invalid query parameters", {"errors": json.loads(e.json())} ) rows, total = list_tags( diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index 081918757..ad649463f 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -28,6 +28,7 @@ class AssetValidationError(Exception): def __init__(self, code: str, message: str): super().__init__(message) self.code = code + self.message = message class AssetNotFoundError(Exception): @@ -35,12 +36,15 @@ class AssetNotFoundError(Exception): def __init__(self, message: str): super().__init__(message) + self.message = message class HashMismatchError(Exception): """Uploaded file hash does not match provided hash.""" - pass + def __init__(self, message: str): + super().__init__(message) + self.message = message class DependencyMissingError(Exception): diff --git a/app/assets/api/upload.py b/app/assets/api/upload.py index 7f90cec78..ac0919374 100644 --- a/app/assets/api/upload.py +++ b/app/assets/api/upload.py @@ -1,3 +1,4 @@ +import logging import os import uuid from typing import Callable @@ -83,7 +84,10 @@ async def parse_multipart_upload( provided_hash = normalize_and_validate_hash(s) try: provided_hash_exists = check_hash_exists(provided_hash) - except Exception: + except Exception as e: + logging.warning( + "check_hash_exists failed for hash=%s: %s", provided_hash, e + ) provided_hash_exists = None # do not fail the whole request here elif fname == "file": @@ -162,5 +166,5 @@ def delete_temp_file_if_exists(tmp_path: str | None) -> None: try: if os.path.exists(tmp_path): os.remove(tmp_path) - except Exception: - pass + except OSError as e: + logging.debug("Failed to delete temp file %s: %s", tmp_path, e) diff --git a/app/assets/database/queries/asset.py b/app/assets/database/queries/asset.py index 6913fb501..cba937b1e 100644 --- a/app/assets/database/queries/asset.py +++ b/app/assets/database/queries/asset.py @@ -85,6 +85,6 @@ def bulk_insert_assets( """Bulk insert Asset rows. Each dict should have: id, hash, size_bytes, mime_type, created_at.""" if not rows: return - ins = sqlite.insert(Asset) + ins = sqlite.insert(Asset).on_conflict_do_nothing(index_elements=[Asset.hash]) for chunk in iter_chunks(rows, calculate_rows_per_statement(5)): session.execute(ins, chunk) diff --git a/app/assets/database/queries/common.py b/app/assets/database/queries/common.py index 4086cea56..bb3b480fa 100644 --- a/app/assets/database/queries/common.py +++ b/app/assets/database/queries/common.py @@ -23,8 +23,8 @@ def iter_chunks(seq, n: int): def iter_row_chunks(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]: """Yield chunks of rows sized to fit within bind param limits.""" if not rows: - return [] - rows_per_stmt = max(1, MAX_BIND_PARAMS // max(1, cols_per_row)) + return + rows_per_stmt = calculate_rows_per_statement(cols_per_row) for i in range(0, len(rows), rows_per_stmt): yield rows[i : i + rows_per_stmt] diff --git a/app/assets/scanner.py b/app/assets/scanner.py index 201967286..2a56c6216 100644 --- a/app/assets/scanner.py +++ b/app/assets/scanner.py @@ -25,6 +25,7 @@ from app.assets.services.file_utils import ( list_files_recursively, verify_file_unchanged, ) +from app.assets.services.hashing import compute_blake3_hash from app.assets.services.metadata_extract import extract_file_metadata from app.assets.services.path_utils import ( compute_relative_filename, @@ -84,7 +85,7 @@ def collect_models_files() -> list[str]: allowed = False for b in bases: base_abs = os.path.abspath(b) - with contextlib.suppress(Exception): + with contextlib.suppress(ValueError): if os.path.commonpath([abs_path, base_abs]) == base_abs: allowed = True break @@ -120,7 +121,9 @@ def sync_cache_states_with_filesystem( if not prefixes: return set() if collect_existing_paths else None - rows = get_cache_states_for_prefixes(session, prefixes) + rows = get_cache_states_for_prefixes( + session, prefixes, include_missing=update_missing_tags + ) by_asset: dict[str, _AssetAccumulator] = {} for row in rows: @@ -139,8 +142,12 @@ def sync_cache_states_with_filesystem( ) except FileNotFoundError: exists = False - except OSError: + except PermissionError: + exists = True + logging.debug("Permission denied accessing %s", row.file_path) + except OSError as e: exists = False + logging.debug("OSError checking %s: %s", row.file_path, e) acc["states"].append( { @@ -156,6 +163,7 @@ def sync_cache_states_with_filesystem( to_clear_verify: list[int] = [] stale_state_ids: list[int] = [] to_mark_missing: list[int] = [] + to_clear_missing: list[int] = [] survivors: set[str] = set() for aid, acc in by_asset.items(): @@ -168,8 +176,10 @@ def sync_cache_states_with_filesystem( if not s["exists"]: to_mark_missing.append(s["sid"]) continue - if s["fast_ok"] and s["needs_verify"]: - to_clear_verify.append(s["sid"]) + if s["fast_ok"]: + to_clear_missing.append(s["sid"]) + if s["needs_verify"]: + to_clear_verify.append(s["sid"]) if not s["fast_ok"] and not s["needs_verify"]: to_set_verify.append(s["sid"]) @@ -187,11 +197,15 @@ def sync_cache_states_with_filesystem( if not s["exists"]: stale_state_ids.append(s["sid"]) if update_missing_tags: - with contextlib.suppress(Exception): + try: remove_missing_tag_for_asset_id(session, asset_id=aid) + except Exception as e: + logging.warning("Failed to remove missing tag for asset %s: %s", aid, e) elif update_missing_tags: - with contextlib.suppress(Exception): + try: add_missing_tag_for_asset_id(session, asset_id=aid, origin="automatic") + except Exception as e: + logging.warning("Failed to add missing tag for asset %s: %s", aid, e) for s in states: if s["exists"]: @@ -201,6 +215,7 @@ def sync_cache_states_with_filesystem( stale_set = set(stale_state_ids) to_mark_missing = [sid for sid in to_mark_missing if sid not in stale_set] bulk_update_is_missing(session, to_mark_missing, value=True) + bulk_update_is_missing(session, to_clear_missing, value=False) bulk_update_needs_verify(session, to_set_verify, value=True) bulk_update_needs_verify(session, to_clear_verify, value=False) @@ -258,6 +273,7 @@ def build_asset_specs( paths: list[str], existing_paths: set[str], enable_metadata_extraction: bool = True, + compute_hashes: bool = False, ) -> tuple[list[SeedAssetSpec], set[str], int]: """Build asset specs from paths, returning (specs, tag_pool, skipped_count). @@ -265,6 +281,7 @@ def build_asset_specs( paths: List of file paths to process existing_paths: Set of paths that already exist in the database enable_metadata_extraction: If True, extract tier 1 & 2 metadata from files + compute_hashes: If True, compute blake3 hashes for each file (slow for large files) """ specs: list[SeedAssetSpec] = [] tag_pool: set[str] = set() @@ -294,6 +311,15 @@ def build_asset_specs( relative_filename=rel_fname, ) + # Compute hash if requested + asset_hash: str | None = None + if compute_hashes: + try: + digest = compute_blake3_hash(abs_p) + asset_hash = "blake3:" + digest + except Exception as e: + logging.warning("Failed to hash %s: %s", abs_p, e) + specs.append( { "abs_path": abs_p, @@ -303,6 +329,7 @@ def build_asset_specs( "tags": tags, "fname": rel_fname, "metadata": metadata, + "hash": asset_hash, } ) tag_pool.update(tags) @@ -322,9 +349,18 @@ def insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int: return result.inserted_infos -def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None: +def seed_assets( + roots: tuple[RootType, ...], + enable_logging: bool = False, + compute_hashes: bool = False, +) -> None: """Scan the given roots and seed the assets into the database. + Args: + roots: Tuple of root types to scan (models, input, output) + enable_logging: If True, log progress and completion messages + compute_hashes: If True, compute blake3 hashes for each file (slow for large files) + Note: This function does not mark missing assets. Call mark_missing_outside_prefixes_safely separately if cleanup is needed. """ @@ -340,7 +376,9 @@ def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> No existing_paths.update(sync_root_safely(r)) paths = collect_paths_for_roots(roots) - specs, tag_pool, skipped_existing = build_asset_specs(paths, existing_paths) + specs, tag_pool, skipped_existing = build_asset_specs( + paths, existing_paths, compute_hashes=compute_hashes + ) created = insert_asset_specs(specs, tag_pool) if enable_logging: diff --git a/app/assets/seeder.py b/app/assets/seeder.py index bb9934acd..a6594615f 100644 --- a/app/assets/seeder.py +++ b/app/assets/seeder.py @@ -82,6 +82,7 @@ class AssetSeeder: self._thread: threading.Thread | None = None self._cancel_event = threading.Event() self._roots: tuple[RootType, ...] = () + self._compute_hashes: bool = False self._progress_callback: ProgressCallback | None = None def start( @@ -89,6 +90,7 @@ class AssetSeeder: roots: tuple[RootType, ...] = ("models", "input", "output"), progress_callback: ProgressCallback | None = None, prune_first: bool = False, + compute_hashes: bool = False, ) -> bool: """Start a background scan for the given roots. @@ -96,6 +98,7 @@ class AssetSeeder: roots: Tuple of root types to scan (models, input, output) progress_callback: Optional callback called with progress updates prune_first: If True, prune orphaned assets before scanning + compute_hashes: If True, compute blake3 hashes for each file (slow for large files) Returns: True if scan was started, False if already running @@ -108,6 +111,7 @@ class AssetSeeder: self._errors = [] self._roots = roots self._prune_first = prune_first + self._compute_hashes = compute_hashes self._progress_callback = progress_callback self._cancel_event.clear() self._thread = threading.Thread( @@ -237,6 +241,9 @@ class AssetSeeder: skipped: int | None = None, ) -> None: """Update progress counters (thread-safe).""" + callback: ProgressCallback | None = None + progress: Progress | None = None + with self._lock: if self._progress is None: return @@ -249,17 +256,19 @@ class AssetSeeder: if skipped is not None: self._progress.skipped = skipped if self._progress_callback: - try: - self._progress_callback( - Progress( - scanned=self._progress.scanned, - total=self._progress.total, - created=self._progress.created, - skipped=self._progress.skipped, - ) - ) - except Exception: - pass + callback = self._progress_callback + progress = Progress( + scanned=self._progress.scanned, + total=self._progress.total, + created=self._progress.created, + skipped=self._progress.skipped, + ) + + if callback and progress: + try: + callback(progress) + except Exception: + pass def _add_error(self, message: str) -> None: """Add an error message (thread-safe).""" @@ -334,7 +343,9 @@ class AssetSeeder: {"roots": list(roots), "total": total_paths}, ) - specs, tag_pool, skipped_existing = build_asset_specs(paths, existing_paths) + specs, tag_pool, skipped_existing = build_asset_specs( + paths, existing_paths, compute_hashes=self._compute_hashes + ) self._update_progress(skipped=skipped_existing) if self._is_cancelled(): diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 3925bb0b8..e65f49df0 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -272,7 +272,9 @@ def resolve_asset_for_download( states = list_cache_states_by_asset_id(session, asset_id=asset.id) abs_path = select_best_live_path(states) if not abs_path: - raise FileNotFoundError + raise FileNotFoundError( + f"No live path for AssetInfo {asset_info_id} (asset id={asset.id}, name={info.name})" + ) update_asset_info_access_time(session, asset_info_id=asset_info_id) session.commit() diff --git a/app/assets/services/bulk_ingest.py b/app/assets/services/bulk_ingest.py index 88f48c724..ba7872488 100644 --- a/app/assets/services/bulk_ingest.py +++ b/app/assets/services/bulk_ingest.py @@ -36,6 +36,7 @@ class SeedAssetSpec(TypedDict): tags: list[str] fname: str metadata: ExtractedMetadata | None + hash: str | None class AssetRow(TypedDict): @@ -163,7 +164,7 @@ def batch_insert_seed_assets( asset_rows.append( { "id": asset_id, - "hash": None, + "hash": spec.get("hash"), "size_bytes": spec["size_bytes"], "mime_type": None, "created_at": current_time, diff --git a/app/assets/services/file_utils.py b/app/assets/services/file_utils.py index 97bb2ec73..f54ea3a34 100644 --- a/app/assets/services/file_utils.py +++ b/app/assets/services/file_utils.py @@ -23,15 +23,16 @@ def verify_file_unchanged( Returns True if the file's mtime and size match the database values. Returns False if mtime_db is None or values don't match. + + size_db=None means don't check size; 0 is a valid recorded size. """ if mtime_db is None: return False actual_mtime_ns = get_mtime_ns(stat_result) if int(mtime_db) != int(actual_mtime_ns): return False - sz = int(size_db or 0) - if sz > 0: - return int(stat_result.st_size) == sz + if size_db is not None: + return int(stat_result.st_size) == int(size_db) return True diff --git a/app/assets/services/hashing.py b/app/assets/services/hashing.py index 38aeae4d7..c77f2f916 100644 --- a/app/assets/services/hashing.py +++ b/app/assets/services/hashing.py @@ -2,10 +2,23 @@ import asyncio import os from typing import IO -from blake3 import blake3 - DEFAULT_CHUNK = 8 * 1024 * 1024 +_blake3 = None + + +def _get_blake3(): + global _blake3 + if _blake3 is None: + try: + from blake3 import blake3 as _b3 + _blake3 = _b3 + except ImportError: + raise ImportError( + "blake3 is required for asset hashing. Install with: pip install blake3" + ) + return _blake3 + def compute_blake3_hash( fp: str | IO[bytes], @@ -42,7 +55,7 @@ def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str: if orig_pos != 0: file_obj.seek(0) - h = blake3() + h = _get_blake3()() while True: chunk = file_obj.read(chunk_size) if not chunk: