feat(assets): register output files as assets after prompt execution

Add ingest_existing_file() to services/ingest.py as a public one-call
wrapper for registering on-disk files (stat, BLAKE3 hash, MIME detection,
path-based tag derivation).

After each prompt execution in the main loop, iterate
history_result['outputs'] and register files with type 'output' as
assets. Runs while the asset seeder is paused, gated behind
asset_seeder.is_disabled(). Stores prompt_id in user_metadata for
provenance tracking.

Amp-Thread-ID: https://ampcode.com/threads/T-019cc013-1444-73c8-81d6-07cae6e5e38d
Co-authored-by: Amp <amp@ampcode.com>

refactor(assets): two-phase ingest — stub insert then hash

ingest_existing_file() now inserts a stub record (hash=NULL) first for
instant UX visibility, then computes the BLAKE3 hash and runs the full
ingest pipeline. No compute_hash flag exposed — both phases always run.

Amp-Thread-ID: https://ampcode.com/threads/T-019cc013-1444-73c8-81d6-07cae6e5e38d
Co-authored-by: Amp <amp@ampcode.com>

refactor(assets): defer hashing to background seeder enrich phase

ingest_existing_file() now only inserts a stub record (hash=NULL) for
instant UX visibility. After registering outputs, triggers
asset_seeder.start_enrich() to compute hashes in the background.
This avoids blocking the prompt worker thread on hash computation.

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019ccb2f-8bfd-74b3-9005-3a4ace919554
This commit is contained in:
Luke Mino-Altherr
2026-03-06 14:51:22 -08:00
parent 29b24cb517
commit 2f3d18ae71
4 changed files with 117 additions and 0 deletions

View File

@@ -92,6 +92,7 @@ class _AssetSeeder:
self._prune_first: bool = False
self._progress_callback: ProgressCallback | None = None
self._disabled: bool = False
self._pending_enrich: dict | None = None
def disable(self) -> None:
"""Disable the asset seeder, preventing any scans from starting."""
@@ -196,6 +197,34 @@ class _AssetSeeder:
compute_hashes=compute_hashes,
)
def enqueue_enrich(
self,
roots: tuple[RootType, ...] = ("models", "input", "output"),
compute_hashes: bool = False,
) -> bool:
"""Start an enrichment scan now, or queue it for after the current scan.
If the seeder is idle, starts immediately. Otherwise, the enrich
request is stored and will run automatically when the current scan
finishes.
Args:
roots: Tuple of root types to scan
compute_hashes: If True, compute blake3 hashes
Returns:
True if started immediately, False if queued for later
"""
if self.start_enrich(roots=roots, compute_hashes=compute_hashes):
return True
with self._lock:
self._pending_enrich = {
"roots": roots,
"compute_hashes": compute_hashes,
}
logging.info("Enrich scan queued (roots=%s)", roots)
return False
def cancel(self) -> bool:
"""Request cancellation of the current scan.
@@ -597,6 +626,13 @@ class _AssetSeeder:
self._last_progress = self._progress
self._state = State.IDLE
self._progress = None
pending = self._pending_enrich
self._pending_enrich = None
if pending is not None:
self.start_enrich(
roots=pending["roots"],
compute_hashes=pending["compute_hashes"],
)
def _run_fast_phase(self, roots: tuple[RootType, ...]) -> tuple[int, int, int]:
"""Run phase 1: fast scan to create stub records.

View File

@@ -23,6 +23,7 @@ from app.assets.services.ingest import (
DependencyMissingError,
HashMismatchError,
create_from_hash,
ingest_existing_file,
upload_from_temp_path,
)
from app.assets.database.queries import (
@@ -72,6 +73,7 @@ __all__ = [
"delete_asset_reference",
"get_asset_by_hash",
"get_asset_detail",
"ingest_existing_file",
"get_mtime_ns",
"get_size_and_mtime_ns",
"list_assets_page",

View File

@@ -23,9 +23,11 @@ from app.assets.database.queries import (
validate_tags_exist,
)
from app.assets.helpers import normalize_tags
from app.assets.services.bulk_ingest import batch_insert_seed_assets
from app.assets.services.file_utils import get_size_and_mtime_ns
from app.assets.services.path_utils import (
compute_relative_filename,
get_name_and_tags_from_asset_path,
resolve_destination_from_tags,
validate_path_within_base,
)
@@ -128,6 +130,39 @@ def _ingest_file_from_path(
)
def ingest_existing_file(
abs_path: str,
user_metadata: UserMetadata = None,
extra_tags: Sequence[str] = (),
owner_id: str = "",
) -> None:
"""Register an existing on-disk file as an asset stub.
Inserts a stub record (hash=NULL) for immediate UX visibility.
The caller is responsible for triggering background enrichment
(hash computation, metadata extraction) via the asset seeder.
"""
size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path)
mime_type = mimetypes.guess_type(abs_path, strict=False)[0]
name, path_tags = get_name_and_tags_from_asset_path(abs_path)
tags = list(dict.fromkeys(path_tags + list(extra_tags)))
spec = {
"abs_path": abs_path,
"size_bytes": size_bytes,
"mtime_ns": mtime_ns,
"info_name": name,
"tags": tags,
"fname": os.path.basename(abs_path),
"metadata": None,
"hash": None,
"mime_type": mime_type,
}
with create_session() as session:
batch_insert_seed_assets(session, [spec], owner_id=owner_id)
session.commit()
def _register_existing_asset(
asset_hash: str,
name: str,

44
main.py
View File

@@ -8,6 +8,7 @@ import time
from comfy.cli_args import args, enables_dynamic_vram
from app.logger import setup_logger
from app.assets.seeder import asset_seeder
from app.assets.services import ingest_existing_file
import itertools
import utils.extra_config
from utils.mime_types import init_mime_types
@@ -229,6 +230,44 @@ def cuda_malloc_warning():
logging.warning("\nWARNING: this card most likely does not support cuda-malloc, if you get \"CUDA error\" please run ComfyUI with: --disable-cuda-malloc\n")
def _register_execution_outputs(history_result: dict, prompt_id: str) -> int:
"""Register output files from a completed execution as assets."""
outputs = history_result.get("outputs", {})
if not outputs:
return 0
registered = 0
for node_id, node_output in outputs.items():
for key, items in node_output.items():
if not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
if item.get("type") != "output":
continue
filename = item.get("filename")
subfolder = item.get("subfolder", "")
if not filename:
continue
base_dir = folder_paths.get_directory_by_type("output")
abs_path = os.path.join(base_dir, subfolder, filename)
if not os.path.isfile(abs_path):
continue
try:
ingest_existing_file(
abs_path,
user_metadata={"prompt_id": prompt_id},
)
registered += 1
except Exception:
logging.exception("Failed to register output: %s", abs_path)
return registered
def prompt_worker(q, server_instance):
current_time: float = 0.0
cache_type = execution.CacheType.CLASSIC
@@ -263,6 +302,7 @@ def prompt_worker(q, server_instance):
asset_seeder.pause()
e.execute(item[2], prompt_id, extra_data, item[4])
need_gc = True
remove_sensitive = lambda prompt: prompt[:5] + prompt[6:]
@@ -306,6 +346,10 @@ def prompt_worker(q, server_instance):
last_gc_collect = current_time
need_gc = False
hook_breaker_ac10a0.restore_functions()
if not asset_seeder.is_disabled():
if _register_execution_outputs(e.history_result, prompt_id) > 0:
asset_seeder.enqueue_enrich(roots=("output",), compute_hashes=True)
asset_seeder.resume()