feat: non-destructive asset pruning with is_missing flag

- Add is_missing column to AssetCacheState for soft-delete
- Replace hard-delete pruning with mark_cache_states_missing_outside_prefixes
- Auto-restore missing cache states when files are re-scanned
- Filter out missing cache states from queries by default
- Rename functions for clarity:
  - mark_cache_states_missing_outside_prefixes (was delete_cache_states_outside_prefixes)
  - get_unreferenced_unhashed_asset_ids (was get_orphaned_seed_asset_ids)
  - mark_assets_missing_outside_prefixes (was prune_orphaned_assets)
  - mark_missing_outside_prefixes_safely (was prune_orphans_safely)
- Add restore_cache_states_by_paths for explicit restoration
- Add cleanup_unreferenced_assets for explicit hard-delete when needed
- Update API endpoint /api/assets/prune to use new soft-delete behavior

This preserves user metadata (tags, etc.) when base directories change,
allowing assets to be restored when the original paths become available again.

Amp-Thread-ID: https://ampcode.com/threads/T-019c3114-bf28-73a9-a4d2-85b208fd5462
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-05 21:21:46 -08:00
parent 6443cf016e
commit f13b7aeba2
11 changed files with 282 additions and 103 deletions

View File

@@ -698,20 +698,21 @@ async def cancel_seed(request: web.Request) -> web.Response:
@ROUTES.post("/api/assets/prune")
async def prune_orphans(request: web.Request) -> web.Response:
"""Prune orphaned assets that no longer exist on the filesystem.
async def mark_missing_assets(request: web.Request) -> web.Response:
"""Mark assets as missing when their cache states point to files outside all known root prefixes.
This removes assets whose cache states point to files outside all known
root prefixes (models, input, output).
This is a non-destructive soft-delete operation. Assets and their metadata
are preserved, but cache states are flagged as missing. They can be restored
if the file reappears in a future scan.
Returns:
200 OK with count of pruned assets
200 OK with count of marked assets
409 Conflict if a scan is currently running
"""
pruned = asset_seeder.prune_orphans()
if pruned == 0 and asset_seeder.get_status().state.value != "IDLE":
marked = asset_seeder.mark_missing_outside_prefixes()
if marked == 0 and asset_seeder.get_status().state.value != "IDLE":
return web.json_response(
{"status": "scan_running", "pruned": 0},
{"status": "scan_running", "marked": 0},
status=409,
)
return web.json_response({"status": "completed", "pruned": pruned}, status=200)
return web.json_response({"status": "completed", "marked": marked}, status=200)

View File

@@ -83,12 +83,14 @@ class AssetCacheState(Base):
file_path: Mapped[str] = mapped_column(Text, nullable=False)
mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_missing: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
asset: Mapped[Asset] = relationship(back_populates="cache_states")
__table_args__ = (
Index("ix_asset_cache_state_file_path", "file_path"),
Index("ix_asset_cache_state_asset_id", "asset_id"),
Index("ix_asset_cache_state_is_missing", "is_missing"),
CheckConstraint(
"(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"
),

View File

@@ -28,12 +28,13 @@ from app.assets.database.queries.cache_state import (
bulk_set_needs_verify,
delete_assets_by_ids,
delete_cache_states_by_ids,
delete_cache_states_outside_prefixes,
delete_orphaned_seed_asset,
get_cache_states_by_paths_and_asset_ids,
get_cache_states_for_prefixes,
get_orphaned_seed_asset_ids,
get_unreferenced_unhashed_asset_ids,
list_cache_states_by_asset_id,
mark_cache_states_missing_outside_prefixes,
restore_cache_states_by_paths,
upsert_cache_state,
)
from app.assets.database.queries.tags import (
@@ -68,7 +69,6 @@ __all__ = [
"delete_asset_info_by_id",
"delete_assets_by_ids",
"delete_cache_states_by_ids",
"delete_cache_states_outside_prefixes",
"delete_orphaned_seed_asset",
"ensure_tags_exist",
"fetch_asset_info_and_asset",
@@ -80,13 +80,15 @@ __all__ = [
"get_cache_states_by_paths_and_asset_ids",
"get_cache_states_for_prefixes",
"get_or_create_asset_info",
"get_orphaned_seed_asset_ids",
"get_unreferenced_unhashed_asset_ids",
"insert_asset_info",
"list_asset_infos_page",
"list_cache_states_by_asset_id",
"list_tags_with_usage",
"mark_cache_states_missing_outside_prefixes",
"remove_missing_tag_for_asset_id",
"remove_tags_from_asset_info",
"restore_cache_states_by_paths",
"set_asset_info_metadata",
"set_asset_info_preview",
"set_asset_info_tags",

View File

@@ -49,11 +49,15 @@ def upsert_cache_state(
file_path: str,
mtime_ns: int,
) -> tuple[bool, bool]:
"""Upsert a cache state by file_path. Returns (created, updated)."""
"""Upsert a cache state by file_path. Returns (created, updated).
Also restores cache states that were previously marked as missing.
"""
vals = {
"asset_id": asset_id,
"file_path": file_path,
"mtime_ns": int(mtime_ns),
"is_missing": False,
}
ins = (
sqlite.insert(AssetCacheState)
@@ -74,26 +78,30 @@ def upsert_cache_state(
AssetCacheState.asset_id != asset_id,
AssetCacheState.mtime_ns.is_(None),
AssetCacheState.mtime_ns != int(mtime_ns),
AssetCacheState.is_missing == True, # noqa: E712
)
)
.values(asset_id=asset_id, mtime_ns=int(mtime_ns))
.values(asset_id=asset_id, mtime_ns=int(mtime_ns), is_missing=False)
)
res2 = session.execute(upd)
updated = int(res2.rowcount or 0) > 0
return False, updated
def delete_cache_states_outside_prefixes(
def mark_cache_states_missing_outside_prefixes(
session: Session, valid_prefixes: list[str]
) -> int:
"""Delete cache states with file_path not matching any of the valid prefixes.
"""Mark cache states as missing when file_path doesn't match any valid prefix.
This is a non-destructive soft-delete that preserves user metadata.
Cache states can be restored if the file reappears in a future scan.
Args:
session: Database session
valid_prefixes: List of absolute directory prefixes that are valid
Returns:
Number of cache states deleted
Number of cache states marked as missing
"""
if not valid_prefixes:
return 0
@@ -104,22 +112,59 @@ def delete_cache_states_outside_prefixes(
return AssetCacheState.file_path.like(escaped + "%", escape=esc)
matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in valid_prefixes])
result = session.execute(sa.delete(AssetCacheState).where(~matches_valid_prefix))
result = session.execute(
sa.update(AssetCacheState)
.where(~matches_valid_prefix)
.where(AssetCacheState.is_missing == False) # noqa: E712
.values(is_missing=True)
)
return result.rowcount
def get_orphaned_seed_asset_ids(session: Session) -> list[str]:
"""Get IDs of seed assets (hash is None) with no remaining cache states.
def restore_cache_states_by_paths(session: Session, file_paths: list[str]) -> int:
"""Restore cache states that were previously marked as missing.
Called when a file path is re-scanned and found to exist.
Args:
session: Database session
file_paths: List of file paths that exist and should be restored
Returns:
List of asset IDs that are orphaned
Number of cache states restored
"""
orphan_subq = (
sa.select(Asset.id)
.outerjoin(AssetCacheState, AssetCacheState.asset_id == Asset.id)
.where(Asset.hash.is_(None), AssetCacheState.id.is_(None))
if not file_paths:
return 0
result = session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.file_path.in_(file_paths))
.where(AssetCacheState.is_missing == True) # noqa: E712
.values(is_missing=False)
)
return [row[0] for row in session.execute(orphan_subq).all()]
return result.rowcount
def get_unreferenced_unhashed_asset_ids(session: Session) -> list[str]:
"""Get IDs of unhashed assets (hash=None) with no active cache states.
An asset is considered unreferenced if it has no cache states,
or all its cache states are marked as missing.
Returns:
List of asset IDs that are unreferenced
"""
active_cache_state_exists = (
sa.select(sa.literal(1))
.where(AssetCacheState.asset_id == Asset.id)
.where(AssetCacheState.is_missing == False) # noqa: E712
.correlate(Asset)
.exists()
)
unreferenced_subq = sa.select(Asset.id).where(
Asset.hash.is_(None), ~active_cache_state_exists
)
return [row[0] for row in session.execute(unreferenced_subq).all()]
def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
@@ -142,12 +187,15 @@ def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
def get_cache_states_for_prefixes(
session: Session,
prefixes: list[str],
*,
include_missing: bool = False,
) -> list[CacheStateRow]:
"""Get all cache states with paths matching any of the given prefixes.
Args:
session: Database session
prefixes: List of absolute directory prefixes to match
include_missing: If False (default), exclude cache states marked as missing
Returns:
List of cache state rows with joined asset data, ordered by asset_id, state_id
@@ -163,7 +211,7 @@ def get_cache_states_for_prefixes(
escaped, esc = escape_sql_like_string(base)
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
rows = session.execute(
query = (
sa.select(
AssetCacheState.id,
AssetCacheState.file_path,
@@ -175,7 +223,13 @@ def get_cache_states_for_prefixes(
)
.join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds))
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
)
if not include_missing:
query = query.where(AssetCacheState.is_missing == False) # noqa: E712
rows = session.execute(
query.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
).all()
return [
@@ -240,13 +294,15 @@ def bulk_insert_cache_states_ignore_conflicts(
"""Bulk insert cache state rows with ON CONFLICT DO NOTHING on file_path.
Each dict should have: asset_id, file_path, mtime_ns
The is_missing field is automatically set to False for new inserts.
"""
if not rows:
return
enriched_rows = [{**row, "is_missing": False} for row in rows]
ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing(
index_elements=[AssetCacheState.file_path]
)
for chunk in iter_chunks(rows, calculate_rows_per_statement(3)):
for chunk in iter_chunks(enriched_rows, calculate_rows_per_statement(4)):
session.execute(ins, chunk)

View File

@@ -17,7 +17,7 @@ from app.assets.database.queries import (
from app.assets.services.bulk_ingest import (
SeedAssetSpec,
batch_insert_seed_assets,
prune_orphaned_assets,
mark_assets_missing_outside_prefixes,
)
from app.assets.services.file_utils import (
get_mtime_ns,
@@ -221,18 +221,18 @@ def sync_root_safely(root: RootType) -> set[str]:
return set()
def prune_orphans_safely(prefixes: list[str]) -> int:
"""Prune orphaned assets outside the given prefixes.
def mark_missing_outside_prefixes_safely(prefixes: list[str]) -> int:
"""Mark cache states as missing when outside the given prefixes.
Returns count pruned or 0 on failure.
This is a non-destructive soft-delete. Returns count marked or 0 on failure.
"""
try:
with create_session() as sess:
count = prune_orphaned_assets(sess, prefixes)
count = mark_assets_missing_outside_prefixes(sess, prefixes)
sess.commit()
return count
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
logging.exception("marking missing assets failed: %s", e)
return 0
@@ -319,7 +319,7 @@ def insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int:
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""Scan the given roots and seed the assets into the database.
Note: This function does not prune orphaned assets. Call prune_orphaned_assets
Note: This function does not mark missing assets. Call mark_missing_outside_prefixes_safely
separately if cleanup is needed.
"""
if not dependencies_available():

View File

@@ -15,7 +15,7 @@ from app.assets.scanner import (
get_all_known_prefixes,
get_prefixes_for_root,
insert_asset_specs,
prune_orphans_safely,
mark_missing_outside_prefixes_safely,
sync_root_safely,
)
from app.database.db import dependencies_available
@@ -174,35 +174,41 @@ class AssetSeeder:
with self._lock:
self._thread = None
def prune_orphans(self) -> int:
"""Prune orphaned assets that are outside all known root prefixes.
def mark_missing_outside_prefixes(self) -> int:
"""Mark cache states as missing when outside all known root prefixes.
This is a non-destructive soft-delete operation. Assets and their
metadata are preserved, but cache states are flagged as missing.
They can be restored if the file reappears in a future scan.
This operation is decoupled from scanning to prevent partial scans
from accidentally deleting assets belonging to other roots.
from accidentally marking assets belonging to other roots.
Should be called explicitly when cleanup is desired, typically after
a full scan of all roots or during maintenance.
Returns:
Number of orphaned assets pruned, or 0 if dependencies unavailable
or a scan is currently running
Number of cache states marked as missing, or 0 if dependencies
unavailable or a scan is currently running
"""
with self._lock:
if self._state != State.IDLE:
logging.warning("Cannot prune orphans while scan is running")
logging.warning(
"Cannot mark missing assets while scan is running"
)
return 0
if not dependencies_available():
logging.warning(
"Database dependencies not available, skipping orphan pruning"
"Database dependencies not available, skipping mark missing"
)
return 0
all_prefixes = get_all_known_prefixes()
pruned = prune_orphans_safely(all_prefixes)
if pruned > 0:
logging.info("Pruned %d orphaned assets", pruned)
return pruned
marked = mark_missing_outside_prefixes_safely(all_prefixes)
if marked > 0:
logging.info("Marked %d cache states as missing", marked)
return marked
def _is_cancelled(self) -> bool:
"""Check if cancellation has been requested."""
@@ -290,9 +296,9 @@ class AssetSeeder:
if self._prune_first:
all_prefixes = get_all_known_prefixes()
pruned = prune_orphans_safely(all_prefixes)
if pruned > 0:
logging.info("Pruned %d orphaned assets before scan", pruned)
marked = mark_missing_outside_prefixes_safely(all_prefixes)
if marked > 0:
logging.info("Marked %d cache states as missing before scan", marked)
if self._is_cancelled():
logging.info("Asset scan cancelled after pruning phase")

View File

@@ -11,7 +11,8 @@ from app.assets.services.asset_management import (
from app.assets.services.bulk_ingest import (
BulkInsertResult,
batch_insert_seed_assets,
prune_orphaned_assets,
cleanup_unreferenced_assets,
mark_assets_missing_outside_prefixes,
)
from app.assets.services.file_utils import (
get_mtime_ns,
@@ -77,7 +78,8 @@ __all__ = [
"list_assets_page",
"list_files_recursively",
"list_tags",
"prune_orphaned_assets",
"cleanup_unreferenced_assets",
"mark_assets_missing_outside_prefixes",
"remove_tags",
"resolve_asset_for_download",
"set_asset_preview",

View File

@@ -28,10 +28,10 @@ from app.assets.database.queries import (
bulk_insert_cache_states_ignore_conflicts,
bulk_insert_tags_and_meta,
delete_assets_by_ids,
delete_cache_states_outside_prefixes,
get_asset_info_ids_by_ids,
get_cache_states_by_paths_and_asset_ids,
get_orphaned_seed_asset_ids,
get_unreferenced_unhashed_asset_ids,
mark_cache_states_missing_outside_prefixes,
)
from app.assets.helpers import get_utc_now
@@ -210,16 +210,37 @@ def batch_insert_seed_assets(
)
def prune_orphaned_assets(session: Session, valid_prefixes: list[str]) -> int:
"""Prune cache states outside valid prefixes, then delete orphaned seed assets.
def mark_assets_missing_outside_prefixes(
session: Session, valid_prefixes: list[str]
) -> int:
"""Mark cache states as missing when outside valid prefixes.
This is a non-destructive operation that soft-deletes cache states
by setting is_missing=True. User metadata is preserved and assets
can be restored if the file reappears in a future scan.
Note: This does NOT delete
unreferenced unhashed assets. Those are preserved so user metadata
remains intact even when base directories change.
Args:
session: Database session
valid_prefixes: List of absolute directory prefixes that are valid
Returns:
Number of orphaned assets deleted
Number of cache states marked as missing
"""
delete_cache_states_outside_prefixes(session, valid_prefixes)
orphan_ids = get_orphaned_seed_asset_ids(session)
return delete_assets_by_ids(session, orphan_ids)
return mark_cache_states_missing_outside_prefixes(session, valid_prefixes)
def cleanup_unreferenced_assets(session: Session) -> int:
"""Hard-delete unhashed assets with no active cache states.
This is a destructive operation intended for explicit cleanup.
Only deletes assets where hash=None and all cache states are missing.
Returns:
Number of assets deleted
"""
unreferenced_ids = get_unreferenced_unhashed_asset_ids(session)
return delete_assets_by_ids(session, unreferenced_ids)