refactor(assets): extract scanner logic into service modules

- Create file_utils.py with shared file utilities:
  - get_mtime_ns() - extract mtime in nanoseconds from stat
  - get_size_and_mtime_ns() - get both size and mtime
  - verify_file_unchanged() - check file matches DB mtime/size
  - list_files_recursively() - recursive directory listing

- Create bulk_ingest.py for bulk operations:
  - BulkInsertResult dataclass
  - batch_insert_seed_assets() - batch insert with conflict handling
  - prune_orphaned_assets() - clean up orphaned assets

- Update scanner.py to use new service modules instead of
  calling database queries directly

- Update ingest.py to use shared get_size_and_mtime_ns()

- Export new functions from services/__init__.py

Amp-Thread-ID: https://ampcode.com/threads/T-019c2ae7-f701-716a-a0dd-1feb988732fb
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-04 15:17:31 -08:00
parent 8c4eb9a659
commit bc92ae4a0d
6 changed files with 331 additions and 278 deletions

View File

@@ -2,31 +2,27 @@ import contextlib
import logging
import os
import time
import uuid
from typing import Literal
from sqlalchemy.orm import Session
import folder_paths
from app.assets.database.queries import (
add_missing_tag_for_asset_id,
bulk_insert_asset_infos_ignore_conflicts,
bulk_insert_assets,
bulk_insert_cache_states_ignore_conflicts,
bulk_insert_tags_and_meta,
bulk_set_needs_verify,
delete_assets_by_ids,
delete_cache_states_by_ids,
delete_cache_states_outside_prefixes,
delete_orphaned_seed_asset,
ensure_tags_exist,
get_asset_info_ids_by_ids,
get_cache_states_by_paths_and_asset_ids,
get_cache_states_for_prefixes,
get_orphaned_seed_asset_ids,
remove_missing_tag_for_asset_id,
)
from app.assets.helpers import get_utc_now
from app.assets.services.bulk_ingest import (
batch_insert_seed_assets,
prune_orphaned_assets,
)
from app.assets.services.file_utils import (
get_mtime_ns,
list_files_recursively,
verify_file_unchanged,
)
from app.assets.services.path_utils import (
compute_relative_filename,
get_comfy_models_folders,
@@ -37,37 +33,6 @@ from app.database.db import create_session, dependencies_available
RootType = Literal["models", "input", "output"]
def verify_asset_file_unchanged(
mtime_db: int | None,
size_db: int | None,
stat_result: os.stat_result,
) -> bool:
if mtime_db is None:
return False
actual_mtime_ns = getattr(
stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1_000_000_000)
)
if int(mtime_db) != int(actual_mtime_ns):
return False
sz = int(size_db or 0)
if sz > 0:
return int(stat_result.st_size) == sz
return True
def list_files_recursively(base_dir: str) -> list[str]:
out: list[str] = []
base_abs = os.path.abspath(base_dir)
if not os.path.isdir(base_abs):
return out
for dirpath, _subdirs, filenames in os.walk(
base_abs, topdown=True, followlinks=False
):
for name in filenames:
out.append(os.path.abspath(os.path.join(dirpath, name)))
return out
def get_prefixes_for_root(root: RootType) -> list[str]:
if root == "models":
bases: list[str] = []
@@ -102,170 +67,6 @@ def collect_models_files() -> list[str]:
return out
def _batch_insert_assets_from_paths(
session: Session,
specs: list[dict],
owner_id: str = "",
) -> dict:
"""Seed assets from filesystem specs in batch.
Each spec is a dict with keys:
- abs_path: str
- size_bytes: int
- mtime_ns: int
- info_name: str
- tags: list[str]
- fname: Optional[str]
This function orchestrates:
1. Insert seed Assets (hash=NULL)
2. Claim cache states with ON CONFLICT DO NOTHING
3. Query to find winners (paths where our asset_id was inserted)
4. Delete Assets for losers (path already claimed by another asset)
5. Insert AssetInfo for winners
6. Insert tags and metadata for successfully inserted AssetInfos
Returns:
dict with keys: inserted_infos, won_states, lost_states
"""
if not specs:
return {"inserted_infos": 0, "won_states": 0, "lost_states": 0}
now = get_utc_now()
asset_rows: list[dict] = []
state_rows: list[dict] = []
path_to_asset: dict[str, str] = {}
asset_to_info: dict[str, dict] = {}
path_list: list[str] = []
for sp in specs:
ap = os.path.abspath(sp["abs_path"])
aid = str(uuid.uuid4())
iid = str(uuid.uuid4())
path_list.append(ap)
path_to_asset[ap] = aid
asset_rows.append(
{
"id": aid,
"hash": None,
"size_bytes": sp["size_bytes"],
"mime_type": None,
"created_at": now,
}
)
state_rows.append(
{
"asset_id": aid,
"file_path": ap,
"mtime_ns": sp["mtime_ns"],
}
)
asset_to_info[aid] = {
"id": iid,
"owner_id": owner_id,
"name": sp["info_name"],
"asset_id": aid,
"preview_id": None,
"user_metadata": {"filename": sp["fname"]} if sp["fname"] else None,
"created_at": now,
"updated_at": now,
"last_access_time": now,
"_tags": sp["tags"],
"_filename": sp["fname"],
}
bulk_insert_assets(session, asset_rows)
bulk_insert_cache_states_ignore_conflicts(session, state_rows)
winners_by_path = get_cache_states_by_paths_and_asset_ids(session, path_to_asset)
all_paths_set = set(path_list)
losers_by_path = all_paths_set - winners_by_path
lost_assets = [path_to_asset[p] for p in losers_by_path]
if lost_assets:
delete_assets_by_ids(session, lost_assets)
if not winners_by_path:
return {
"inserted_infos": 0,
"won_states": 0,
"lost_states": len(losers_by_path),
}
winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path]
db_info_rows = [
{
"id": row["id"],
"owner_id": row["owner_id"],
"name": row["name"],
"asset_id": row["asset_id"],
"preview_id": row["preview_id"],
"user_metadata": row["user_metadata"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"last_access_time": row["last_access_time"],
}
for row in winner_info_rows
]
bulk_insert_asset_infos_ignore_conflicts(session, db_info_rows)
all_info_ids = [row["id"] for row in winner_info_rows]
inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids)
tag_rows: list[dict] = []
meta_rows: list[dict] = []
if inserted_info_ids:
for row in winner_info_rows:
iid = row["id"]
if iid not in inserted_info_ids:
continue
for t in row["_tags"]:
tag_rows.append(
{
"asset_info_id": iid,
"tag_name": t,
"origin": "automatic",
"added_at": now,
}
)
if row["_filename"]:
meta_rows.append(
{
"asset_info_id": iid,
"key": "filename",
"ordinal": 0,
"val_str": row["_filename"],
"val_num": None,
"val_bool": None,
"val_json": None,
}
)
bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows)
return {
"inserted_infos": len(inserted_info_ids),
"won_states": len(winners_by_path),
"lost_states": len(losers_by_path),
}
def prune_orphaned_assets(session, valid_prefixes: list[str]) -> int:
"""Prune cache states outside valid prefixes, then delete orphaned seed assets.
Args:
session: Database session
valid_prefixes: List of absolute directory prefixes that are valid
Returns:
Number of orphaned assets deleted
"""
delete_cache_states_outside_prefixes(session, valid_prefixes)
orphan_ids = get_orphaned_seed_asset_ids(session)
return delete_assets_by_ids(session, orphan_ids)
def sync_cache_states_with_filesystem(
session,
root: RootType,
@@ -305,7 +106,7 @@ def sync_cache_states_with_filesystem(
fast_ok = False
try:
exists = True
fast_ok = verify_asset_file_unchanged(
fast_ok = verify_file_unchanged(
mtime_db=row.mtime_ns,
size_db=acc["size_db"],
stat_result=os.stat(row.file_path, follow_symlinks=True),
@@ -447,9 +248,7 @@ def _build_asset_specs(
{
"abs_path": abs_p,
"size_bytes": stat_p.st_size,
"mtime_ns": getattr(
stat_p, "st_mtime_ns", int(stat_p.st_mtime * 1_000_000_000)
),
"mtime_ns": get_mtime_ns(stat_p),
"info_name": name,
"tags": tags,
"fname": compute_relative_filename(abs_p),
@@ -467,9 +266,9 @@ def _insert_asset_specs(specs: list[dict], tag_pool: set[str]) -> int:
with create_session() as sess:
if tag_pool:
ensure_tags_exist(sess, tag_pool, tag_type="user")
result = _batch_insert_assets_from_paths(sess, specs=specs, owner_id="")
result = batch_insert_seed_assets(sess, specs=specs, owner_id="")
sess.commit()
return result["inserted_infos"]
return result.inserted_infos
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None: