mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-04-28 02:11:31 +00:00
fix: address code review feedback
- Fix missing import for compute_filename_for_reference in ingest.py - Apply code review fixes across routes, queries, scanner, seeder, hashing, ingest, path_utils, main, and server - Update and add tests for sync references and seeder Amp-Thread-ID: https://ampcode.com/threads/T-019cb61a-ed54-738c-a05f-9b5242e513f3 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -12,7 +12,7 @@ from app.assets.scanner import (
|
||||
ENRICHMENT_METADATA,
|
||||
ENRICHMENT_STUB,
|
||||
RootType,
|
||||
build_stub_specs,
|
||||
build_asset_specs,
|
||||
collect_paths_for_roots,
|
||||
enrich_assets_batch,
|
||||
get_all_known_prefixes,
|
||||
@@ -68,35 +68,23 @@ class ScanStatus:
|
||||
ProgressCallback = Callable[[Progress], None]
|
||||
|
||||
|
||||
class AssetSeeder:
|
||||
"""Singleton class managing background asset scanning.
|
||||
class _AssetSeeder:
|
||||
"""Background asset scanning manager.
|
||||
|
||||
Thread-safe singleton that spawns ephemeral daemon threads for scanning.
|
||||
Spawns ephemeral daemon threads for scanning.
|
||||
Each scan creates a new thread that exits when complete.
|
||||
Use the module-level ``asset_seeder`` instance.
|
||||
"""
|
||||
|
||||
_instance: "AssetSeeder | None" = None
|
||||
_instance_lock = threading.Lock()
|
||||
|
||||
def __new__(cls) -> "AssetSeeder":
|
||||
with cls._instance_lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self) -> None:
|
||||
if self._initialized:
|
||||
return
|
||||
self._initialized = True
|
||||
self._lock = threading.Lock()
|
||||
self._state = State.IDLE
|
||||
self._progress: Progress | None = None
|
||||
self._errors: list[str] = []
|
||||
self._thread: threading.Thread | None = None
|
||||
self._cancel_event = threading.Event()
|
||||
self._pause_event = threading.Event()
|
||||
self._pause_event.set() # Start unpaused (set = running, clear = paused)
|
||||
self._run_gate = threading.Event()
|
||||
self._run_gate.set() # Start unpaused (set = running, clear = paused)
|
||||
self._roots: tuple[RootType, ...] = ()
|
||||
self._phase: ScanPhase = ScanPhase.FULL
|
||||
self._compute_hashes: bool = False
|
||||
@@ -154,10 +142,10 @@ class AssetSeeder:
|
||||
self._compute_hashes = compute_hashes
|
||||
self._progress_callback = progress_callback
|
||||
self._cancel_event.clear()
|
||||
self._pause_event.set() # Ensure unpaused when starting
|
||||
self._run_gate.set() # Ensure unpaused when starting
|
||||
self._thread = threading.Thread(
|
||||
target=self._run_scan,
|
||||
name="AssetSeeder",
|
||||
name="_AssetSeeder",
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
@@ -223,7 +211,7 @@ class AssetSeeder:
|
||||
logging.info("Asset seeder cancelling (was %s)", self._state.value)
|
||||
self._state = State.CANCELLING
|
||||
self._cancel_event.set()
|
||||
self._pause_event.set() # Unblock if paused so thread can exit
|
||||
self._run_gate.set() # Unblock if paused so thread can exit
|
||||
return True
|
||||
|
||||
def stop(self) -> bool:
|
||||
@@ -247,7 +235,7 @@ class AssetSeeder:
|
||||
return False
|
||||
logging.info("Asset seeder pausing")
|
||||
self._state = State.PAUSED
|
||||
self._pause_event.clear()
|
||||
self._run_gate.clear()
|
||||
return True
|
||||
|
||||
def resume(self) -> bool:
|
||||
@@ -263,7 +251,7 @@ class AssetSeeder:
|
||||
return False
|
||||
logging.info("Asset seeder resuming")
|
||||
self._state = State.RUNNING
|
||||
self._pause_event.set()
|
||||
self._run_gate.set()
|
||||
self._emit_event("assets.seed.resumed", {})
|
||||
return True
|
||||
|
||||
@@ -356,10 +344,10 @@ class AssetSeeder:
|
||||
self._thread = None
|
||||
|
||||
def mark_missing_outside_prefixes(self) -> int:
|
||||
"""Mark cache states as missing when outside all known root prefixes.
|
||||
"""Mark references as missing when outside all known root prefixes.
|
||||
|
||||
This is a non-destructive soft-delete operation. Assets and their
|
||||
metadata are preserved, but cache states are flagged as missing.
|
||||
metadata are preserved, but references are flagged as missing.
|
||||
They can be restored if the file reappears in a future scan.
|
||||
|
||||
This operation is decoupled from scanning to prevent partial scans
|
||||
@@ -369,7 +357,7 @@ class AssetSeeder:
|
||||
a full scan of all roots or during maintenance.
|
||||
|
||||
Returns:
|
||||
Number of cache states marked as missing
|
||||
Number of references marked as missing
|
||||
|
||||
Raises:
|
||||
ScanInProgressError: If a scan is currently running
|
||||
@@ -389,7 +377,7 @@ class AssetSeeder:
|
||||
all_prefixes = get_all_known_prefixes()
|
||||
marked = mark_missing_outside_prefixes_safely(all_prefixes)
|
||||
if marked > 0:
|
||||
logging.info("Marked %d cache states as missing", marked)
|
||||
logging.info("Marked %d references as missing", marked)
|
||||
return marked
|
||||
finally:
|
||||
with self._lock:
|
||||
@@ -409,9 +397,9 @@ class AssetSeeder:
|
||||
Returns:
|
||||
True if scan should stop, False to continue
|
||||
"""
|
||||
if not self._pause_event.is_set():
|
||||
if not self._run_gate.is_set():
|
||||
self._emit_event("assets.seed.paused", {})
|
||||
self._pause_event.wait() # Blocks if paused
|
||||
self._run_gate.wait() # Blocks if paused
|
||||
return self._is_cancelled()
|
||||
|
||||
def _emit_event(self, event_type: str, data: dict) -> None:
|
||||
@@ -539,7 +527,11 @@ class AssetSeeder:
|
||||
cancelled = True
|
||||
return
|
||||
|
||||
total_enriched = self._run_enrich_phase(roots)
|
||||
enrich_cancelled, total_enriched = self._run_enrich_phase(roots)
|
||||
|
||||
if enrich_cancelled:
|
||||
cancelled = True
|
||||
return
|
||||
|
||||
self._emit_event(
|
||||
"assets.seed.enrich_complete",
|
||||
@@ -613,7 +605,9 @@ class AssetSeeder:
|
||||
)
|
||||
|
||||
# Use stub specs (no metadata extraction, no hashing)
|
||||
specs, tag_pool, skipped_existing = build_stub_specs(paths, existing_paths)
|
||||
specs, tag_pool, skipped_existing = build_asset_specs(
|
||||
paths, existing_paths, enable_metadata_extraction=False, compute_hashes=False,
|
||||
)
|
||||
self._update_progress(skipped=skipped_existing)
|
||||
|
||||
if self._check_pause_and_cancel():
|
||||
@@ -661,11 +655,11 @@ class AssetSeeder:
|
||||
self._update_progress(scanned=len(specs), created=total_created)
|
||||
return total_created, skipped_existing, total_paths
|
||||
|
||||
def _run_enrich_phase(self, roots: tuple[RootType, ...]) -> int:
|
||||
def _run_enrich_phase(self, roots: tuple[RootType, ...]) -> tuple[bool, int]:
|
||||
"""Run phase 2: enrich existing records with metadata and hashes.
|
||||
|
||||
Returns:
|
||||
Total number of assets enriched
|
||||
Tuple of (cancelled, total_enriched)
|
||||
"""
|
||||
total_enriched = 0
|
||||
batch_size = 100
|
||||
@@ -690,7 +684,7 @@ class AssetSeeder:
|
||||
while True:
|
||||
if self._check_pause_and_cancel():
|
||||
logging.info("Enrich scan cancelled after %d assets", total_enriched)
|
||||
break
|
||||
return True, total_enriched
|
||||
|
||||
# Fetch next batch of unenriched assets
|
||||
unenriched = get_unenriched_assets_for_roots(
|
||||
@@ -737,7 +731,7 @@ class AssetSeeder:
|
||||
)
|
||||
last_progress_time = now
|
||||
|
||||
return total_enriched
|
||||
return False, total_enriched
|
||||
|
||||
|
||||
asset_seeder = AssetSeeder()
|
||||
asset_seeder = _AssetSeeder()
|
||||
|
||||
Reference in New Issue
Block a user