refactor(assets): extract services layer from manager and helpers

- Create services/ package: asset_management, bulk_ingest, file_utils, hashing, ingest, metadata_extract, path_utils, schemas, tagging
- Move business logic out of helpers.py into service modules
- Remove manager.py and hashing.py (absorbed into services)
- Add blake3 to requirements.txt
- Add comprehensive service-layer tests

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c9209-37af-757a-b6e4-af59b4267362
This commit is contained in:
Luke Mino-Altherr
2026-02-24 11:59:03 -08:00
parent 21a9ab7f48
commit 698c9abe54
23 changed files with 3159 additions and 878 deletions

View File

@@ -0,0 +1 @@
# Service layer tests

View File

@@ -0,0 +1,54 @@
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.assets.database.models import Base
@pytest.fixture(autouse=True)
def autoclean_unit_test_assets():
"""Override parent autouse fixture - service unit tests don't need server cleanup."""
yield
@pytest.fixture
def db_engine():
"""In-memory SQLite engine for fast unit tests."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
return engine
@pytest.fixture
def session(db_engine):
"""Session fixture for tests that need direct DB access."""
with Session(db_engine) as sess:
yield sess
@pytest.fixture
def mock_create_session(db_engine):
"""Patch create_session to use our in-memory database."""
from contextlib import contextmanager
from sqlalchemy.orm import Session as SASession
@contextmanager
def _create_session():
with SASession(db_engine) as sess:
yield sess
with patch("app.assets.services.ingest.create_session", _create_session), \
patch("app.assets.services.asset_management.create_session", _create_session), \
patch("app.assets.services.tagging.create_session", _create_session):
yield _create_session
@pytest.fixture
def temp_dir():
"""Temporary directory for file operations."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)

View File

