refactor(assets): defer hashing to background seeder enrich phase

ingest_existing_file() now only inserts a stub record (hash=NULL) for
instant UX visibility. After registering outputs, triggers
asset_seeder.start_enrich() to compute hashes in the background.
This avoids blocking the prompt worker thread on hash computation.

Amp-Thread-ID: https://ampcode.com/threads/T-019cc013-1444-73c8-81d6-07cae6e5e38d
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-03-06 15:44:17 -08:00
parent 5ac207e846
commit 4c1d87ed11
2 changed files with 7 additions and 25 deletions

View File

@@ -134,21 +134,19 @@ def ingest_existing_file(
abs_path: str,
user_metadata: UserMetadata = None,
extra_tags: Sequence[str] = (),
tag_origin: str = "automatic",
owner_id: str = "",
) -> IngestResult:
"""Register an existing on-disk file as an asset.
) -> None:
"""Register an existing on-disk file as an asset stub.
Uses a two-phase approach: first inserts a stub record (hash=NULL) to
unblock UX immediately, then computes the BLAKE3 hash and updates the
asset with the full ingest pipeline (dedup, metadata, tags).
Inserts a stub record (hash=NULL) for immediate UX visibility.
The caller is responsible for triggering background enrichment
(hash computation, metadata extraction) via the asset seeder.
"""
size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path)
mime_type = mimetypes.guess_type(abs_path, strict=False)[0]
name, path_tags = get_name_and_tags_from_asset_path(abs_path)
tags = list(dict.fromkeys(path_tags + list(extra_tags)))
# Phase 1: fast stub insert (hash=NULL) to make the asset visible immediately
spec = {
"abs_path": abs_path,
"size_bytes": size_bytes,
@@ -164,23 +162,6 @@ def ingest_existing_file(
batch_insert_seed_assets(session, [spec], owner_id=owner_id)
session.commit()
# Phase 2: compute hash and run full ingest (dedup, metadata, tags)
digest, _ = hashing.compute_blake3_hash(abs_path)
asset_hash = "blake3:" + digest
return _ingest_file_from_path(
abs_path=abs_path,
asset_hash=asset_hash,
size_bytes=size_bytes,
mtime_ns=mtime_ns,
mime_type=mime_type,
info_name=name,
user_metadata=user_metadata,
tags=tags,
tag_origin=tag_origin,
owner_id=owner_id,
)
def _register_existing_asset(
asset_hash: str,

View File

@@ -304,7 +304,8 @@ def prompt_worker(q, server_instance):
try:
e.execute(item[2], prompt_id, extra_data, item[4])
if not asset_seeder.is_disabled():
_register_execution_outputs(e.history_result, prompt_id)
if _register_execution_outputs(e.history_result, prompt_id) > 0:
asset_seeder.start_enrich(roots=("output",), compute_hashes=True)
finally:
if was_paused:
asset_seeder.resume()