mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-02-18 22:20:03 +00:00
feat: non-destructive asset pruning with is_missing flag
- Add is_missing column to AssetCacheState for soft-delete - Replace hard-delete pruning with mark_cache_states_missing_outside_prefixes - Auto-restore missing cache states when files are re-scanned - Filter out missing cache states from queries by default - Rename functions for clarity: - mark_cache_states_missing_outside_prefixes (was delete_cache_states_outside_prefixes) - get_unreferenced_unhashed_asset_ids (was get_orphaned_seed_asset_ids) - mark_assets_missing_outside_prefixes (was prune_orphaned_assets) - mark_missing_outside_prefixes_safely (was prune_orphans_safely) - Add restore_cache_states_by_paths for explicit restoration - Add cleanup_unreferenced_assets for explicit hard-delete when needed - Update API endpoint /api/assets/prune to use new soft-delete behavior This preserves user metadata (tags, etc.) when base directories change, allowing assets to be restored when the original paths become available again. Amp-Thread-ID: https://ampcode.com/threads/T-019c3114-bf28-73a9-a4d2-85b208fd5462 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
37
alembic_db/versions/0002_add_is_missing_to_cache_state.py
Normal file
37
alembic_db/versions/0002_add_is_missing_to_cache_state.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Add is_missing column to asset_cache_state for non-destructive soft-delete
|
||||
|
||||
Revision ID: 0002_add_is_missing
|
||||
Revises: 0001_assets
|
||||
Create Date: 2025-02-05 00:00:00
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
revision = "0002_add_is_missing"
|
||||
down_revision = "0001_assets"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"asset_cache_state",
|
||||
sa.Column(
|
||||
"is_missing",
|
||||
sa.Boolean(),
|
||||
nullable=False,
|
||||
server_default=sa.text("false"),
|
||||
),
|
||||
)
|
||||
op.create_index(
|
||||
"ix_asset_cache_state_is_missing",
|
||||
"asset_cache_state",
|
||||
["is_missing"],
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("ix_asset_cache_state_is_missing", table_name="asset_cache_state")
|
||||
op.drop_column("asset_cache_state", "is_missing")
|
||||
@@ -698,20 +698,21 @@ async def cancel_seed(request: web.Request) -> web.Response:
|
||||
|
||||
|
||||
@ROUTES.post("/api/assets/prune")
|
||||
async def prune_orphans(request: web.Request) -> web.Response:
|
||||
"""Prune orphaned assets that no longer exist on the filesystem.
|
||||
async def mark_missing_assets(request: web.Request) -> web.Response:
|
||||
"""Mark assets as missing when their cache states point to files outside all known root prefixes.
|
||||
|
||||
This removes assets whose cache states point to files outside all known
|
||||
root prefixes (models, input, output).
|
||||
This is a non-destructive soft-delete operation. Assets and their metadata
|
||||
are preserved, but cache states are flagged as missing. They can be restored
|
||||
if the file reappears in a future scan.
|
||||
|
||||
Returns:
|
||||
200 OK with count of pruned assets
|
||||
200 OK with count of marked assets
|
||||
409 Conflict if a scan is currently running
|
||||
"""
|
||||
pruned = asset_seeder.prune_orphans()
|
||||
if pruned == 0 and asset_seeder.get_status().state.value != "IDLE":
|
||||
marked = asset_seeder.mark_missing_outside_prefixes()
|
||||
if marked == 0 and asset_seeder.get_status().state.value != "IDLE":
|
||||
return web.json_response(
|
||||
{"status": "scan_running", "pruned": 0},
|
||||
{"status": "scan_running", "marked": 0},
|
||||
status=409,
|
||||
)
|
||||
return web.json_response({"status": "completed", "pruned": pruned}, status=200)
|
||||
return web.json_response({"status": "completed", "marked": marked}, status=200)
|
||||
|
||||
@@ -83,12 +83,14 @@ class AssetCacheState(Base):
|
||||
file_path: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||
needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
||||
is_missing: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
||||
|
||||
asset: Mapped[Asset] = relationship(back_populates="cache_states")
|
||||
|
||||
__table_args__ = (
|
||||
Index("ix_asset_cache_state_file_path", "file_path"),
|
||||
Index("ix_asset_cache_state_asset_id", "asset_id"),
|
||||
Index("ix_asset_cache_state_is_missing", "is_missing"),
|
||||
CheckConstraint(
|
||||
"(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"
|
||||
),
|
||||
|
||||
@@ -28,12 +28,13 @@ from app.assets.database.queries.cache_state import (
|
||||
bulk_set_needs_verify,
|
||||
delete_assets_by_ids,
|
||||
delete_cache_states_by_ids,
|
||||
delete_cache_states_outside_prefixes,
|
||||
delete_orphaned_seed_asset,
|
||||
get_cache_states_by_paths_and_asset_ids,
|
||||
get_cache_states_for_prefixes,
|
||||
get_orphaned_seed_asset_ids,
|
||||
get_unreferenced_unhashed_asset_ids,
|
||||
list_cache_states_by_asset_id,
|
||||
mark_cache_states_missing_outside_prefixes,
|
||||
restore_cache_states_by_paths,
|
||||
upsert_cache_state,
|
||||
)
|
||||
from app.assets.database.queries.tags import (
|
||||
@@ -68,7 +69,6 @@ __all__ = [
|
||||
"delete_asset_info_by_id",
|
||||
"delete_assets_by_ids",
|
||||
"delete_cache_states_by_ids",
|
||||
"delete_cache_states_outside_prefixes",
|
||||
"delete_orphaned_seed_asset",
|
||||
"ensure_tags_exist",
|
||||
"fetch_asset_info_and_asset",
|
||||
@@ -80,13 +80,15 @@ __all__ = [
|
||||
"get_cache_states_by_paths_and_asset_ids",
|
||||
"get_cache_states_for_prefixes",
|
||||
"get_or_create_asset_info",
|
||||
"get_orphaned_seed_asset_ids",
|
||||
"get_unreferenced_unhashed_asset_ids",
|
||||
"insert_asset_info",
|
||||
"list_asset_infos_page",
|
||||
"list_cache_states_by_asset_id",
|
||||
"list_tags_with_usage",
|
||||
"mark_cache_states_missing_outside_prefixes",
|
||||
"remove_missing_tag_for_asset_id",
|
||||
"remove_tags_from_asset_info",
|
||||
"restore_cache_states_by_paths",
|
||||
"set_asset_info_metadata",
|
||||
"set_asset_info_preview",
|
||||
"set_asset_info_tags",
|
||||
|
||||
@@ -49,11 +49,15 @@ def upsert_cache_state(
|
||||
file_path: str,
|
||||
mtime_ns: int,
|
||||
) -> tuple[bool, bool]:
|
||||
"""Upsert a cache state by file_path. Returns (created, updated)."""
|
||||
"""Upsert a cache state by file_path. Returns (created, updated).
|
||||
|
||||
Also restores cache states that were previously marked as missing.
|
||||
"""
|
||||
vals = {
|
||||
"asset_id": asset_id,
|
||||
"file_path": file_path,
|
||||
"mtime_ns": int(mtime_ns),
|
||||
"is_missing": False,
|
||||
}
|
||||
ins = (
|
||||
sqlite.insert(AssetCacheState)
|
||||
@@ -74,26 +78,30 @@ def upsert_cache_state(
|
||||
AssetCacheState.asset_id != asset_id,
|
||||
AssetCacheState.mtime_ns.is_(None),
|
||||
AssetCacheState.mtime_ns != int(mtime_ns),
|
||||
AssetCacheState.is_missing == True, # noqa: E712
|
||||
)
|
||||
)
|
||||
.values(asset_id=asset_id, mtime_ns=int(mtime_ns))
|
||||
.values(asset_id=asset_id, mtime_ns=int(mtime_ns), is_missing=False)
|
||||
)
|
||||
res2 = session.execute(upd)
|
||||
updated = int(res2.rowcount or 0) > 0
|
||||
return False, updated
|
||||
|
||||
|
||||
def delete_cache_states_outside_prefixes(
|
||||
def mark_cache_states_missing_outside_prefixes(
|
||||
session: Session, valid_prefixes: list[str]
|
||||
) -> int:
|
||||
"""Delete cache states with file_path not matching any of the valid prefixes.
|
||||
"""Mark cache states as missing when file_path doesn't match any valid prefix.
|
||||
|
||||
This is a non-destructive soft-delete that preserves user metadata.
|
||||
Cache states can be restored if the file reappears in a future scan.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
valid_prefixes: List of absolute directory prefixes that are valid
|
||||
|
||||
Returns:
|
||||
Number of cache states deleted
|
||||
Number of cache states marked as missing
|
||||
"""
|
||||
if not valid_prefixes:
|
||||
return 0
|
||||
@@ -104,22 +112,59 @@ def delete_cache_states_outside_prefixes(
|
||||
return AssetCacheState.file_path.like(escaped + "%", escape=esc)
|
||||
|
||||
matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in valid_prefixes])
|
||||
result = session.execute(sa.delete(AssetCacheState).where(~matches_valid_prefix))
|
||||
result = session.execute(
|
||||
sa.update(AssetCacheState)
|
||||
.where(~matches_valid_prefix)
|
||||
.where(AssetCacheState.is_missing == False) # noqa: E712
|
||||
.values(is_missing=True)
|
||||
)
|
||||
return result.rowcount
|
||||
|
||||
|
||||
def get_orphaned_seed_asset_ids(session: Session) -> list[str]:
|
||||
"""Get IDs of seed assets (hash is None) with no remaining cache states.
|
||||
def restore_cache_states_by_paths(session: Session, file_paths: list[str]) -> int:
|
||||
"""Restore cache states that were previously marked as missing.
|
||||
|
||||
Called when a file path is re-scanned and found to exist.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
file_paths: List of file paths that exist and should be restored
|
||||
|
||||
Returns:
|
||||
List of asset IDs that are orphaned
|
||||
Number of cache states restored
|
||||
"""
|
||||
orphan_subq = (
|
||||
sa.select(Asset.id)
|
||||
.outerjoin(AssetCacheState, AssetCacheState.asset_id == Asset.id)
|
||||
.where(Asset.hash.is_(None), AssetCacheState.id.is_(None))
|
||||
if not file_paths:
|
||||
return 0
|
||||
|
||||
result = session.execute(
|
||||
sa.update(AssetCacheState)
|
||||
.where(AssetCacheState.file_path.in_(file_paths))
|
||||
.where(AssetCacheState.is_missing == True) # noqa: E712
|
||||
.values(is_missing=False)
|
||||
)
|
||||
return [row[0] for row in session.execute(orphan_subq).all()]
|
||||
return result.rowcount
|
||||
|
||||
|
||||
def get_unreferenced_unhashed_asset_ids(session: Session) -> list[str]:
|
||||
"""Get IDs of unhashed assets (hash=None) with no active cache states.
|
||||
|
||||
An asset is considered unreferenced if it has no cache states,
|
||||
or all its cache states are marked as missing.
|
||||
|
||||
Returns:
|
||||
List of asset IDs that are unreferenced
|
||||
"""
|
||||
active_cache_state_exists = (
|
||||
sa.select(sa.literal(1))
|
||||
.where(AssetCacheState.asset_id == Asset.id)
|
||||
.where(AssetCacheState.is_missing == False) # noqa: E712
|
||||
.correlate(Asset)
|
||||
.exists()
|
||||
)
|
||||
unreferenced_subq = sa.select(Asset.id).where(
|
||||
Asset.hash.is_(None), ~active_cache_state_exists
|
||||
)
|
||||
return [row[0] for row in session.execute(unreferenced_subq).all()]
|
||||
|
||||
|
||||
def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
|
||||
@@ -142,12 +187,15 @@ def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
|
||||
def get_cache_states_for_prefixes(
|
||||
session: Session,
|
||||
prefixes: list[str],
|
||||
*,
|
||||
include_missing: bool = False,
|
||||
) -> list[CacheStateRow]:
|
||||
"""Get all cache states with paths matching any of the given prefixes.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
prefixes: List of absolute directory prefixes to match
|
||||
include_missing: If False (default), exclude cache states marked as missing
|
||||
|
||||
Returns:
|
||||
List of cache state rows with joined asset data, ordered by asset_id, state_id
|
||||
@@ -163,7 +211,7 @@ def get_cache_states_for_prefixes(
|
||||
escaped, esc = escape_sql_like_string(base)
|
||||
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
|
||||
|
||||
rows = session.execute(
|
||||
query = (
|
||||
sa.select(
|
||||
AssetCacheState.id,
|
||||
AssetCacheState.file_path,
|
||||
@@ -175,7 +223,13 @@ def get_cache_states_for_prefixes(
|
||||
)
|
||||
.join(Asset, Asset.id == AssetCacheState.asset_id)
|
||||
.where(sa.or_(*conds))
|
||||
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
|
||||
)
|
||||
|
||||
if not include_missing:
|
||||
query = query.where(AssetCacheState.is_missing == False) # noqa: E712
|
||||
|
||||
rows = session.execute(
|
||||
query.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
|
||||
).all()
|
||||
|
||||
return [
|
||||
@@ -240,13 +294,15 @@ def bulk_insert_cache_states_ignore_conflicts(
|
||||
"""Bulk insert cache state rows with ON CONFLICT DO NOTHING on file_path.
|
||||
|
||||
Each dict should have: asset_id, file_path, mtime_ns
|
||||
The is_missing field is automatically set to False for new inserts.
|
||||
"""
|
||||
if not rows:
|
||||
return
|
||||
enriched_rows = [{**row, "is_missing": False} for row in rows]
|
||||
ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing(
|
||||
index_elements=[AssetCacheState.file_path]
|
||||
)
|
||||
for chunk in iter_chunks(rows, calculate_rows_per_statement(3)):
|
||||
for chunk in iter_chunks(enriched_rows, calculate_rows_per_statement(4)):
|
||||
session.execute(ins, chunk)
|
||||
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from app.assets.database.queries import (
|
||||
from app.assets.services.bulk_ingest import (
|
||||
SeedAssetSpec,
|
||||
batch_insert_seed_assets,
|
||||
prune_orphaned_assets,
|
||||
mark_assets_missing_outside_prefixes,
|
||||
)
|
||||
from app.assets.services.file_utils import (
|
||||
get_mtime_ns,
|
||||
@@ -221,18 +221,18 @@ def sync_root_safely(root: RootType) -> set[str]:
|
||||
return set()
|
||||
|
||||
|
||||
def prune_orphans_safely(prefixes: list[str]) -> int:
|
||||
"""Prune orphaned assets outside the given prefixes.
|
||||
def mark_missing_outside_prefixes_safely(prefixes: list[str]) -> int:
|
||||
"""Mark cache states as missing when outside the given prefixes.
|
||||
|
||||
Returns count pruned or 0 on failure.
|
||||
This is a non-destructive soft-delete. Returns count marked or 0 on failure.
|
||||
"""
|
||||
try:
|
||||
with create_session() as sess:
|
||||
count = prune_orphaned_assets(sess, prefixes)
|
||||
count = mark_assets_missing_outside_prefixes(sess, prefixes)
|
||||
sess.commit()
|
||||
return count
|
||||
except Exception as e:
|
||||
logging.exception("orphan pruning failed: %s", e)
|
||||
logging.exception("marking missing assets failed: %s", e)
|
||||
return 0
|
||||
|
||||
|
||||
@@ -319,7 +319,7 @@ def insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int:
|
||||
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
|
||||
"""Scan the given roots and seed the assets into the database.
|
||||
|
||||
Note: This function does not prune orphaned assets. Call prune_orphaned_assets
|
||||
Note: This function does not mark missing assets. Call mark_missing_outside_prefixes_safely
|
||||
separately if cleanup is needed.
|
||||
"""
|
||||
if not dependencies_available():
|
||||
|
||||
@@ -15,7 +15,7 @@ from app.assets.scanner import (
|
||||
get_all_known_prefixes,
|
||||
get_prefixes_for_root,
|
||||
insert_asset_specs,
|
||||
prune_orphans_safely,
|
||||
mark_missing_outside_prefixes_safely,
|
||||
sync_root_safely,
|
||||
)
|
||||
from app.database.db import dependencies_available
|
||||
@@ -174,35 +174,41 @@ class AssetSeeder:
|
||||
with self._lock:
|
||||
self._thread = None
|
||||
|
||||
def prune_orphans(self) -> int:
|
||||
"""Prune orphaned assets that are outside all known root prefixes.
|
||||
def mark_missing_outside_prefixes(self) -> int:
|
||||
"""Mark cache states as missing when outside all known root prefixes.
|
||||
|
||||
This is a non-destructive soft-delete operation. Assets and their
|
||||
metadata are preserved, but cache states are flagged as missing.
|
||||
They can be restored if the file reappears in a future scan.
|
||||
|
||||
This operation is decoupled from scanning to prevent partial scans
|
||||
from accidentally deleting assets belonging to other roots.
|
||||
from accidentally marking assets belonging to other roots.
|
||||
|
||||
Should be called explicitly when cleanup is desired, typically after
|
||||
a full scan of all roots or during maintenance.
|
||||
|
||||
Returns:
|
||||
Number of orphaned assets pruned, or 0 if dependencies unavailable
|
||||
or a scan is currently running
|
||||
Number of cache states marked as missing, or 0 if dependencies
|
||||
unavailable or a scan is currently running
|
||||
"""
|
||||
with self._lock:
|
||||
if self._state != State.IDLE:
|
||||
logging.warning("Cannot prune orphans while scan is running")
|
||||
logging.warning(
|
||||
"Cannot mark missing assets while scan is running"
|
||||
)
|
||||
return 0
|
||||
|
||||
if not dependencies_available():
|
||||
logging.warning(
|
||||
"Database dependencies not available, skipping orphan pruning"
|
||||
"Database dependencies not available, skipping mark missing"
|
||||
)
|
||||
return 0
|
||||
|
||||
all_prefixes = get_all_known_prefixes()
|
||||
pruned = prune_orphans_safely(all_prefixes)
|
||||
if pruned > 0:
|
||||
logging.info("Pruned %d orphaned assets", pruned)
|
||||
return pruned
|
||||
marked = mark_missing_outside_prefixes_safely(all_prefixes)
|
||||
if marked > 0:
|
||||
logging.info("Marked %d cache states as missing", marked)
|
||||
return marked
|
||||
|
||||
def _is_cancelled(self) -> bool:
|
||||
"""Check if cancellation has been requested."""
|
||||
@@ -290,9 +296,9 @@ class AssetSeeder:
|
||||
|
||||
if self._prune_first:
|
||||
all_prefixes = get_all_known_prefixes()
|
||||
pruned = prune_orphans_safely(all_prefixes)
|
||||
if pruned > 0:
|
||||
logging.info("Pruned %d orphaned assets before scan", pruned)
|
||||
marked = mark_missing_outside_prefixes_safely(all_prefixes)
|
||||
if marked > 0:
|
||||
logging.info("Marked %d cache states as missing before scan", marked)
|
||||
|
||||
if self._is_cancelled():
|
||||
logging.info("Asset scan cancelled after pruning phase")
|
||||
|
||||
@@ -11,7 +11,8 @@ from app.assets.services.asset_management import (
|
||||
from app.assets.services.bulk_ingest import (
|
||||
BulkInsertResult,
|
||||
batch_insert_seed_assets,
|
||||
prune_orphaned_assets,
|
||||
cleanup_unreferenced_assets,
|
||||
mark_assets_missing_outside_prefixes,
|
||||
)
|
||||
from app.assets.services.file_utils import (
|
||||
get_mtime_ns,
|
||||
@@ -77,7 +78,8 @@ __all__ = [
|
||||
"list_assets_page",
|
||||
"list_files_recursively",
|
||||
"list_tags",
|
||||
"prune_orphaned_assets",
|
||||
"cleanup_unreferenced_assets",
|
||||
"mark_assets_missing_outside_prefixes",
|
||||
"remove_tags",
|
||||
"resolve_asset_for_download",
|
||||
"set_asset_preview",
|
||||
|
||||
@@ -28,10 +28,10 @@ from app.assets.database.queries import (
|
||||
bulk_insert_cache_states_ignore_conflicts,
|
||||
bulk_insert_tags_and_meta,
|
||||
delete_assets_by_ids,
|
||||
delete_cache_states_outside_prefixes,
|
||||
get_asset_info_ids_by_ids,
|
||||
get_cache_states_by_paths_and_asset_ids,
|
||||
get_orphaned_seed_asset_ids,
|
||||
get_unreferenced_unhashed_asset_ids,
|
||||
mark_cache_states_missing_outside_prefixes,
|
||||
)
|
||||
from app.assets.helpers import get_utc_now
|
||||
|
||||
@@ -210,16 +210,37 @@ def batch_insert_seed_assets(
|
||||
)
|
||||
|
||||
|
||||
def prune_orphaned_assets(session: Session, valid_prefixes: list[str]) -> int:
|
||||
"""Prune cache states outside valid prefixes, then delete orphaned seed assets.
|
||||
def mark_assets_missing_outside_prefixes(
|
||||
session: Session, valid_prefixes: list[str]
|
||||
) -> int:
|
||||
"""Mark cache states as missing when outside valid prefixes.
|
||||
|
||||
This is a non-destructive operation that soft-deletes cache states
|
||||
by setting is_missing=True. User metadata is preserved and assets
|
||||
can be restored if the file reappears in a future scan.
|
||||
|
||||
Note: This does NOT delete
|
||||
unreferenced unhashed assets. Those are preserved so user metadata
|
||||
remains intact even when base directories change.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
valid_prefixes: List of absolute directory prefixes that are valid
|
||||
|
||||
Returns:
|
||||
Number of orphaned assets deleted
|
||||
Number of cache states marked as missing
|
||||
"""
|
||||
delete_cache_states_outside_prefixes(session, valid_prefixes)
|
||||
orphan_ids = get_orphaned_seed_asset_ids(session)
|
||||
return delete_assets_by_ids(session, orphan_ids)
|
||||
return mark_cache_states_missing_outside_prefixes(session, valid_prefixes)
|
||||
|
||||
|
||||
def cleanup_unreferenced_assets(session: Session) -> int:
|
||||
"""Hard-delete unhashed assets with no active cache states.
|
||||
|
||||
This is a destructive operation intended for explicit cleanup.
|
||||
Only deletes assets where hash=None and all cache states are missing.
|
||||
|
||||
Returns:
|
||||
Number of assets deleted
|
||||
"""
|
||||
unreferenced_ids = get_unreferenced_unhashed_asset_ids(session)
|
||||
return delete_assets_by_ids(session, unreferenced_ids)
|
||||
|
||||
@@ -6,8 +6,7 @@ from app.assets.database.models import Asset, AssetCacheState, AssetInfo
|
||||
from app.assets.database.queries import (
|
||||
list_cache_states_by_asset_id,
|
||||
upsert_cache_state,
|
||||
delete_cache_states_outside_prefixes,
|
||||
get_orphaned_seed_asset_ids,
|
||||
get_unreferenced_unhashed_asset_ids,
|
||||
delete_assets_by_ids,
|
||||
get_cache_states_for_prefixes,
|
||||
bulk_set_needs_verify,
|
||||
@@ -15,6 +14,8 @@ from app.assets.database.queries import (
|
||||
delete_orphaned_seed_asset,
|
||||
bulk_insert_cache_states_ignore_conflicts,
|
||||
get_cache_states_by_paths_and_asset_ids,
|
||||
mark_cache_states_missing_outside_prefixes,
|
||||
restore_cache_states_by_paths,
|
||||
)
|
||||
from app.assets.helpers import select_best_live_path, get_utc_now
|
||||
|
||||
@@ -168,9 +169,51 @@ class TestUpsertCacheState:
|
||||
state = session.query(AssetCacheState).filter_by(file_path=file_path).one()
|
||||
assert state.mtime_ns == final_mtime
|
||||
|
||||
def test_upsert_restores_missing_state(self, session: Session):
|
||||
"""Upserting a cache state that was marked missing should restore it."""
|
||||
asset = _make_asset(session, "hash1")
|
||||
file_path = "/restored/file.bin"
|
||||
|
||||
class TestDeleteCacheStatesOutsidePrefixes:
|
||||
def test_deletes_states_outside_prefixes(self, session: Session, tmp_path):
|
||||
state = _make_cache_state(session, asset, file_path, mtime_ns=100)
|
||||
state.is_missing = True
|
||||
session.commit()
|
||||
|
||||
created, updated = upsert_cache_state(
|
||||
session, asset_id=asset.id, file_path=file_path, mtime_ns=100
|
||||
)
|
||||
session.commit()
|
||||
|
||||
assert created is False
|
||||
assert updated is True
|
||||
restored_state = session.query(AssetCacheState).filter_by(file_path=file_path).one()
|
||||
assert restored_state.is_missing is False
|
||||
|
||||
|
||||
class TestRestoreCacheStatesByPaths:
|
||||
def test_restores_missing_states(self, session: Session):
|
||||
asset = _make_asset(session, "hash1")
|
||||
missing_path = "/missing/file.bin"
|
||||
active_path = "/active/file.bin"
|
||||
|
||||
missing_state = _make_cache_state(session, asset, missing_path)
|
||||
missing_state.is_missing = True
|
||||
_make_cache_state(session, asset, active_path)
|
||||
session.commit()
|
||||
|
||||
restored = restore_cache_states_by_paths(session, [missing_path])
|
||||
session.commit()
|
||||
|
||||
assert restored == 1
|
||||
state = session.query(AssetCacheState).filter_by(file_path=missing_path).one()
|
||||
assert state.is_missing is False
|
||||
|
||||
def test_empty_list_restores_nothing(self, session: Session):
|
||||
restored = restore_cache_states_by_paths(session, [])
|
||||
assert restored == 0
|
||||
|
||||
|
||||
class TestMarkCacheStatesMissingOutsidePrefixes:
|
||||
def test_marks_states_missing_outside_prefixes(self, session: Session, tmp_path):
|
||||
asset = _make_asset(session, "hash1")
|
||||
valid_dir = tmp_path / "valid"
|
||||
valid_dir.mkdir()
|
||||
@@ -184,39 +227,48 @@ class TestDeleteCacheStatesOutsidePrefixes:
|
||||
_make_cache_state(session, asset, invalid_path)
|
||||
session.commit()
|
||||
|
||||
deleted = delete_cache_states_outside_prefixes(session, [str(valid_dir)])
|
||||
marked = mark_cache_states_missing_outside_prefixes(session, [str(valid_dir)])
|
||||
session.commit()
|
||||
|
||||
assert deleted == 1
|
||||
remaining = session.query(AssetCacheState).all()
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].file_path == valid_path
|
||||
assert marked == 1
|
||||
all_states = session.query(AssetCacheState).all()
|
||||
assert len(all_states) == 2
|
||||
|
||||
def test_empty_prefixes_deletes_nothing(self, session: Session):
|
||||
valid_state = next(s for s in all_states if s.file_path == valid_path)
|
||||
invalid_state = next(s for s in all_states if s.file_path == invalid_path)
|
||||
assert valid_state.is_missing is False
|
||||
assert invalid_state.is_missing is True
|
||||
|
||||
def test_empty_prefixes_marks_nothing(self, session: Session):
|
||||
asset = _make_asset(session, "hash1")
|
||||
_make_cache_state(session, asset, "/some/path.bin")
|
||||
session.commit()
|
||||
|
||||
deleted = delete_cache_states_outside_prefixes(session, [])
|
||||
marked = mark_cache_states_missing_outside_prefixes(session, [])
|
||||
|
||||
assert deleted == 0
|
||||
assert marked == 0
|
||||
|
||||
|
||||
class TestGetOrphanedSeedAssetIds:
|
||||
def test_returns_orphaned_seed_assets(self, session: Session):
|
||||
# Seed asset (hash=None) with no cache states
|
||||
orphan = _make_asset(session, hash_val=None)
|
||||
# Seed asset with cache state (not orphaned)
|
||||
with_state = _make_asset(session, hash_val=None)
|
||||
_make_cache_state(session, with_state, "/has/state.bin")
|
||||
class TestGetUnreferencedUnhashedAssetIds:
|
||||
def test_returns_unreferenced_unhashed_assets(self, session: Session):
|
||||
# Unhashed asset (hash=None) with no cache states
|
||||
no_states = _make_asset(session, hash_val=None)
|
||||
# Unhashed asset with active cache state (not unreferenced)
|
||||
with_active_state = _make_asset(session, hash_val=None)
|
||||
_make_cache_state(session, with_active_state, "/has/state.bin")
|
||||
# Unhashed asset with only missing cache state (should be unreferenced)
|
||||
with_missing_state = _make_asset(session, hash_val=None)
|
||||
missing_state = _make_cache_state(session, with_missing_state, "/missing/state.bin")
|
||||
missing_state.is_missing = True
|
||||
# Regular asset (hash not None) - should not be returned
|
||||
_make_asset(session, hash_val="blake3:regular")
|
||||
session.commit()
|
||||
|
||||
orphaned = get_orphaned_seed_asset_ids(session)
|
||||
unreferenced = get_unreferenced_unhashed_asset_ids(session)
|
||||
|
||||
assert orphan.id in orphaned
|
||||
assert with_state.id not in orphaned
|
||||
assert no_states.id in unreferenced
|
||||
assert with_missing_state.id in unreferenced
|
||||
assert with_active_state.id not in unreferenced
|
||||
|
||||
|
||||
class TestDeleteAssetsByIds:
|
||||
|
||||
@@ -349,10 +349,10 @@ class TestSeederThreadSafety:
|
||||
)
|
||||
|
||||
|
||||
class TestSeederPruneOrphans:
|
||||
"""Test prune_orphans behavior."""
|
||||
class TestSeederMarkMissing:
|
||||
"""Test mark_missing_outside_prefixes behavior."""
|
||||
|
||||
def test_prune_orphans_when_idle(self, fresh_seeder: AssetSeeder):
|
||||
def test_mark_missing_when_idle(self, fresh_seeder: AssetSeeder):
|
||||
with (
|
||||
patch("app.assets.seeder.dependencies_available", return_value=True),
|
||||
patch(
|
||||
@@ -360,14 +360,14 @@ class TestSeederPruneOrphans:
|
||||
return_value=["/models", "/input", "/output"],
|
||||
),
|
||||
patch(
|
||||
"app.assets.seeder.prune_orphans_safely", return_value=5
|
||||
) as mock_prune,
|
||||
"app.assets.seeder.mark_missing_outside_prefixes_safely", return_value=5
|
||||
) as mock_mark,
|
||||
):
|
||||
result = fresh_seeder.prune_orphans()
|
||||
result = fresh_seeder.mark_missing_outside_prefixes()
|
||||
assert result == 5
|
||||
mock_prune.assert_called_once_with(["/models", "/input", "/output"])
|
||||
mock_mark.assert_called_once_with(["/models", "/input", "/output"])
|
||||
|
||||
def test_prune_orphans_returns_zero_when_running(
|
||||
def test_mark_missing_returns_zero_when_running(
|
||||
self, fresh_seeder: AssetSeeder, mock_dependencies
|
||||
):
|
||||
barrier = threading.Event()
|
||||
@@ -382,35 +382,35 @@ class TestSeederPruneOrphans:
|
||||
fresh_seeder.start(roots=("models",))
|
||||
time.sleep(0.05)
|
||||
|
||||
result = fresh_seeder.prune_orphans()
|
||||
result = fresh_seeder.mark_missing_outside_prefixes()
|
||||
assert result == 0
|
||||
|
||||
barrier.set()
|
||||
|
||||
def test_prune_orphans_returns_zero_when_dependencies_unavailable(
|
||||
def test_mark_missing_returns_zero_when_dependencies_unavailable(
|
||||
self, fresh_seeder: AssetSeeder
|
||||
):
|
||||
with patch("app.assets.seeder.dependencies_available", return_value=False):
|
||||
result = fresh_seeder.prune_orphans()
|
||||
result = fresh_seeder.mark_missing_outside_prefixes()
|
||||
assert result == 0
|
||||
|
||||
def test_prune_first_flag_triggers_pruning_before_scan(
|
||||
def test_prune_first_flag_triggers_mark_missing_before_scan(
|
||||
self, fresh_seeder: AssetSeeder
|
||||
):
|
||||
prune_call_order = []
|
||||
call_order = []
|
||||
|
||||
def track_prune(prefixes):
|
||||
prune_call_order.append("prune")
|
||||
def track_mark(prefixes):
|
||||
call_order.append("mark_missing")
|
||||
return 3
|
||||
|
||||
def track_sync(root):
|
||||
prune_call_order.append(f"sync_{root}")
|
||||
call_order.append(f"sync_{root}")
|
||||
return set()
|
||||
|
||||
with (
|
||||
patch("app.assets.seeder.dependencies_available", return_value=True),
|
||||
patch("app.assets.seeder.get_all_known_prefixes", return_value=["/models"]),
|
||||
patch("app.assets.seeder.prune_orphans_safely", side_effect=track_prune),
|
||||
patch("app.assets.seeder.mark_missing_outside_prefixes_safely", side_effect=track_mark),
|
||||
patch("app.assets.seeder.sync_root_safely", side_effect=track_sync),
|
||||
patch("app.assets.seeder.collect_paths_for_roots", return_value=[]),
|
||||
patch("app.assets.seeder.build_asset_specs", return_value=([], set(), 0)),
|
||||
@@ -419,5 +419,5 @@ class TestSeederPruneOrphans:
|
||||
fresh_seeder.start(roots=("models",), prune_first=True)
|
||||
fresh_seeder.wait(timeout=5.0)
|
||||
|
||||
assert prune_call_order[0] == "prune"
|
||||
assert "sync_models" in prune_call_order
|
||||
assert call_order[0] == "mark_missing"
|
||||
assert "sync_models" in call_order
|
||||
|
||||
Reference in New Issue
Block a user