@@ -0,0 +1,264 @@
"""Tests for asset_management services."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference
from app.assets.database.queries import ensure_tags_exist, add_tags_to_reference
from app.assets.helpers import get_utc_now
from app.assets.services import (
get_asset_detail,
update_asset_metadata,
delete_asset_reference,
set_asset_preview,
)
def _make_asset(session: Session, hash_val: str = "blake3:test", size: int = 1024) -> Asset:
asset = Asset(hash=hash_val, size_bytes=size, mime_type="application/octet-stream")
session.add(asset)
session.flush()
return asset
def _make_reference(
session: Session,
asset: Asset,
name: str = "test",
owner_id: str = "",
) -> AssetReference:
now = get_utc_now()
ref = AssetReference(
owner_id=owner_id,
name=name,
asset_id=asset.id,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
class TestGetAssetDetail:
def test_returns_none_for_nonexistent(self, mock_create_session):
result = get_asset_detail(reference_id="nonexistent")
assert result is None
def test_returns_asset_with_tags(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, name="test.bin")
ensure_tags_exist(session, ["alpha", "beta"])
add_tags_to_reference(session, reference_id=ref.id, tags=["alpha", "beta"])
session.commit()
result = get_asset_detail(reference_id=ref.id)
assert result is not None
assert result.ref.id == ref.id
assert result.asset.hash == asset.hash
assert set(result.tags) == {"alpha", "beta"}
def test_respects_owner_visibility(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
# Wrong owner cannot see
result = get_asset_detail(reference_id=ref.id, owner_id="user2")
assert result is None
# Correct owner can see
result = get_asset_detail(reference_id=ref.id, owner_id="user1")
assert result is not None
class TestUpdateAssetMetadata:
def test_updates_name(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, name="old_name.bin")
ref_id = ref.id
session.commit()
update_asset_metadata(
reference_id=ref_id,
name="new_name.bin",
)
# Verify by re-fetching from DB
session.expire_all()
updated_ref = session.get(AssetReference, ref_id)
assert updated_ref.name == "new_name.bin"
def test_updates_tags(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["old"])
add_tags_to_reference(session, reference_id=ref.id, tags=["old"])
session.commit()
result = update_asset_metadata(
reference_id=ref.id,
tags=["new1", "new2"],
)
assert set(result.tags) == {"new1", "new2"}
assert "old" not in result.tags
def test_updates_user_metadata(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ref_id = ref.id
session.commit()
update_asset_metadata(
reference_id=ref_id,
user_metadata={"key": "value", "num": 42},
)
# Verify by re-fetching from DB
session.expire_all()
updated_ref = session.get(AssetReference, ref_id)
assert updated_ref.user_metadata["key"] == "value"
assert updated_ref.user_metadata["num"] == 42
def test_raises_for_nonexistent(self, mock_create_session):
with pytest.raises(ValueError, match="not found"):
update_asset_metadata(reference_id="nonexistent", name="fail")
def test_raises_for_wrong_owner(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
with pytest.raises(PermissionError, match="not owner"):
update_asset_metadata(
reference_id=ref.id,
name="new",
owner_id="user2",
)
class TestDeleteAssetReference:
def test_deletes_reference(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ref_id = ref.id
session.commit()
result = delete_asset_reference(
reference_id=ref_id,
owner_id="",
delete_content_if_orphan=False,
)
assert result is True
assert session.get(AssetReference, ref_id) is None
def test_returns_false_for_nonexistent(self, mock_create_session):
result = delete_asset_reference(
reference_id="nonexistent",
owner_id="",
)
assert result is False
def test_returns_false_for_wrong_owner(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
ref_id = ref.id
session.commit()
result = delete_asset_reference(
reference_id=ref_id,
owner_id="user2",
)
assert result is False
assert session.get(AssetReference, ref_id) is not None
def test_keeps_asset_if_other_references_exist(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref1 = _make_reference(session, asset, name="ref1")
_make_reference(session, asset, name="ref2") # Second ref keeps asset alive
asset_id = asset.id
session.commit()
delete_asset_reference(
reference_id=ref1.id,
owner_id="",
delete_content_if_orphan=True,
)
# Asset should still exist
assert session.get(Asset, asset_id) is not None
def test_deletes_orphaned_asset(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
asset_id = asset.id
ref_id = ref.id
session.commit()
delete_asset_reference(
reference_id=ref_id,
owner_id="",
delete_content_if_orphan=True,
)
# Both ref and asset should be gone
assert session.get(AssetReference, ref_id) is None
assert session.get(Asset, asset_id) is None
class TestSetAssetPreview:
def test_sets_preview(self, mock_create_session, session: Session):
asset = _make_asset(session, hash_val="blake3:main")
preview_asset = _make_asset(session, hash_val="blake3:preview")
ref = _make_reference(session, asset)
ref_id = ref.id
preview_id = preview_asset.id
session.commit()
set_asset_preview(
reference_id=ref_id,
preview_asset_id=preview_id,
)
# Verify by re-fetching from DB
session.expire_all()
updated_ref = session.get(AssetReference, ref_id)
assert updated_ref.preview_id == preview_id
def test_clears_preview(self, mock_create_session, session: Session):
asset = _make_asset(session)
preview_asset = _make_asset(session, hash_val="blake3:preview")
ref = _make_reference(session, asset)
ref.preview_id = preview_asset.id
ref_id = ref.id
session.commit()
set_asset_preview(
reference_id=ref_id,
preview_asset_id=None,
)
# Verify by re-fetching from DB
session.expire_all()
updated_ref = session.get(AssetReference, ref_id)
assert updated_ref.preview_id is None
def test_raises_for_nonexistent_ref(self, mock_create_session):
with pytest.raises(ValueError, match="not found"):
set_asset_preview(reference_id="nonexistent")
def test_raises_for_wrong_owner(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
with pytest.raises(PermissionError, match="not owner"):
set_asset_preview(
reference_id=ref.id,
preview_asset_id=None,
owner_id="user2",
)

View File

@@ -0,0 +1,137 @@
"""Tests for bulk ingest services."""
from pathlib import Path
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference
from app.assets.services.bulk_ingest import SeedAssetSpec, batch_insert_seed_assets
class TestBatchInsertSeedAssets:
def test_populates_mime_type_for_model_files(self, session: Session, temp_dir: Path):
"""Verify mime_type is stored in the Asset table for model files."""
file_path = temp_dir / "model.safetensors"
file_path.write_bytes(b"fake safetensors content")
specs: list[SeedAssetSpec] = [
{
"abs_path": str(file_path),
"size_bytes": 24,
"mtime_ns": 1234567890000000000,
"info_name": "Test Model",
"tags": ["models"],
"fname": "model.safetensors",
"metadata": None,
"hash": None,
"mime_type": "application/safetensors",
}
]
result = batch_insert_seed_assets(session, specs=specs, owner_id="")
assert result.inserted_refs == 1
# Verify Asset has mime_type populated
assets = session.query(Asset).all()
assert len(assets) == 1
assert assets[0].mime_type == "application/safetensors"
def test_mime_type_none_when_not_provided(self, session: Session, temp_dir: Path):
"""Verify mime_type is None when not provided in spec."""
file_path = temp_dir / "unknown.bin"
file_path.write_bytes(b"binary data")
specs: list[SeedAssetSpec] = [
{
"abs_path": str(file_path),
"size_bytes": 11,
"mtime_ns": 1234567890000000000,
"info_name": "Unknown File",
"tags": [],
"fname": "unknown.bin",
"metadata": None,
"hash": None,
"mime_type": None,
}
]
result = batch_insert_seed_assets(session, specs=specs, owner_id="")
assert result.inserted_refs == 1
assets = session.query(Asset).all()
assert len(assets) == 1
assert assets[0].mime_type is None
def test_various_model_mime_types(self, session: Session, temp_dir: Path):
"""Verify various model file types get correct mime_type."""
test_cases = [
("model.safetensors", "application/safetensors"),
("model.pt", "application/pytorch"),
("model.ckpt", "application/pickle"),
("model.gguf", "application/gguf"),
]
specs: list[SeedAssetSpec] = []
for filename, mime_type in test_cases:
file_path = temp_dir / filename
file_path.write_bytes(b"content")
specs.append(
{
"abs_path": str(file_path),
"size_bytes": 7,
"mtime_ns": 1234567890000000000,
"info_name": filename,
"tags": [],
"fname": filename,
"metadata": None,
"hash": None,
"mime_type": mime_type,
}
)
result = batch_insert_seed_assets(session, specs=specs, owner_id="")
assert result.inserted_refs == len(test_cases)
for filename, expected_mime in test_cases:
ref = session.query(AssetReference).filter_by(name=filename).first()
assert ref is not None
asset = session.query(Asset).filter_by(id=ref.asset_id).first()
assert asset.mime_type == expected_mime, f"Expected {expected_mime} for {filename}, got {asset.mime_type}"
class TestMetadataExtraction:
def test_extracts_mime_type_for_model_files(self, temp_dir: Path):
"""Verify metadata extraction returns correct mime_type for model files."""
from app.assets.services.metadata_extract import extract_file_metadata
file_path = temp_dir / "model.safetensors"
file_path.write_bytes(b"fake safetensors content")
meta = extract_file_metadata(str(file_path))
assert meta.content_type == "application/safetensors"
def test_mime_type_for_various_model_formats(self, temp_dir: Path):
"""Verify various model file types get correct mime_type from metadata."""
from app.assets.services.metadata_extract import extract_file_metadata
test_cases = [
("model.safetensors", "application/safetensors"),
("model.sft", "application/safetensors"),
("model.pt", "application/pytorch"),
("model.pth", "application/pytorch"),
("model.ckpt", "application/pickle"),
("model.pkl", "application/pickle"),
("model.gguf", "application/gguf"),
]
for filename, expected_mime in test_cases:
file_path = temp_dir / filename
file_path.write_bytes(b"content")
meta = extract_file_metadata(str(file_path))
assert meta.content_type == expected_mime, f"Expected {expected_mime} for {filename}, got {meta.content_type}"

View File

@@ -0,0 +1,252 @@
"""Tests for asset enrichment (mime_type and hash population)."""
from pathlib import Path
from unittest.mock import patch
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference
from app.assets.scanner import (
ENRICHMENT_HASHED,
ENRICHMENT_METADATA,
ENRICHMENT_STUB,
enrich_asset,
)
def _create_stub_asset(
session: Session,
file_path: str,
asset_id: str = "test-asset-id",
reference_id: str = "test-ref-id",
name: str | None = None,
) -> tuple[Asset, AssetReference]:
"""Create a stub asset with reference for testing enrichment."""
asset = Asset(
id=asset_id,
hash=None,
size_bytes=100,
mime_type=None,
)
session.add(asset)
session.flush()
ref = AssetReference(
id=reference_id,
asset_id=asset_id,
name=name or f"test-asset-{asset_id}",
owner_id="system",
file_path=file_path,
mtime_ns=1234567890000000000,
enrichment_level=ENRICHMENT_STUB,
)
session.add(ref)
session.flush()
return asset, ref
class TestEnrichAsset:
def test_extracts_mime_type_and_updates_asset(
self, db_engine, temp_dir: Path, session: Session
):
"""Verify mime_type is written to the Asset table during enrichment."""
file_path = temp_dir / "model.safetensors"
file_path.write_bytes(b"\x00" * 100)
asset, ref = _create_stub_asset(
session, str(file_path), "asset-1", "ref-1"
)
session.commit()
with patch("app.assets.scanner.create_session") as mock_cs:
from contextlib import contextmanager
@contextmanager
def _create_session():
with Session(db_engine) as sess:
yield sess
mock_cs.side_effect = _create_session
new_level = enrich_asset(
file_path=str(file_path),
reference_id=ref.id,
asset_id=asset.id,
extract_metadata=True,
compute_hash=False,
)
assert new_level == ENRICHMENT_METADATA
session.expire_all()
updated_asset = session.get(Asset, "asset-1")
assert updated_asset is not None
assert updated_asset.mime_type == "application/safetensors"
def test_computes_hash_and_updates_asset(
self, db_engine, temp_dir: Path, session: Session
):
"""Verify hash is written to the Asset table during enrichment."""
file_path = temp_dir / "data.bin"
file_path.write_bytes(b"test content for hashing")
asset, ref = _create_stub_asset(
session, str(file_path), "asset-2", "ref-2"
)
session.commit()
with patch("app.assets.scanner.create_session") as mock_cs:
from contextlib import contextmanager
@contextmanager
def _create_session():
with Session(db_engine) as sess:
yield sess
mock_cs.side_effect = _create_session
new_level = enrich_asset(
file_path=str(file_path),
reference_id=ref.id,
asset_id=asset.id,
extract_metadata=True,
compute_hash=True,
)
assert new_level == ENRICHMENT_HASHED
session.expire_all()
updated_asset = session.get(Asset, "asset-2")
assert updated_asset is not None
assert updated_asset.hash is not None
assert updated_asset.hash.startswith("blake3:")
def test_enrichment_updates_both_mime_and_hash(
self, db_engine, temp_dir: Path, session: Session
):
"""Verify both mime_type and hash are set when full enrichment runs."""
file_path = temp_dir / "model.safetensors"
file_path.write_bytes(b"\x00" * 50)
asset, ref = _create_stub_asset(
session, str(file_path), "asset-3", "ref-3"
)
session.commit()
with patch("app.assets.scanner.create_session") as mock_cs:
from contextlib import contextmanager
@contextmanager
def _create_session():
with Session(db_engine) as sess:
yield sess
mock_cs.side_effect = _create_session
enrich_asset(
file_path=str(file_path),
reference_id=ref.id,
asset_id=asset.id,
extract_metadata=True,
compute_hash=True,
)
session.expire_all()
updated_asset = session.get(Asset, "asset-3")
assert updated_asset is not None
assert updated_asset.mime_type == "application/safetensors"
assert updated_asset.hash is not None
assert updated_asset.hash.startswith("blake3:")
def test_missing_file_returns_stub_level(
self, db_engine, temp_dir: Path, session: Session
):
"""Verify missing files don't cause errors and return STUB level."""
file_path = temp_dir / "nonexistent.bin"
asset, ref = _create_stub_asset(
session, str(file_path), "asset-4", "ref-4"
)
session.commit()
with patch("app.assets.scanner.create_session") as mock_cs:
from contextlib import contextmanager
@contextmanager
def _create_session():
with Session(db_engine) as sess:
yield sess
mock_cs.side_effect = _create_session
new_level = enrich_asset(
file_path=str(file_path),
reference_id=ref.id,
asset_id=asset.id,
extract_metadata=True,
compute_hash=True,
)
assert new_level == ENRICHMENT_STUB
session.expire_all()
updated_asset = session.get(Asset, "asset-4")
assert updated_asset.mime_type is None
assert updated_asset.hash is None
def test_duplicate_hash_merges_into_existing_asset(
self, db_engine, temp_dir: Path, session: Session
):
"""Verify duplicate files merge into existing asset instead of failing."""
file_path_1 = temp_dir / "file1.bin"
file_path_2 = temp_dir / "file2.bin"
content = b"identical content"
file_path_1.write_bytes(content)
file_path_2.write_bytes(content)
asset1, ref1 = _create_stub_asset(
session, str(file_path_1), "asset-dup-1", "ref-dup-1"
)
asset2, ref2 = _create_stub_asset(
session, str(file_path_2), "asset-dup-2", "ref-dup-2"
)
session.commit()
with patch("app.assets.scanner.create_session") as mock_cs:
from contextlib import contextmanager
@contextmanager
def _create_session():
with Session(db_engine) as sess:
yield sess
mock_cs.side_effect = _create_session
enrich_asset(
file_path=str(file_path_1),
reference_id=ref1.id,
asset_id=asset1.id,
extract_metadata=True,
compute_hash=True,
)
enrich_asset(
file_path=str(file_path_2),
reference_id=ref2.id,
asset_id=asset2.id,
extract_metadata=True,
compute_hash=True,
)
session.expire_all()
updated_asset1 = session.get(Asset, "asset-dup-1")
assert updated_asset1 is not None
assert updated_asset1.hash is not None
updated_asset2 = session.get(Asset, "asset-dup-2")
assert updated_asset2 is None
updated_ref2 = session.get(AssetReference, "ref-dup-2")
assert updated_ref2 is not None
assert updated_ref2.asset_id == "asset-dup-1"

