refactor(assets): database layer — split queries into modules and merge migrations

- Split monolithic queries.py into modular query modules (asset, asset_reference, common, tags)
- Absorb bulk_ops.py and tags.py into query modules
- Merge migrations 0002-0005 into single migration (0002_merge_to_asset_references)
- Update models.py (merge AssetInfo/AssetCacheState into AssetReference)
- Enable SQLite foreign key enforcement
- Add comprehensive query-layer tests

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c917d-82b5-7448-a04f-9cd59c69d0a2
This commit is contained in:
Luke Mino-Altherr
2026-02-24 11:58:54 -08:00
parent afc00f0055
commit 7db34b428d
19 changed files with 3792 additions and 1336 deletions

View File

@@ -0,0 +1,28 @@
"""Helper functions for assets integration tests."""
import time
import requests
def trigger_sync_seed_assets(session: requests.Session, base_url: str) -> None:
"""Force a synchronous sync/seed pass by calling the seed endpoint with wait=true.
Retries on 409 (already running) until the previous scan finishes.
"""
deadline = time.monotonic() + 60
while True:
r = session.post(
base_url + "/api/assets/seed?wait=true",
json={"roots": ["models", "input", "output"]},
timeout=60,
)
if r.status_code != 409:
assert r.status_code == 200, f"seed endpoint returned {r.status_code}: {r.text}"
return
if time.monotonic() > deadline:
raise TimeoutError("seed endpoint stuck in 409 (already running)")
time.sleep(0.25)
def get_asset_filename(asset_hash: str, extension: str) -> str:
return asset_hash.removeprefix("blake3:") + extension

View File

@@ -0,0 +1,20 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.assets.database.models import Base
@pytest.fixture
def session():
"""In-memory SQLite session for fast unit tests."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
with Session(engine) as sess:
yield sess
@pytest.fixture(autouse=True)
def autoclean_unit_test_assets():
"""Override parent autouse fixture - query tests don't need server cleanup."""
yield

View File

@@ -0,0 +1,144 @@
import uuid
import pytest
from sqlalchemy.orm import Session
from app.assets.helpers import get_utc_now
from app.assets.database.models import Asset
from app.assets.database.queries import (
asset_exists_by_hash,
get_asset_by_hash,
upsert_asset,
bulk_insert_assets,
)
class TestAssetExistsByHash:
@pytest.mark.parametrize(
"setup_hash,query_hash,expected",
[
(None, "nonexistent", False), # No asset exists
("blake3:abc123", "blake3:abc123", True), # Asset exists with matching hash
(None, "", False), # Null hash in DB doesn't match empty string
],
ids=["nonexistent", "existing", "null_hash_no_match"],
)
def test_exists_by_hash(self, session: Session, setup_hash, query_hash, expected):
if setup_hash is not None or query_hash == "":
asset = Asset(hash=setup_hash, size_bytes=100)
session.add(asset)
session.commit()
assert asset_exists_by_hash(session, asset_hash=query_hash) is expected
class TestGetAssetByHash:
@pytest.mark.parametrize(
"setup_hash,query_hash,should_find",
[
(None, "nonexistent", False),
("blake3:def456", "blake3:def456", True),
],
ids=["nonexistent", "existing"],
)
def test_get_by_hash(self, session: Session, setup_hash, query_hash, should_find):
if setup_hash is not None:
asset = Asset(hash=setup_hash, size_bytes=200, mime_type="image/png")
session.add(asset)
session.commit()
result = get_asset_by_hash(session, asset_hash=query_hash)
if should_find:
assert result is not None
assert result.size_bytes == 200
assert result.mime_type == "image/png"
else:
assert result is None
class TestUpsertAsset:
@pytest.mark.parametrize(
"first_size,first_mime,second_size,second_mime,expect_created,expect_updated,final_size,final_mime",
[
# New asset creation
(None, None, 1024, "application/octet-stream", True, False, 1024, "application/octet-stream"),
# Existing asset, same values - no update
(500, "text/plain", 500, "text/plain", False, False, 500, "text/plain"),
# Existing asset with size 0, update with new values
(0, None, 2048, "image/png", False, True, 2048, "image/png"),
# Existing asset, second call with size 0 - no update
(1000, None, 0, None, False, False, 1000, None),
],
ids=["new_asset", "existing_no_change", "update_from_zero", "zero_size_no_update"],
)
def test_upsert_scenarios(
self,
session: Session,
first_size,
first_mime,
second_size,
second_mime,
expect_created,
expect_updated,
final_size,
final_mime,
):
asset_hash = f"blake3:test_{first_size}_{second_size}"
# First upsert (if first_size is not None, we're testing the second call)
if first_size is not None:
upsert_asset(
session,
asset_hash=asset_hash,
size_bytes=first_size,
mime_type=first_mime,
)
session.commit()
# The upsert call we're testing
asset, created, updated = upsert_asset(
session,
asset_hash=asset_hash,
size_bytes=second_size,
mime_type=second_mime,
)
session.commit()
assert created is expect_created
assert updated is expect_updated
assert asset.size_bytes == final_size
assert asset.mime_type == final_mime
class TestBulkInsertAssets:
def test_inserts_multiple_assets(self, session: Session):
now = get_utc_now()
rows = [
{"id": str(uuid.uuid4()), "hash": "blake3:bulk1", "size_bytes": 100, "mime_type": "text/plain", "created_at": now},
{"id": str(uuid.uuid4()), "hash": "blake3:bulk2", "size_bytes": 200, "mime_type": "image/png", "created_at": now},
{"id": str(uuid.uuid4()), "hash": "blake3:bulk3", "size_bytes": 300, "mime_type": None, "created_at": now},
]
bulk_insert_assets(session, rows)
session.commit()
assets = session.query(Asset).all()
assert len(assets) == 3
hashes = {a.hash for a in assets}
assert hashes == {"blake3:bulk1", "blake3:bulk2", "blake3:bulk3"}
def test_empty_list_is_noop(self, session: Session):
bulk_insert_assets(session, [])
session.commit()
assert session.query(Asset).count() == 0
def test_handles_large_batch(self, session: Session):
"""Test chunking logic with more rows than MAX_BIND_PARAMS allows."""
now = get_utc_now()
rows = [
{"id": str(uuid.uuid4()), "hash": f"blake3:large{i}", "size_bytes": i, "mime_type": None, "created_at": now}
for i in range(200)
]
bulk_insert_assets(session, rows)
session.commit()
assert session.query(Asset).count() == 200

