From 32011c403bb79b7eab67ced208114fc5f87e0be7 Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Thu, 5 Feb 2026 21:32:01 -0800 Subject: [PATCH] refactor(bulk_ingest): improve variable naming and add typed dicts - Rename shorthand variables to explicit names (sp -> spec, aid -> asset_id, etc.) - Move imports to top of file - Add TypedDict definitions for AssetRow, CacheStateRow, AssetInfoRow, TagRow, MetadataRow - Replace bare dict types with typed alternatives Amp-Thread-ID: https://ampcode.com/threads/T-019c316d-13f7-77f8-b92b-ea7276c3e09c Co-authored-by: Amp --- app/assets/services/bulk_ingest.py | 253 +++++++++++++++++++---------- 1 file changed, 163 insertions(+), 90 deletions(-) diff --git a/app/assets/services/bulk_ingest.py b/app/assets/services/bulk_ingest.py index ad7a12eb2..87b492aea 100644 --- a/app/assets/services/bulk_ingest.py +++ b/app/assets/services/bulk_ingest.py @@ -3,10 +3,24 @@ from __future__ import annotations import os import uuid from dataclasses import dataclass +from datetime import datetime from typing import TYPE_CHECKING, Any, TypedDict from sqlalchemy.orm import Session +from app.assets.database.queries import ( + bulk_insert_asset_infos_ignore_conflicts, + bulk_insert_assets, + bulk_insert_cache_states_ignore_conflicts, + bulk_insert_tags_and_meta, + delete_assets_by_ids, + get_asset_info_ids_by_ids, + get_cache_states_by_paths_and_asset_ids, + get_unreferenced_unhashed_asset_ids, + mark_cache_states_missing_outside_prefixes, +) +from app.assets.helpers import get_utc_now + if TYPE_CHECKING: from app.assets.services.metadata_extract import ExtractedMetadata @@ -22,18 +36,75 @@ class SeedAssetSpec(TypedDict): fname: str metadata: ExtractedMetadata | None -from app.assets.database.queries import ( - bulk_insert_asset_infos_ignore_conflicts, - bulk_insert_assets, - bulk_insert_cache_states_ignore_conflicts, - bulk_insert_tags_and_meta, - delete_assets_by_ids, - get_asset_info_ids_by_ids, - get_cache_states_by_paths_and_asset_ids, - get_unreferenced_unhashed_asset_ids, - mark_cache_states_missing_outside_prefixes, -) -from app.assets.helpers import get_utc_now + +class AssetRow(TypedDict): + """Row data for inserting an Asset.""" + + id: str + hash: str | None + size_bytes: int + mime_type: str | None + created_at: datetime + + +class CacheStateRow(TypedDict): + """Row data for inserting a CacheState.""" + + asset_id: str + file_path: str + mtime_ns: int + + +class AssetInfoRow(TypedDict): + """Row data for inserting an AssetInfo.""" + + id: str + owner_id: str + name: str + asset_id: str + preview_id: str | None + user_metadata: dict[str, Any] | None + created_at: datetime + updated_at: datetime + last_access_time: datetime + + +class AssetInfoRowInternal(TypedDict): + """Internal row data for AssetInfo with extra tracking fields.""" + + id: str + owner_id: str + name: str + asset_id: str + preview_id: str | None + user_metadata: dict[str, Any] | None + created_at: datetime + updated_at: datetime + last_access_time: datetime + _tags: list[str] + _filename: str + _extracted_metadata: ExtractedMetadata | None + + +class TagRow(TypedDict): + """Row data for inserting a Tag.""" + + asset_info_id: str + tag_name: str + origin: str + added_at: datetime + + +class MetadataRow(TypedDict): + """Row data for inserting asset metadata.""" + + asset_info_id: str + key: str + ordinal: int + val_str: str | None + val_num: float | None + val_bool: bool | None + val_json: dict[str, Any] | None @dataclass @@ -74,139 +145,141 @@ def batch_insert_seed_assets( if not specs: return BulkInsertResult(inserted_infos=0, won_states=0, lost_states=0) - now = get_utc_now() - asset_rows: list[dict] = [] - state_rows: list[dict] = [] - path_to_asset: dict[str, str] = {} - asset_to_info: dict[str, dict] = {} - path_list: list[str] = [] + current_time = get_utc_now() + asset_rows: list[AssetRow] = [] + cache_state_rows: list[CacheStateRow] = [] + path_to_asset_id: dict[str, str] = {} + asset_id_to_info: dict[str, AssetInfoRowInternal] = {} + absolute_path_list: list[str] = [] - for sp in specs: - ap = os.path.abspath(sp["abs_path"]) - aid = str(uuid.uuid4()) - iid = str(uuid.uuid4()) - path_list.append(ap) - path_to_asset[ap] = aid + for spec in specs: + absolute_path = os.path.abspath(spec["abs_path"]) + asset_id = str(uuid.uuid4()) + asset_info_id = str(uuid.uuid4()) + absolute_path_list.append(absolute_path) + path_to_asset_id[absolute_path] = asset_id asset_rows.append( { - "id": aid, + "id": asset_id, "hash": None, - "size_bytes": sp["size_bytes"], + "size_bytes": spec["size_bytes"], "mime_type": None, - "created_at": now, + "created_at": current_time, } ) - state_rows.append( + cache_state_rows.append( { - "asset_id": aid, - "file_path": ap, - "mtime_ns": sp["mtime_ns"], + "asset_id": asset_id, + "file_path": absolute_path, + "mtime_ns": spec["mtime_ns"], } ) # Build user_metadata from extracted metadata or fallback to filename - extracted = sp.get("metadata") - if extracted: - user_metadata: dict[str, Any] | None = extracted.to_user_metadata() - elif sp["fname"]: - user_metadata = {"filename": sp["fname"]} + extracted_metadata = spec.get("metadata") + if extracted_metadata: + user_metadata: dict[str, Any] | None = extracted_metadata.to_user_metadata() + elif spec["fname"]: + user_metadata = {"filename": spec["fname"]} else: user_metadata = None - asset_to_info[aid] = { - "id": iid, + asset_id_to_info[asset_id] = { + "id": asset_info_id, "owner_id": owner_id, - "name": sp["info_name"], - "asset_id": aid, + "name": spec["info_name"], + "asset_id": asset_id, "preview_id": None, "user_metadata": user_metadata, - "created_at": now, - "updated_at": now, - "last_access_time": now, - "_tags": sp["tags"], - "_filename": sp["fname"], - "_extracted_metadata": extracted, + "created_at": current_time, + "updated_at": current_time, + "last_access_time": current_time, + "_tags": spec["tags"], + "_filename": spec["fname"], + "_extracted_metadata": extracted_metadata, } bulk_insert_assets(session, asset_rows) - bulk_insert_cache_states_ignore_conflicts(session, state_rows) - winners_by_path = get_cache_states_by_paths_and_asset_ids(session, path_to_asset) + bulk_insert_cache_states_ignore_conflicts(session, cache_state_rows) + winning_paths = get_cache_states_by_paths_and_asset_ids(session, path_to_asset_id) - all_paths_set = set(path_list) - losers_by_path = all_paths_set - winners_by_path - lost_assets = [path_to_asset[p] for p in losers_by_path] + all_paths_set = set(absolute_path_list) + losing_paths = all_paths_set - winning_paths + lost_asset_ids = [path_to_asset_id[path] for path in losing_paths] - if lost_assets: - delete_assets_by_ids(session, lost_assets) + if lost_asset_ids: + delete_assets_by_ids(session, lost_asset_ids) - if not winners_by_path: + if not winning_paths: return BulkInsertResult( inserted_infos=0, won_states=0, - lost_states=len(losers_by_path), + lost_states=len(losing_paths), ) - winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path] - db_info_rows = [ - { - "id": row["id"], - "owner_id": row["owner_id"], - "name": row["name"], - "asset_id": row["asset_id"], - "preview_id": row["preview_id"], - "user_metadata": row["user_metadata"], - "created_at": row["created_at"], - "updated_at": row["updated_at"], - "last_access_time": row["last_access_time"], - } - for row in winner_info_rows + winner_info_rows = [ + asset_id_to_info[path_to_asset_id[path]] for path in winning_paths ] - bulk_insert_asset_infos_ignore_conflicts(session, db_info_rows) + database_info_rows: list[AssetInfoRow] = [ + { + "id": info_row["id"], + "owner_id": info_row["owner_id"], + "name": info_row["name"], + "asset_id": info_row["asset_id"], + "preview_id": info_row["preview_id"], + "user_metadata": info_row["user_metadata"], + "created_at": info_row["created_at"], + "updated_at": info_row["updated_at"], + "last_access_time": info_row["last_access_time"], + } + for info_row in winner_info_rows + ] + bulk_insert_asset_infos_ignore_conflicts(session, database_info_rows) - all_info_ids = [row["id"] for row in winner_info_rows] + all_info_ids = [info_row["id"] for info_row in winner_info_rows] inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids) - tag_rows: list[dict] = [] - meta_rows: list[dict] = [] + tag_rows: list[TagRow] = [] + metadata_rows: list[MetadataRow] = [] if inserted_info_ids: - for row in winner_info_rows: - iid = row["id"] - if iid not in inserted_info_ids: + for info_row in winner_info_rows: + info_id = info_row["id"] + if info_id not in inserted_info_ids: continue - for t in row["_tags"]: + for tag in info_row["_tags"]: tag_rows.append( { - "asset_info_id": iid, - "tag_name": t, + "asset_info_id": info_id, + "tag_name": tag, "origin": "automatic", - "added_at": now, + "added_at": current_time, } ) # Use extracted metadata for meta rows if available - extracted = row.get("_extracted_metadata") - if extracted: - meta_rows.extend(extracted.to_meta_rows(iid)) - elif row["_filename"]: + extracted_metadata = info_row.get("_extracted_metadata") + if extracted_metadata: + metadata_rows.extend(extracted_metadata.to_meta_rows(info_id)) + elif info_row["_filename"]: # Fallback: just store filename - meta_rows.append( + metadata_rows.append( { - "asset_info_id": iid, + "asset_info_id": info_id, "key": "filename", "ordinal": 0, - "val_str": row["_filename"], + "val_str": info_row["_filename"], "val_num": None, "val_bool": None, "val_json": None, } ) - bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows) + bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=metadata_rows) return BulkInsertResult( inserted_infos=len(inserted_info_ids), - won_states=len(winners_by_path), - lost_states=len(losers_by_path), + won_states=len(winning_paths), + lost_states=len(losing_paths), )