View File

@@ -0,0 +1,229 @@
"""Tests for ingest services."""
from pathlib import Path
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference, Tag
from app.assets.database.queries import get_reference_tags
from app.assets.services.ingest import _ingest_file_from_path, _register_existing_asset
class TestIngestFileFromPath:
def test_creates_asset_and_reference(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "test_file.bin"
file_path.write_bytes(b"test content")
result = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:abc123",
size_bytes=12,
mtime_ns=1234567890000000000,
mime_type="application/octet-stream",
)
assert result.asset_created is True
assert result.ref_created is True
assert result.reference_id is not None
# Verify DB state
assets = session.query(Asset).all()
assert len(assets) == 1
assert assets[0].hash == "blake3:abc123"
refs = session.query(AssetReference).all()
assert len(refs) == 1
assert refs[0].file_path == str(file_path)
def test_creates_reference_when_name_provided(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "model.safetensors"
file_path.write_bytes(b"model data")
result = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:def456",
size_bytes=10,
mtime_ns=1234567890000000000,
mime_type="application/octet-stream",
info_name="My Model",
owner_id="user1",
)
assert result.asset_created is True
assert result.reference_id is not None
ref = session.query(AssetReference).first()
assert ref is not None
assert ref.name == "My Model"
assert ref.owner_id == "user1"
def test_creates_tags_when_provided(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "tagged.bin"
file_path.write_bytes(b"data")
result = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:ghi789",
size_bytes=4,
mtime_ns=1234567890000000000,
info_name="Tagged Asset",
tags=["models", "checkpoints"],
)
assert result.reference_id is not None
# Verify tags were created and linked
tags = session.query(Tag).all()
tag_names = {t.name for t in tags}
assert "models" in tag_names
assert "checkpoints" in tag_names
ref_tags = get_reference_tags(session, reference_id=result.reference_id)
assert set(ref_tags) == {"models", "checkpoints"}
def test_idempotent_upsert(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "dup.bin"
file_path.write_bytes(b"content")
# First ingest
r1 = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:repeat",
size_bytes=7,
mtime_ns=1234567890000000000,
)
assert r1.asset_created is True
# Second ingest with same hash - should update, not create
r2 = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:repeat",
size_bytes=7,
mtime_ns=1234567890000000001, # different mtime
)
assert r2.asset_created is False
assert r2.ref_created is False
assert r2.ref_updated is True
# Still only one asset
assets = session.query(Asset).all()
assert len(assets) == 1
def test_validates_preview_id(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "with_preview.bin"
file_path.write_bytes(b"data")
# Create a preview asset first
preview_asset = Asset(hash="blake3:preview", size_bytes=100)
session.add(preview_asset)
session.commit()
preview_id = preview_asset.id
result = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:main",
size_bytes=4,
mtime_ns=1234567890000000000,
info_name="With Preview",
preview_id=preview_id,
)
assert result.reference_id is not None
ref = session.query(AssetReference).filter_by(id=result.reference_id).first()
assert ref.preview_id == preview_id
def test_invalid_preview_id_is_cleared(self, mock_create_session, temp_dir: Path, session: Session):
file_path = temp_dir / "bad_preview.bin"
file_path.write_bytes(b"data")
result = _ingest_file_from_path(
abs_path=str(file_path),
asset_hash="blake3:badpreview",
size_bytes=4,
mtime_ns=1234567890000000000,
info_name="Bad Preview",
preview_id="nonexistent-uuid",
)
assert result.reference_id is not None
ref = session.query(AssetReference).filter_by(id=result.reference_id).first()
assert ref.preview_id is None
class TestRegisterExistingAsset:
def test_creates_reference_for_existing_asset(self, mock_create_session, session: Session):
# Create existing asset
asset = Asset(hash="blake3:existing", size_bytes=1024, mime_type="image/png")
session.add(asset)
session.commit()
result = _register_existing_asset(
asset_hash="blake3:existing",
name="Registered Asset",
user_metadata={"key": "value"},
tags=["models"],
)
assert result.created is True
assert "models" in result.tags
# Verify by re-fetching from DB
session.expire_all()
refs = session.query(AssetReference).filter_by(name="Registered Asset").all()
assert len(refs) == 1
def test_creates_new_reference_even_with_same_name(self, mock_create_session, session: Session):
# Create asset and reference
asset = Asset(hash="blake3:withref", size_bytes=512)
session.add(asset)
session.flush()
from app.assets.helpers import get_utc_now
ref = AssetReference(
owner_id="",
name="Existing Ref",
asset_id=asset.id,
created_at=get_utc_now(),
updated_at=get_utc_now(),
last_access_time=get_utc_now(),
)
session.add(ref)
session.flush()
ref_id = ref.id
session.commit()
result = _register_existing_asset(
asset_hash="blake3:withref",
name="Existing Ref",
owner_id="",
)
# Multiple files with same name are allowed
assert result.created is True
# Verify two AssetReferences exist for this name
session.expire_all()
refs = session.query(AssetReference).filter_by(name="Existing Ref").all()
assert len(refs) == 2
assert ref_id in [r.id for r in refs]
def test_raises_for_nonexistent_hash(self, mock_create_session):
with pytest.raises(ValueError, match="No asset with hash"):
_register_existing_asset(
asset_hash="blake3:doesnotexist",
name="Fail",
)
def test_applies_tags_to_new_reference(self, mock_create_session, session: Session):
asset = Asset(hash="blake3:tagged", size_bytes=256)
session.add(asset)
session.commit()
result = _register_existing_asset(
asset_hash="blake3:tagged",
name="Tagged Ref",
tags=["alpha", "beta"],
)
assert result.created is True
assert set(result.tags) == {"alpha", "beta"}

