refactor: flatten nested try blocks and if statements in assets package

Extract helper functions to eliminate nested try-except blocks in scanner.py
and remove duplicated type-checking logic in asset_info.py. Simplify nested
conditionals in asset_management.py for clearer control flow.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-03 14:28:16 -08:00
parent 7eccb65d9c
commit fc9b6a593d
3 changed files with 71 additions and 71 deletions

View File

@@ -79,24 +79,22 @@ def update_asset_metadata(
# Compute filename from best live path
computed_filename = _compute_filename_for_asset(session, info.asset_id)
# Determine if metadata needs updating
new_meta: dict | None = None
if user_metadata is not None:
new_meta = dict(user_metadata)
elif computed_filename:
current_meta = info.user_metadata or {}
if current_meta.get("filename") != computed_filename:
new_meta = dict(current_meta)
if new_meta is not None:
if computed_filename:
new_meta["filename"] = computed_filename
set_asset_info_metadata(
session, asset_info_id=asset_info_id, user_metadata=new_meta
)
touched = True
else:
if computed_filename:
current_meta = info.user_metadata or {}
if current_meta.get("filename") != computed_filename:
new_meta = dict(current_meta)
new_meta["filename"] = computed_filename
set_asset_info_metadata(
session, asset_info_id=asset_info_id, user_metadata=new_meta
)
touched = True
if tags is not None:
set_asset_info_tags(

View File

@@ -367,6 +367,41 @@ def sync_cache_states_with_filesystem(
return survivors if collect_existing_paths else None
def _sync_root_safely(root: RootType) -> set[str]:
"""Sync a single root's cache states with the filesystem.
Returns survivors (existing paths) or empty set on failure.
"""
try:
with create_session() as sess:
survivors = sync_cache_states_with_filesystem(
sess,
root,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
return survivors or set()
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", root, e)
return set()
def _prune_orphans_safely(prefixes: list[str]) -> int:
"""Prune orphaned assets outside the given prefixes.
Returns count pruned or 0 on failure.
"""
try:
with create_session() as sess:
count = prune_orphaned_assets(sess, prefixes)
sess.commit()
return count
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
return 0
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""Scan the given roots and seed the assets into the database."""
if not dependencies_available():
@@ -383,29 +418,12 @@ def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> No
try:
existing_paths: set[str] = set()
for r in roots:
try:
with create_session() as sess:
survivors = sync_cache_states_with_filesystem(
sess,
r,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
if survivors:
existing_paths.update(survivors)
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", r, e)
existing_paths.update(_sync_root_safely(r))
try:
with create_session() as sess:
all_prefixes = [
os.path.abspath(p) for r in roots for p in get_prefixes_for_root(r)
]
orphans_pruned = prune_orphaned_assets(sess, all_prefixes)
sess.commit()
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
all_prefixes = [
os.path.abspath(p) for r in roots for p in get_prefixes_for_root(r)
]
orphans_pruned = _prune_orphans_safely(all_prefixes)
if "models" in roots:
paths.extend(collect_models_files())