From 2f3d18ae7187c2c7e82300508ed5f0089b9939e0 Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Fri, 6 Mar 2026 14:51:22 -0800 Subject: [PATCH] feat(assets): register output files as assets after prompt execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ingest_existing_file() to services/ingest.py as a public one-call wrapper for registering on-disk files (stat, BLAKE3 hash, MIME detection, path-based tag derivation). After each prompt execution in the main loop, iterate history_result['outputs'] and register files with type 'output' as assets. Runs while the asset seeder is paused, gated behind asset_seeder.is_disabled(). Stores prompt_id in user_metadata for provenance tracking. Amp-Thread-ID: https://ampcode.com/threads/T-019cc013-1444-73c8-81d6-07cae6e5e38d Co-authored-by: Amp refactor(assets): two-phase ingest — stub insert then hash ingest_existing_file() now inserts a stub record (hash=NULL) first for instant UX visibility, then computes the BLAKE3 hash and runs the full ingest pipeline. No compute_hash flag exposed — both phases always run. Amp-Thread-ID: https://ampcode.com/threads/T-019cc013-1444-73c8-81d6-07cae6e5e38d Co-authored-by: Amp refactor(assets): defer hashing to background seeder enrich phase ingest_existing_file() now only inserts a stub record (hash=NULL) for instant UX visibility. After registering outputs, triggers asset_seeder.start_enrich() to compute hashes in the background. This avoids blocking the prompt worker thread on hash computation. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019ccb2f-8bfd-74b3-9005-3a4ace919554 --- app/assets/seeder.py | 36 +++++++++++++++++++++++++++ app/assets/services/__init__.py | 2 ++ app/assets/services/ingest.py | 35 ++++++++++++++++++++++++++ main.py | 44 +++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+) diff --git a/app/assets/seeder.py b/app/assets/seeder.py index 029448464..7471e1adc 100644 --- a/app/assets/seeder.py +++ b/app/assets/seeder.py @@ -92,6 +92,7 @@ class _AssetSeeder: self._prune_first: bool = False self._progress_callback: ProgressCallback | None = None self._disabled: bool = False + self._pending_enrich: dict | None = None def disable(self) -> None: """Disable the asset seeder, preventing any scans from starting.""" @@ -196,6 +197,34 @@ class _AssetSeeder: compute_hashes=compute_hashes, ) + def enqueue_enrich( + self, + roots: tuple[RootType, ...] = ("models", "input", "output"), + compute_hashes: bool = False, + ) -> bool: + """Start an enrichment scan now, or queue it for after the current scan. + + If the seeder is idle, starts immediately. Otherwise, the enrich + request is stored and will run automatically when the current scan + finishes. + + Args: + roots: Tuple of root types to scan + compute_hashes: If True, compute blake3 hashes + + Returns: + True if started immediately, False if queued for later + """ + if self.start_enrich(roots=roots, compute_hashes=compute_hashes): + return True + with self._lock: + self._pending_enrich = { + "roots": roots, + "compute_hashes": compute_hashes, + } + logging.info("Enrich scan queued (roots=%s)", roots) + return False + def cancel(self) -> bool: """Request cancellation of the current scan. @@ -597,6 +626,13 @@ class _AssetSeeder: self._last_progress = self._progress self._state = State.IDLE self._progress = None + pending = self._pending_enrich + self._pending_enrich = None + if pending is not None: + self.start_enrich( + roots=pending["roots"], + compute_hashes=pending["compute_hashes"], + ) def _run_fast_phase(self, roots: tuple[RootType, ...]) -> tuple[int, int, int]: """Run phase 1: fast scan to create stub records. diff --git a/app/assets/services/__init__.py b/app/assets/services/__init__.py index 11fcb4122..77fc8d182 100644 --- a/app/assets/services/__init__.py +++ b/app/assets/services/__init__.py @@ -23,6 +23,7 @@ from app.assets.services.ingest import ( DependencyMissingError, HashMismatchError, create_from_hash, + ingest_existing_file, upload_from_temp_path, ) from app.assets.database.queries import ( @@ -72,6 +73,7 @@ __all__ = [ "delete_asset_reference", "get_asset_by_hash", "get_asset_detail", + "ingest_existing_file", "get_mtime_ns", "get_size_and_mtime_ns", "list_assets_page", diff --git a/app/assets/services/ingest.py b/app/assets/services/ingest.py index 44d7aef36..0e2fdd2b1 100644 --- a/app/assets/services/ingest.py +++ b/app/assets/services/ingest.py @@ -23,9 +23,11 @@ from app.assets.database.queries import ( validate_tags_exist, ) from app.assets.helpers import normalize_tags +from app.assets.services.bulk_ingest import batch_insert_seed_assets from app.assets.services.file_utils import get_size_and_mtime_ns from app.assets.services.path_utils import ( compute_relative_filename, + get_name_and_tags_from_asset_path, resolve_destination_from_tags, validate_path_within_base, ) @@ -128,6 +130,39 @@ def _ingest_file_from_path( ) +def ingest_existing_file( + abs_path: str, + user_metadata: UserMetadata = None, + extra_tags: Sequence[str] = (), + owner_id: str = "", +) -> None: + """Register an existing on-disk file as an asset stub. + + Inserts a stub record (hash=NULL) for immediate UX visibility. + The caller is responsible for triggering background enrichment + (hash computation, metadata extraction) via the asset seeder. + """ + size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path) + mime_type = mimetypes.guess_type(abs_path, strict=False)[0] + name, path_tags = get_name_and_tags_from_asset_path(abs_path) + tags = list(dict.fromkeys(path_tags + list(extra_tags))) + + spec = { + "abs_path": abs_path, + "size_bytes": size_bytes, + "mtime_ns": mtime_ns, + "info_name": name, + "tags": tags, + "fname": os.path.basename(abs_path), + "metadata": None, + "hash": None, + "mime_type": mime_type, + } + with create_session() as session: + batch_insert_seed_assets(session, [spec], owner_id=owner_id) + session.commit() + + def _register_existing_asset( asset_hash: str, name: str, diff --git a/main.py b/main.py index a8fc1a28d..8d682b1ba 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ import time from comfy.cli_args import args, enables_dynamic_vram from app.logger import setup_logger from app.assets.seeder import asset_seeder +from app.assets.services import ingest_existing_file import itertools import utils.extra_config from utils.mime_types import init_mime_types @@ -229,6 +230,44 @@ def cuda_malloc_warning(): logging.warning("\nWARNING: this card most likely does not support cuda-malloc, if you get \"CUDA error\" please run ComfyUI with: --disable-cuda-malloc\n") +def _register_execution_outputs(history_result: dict, prompt_id: str) -> int: + """Register output files from a completed execution as assets.""" + outputs = history_result.get("outputs", {}) + if not outputs: + return 0 + + registered = 0 + for node_id, node_output in outputs.items(): + for key, items in node_output.items(): + if not isinstance(items, list): + continue + for item in items: + if not isinstance(item, dict): + continue + if item.get("type") != "output": + continue + filename = item.get("filename") + subfolder = item.get("subfolder", "") + if not filename: + continue + + base_dir = folder_paths.get_directory_by_type("output") + abs_path = os.path.join(base_dir, subfolder, filename) + if not os.path.isfile(abs_path): + continue + + try: + ingest_existing_file( + abs_path, + user_metadata={"prompt_id": prompt_id}, + ) + registered += 1 + except Exception: + logging.exception("Failed to register output: %s", abs_path) + + return registered + + def prompt_worker(q, server_instance): current_time: float = 0.0 cache_type = execution.CacheType.CLASSIC @@ -263,6 +302,7 @@ def prompt_worker(q, server_instance): asset_seeder.pause() e.execute(item[2], prompt_id, extra_data, item[4]) + need_gc = True remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] @@ -306,6 +346,10 @@ def prompt_worker(q, server_instance): last_gc_collect = current_time need_gc = False hook_breaker_ac10a0.restore_functions() + + if not asset_seeder.is_disabled(): + if _register_execution_outputs(e.history_result, prompt_id) > 0: + asset_seeder.enqueue_enrich(roots=("output",), compute_hashes=True) asset_seeder.resume()