View File

@@ -0,0 +1,197 @@
"""Tests for tagging services."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference
from app.assets.database.queries import ensure_tags_exist, add_tags_to_reference
from app.assets.helpers import get_utc_now
from app.assets.services import apply_tags, remove_tags, list_tags
def _make_asset(session: Session, hash_val: str = "blake3:test") -> Asset:
asset = Asset(hash=hash_val, size_bytes=1024)
session.add(asset)
session.flush()
return asset
def _make_reference(
session: Session,
asset: Asset,
name: str = "test",
owner_id: str = "",
) -> AssetReference:
now = get_utc_now()
ref = AssetReference(
owner_id=owner_id,
name=name,
asset_id=asset.id,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
class TestApplyTags:
def test_adds_new_tags(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
session.commit()
result = apply_tags(
reference_id=ref.id,
tags=["alpha", "beta"],
)
assert set(result.added) == {"alpha", "beta"}
assert result.already_present == []
assert set(result.total_tags) == {"alpha", "beta"}
def test_reports_already_present(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["existing"])
add_tags_to_reference(session, reference_id=ref.id, tags=["existing"])
session.commit()
result = apply_tags(
reference_id=ref.id,
tags=["existing", "new"],
)
assert result.added == ["new"]
assert result.already_present == ["existing"]
def test_raises_for_nonexistent_ref(self, mock_create_session):
with pytest.raises(ValueError, match="not found"):
apply_tags(reference_id="nonexistent", tags=["x"])
def test_raises_for_wrong_owner(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
with pytest.raises(PermissionError, match="not owner"):
apply_tags(
reference_id=ref.id,
tags=["new"],
owner_id="user2",
)
class TestRemoveTags:
def test_removes_tags(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["a", "b", "c"])
add_tags_to_reference(session, reference_id=ref.id, tags=["a", "b", "c"])
session.commit()
result = remove_tags(
reference_id=ref.id,
tags=["a", "b"],
)
assert set(result.removed) == {"a", "b"}
assert result.not_present == []
assert result.total_tags == ["c"]
def test_reports_not_present(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset)
ensure_tags_exist(session, ["present"])
add_tags_to_reference(session, reference_id=ref.id, tags=["present"])
session.commit()
result = remove_tags(
reference_id=ref.id,
tags=["present", "absent"],
)
assert result.removed == ["present"]
assert result.not_present == ["absent"]
def test_raises_for_nonexistent_ref(self, mock_create_session):
with pytest.raises(ValueError, match="not found"):
remove_tags(reference_id="nonexistent", tags=["x"])
def test_raises_for_wrong_owner(self, mock_create_session, session: Session):
asset = _make_asset(session)
ref = _make_reference(session, asset, owner_id="user1")
session.commit()
with pytest.raises(PermissionError, match="not owner"):
remove_tags(
reference_id=ref.id,
tags=["x"],
owner_id="user2",
)
class TestListTags:
def test_returns_tags_with_counts(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["used", "unused"])
asset = _make_asset(session)
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["used"])
session.commit()
rows, total = list_tags()
tag_dict = {name: count for name, _, count in rows}
assert tag_dict["used"] == 1
assert tag_dict["unused"] == 0
assert total == 2
def test_excludes_zero_counts(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["used", "unused"])
asset = _make_asset(session)
ref = _make_reference(session, asset)
add_tags_to_reference(session, reference_id=ref.id, tags=["used"])
session.commit()
rows, total = list_tags(include_zero=False)
tag_names = {name for name, _, _ in rows}
assert "used" in tag_names
assert "unused" not in tag_names
def test_prefix_filter(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["alpha", "beta", "alphabet"])
session.commit()
rows, _ = list_tags(prefix="alph")
tag_names = {name for name, _, _ in rows}
assert tag_names == {"alpha", "alphabet"}
def test_order_by_name(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["zebra", "alpha", "middle"])
session.commit()
rows, _ = list_tags(order="name_asc")
names = [name for name, _, _ in rows]
assert names == ["alpha", "middle", "zebra"]
def test_pagination(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["a", "b", "c", "d", "e"])
session.commit()
rows, total = list_tags(limit=2, offset=1, order="name_asc")
assert total == 5
assert len(rows) == 2
names = [name for name, _, _ in rows]
assert names == ["b", "c"]
def test_clamps_limit(self, mock_create_session, session: Session):
ensure_tags_exist(session, ["a"])
session.commit()
# Service should clamp limit to max 1000
rows, _ = list_tags(limit=2000)
assert len(rows) <= 1000

View File

@@ -0,0 +1,55 @@
from app.assets.services.file_utils import is_visible, list_files_recursively
class TestIsVisible:
def test_visible_file(self):
assert is_visible("file.txt") is True
def test_hidden_file(self):
assert is_visible(".hidden") is False
def test_hidden_directory(self):
assert is_visible(".git") is False
def test_visible_directory(self):
assert is_visible("src") is True
def test_dotdot_is_hidden(self):
assert is_visible("..") is False
def test_dot_is_hidden(self):
assert is_visible(".") is False
class TestListFilesRecursively:
def test_skips_hidden_files(self, tmp_path):
(tmp_path / "visible.txt").write_text("a")
(tmp_path / ".hidden").write_text("b")
result = list_files_recursively(str(tmp_path))
assert len(result) == 1
assert result[0].endswith("visible.txt")
def test_skips_hidden_directories(self, tmp_path):
hidden_dir = tmp_path / ".hidden_dir"
hidden_dir.mkdir()
(hidden_dir / "file.txt").write_text("a")
visible_dir = tmp_path / "visible_dir"
visible_dir.mkdir()
(visible_dir / "file.txt").write_text("b")
result = list_files_recursively(str(tmp_path))
assert len(result) == 1
assert "visible_dir" in result[0]
assert ".hidden_dir" not in result[0]
def test_empty_directory(self, tmp_path):
result = list_files_recursively(str(tmp_path))
assert result == []
def test_nonexistent_directory(self, tmp_path):
result = list_files_recursively(str(tmp_path / "nonexistent"))
assert result == []

View File

@@ -2,4 +2,3 @@ pytest>=7.8.0
pytest-aiohttp
pytest-asyncio
websocket-client
blake3