View File

@@ -0,0 +1,517 @@
import time
import uuid
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference, AssetReferenceMeta
from app.assets.database.queries import (
reference_exists_for_asset_id,
get_reference_by_id,
insert_reference,
get_or_create_reference,
update_reference_timestamps,
list_references_page,
fetch_reference_asset_and_tags,
fetch_reference_and_asset,
update_reference_access_time,
set_reference_metadata,
delete_reference_by_id,
set_reference_preview,
bulk_insert_references_ignore_conflicts,
get_reference_ids_by_ids,
ensure_tags_exist,
add_tags_to_reference,
)
from app.assets.helpers import get_utc_now
def _make_asset(session: Session, hash_val: str | None = None, size: int = 1024) -> Asset:
asset = Asset(hash=hash_val, size_bytes=size, mime_type="application/octet-stream")
session.add(asset)
session.flush()
return asset
def _make_reference(
session: Session,
asset: Asset,
name: str = "test",
owner_id: str = "",
) -> AssetReference:
now = get_utc_now()
ref = AssetReference(
owner_id=owner_id,
name=name,
asset_id=asset.id,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
class TestReferenceExistsForAssetId:
def test_returns_false_when_no_reference(self, session: Session):
asset = _make_asset(session, "hash1")
assert reference_exists_for_asset_id(session, asset_id=asset.id) is False
def test_returns_true_when_reference_exists(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset)
assert reference_exists_for_asset_id(session, asset_id=asset.id) is True
class TestGetReferenceById:
def test_returns_none_for_nonexistent(self, session: Session):
assert get_reference_by_id(session, reference_id="nonexistent") is None
def test_returns_reference(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset, name="myfile.txt")
result = get_reference_by_id(session, reference_id=ref.id)
assert result is not None
assert result.name == "myfile.txt"
class TestListReferencesPage:
def test_empty_db(self, session: Session):
refs, tag_map, total = list_references_page(session)
assert refs == []
assert tag_map == {}
assert total == 0
def test_returns_references_with_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset, name="test.bin")
ensure_tags_exist(session, ["alpha", "beta"])
add_tags_to_reference(session, reference_id=ref.id, tags=["alpha", "beta"])
session.commit()
refs, tag_map, total = list_references_page(session)
assert len(refs) == 1
assert refs[0].id == ref.id
assert set(tag_map[ref.id]) == {"alpha", "beta"}
assert total == 1
def test_name_contains_filter(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, name="model_v1.safetensors")
_make_reference(session, asset, name="config.json")
session.commit()
refs, _, total = list_references_page(session, name_contains="model")
assert total == 1
assert refs[0].name == "model_v1.safetensors"
def test_owner_visibility(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, name="public", owner_id="")
_make_reference(session, asset, name="private", owner_id="user1")
session.commit()
# Empty owner sees only public
refs, _, total = list_references_page(session, owner_id="")
assert total == 1
assert refs[0].name == "public"
# Owner sees both
refs, _, total = list_references_page(session, owner_id="user1")
assert total == 2
def test_include_tags_filter(self, session: Session):
asset = _make_asset(session, "hash1")
ref1 = _make_reference(session, asset, name="tagged")
_make_reference(session, asset, name="untagged")
ensure_tags_exist(session, ["wanted"])
add_tags_to_reference(session, reference_id=ref1.id, tags=["wanted"])
session.commit()
refs, _, total = list_references_page(session, include_tags=["wanted"])
assert total == 1
assert refs[0].name == "tagged"
def test_exclude_tags_filter(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, name="keep")
ref_exclude = _make_reference(session, asset, name="exclude")
ensure_tags_exist(session, ["bad"])
add_tags_to_reference(session, reference_id=ref_exclude.id, tags=["bad"])
session.commit()
refs, _, total = list_references_page(session, exclude_tags=["bad"])
assert total == 1
assert refs[0].name == "keep"
def test_sorting(self, session: Session):
asset = _make_asset(session, "hash1", size=100)
asset2 = _make_asset(session, "hash2", size=500)
_make_reference(session, asset, name="small")
_make_reference(session, asset2, name="large")
session.commit()
refs, _, _ = list_references_page(session, sort="size", order="desc")
assert refs[0].name == "large"
refs, _, _ = list_references_page(session, sort="name", order="asc")
assert refs[0].name == "large"
class TestFetchReferenceAssetAndTags:
def test_returns_none_for_nonexistent(self, session: Session):
result = fetch_reference_asset_and_tags(session, "nonexistent")
assert result is None
def test_returns_tuple(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset, name="test.bin")
ensure_tags_exist(session, ["tag1"])
add_tags_to_reference(session, reference_id=ref.id, tags=["tag1"])
session.commit()
result = fetch_reference_asset_and_tags(session, ref.id)
assert result is not None
ret_ref, ret_asset, ret_tags = result
assert ret_ref.id == ref.id
assert ret_asset.id == asset.id
assert ret_tags == ["tag1"]
class TestFetchReferenceAndAsset:
def test_returns_none_for_nonexistent(self, session: Session):
result = fetch_reference_and_asset(session, reference_id="nonexistent")
assert result is None
def test_returns_tuple(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
result = fetch_reference_and_asset(session, reference_id=ref.id)
assert result is not None
ret_ref, ret_asset = result
assert ret_ref.id == ref.id
assert ret_asset.id == asset.id
class TestUpdateReferenceAccessTime:
def test_updates_last_access_time(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
original_time = ref.last_access_time
session.commit()
import time
time.sleep(0.01)
update_reference_access_time(session, reference_id=ref.id)
session.commit()
session.refresh(ref)
assert ref.last_access_time > original_time
class TestDeleteReferenceById:
def test_deletes_existing(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
result = delete_reference_by_id(session, reference_id=ref.id, owner_id="")
assert result is True
assert get_reference_by_id(session, reference_id=ref.id) is None
def test_returns_false_for_nonexistent(self, session: Session):
result = delete_reference_by_id(session, reference_id="nonexistent", owner_id="")
assert result is False
def test_respects_owner_visibility(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
result = delete_reference_by_id(session, reference_id=ref.id, owner_id="user2")
assert result is False
assert get_reference_by_id(session, reference_id=ref.id) is not None
class TestSetReferencePreview:
def test_sets_preview(self, session: Session):
asset = _make_asset(session, "hash1")
preview_asset = _make_asset(session, "preview_hash")
ref = _make_reference(session, asset)
session.commit()
set_reference_preview(session, reference_id=ref.id, preview_asset_id=preview_asset.id)
session.commit()
session.refresh(ref)
assert ref.preview_id == preview_asset.id
def test_clears_preview(self, session: Session):
asset = _make_asset(session, "hash1")
preview_asset = _make_asset(session, "preview_hash")
ref = _make_reference(session, asset)
ref.preview_id = preview_asset.id
session.commit()
set_reference_preview(session, reference_id=ref.id, preview_asset_id=None)
session.commit()
session.refresh(ref)
assert ref.preview_id is None
def test_raises_for_nonexistent_reference(self, session: Session):
with pytest.raises(ValueError, match="not found"):
set_reference_preview(session, reference_id="nonexistent", preview_asset_id=None)
def test_raises_for_nonexistent_preview(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
with pytest.raises(ValueError, match="Preview Asset"):
set_reference_preview(session, reference_id=ref.id, preview_asset_id="nonexistent")
class TestInsertReference:
def test_creates_new_reference(self, session: Session):
asset = _make_asset(session, "hash1")
ref = insert_reference(
session, asset_id=asset.id, owner_id="user1", name="test.bin"
)
session.commit()
assert ref is not None
assert ref.name == "test.bin"
assert ref.owner_id == "user1"
def test_allows_duplicate_names(self, session: Session):
asset = _make_asset(session, "hash1")
ref1 = insert_reference(session, asset_id=asset.id, owner_id="user1", name="dup.bin")
session.commit()
# Duplicate names are now allowed
ref2 = insert_reference(
session, asset_id=asset.id, owner_id="user1", name="dup.bin"
)
session.commit()
assert ref1 is not None
assert ref2 is not None
assert ref1.id != ref2.id
class TestGetOrCreateReference:
def test_creates_new_reference(self, session: Session):
asset = _make_asset(session, "hash1")
ref, created = get_or_create_reference(
session, asset_id=asset.id, owner_id="user1", name="new.bin"
)
session.commit()
assert created is True
assert ref.name == "new.bin"
def test_always_creates_new_reference(self, session: Session):
asset = _make_asset(session, "hash1")
ref1, created1 = get_or_create_reference(
session, asset_id=asset.id, owner_id="user1", name="existing.bin"
)
session.commit()
# Duplicate names are allowed, so always creates new
ref2, created2 = get_or_create_reference(
session, asset_id=asset.id, owner_id="user1", name="existing.bin"
)
session.commit()
assert created1 is True
assert created2 is True
assert ref1.id != ref2.id
class TestUpdateReferenceTimestamps:
def test_updates_timestamps(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
original_updated_at = ref.updated_at
session.commit()
time.sleep(0.01)
update_reference_timestamps(session, ref)
session.commit()
session.refresh(ref)
assert ref.updated_at > original_updated_at
def test_updates_preview_id(self, session: Session):
asset = _make_asset(session, "hash1")
preview_asset = _make_asset(session, "preview_hash")
ref = _make_reference(session, asset)
session.commit()
update_reference_timestamps(session, ref, preview_id=preview_asset.id)
session.commit()
session.refresh(ref)
assert ref.preview_id == preview_asset.id
class TestSetReferenceMetadata:
def test_sets_metadata(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
set_reference_metadata(
session, reference_id=ref.id, user_metadata={"key": "value"}
)
session.commit()
session.refresh(ref)
assert ref.user_metadata == {"key": "value"}
# Check metadata table
meta = session.query(AssetReferenceMeta).filter_by(asset_reference_id=ref.id).all()
assert len(meta) == 1
assert meta[0].key == "key"
assert meta[0].val_str == "value"
def test_replaces_existing_metadata(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
set_reference_metadata(
session, reference_id=ref.id, user_metadata={"old": "data"}
)
session.commit()
set_reference_metadata(
session, reference_id=ref.id, user_metadata={"new": "data"}
)
session.commit()
meta = session.query(AssetReferenceMeta).filter_by(asset_reference_id=ref.id).all()
assert len(meta) == 1
assert meta[0].key == "new"
def test_clears_metadata_with_empty_dict(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
set_reference_metadata(
session, reference_id=ref.id, user_metadata={"key": "value"}
)
session.commit()
set_reference_metadata(
session, reference_id=ref.id, user_metadata={}
)
session.commit()
session.refresh(ref)
assert ref.user_metadata == {}
meta = session.query(AssetReferenceMeta).filter_by(asset_reference_id=ref.id).all()
assert len(meta) == 0
def test_raises_for_nonexistent(self, session: Session):
with pytest.raises(ValueError, match="not found"):
set_reference_metadata(
session, reference_id="nonexistent", user_metadata={"key": "value"}
)
class TestBulkInsertReferencesIgnoreConflicts:
def test_inserts_multiple_references(self, session: Session):
asset = _make_asset(session, "hash1")
now = get_utc_now()
rows = [
{
"id": str(uuid.uuid4()),
"owner_id": "",
"name": "bulk1.bin",
"asset_id": asset.id,
"preview_id": None,
"user_metadata": {},
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
{
"id": str(uuid.uuid4()),
"owner_id": "",
"name": "bulk2.bin",
"asset_id": asset.id,
"preview_id": None,
"user_metadata": {},
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
]
bulk_insert_references_ignore_conflicts(session, rows)
session.commit()
refs = session.query(AssetReference).all()
assert len(refs) == 2
def test_allows_duplicate_names(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, name="existing.bin", owner_id="")
session.commit()
now = get_utc_now()
rows = [
{
"id": str(uuid.uuid4()),
"owner_id": "",
"name": "existing.bin",
"asset_id": asset.id,
"preview_id": None,
"user_metadata": {},
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
{
"id": str(uuid.uuid4()),
"owner_id": "",
"name": "new.bin",
"asset_id": asset.id,
"preview_id": None,
"user_metadata": {},
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
]
bulk_insert_references_ignore_conflicts(session, rows)
session.commit()
# Duplicate names allowed, so all 3 rows exist
refs = session.query(AssetReference).all()
assert len(refs) == 3
def test_empty_list_is_noop(self, session: Session):
bulk_insert_references_ignore_conflicts(session, [])
assert session.query(AssetReference).count() == 0
class TestGetReferenceIdsByIds:
def test_returns_existing_ids(self, session: Session):
asset = _make_asset(session, "hash1")
ref1 = _make_reference(session, asset, name="a.bin")
ref2 = _make_reference(session, asset, name="b.bin")
session.commit()
found = get_reference_ids_by_ids(session, [ref1.id, ref2.id, "nonexistent"])
assert found == {ref1.id, ref2.id}
def test_empty_list_returns_empty(self, session: Session):
found = get_reference_ids_by_ids(session, [])
assert found == set()

View File

@@ -0,0 +1,499 @@
"""Tests for cache_state (AssetReference file path) query functions."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference
from app.assets.database.queries import (
list_references_by_asset_id,
upsert_reference,
get_unreferenced_unhashed_asset_ids,
delete_assets_by_ids,
get_references_for_prefixes,
bulk_update_needs_verify,
delete_references_by_ids,
delete_orphaned_seed_asset,
bulk_insert_references_ignore_conflicts,
get_references_by_paths_and_asset_ids,
mark_references_missing_outside_prefixes,
restore_references_by_paths,
)
from app.assets.helpers import select_best_live_path, get_utc_now
def _make_asset(session: Session, hash_val: str | None = None, size: int = 1024) -> Asset:
asset = Asset(hash=hash_val, size_bytes=size)
session.add(asset)
session.flush()
return asset
def _make_reference(
session: Session,
asset: Asset,
file_path: str,
name: str = "test",
mtime_ns: int | None = None,
needs_verify: bool = False,
) -> AssetReference:
now = get_utc_now()
ref = AssetReference(
asset_id=asset.id,
file_path=file_path,
name=name,
mtime_ns=mtime_ns,
needs_verify=needs_verify,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
class TestListReferencesByAssetId:
def test_returns_empty_for_no_references(self, session: Session):
asset = _make_asset(session, "hash1")
refs = list_references_by_asset_id(session, asset_id=asset.id)
assert list(refs) == []
def test_returns_references_for_asset(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "/path/a.bin", name="a")
_make_reference(session, asset, "/path/b.bin", name="b")
session.commit()
refs = list_references_by_asset_id(session, asset_id=asset.id)
paths = [r.file_path for r in refs]
assert set(paths) == {"/path/a.bin", "/path/b.bin"}
def test_does_not_return_other_assets_references(self, session: Session):
asset1 = _make_asset(session, "hash1")
asset2 = _make_asset(session, "hash2")
_make_reference(session, asset1, "/path/asset1.bin", name="a1")
_make_reference(session, asset2, "/path/asset2.bin", name="a2")
session.commit()
refs = list_references_by_asset_id(session, asset_id=asset1.id)
paths = [r.file_path for r in refs]
assert paths == ["/path/asset1.bin"]
class TestSelectBestLivePath:
def test_returns_empty_for_empty_list(self):
result = select_best_live_path([])
assert result == ""
def test_returns_empty_when_no_files_exist(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset, "/nonexistent/path.bin")
session.commit()
result = select_best_live_path([ref])
assert result == ""
def test_prefers_verified_path(self, session: Session, tmp_path):
"""needs_verify=False should be preferred."""
asset = _make_asset(session, "hash1")
verified_file = tmp_path / "verified.bin"
verified_file.write_bytes(b"data")
unverified_file = tmp_path / "unverified.bin"
unverified_file.write_bytes(b"data")
ref_verified = _make_reference(
session, asset, str(verified_file), name="verified", needs_verify=False
)
ref_unverified = _make_reference(
session, asset, str(unverified_file), name="unverified", needs_verify=True
)
session.commit()
refs = [ref_unverified, ref_verified]
result = select_best_live_path(refs)
assert result == str(verified_file)
def test_falls_back_to_existing_unverified(self, session: Session, tmp_path):
"""If all references need verification, return first existing path."""
asset = _make_asset(session, "hash1")
existing_file = tmp_path / "exists.bin"
existing_file.write_bytes(b"data")
ref = _make_reference(session, asset, str(existing_file), needs_verify=True)
session.commit()
result = select_best_live_path([ref])
assert result == str(existing_file)
class TestSelectBestLivePathWithMocking:
def test_handles_missing_file_path_attr(self):
"""Gracefully handle references with None file_path."""
class MockRef:
file_path = None
needs_verify = False
result = select_best_live_path([MockRef()])
assert result == ""
class TestUpsertReference:
@pytest.mark.parametrize(
"initial_mtime,second_mtime,expect_created,expect_updated,final_mtime",
[
# New reference creation
(None, 12345, True, False, 12345),
# Existing reference, same mtime - no update
(100, 100, False, False, 100),
# Existing reference, different mtime - update
(100, 200, False, True, 200),
],
ids=["new_reference", "existing_no_change", "existing_update_mtime"],
)
def test_upsert_scenarios(
self, session: Session, initial_mtime, second_mtime, expect_created, expect_updated, final_mtime
):
asset = _make_asset(session, "hash1")
file_path = f"/path_{initial_mtime}_{second_mtime}.bin"
name = f"file_{initial_mtime}_{second_mtime}"
# Create initial reference if needed
if initial_mtime is not None:
upsert_reference(session, asset_id=asset.id, file_path=file_path, name=name, mtime_ns=initial_mtime)
session.commit()
# The upsert call we're testing
created, updated = upsert_reference(
session, asset_id=asset.id, file_path=file_path, name=name, mtime_ns=second_mtime
)
session.commit()
assert created is expect_created
assert updated is expect_updated
ref = session.query(AssetReference).filter_by(file_path=file_path).one()
assert ref.mtime_ns == final_mtime
def test_upsert_restores_missing_reference(self, session: Session):
"""Upserting a reference that was marked missing should restore it."""
asset = _make_asset(session, "hash1")
file_path = "/restored/file.bin"
ref = _make_reference(session, asset, file_path, mtime_ns=100)
ref.is_missing = True
session.commit()
created, updated = upsert_reference(
session, asset_id=asset.id, file_path=file_path, name="restored", mtime_ns=100
)
session.commit()
assert created is False
assert updated is True
restored_ref = session.query(AssetReference).filter_by(file_path=file_path).one()
assert restored_ref.is_missing is False
class TestRestoreReferencesByPaths:
def test_restores_missing_references(self, session: Session):
asset = _make_asset(session, "hash1")
missing_path = "/missing/file.bin"
active_path = "/active/file.bin"
missing_ref = _make_reference(session, asset, missing_path, name="missing")
missing_ref.is_missing = True
_make_reference(session, asset, active_path, name="active")
session.commit()
restored = restore_references_by_paths(session, [missing_path])
session.commit()
assert restored == 1
ref = session.query(AssetReference).filter_by(file_path=missing_path).one()
assert ref.is_missing is False
def test_empty_list_restores_nothing(self, session: Session):
restored = restore_references_by_paths(session, [])
assert restored == 0
class TestMarkReferencesMissingOutsidePrefixes:
def test_marks_references_missing_outside_prefixes(self, session: Session, tmp_path):
asset = _make_asset(session, "hash1")
valid_dir = tmp_path / "valid"
valid_dir.mkdir()
invalid_dir = tmp_path / "invalid"
invalid_dir.mkdir()
valid_path = str(valid_dir / "file.bin")
invalid_path = str(invalid_dir / "file.bin")
_make_reference(session, asset, valid_path, name="valid")
_make_reference(session, asset, invalid_path, name="invalid")
session.commit()
marked = mark_references_missing_outside_prefixes(session, [str(valid_dir)])
session.commit()
assert marked == 1
all_refs = session.query(AssetReference).all()
assert len(all_refs) == 2
valid_ref = next(r for r in all_refs if r.file_path == valid_path)
invalid_ref = next(r for r in all_refs if r.file_path == invalid_path)
assert valid_ref.is_missing is False
assert invalid_ref.is_missing is True
def test_empty_prefixes_marks_nothing(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "/some/path.bin")
session.commit()
marked = mark_references_missing_outside_prefixes(session, [])
assert marked == 0
class TestGetUnreferencedUnhashedAssetIds:
def test_returns_unreferenced_unhashed_assets(self, session: Session):
# Unhashed asset (hash=None) with no references (no file_path)
no_refs = _make_asset(session, hash_val=None)
# Unhashed asset with active reference (not unreferenced)
with_active_ref = _make_asset(session, hash_val=None)
_make_reference(session, with_active_ref, "/has/ref.bin", name="has_ref")
# Unhashed asset with only missing reference (should be unreferenced)
with_missing_ref = _make_asset(session, hash_val=None)
missing_ref = _make_reference(session, with_missing_ref, "/missing/ref.bin", name="missing_ref")
missing_ref.is_missing = True
# Regular asset (hash not None) - should not be returned
_make_asset(session, hash_val="blake3:regular")
session.commit()
unreferenced = get_unreferenced_unhashed_asset_ids(session)
assert no_refs.id in unreferenced
assert with_missing_ref.id in unreferenced
assert with_active_ref.id not in unreferenced
class TestDeleteAssetsByIds:
def test_deletes_assets_and_references(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "/test/path.bin", name="test")
session.commit()
deleted = delete_assets_by_ids(session, [asset.id])
session.commit()
assert deleted == 1
assert session.query(Asset).count() == 0
assert session.query(AssetReference).count() == 0
def test_empty_list_deletes_nothing(self, session: Session):
_make_asset(session, "hash1")
session.commit()
deleted = delete_assets_by_ids(session, [])
assert deleted == 0
assert session.query(Asset).count() == 1
class TestGetReferencesForPrefixes:
def test_returns_references_matching_prefix(self, session: Session, tmp_path):
asset = _make_asset(session, "hash1")
dir1 = tmp_path / "dir1"
dir1.mkdir()
dir2 = tmp_path / "dir2"
dir2.mkdir()
path1 = str(dir1 / "file.bin")
path2 = str(dir2 / "file.bin")
_make_reference(session, asset, path1, name="file1", mtime_ns=100)
_make_reference(session, asset, path2, name="file2", mtime_ns=200)
session.commit()
rows = get_references_for_prefixes(session, [str(dir1)])
assert len(rows) == 1
assert rows[0].file_path == path1
def test_empty_prefixes_returns_empty(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "/some/path.bin")
session.commit()
rows = get_references_for_prefixes(session, [])
assert rows == []
class TestBulkSetNeedsVerify:
def test_sets_needs_verify_flag(self, session: Session):
asset = _make_asset(session, "hash1")
ref1 = _make_reference(session, asset, "/path1.bin", needs_verify=False)
ref2 = _make_reference(session, asset, "/path2.bin", needs_verify=False)
session.commit()
updated = bulk_update_needs_verify(session, [ref1.id, ref2.id], True)
session.commit()
assert updated == 2
session.refresh(ref1)
session.refresh(ref2)
assert ref1.needs_verify is True
assert ref2.needs_verify is True
def test_empty_list_updates_nothing(self, session: Session):
updated = bulk_update_needs_verify(session, [], True)
assert updated == 0
class TestDeleteReferencesByIds:
def test_deletes_references_by_id(self, session: Session):
asset = _make_asset(session, "hash1")
ref1 = _make_reference(session, asset, "/path1.bin")
_make_reference(session, asset, "/path2.bin")
session.commit()
deleted = delete_references_by_ids(session, [ref1.id])
session.commit()
assert deleted == 1
assert session.query(AssetReference).count() == 1
def test_empty_list_deletes_nothing(self, session: Session):
deleted = delete_references_by_ids(session, [])
assert deleted == 0
class TestDeleteOrphanedSeedAsset:
@pytest.mark.parametrize(
"create_asset,expected_deleted,expected_count",
[
(True, True, 0), # Existing asset gets deleted
(False, False, 0), # Nonexistent returns False
],
ids=["deletes_existing", "nonexistent_returns_false"],
)
def test_delete_orphaned_seed_asset(
self, session: Session, create_asset, expected_deleted, expected_count
):
asset_id = "nonexistent-id"
if create_asset:
asset = _make_asset(session, hash_val=None)
asset_id = asset.id
_make_reference(session, asset, "/test/path.bin", name="test")
session.commit()
deleted = delete_orphaned_seed_asset(session, asset_id)
if create_asset:
session.commit()
assert deleted is expected_deleted
assert session.query(Asset).count() == expected_count
class TestBulkInsertReferencesIgnoreConflicts:
def test_inserts_multiple_references(self, session: Session):
asset = _make_asset(session, "hash1")
now = get_utc_now()
rows = [
{
"asset_id": asset.id,
"file_path": "/bulk1.bin",
"name": "bulk1",
"mtime_ns": 100,
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
{
"asset_id": asset.id,
"file_path": "/bulk2.bin",
"name": "bulk2",
"mtime_ns": 200,
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
]
bulk_insert_references_ignore_conflicts(session, rows)
session.commit()
assert session.query(AssetReference).count() == 2
def test_ignores_conflicts(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "/existing.bin", mtime_ns=100)
session.commit()
now = get_utc_now()
rows = [
{
"asset_id": asset.id,
"file_path": "/existing.bin",
"name": "existing",
"mtime_ns": 999,
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
{
"asset_id": asset.id,
"file_path": "/new.bin",
"name": "new",
"mtime_ns": 200,
"created_at": now,
"updated_at": now,
"last_access_time": now,
},
]
bulk_insert_references_ignore_conflicts(session, rows)
session.commit()
assert session.query(AssetReference).count() == 2
existing = session.query(AssetReference).filter_by(file_path="/existing.bin").one()
assert existing.mtime_ns == 100 # Original value preserved
def test_empty_list_is_noop(self, session: Session):
bulk_insert_references_ignore_conflicts(session, [])
assert session.query(AssetReference).count() == 0
class TestGetReferencesByPathsAndAssetIds:
def test_returns_matching_paths(self, session: Session):
asset1 = _make_asset(session, "hash1")
asset2 = _make_asset(session, "hash2")
_make_reference(session, asset1, "/path1.bin")
_make_reference(session, asset2, "/path2.bin")
session.commit()
path_to_asset = {
"/path1.bin": asset1.id,
"/path2.bin": asset2.id,
}
winners = get_references_by_paths_and_asset_ids(session, path_to_asset)
assert winners == {"/path1.bin", "/path2.bin"}
def test_excludes_non_matching_asset_ids(self, session: Session):
asset1 = _make_asset(session, "hash1")
asset2 = _make_asset(session, "hash2")
_make_reference(session, asset1, "/path1.bin")
session.commit()
# Path exists but with different asset_id
path_to_asset = {"/path1.bin": asset2.id}
winners = get_references_by_paths_and_asset_ids(session, path_to_asset)
assert winners == set()
def test_empty_dict_returns_empty(self, session: Session):
winners = get_references_by_paths_and_asset_ids(session, {})
assert winners == set()

View File

@@ -0,0 +1,184 @@
"""Tests for metadata filtering logic in asset_reference queries."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference, AssetReferenceMeta
from app.assets.database.queries import list_references_page
from app.assets.database.queries.asset_reference import convert_metadata_to_rows
from app.assets.helpers import get_utc_now
def _make_asset(session: Session, hash_val: str) -> Asset:
asset = Asset(hash=hash_val, size_bytes=1024)
session.add(asset)
session.flush()
return asset
def _make_reference(
session: Session,
asset: Asset,
name: str,
metadata: dict | None = None,
) -> AssetReference:
now = get_utc_now()
ref = AssetReference(
owner_id="",
name=name,
asset_id=asset.id,
user_metadata=metadata,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
if metadata:
for key, val in metadata.items():
for row in convert_metadata_to_rows(key, val):
meta_row = AssetReferenceMeta(
asset_reference_id=ref.id,
key=row["key"],
ordinal=row.get("ordinal", 0),
val_str=row.get("val_str"),
val_num=row.get("val_num"),
val_bool=row.get("val_bool"),
val_json=row.get("val_json"),
)
session.add(meta_row)
session.flush()
return ref
class TestMetadataFilterByType:
"""Table-driven tests for metadata filtering by different value types."""
@pytest.mark.parametrize(
"match_meta,nomatch_meta,filter_key,filter_val",
[
# String matching
({"category": "models"}, {"category": "images"}, "category", "models"),
# Integer matching
({"epoch": 5}, {"epoch": 10}, "epoch", 5),
# Float matching
({"score": 0.95}, {"score": 0.5}, "score", 0.95),
# Boolean True matching
({"enabled": True}, {"enabled": False}, "enabled", True),
# Boolean False matching
({"enabled": False}, {"enabled": True}, "enabled", False),
],
ids=["string", "int", "float", "bool_true", "bool_false"],
)
def test_filter_matches_correct_value(
self, session: Session, match_meta, nomatch_meta, filter_key, filter_val
):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "match", match_meta)
_make_reference(session, asset, "nomatch", nomatch_meta)
session.commit()
refs, _, total = list_references_page(
session, metadata_filter={filter_key: filter_val}
)
assert total == 1
assert refs[0].name == "match"
@pytest.mark.parametrize(
"stored_meta,filter_key,filter_val",
[
# String no match
({"category": "models"}, "category", "other"),
# Int no match
({"epoch": 5}, "epoch", 99),
# Float no match
({"score": 0.5}, "score", 0.99),
],
ids=["string_no_match", "int_no_match", "float_no_match"],
)
def test_filter_returns_empty_when_no_match(
self, session: Session, stored_meta, filter_key, filter_val
):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "item", stored_meta)
session.commit()
refs, _, total = list_references_page(
session, metadata_filter={filter_key: filter_val}
)
assert total == 0
class TestMetadataFilterNull:
"""Tests for null/missing key filtering."""
@pytest.mark.parametrize(
"match_name,match_meta,nomatch_name,nomatch_meta,filter_key",
[
# Null matches missing key
("missing_key", {}, "has_key", {"optional": "value"}, "optional"),
# Null matches explicit null
("explicit_null", {"nullable": None}, "has_value", {"nullable": "present"}, "nullable"),
],
ids=["missing_key", "explicit_null"],
)
def test_null_filter_matches(
self, session: Session, match_name, match_meta, nomatch_name, nomatch_meta, filter_key
):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, match_name, match_meta)
_make_reference(session, asset, nomatch_name, nomatch_meta)
session.commit()
refs, _, total = list_references_page(session, metadata_filter={filter_key: None})
assert total == 1
assert refs[0].name == match_name
class TestMetadataFilterList:
"""Tests for list-based (OR) filtering."""
def test_filter_by_list_matches_any(self, session: Session):
"""List values should match ANY of the values (OR)."""
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "cat_a", {"category": "a"})
_make_reference(session, asset, "cat_b", {"category": "b"})
_make_reference(session, asset, "cat_c", {"category": "c"})
session.commit()
refs, _, total = list_references_page(session, metadata_filter={"category": ["a", "b"]})
assert total == 2
names = {r.name for r in refs}
assert names == {"cat_a", "cat_b"}
class TestMetadataFilterMultipleKeys:
"""Tests for multiple filter keys (AND semantics)."""
def test_multiple_keys_must_all_match(self, session: Session):
"""Multiple keys should ALL match (AND)."""
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "match", {"type": "model", "version": 2})
_make_reference(session, asset, "wrong_type", {"type": "config", "version": 2})
_make_reference(session, asset, "wrong_version", {"type": "model", "version": 1})
session.commit()
refs, _, total = list_references_page(
session, metadata_filter={"type": "model", "version": 2}
)
assert total == 1
assert refs[0].name == "match"
class TestMetadataFilterEmptyDict:
"""Tests for empty filter behavior."""
def test_empty_filter_returns_all(self, session: Session):
asset = _make_asset(session, "hash1")
_make_reference(session, asset, "a", {"key": "val"})
_make_reference(session, asset, "b", {})
session.commit()
refs, _, total = list_references_page(session, metadata_filter={})
assert total == 2

View File

@@ -0,0 +1,366 @@
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference, AssetReferenceTag, AssetReferenceMeta, Tag
from app.assets.database.queries import (
ensure_tags_exist,
get_reference_tags,
set_reference_tags,
add_tags_to_reference,
remove_tags_from_reference,
add_missing_tag_for_asset_id,
remove_missing_tag_for_asset_id,
list_tags_with_usage,
bulk_insert_tags_and_meta,
)
from app.assets.helpers import get_utc_now
def _make_asset(session: Session, hash_val: str | None = None) -> Asset:
asset = Asset(hash=hash_val, size_bytes=1024)
session.add(asset)
session.flush()
return asset
def _make_reference(session: Session, asset: Asset, name: str = "test", owner_id: str = "") -> AssetReference:
now = get_utc_now()
ref = AssetReference(
owner_id=owner_id,
name=name,
asset_id=asset.id,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
class TestEnsureTagsExist:
def test_creates_new_tags(self, session: Session):
ensure_tags_exist(session, ["alpha", "beta"], tag_type="user")
session.commit()
tags = session.query(Tag).all()
assert {t.name for t in tags} == {"alpha", "beta"}
def test_is_idempotent(self, session: Session):
ensure_tags_exist(session, ["alpha"], tag_type="user")
ensure_tags_exist(session, ["alpha"], tag_type="user")
session.commit()
assert session.query(Tag).count() == 1
def test_normalizes_tags(self, session: Session):
ensure_tags_exist(session, [" ALPHA ", "Beta", "alpha"])
session.commit()
tags = session.query(Tag).all()
assert {t.name for t in tags} == {"alpha", "beta"}
def test_empty_list_is_noop(self, session: Session):
ensure_tags_exist(session, [])
session.commit()
assert session.query(Tag).count() == 0
def test_tag_type_is_set(self, session: Session):
ensure_tags_exist(session, ["system-tag"], tag_type="system")
session.commit()
tag = session.query(Tag).filter_by(name="system-tag").one()
assert tag.tag_type == "system"
class TestGetReferenceTags:
def test_returns_empty_for_no_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
tags = get_reference_tags(session, reference_id=ref.id)
assert tags == []
def test_returns_tags_for_reference(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["tag1", "tag2"])
session.add_all([
AssetReferenceTag(asset_reference_id=ref.id, tag_name="tag1", origin="manual", added_at=get_utc_now()),
AssetReferenceTag(asset_reference_id=ref.id, tag_name="tag2", origin="manual", added_at=get_utc_now()),
])
session.flush()
tags = get_reference_tags(session, reference_id=ref.id)
assert set(tags) == {"tag1", "tag2"}
class TestSetReferenceTags:
def test_adds_new_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
result = set_reference_tags(session, reference_id=ref.id, tags=["a", "b"])
session.commit()
assert set(result["added"]) == {"a", "b"}
assert result["removed"] == []
assert set(result["total"]) == {"a", "b"}
def test_removes_old_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
set_reference_tags(session, reference_id=ref.id, tags=["a", "b", "c"])
result = set_reference_tags(session, reference_id=ref.id, tags=["a"])
session.commit()
assert result["added"] == []
assert set(result["removed"]) == {"b", "c"}
assert result["total"] == ["a"]
def test_replaces_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
set_reference_tags(session, reference_id=ref.id, tags=["a", "b"])
result = set_reference_tags(session, reference_id=ref.id, tags=["b", "c"])
session.commit()
assert result["added"] == ["c"]
assert result["removed"] == ["a"]
assert set(result["total"]) == {"b", "c"}
class TestAddTagsToReference:
def test_adds_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
result = add_tags_to_reference(session, reference_id=ref.id, tags=["x", "y"])
session.commit()
assert set(result["added"]) == {"x", "y"}
assert result["already_present"] == []
def test_reports_already_present(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["x"])
result = add_tags_to_reference(session, reference_id=ref.id, tags=["x", "y"])
session.commit()
assert result["added"] == ["y"]
assert result["already_present"] == ["x"]
def test_raises_for_missing_reference(self, session: Session):
with pytest.raises(ValueError, match="not found"):
add_tags_to_reference(session, reference_id="nonexistent", tags=["x"])
class TestRemoveTagsFromReference:
def test_removes_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["a", "b", "c"])
result = remove_tags_from_reference(session, reference_id=ref.id, tags=["a", "b"])
session.commit()
assert set(result["removed"]) == {"a", "b"}
assert result["not_present"] == []
assert result["total_tags"] == ["c"]
def test_reports_not_present(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["a"])
result = remove_tags_from_reference(session, reference_id=ref.id, tags=["a", "x"])
session.commit()
assert result["removed"] == ["a"]
assert result["not_present"] == ["x"]
def test_raises_for_missing_reference(self, session: Session):
with pytest.raises(ValueError, match="not found"):
remove_tags_from_reference(session, reference_id="nonexistent", tags=["x"])
class TestMissingTagFunctions:
def test_add_missing_tag_for_asset_id(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["missing"], tag_type="system")
add_missing_tag_for_asset_id(session, asset_id=asset.id)
session.commit()
tags = get_reference_tags(session, reference_id=ref.id)
assert "missing" in tags
def test_add_missing_tag_is_idempotent(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["missing"], tag_type="system")
add_missing_tag_for_asset_id(session, asset_id=asset.id)
add_missing_tag_for_asset_id(session, asset_id=asset.id)
session.commit()
links = session.query(AssetReferenceTag).filter_by(asset_reference_id=ref.id, tag_name="missing").all()
assert len(links) == 1
def test_remove_missing_tag_for_asset_id(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["missing"], tag_type="system")
add_missing_tag_for_asset_id(session, asset_id=asset.id)
remove_missing_tag_for_asset_id(session, asset_id=asset.id)
session.commit()
tags = get_reference_tags(session, reference_id=ref.id)
assert "missing" not in tags
class TestListTagsWithUsage:
def test_returns_tags_with_counts(self, session: Session):
ensure_tags_exist(session, ["used", "unused"])
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["used"])
session.commit()
rows, total = list_tags_with_usage(session)
tag_dict = {name: count for name, _, count in rows}
assert tag_dict["used"] == 1
assert tag_dict["unused"] == 0
assert total == 2
def test_exclude_zero_counts(self, session: Session):
ensure_tags_exist(session, ["used", "unused"])
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["used"])
session.commit()
rows, total = list_tags_with_usage(session, include_zero=False)
tag_names = {name for name, _, _ in rows}
assert "used" in tag_names
assert "unused" not in tag_names
def test_prefix_filter(self, session: Session):
ensure_tags_exist(session, ["alpha", "beta", "alphabet"])
session.commit()
rows, total = list_tags_with_usage(session, prefix="alph")
tag_names = {name for name, _, _ in rows}
assert tag_names == {"alpha", "alphabet"}
def test_order_by_name(self, session: Session):
ensure_tags_exist(session, ["zebra", "alpha", "middle"])
session.commit()
rows, _ = list_tags_with_usage(session, order="name_asc")
names = [name for name, _, _ in rows]
assert names == ["alpha", "middle", "zebra"]
def test_owner_visibility(self, session: Session):
ensure_tags_exist(session, ["shared-tag", "owner-tag"])
asset = _make_asset(session, "hash1")
shared_ref = _make_reference(session, asset, name="shared", owner_id="")
owner_ref = _make_reference(session, asset, name="owned", owner_id="user1")
add_tags_to_reference(session, reference_id=shared_ref.id, tags=["shared-tag"])
add_tags_to_reference(session, reference_id=owner_ref.id, tags=["owner-tag"])
session.commit()
# Empty owner sees only shared
rows, _ = list_tags_with_usage(session, owner_id="", include_zero=False)
tag_dict = {name: count for name, _, count in rows}
assert tag_dict.get("shared-tag", 0) == 1
assert tag_dict.get("owner-tag", 0) == 0
# User1 sees both
rows, _ = list_tags_with_usage(session, owner_id="user1", include_zero=False)
tag_dict = {name: count for name, _, count in rows}
assert tag_dict.get("shared-tag", 0) == 1
assert tag_dict.get("owner-tag", 0) == 1
class TestBulkInsertTagsAndMeta:
def test_inserts_tags(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["bulk-tag1", "bulk-tag2"])
session.commit()
now = get_utc_now()
tag_rows = [
{"asset_reference_id": ref.id, "tag_name": "bulk-tag1", "origin": "manual", "added_at": now},
{"asset_reference_id": ref.id, "tag_name": "bulk-tag2", "origin": "manual", "added_at": now},
]
bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=[])
session.commit()
tags = get_reference_tags(session, reference_id=ref.id)
assert set(tags) == {"bulk-tag1", "bulk-tag2"}
def test_inserts_meta(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
session.commit()
meta_rows = [
{
"asset_reference_id": ref.id,
"key": "meta-key",
"ordinal": 0,
"val_str": "meta-value",
"val_num": None,
"val_bool": None,
"val_json": None,
},
]
bulk_insert_tags_and_meta(session, tag_rows=[], meta_rows=meta_rows)
session.commit()
meta = session.query(AssetReferenceMeta).filter_by(asset_reference_id=ref.id).all()
assert len(meta) == 1
assert meta[0].key == "meta-key"
assert meta[0].val_str == "meta-value"
def test_ignores_conflicts(self, session: Session):
asset = _make_asset(session, "hash1")
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["existing-tag"])
add_tags_to_reference(session, reference_id=ref.id, tags=["existing-tag"])
session.commit()
now = get_utc_now()
tag_rows = [
{"asset_reference_id": ref.id, "tag_name": "existing-tag", "origin": "duplicate", "added_at": now},
]
bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=[])
session.commit()
# Should still have only one tag link
links = session.query(AssetReferenceTag).filter_by(asset_reference_id=ref.id, tag_name="existing-tag").all()
assert len(links) == 1
# Origin should be original, not overwritten
assert links[0].origin == "manual"
def test_empty_lists_is_noop(self, session: Session):
bulk_insert_tags_and_meta(session, tag_rows=[], meta_rows=[])
assert session.query(AssetReferenceTag).count() == 0
assert session.query(AssetReferenceMeta).count() == 0