mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-04-26 09:29:07 +00:00
Refactor asset database: separate business logic from queries
Architecture changes: - API Routes -> manager.py (thin adapter) -> services/ (business logic) -> queries/ (atomic DB ops) - Services own session lifecycle via create_session() - Queries accept Session as parameter, do single-table atomic operations New app/assets/services/ layer: - __init__.py - exports all service functions - ingest.py - ingest_file_from_path(), register_existing_asset() - asset_management.py - get_asset_detail(), update_asset_metadata(), delete_asset_reference(), set_asset_preview() - tagging.py - apply_tags(), remove_tags(), list_tags() Removed from queries/asset_info.py: - ingest_fs_asset (moved to services/ingest.py as ingest_file_from_path) - update_asset_info_full (moved to services/asset_management.py as update_asset_metadata) - create_asset_info_for_existing_asset (moved to services/ingest.py as register_existing_asset) Updated manager.py: - Now a thin adapter that transforms API schemas to/from service calls - Delegates all business logic to services layer - No longer imports sqlalchemy.orm.Session or models directly Test updates: - Fixed test_cache_state.py import of pick_best_live_path (moved to helpers.py) - Added comprehensive service layer tests (41 new tests) - All 112 query + service tests pass Amp-Thread-ID: https://ampcode.com/threads/T-019c24e2-7ae4-707f-ad19-c775ed8b82b5 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
34
app/assets/services/__init__.py
Normal file
34
app/assets/services/__init__.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# Asset services layer
|
||||
# Business logic that orchestrates database queries and filesystem operations
|
||||
# Services own session lifecycle via create_session()
|
||||
|
||||
from app.assets.services.ingest import (
|
||||
ingest_file_from_path,
|
||||
register_existing_asset,
|
||||
)
|
||||
from app.assets.services.asset_management import (
|
||||
get_asset_detail,
|
||||
update_asset_metadata,
|
||||
delete_asset_reference,
|
||||
set_asset_preview,
|
||||
)
|
||||
from app.assets.services.tagging import (
|
||||
apply_tags,
|
||||
remove_tags,
|
||||
list_tags,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# ingest.py
|
||||
"ingest_file_from_path",
|
||||
"register_existing_asset",
|
||||
# asset_management.py
|
||||
"get_asset_detail",
|
||||
"update_asset_metadata",
|
||||
"delete_asset_reference",
|
||||
"set_asset_preview",
|
||||
# tagging.py
|
||||
"apply_tags",
|
||||
"remove_tags",
|
||||
"list_tags",
|
||||
]
|
||||
229
app/assets/services/asset_management.py
Normal file
229
app/assets/services/asset_management.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""
|
||||
Asset management services - CRUD operations on assets.
|
||||
|
||||
Business logic for:
|
||||
- get_asset_detail: Fetch full asset details with tags
|
||||
- update_asset_metadata: Update name, tags, and/or metadata
|
||||
- delete_asset_reference: Delete AssetInfo and optionally orphaned content
|
||||
- set_asset_preview: Set or clear preview on an asset
|
||||
"""
|
||||
import contextlib
|
||||
import os
|
||||
from typing import Sequence
|
||||
|
||||
from app.database.db import create_session
|
||||
from app.assets.helpers import (
|
||||
compute_relative_filename,
|
||||
pick_best_live_path,
|
||||
utcnow,
|
||||
)
|
||||
from app.assets.database.queries import (
|
||||
asset_info_exists_for_asset_id,
|
||||
delete_asset_info_by_id,
|
||||
fetch_asset_info_and_asset,
|
||||
fetch_asset_info_asset_and_tags,
|
||||
get_asset_info_by_id,
|
||||
list_cache_states_by_asset_id,
|
||||
replace_asset_info_metadata_projection,
|
||||
set_asset_info_preview,
|
||||
set_asset_info_tags,
|
||||
)
|
||||
|
||||
|
||||
def get_asset_detail(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
owner_id: str = "",
|
||||
) -> dict | None:
|
||||
"""
|
||||
Fetch full asset details including tags.
|
||||
Returns dict with info, asset, and tags, or None if not found.
|
||||
"""
|
||||
with create_session() as session:
|
||||
result = fetch_asset_info_asset_and_tags(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
owner_id=owner_id,
|
||||
)
|
||||
if not result:
|
||||
return None
|
||||
|
||||
info, asset, tags = result
|
||||
return {
|
||||
"info": info,
|
||||
"asset": asset,
|
||||
"tags": tags,
|
||||
}
|
||||
|
||||
|
||||
def update_asset_metadata(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
name: str | None = None,
|
||||
tags: Sequence[str] | None = None,
|
||||
user_metadata: dict | None = None,
|
||||
tag_origin: str = "manual",
|
||||
owner_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Update name, tags, and/or metadata on an AssetInfo.
|
||||
Returns updated info dict with tags.
|
||||
"""
|
||||
with create_session() as session:
|
||||
info = get_asset_info_by_id(session, asset_info_id=asset_info_id)
|
||||
if not info:
|
||||
raise ValueError(f"AssetInfo {asset_info_id} not found")
|
||||
if info.owner_id and info.owner_id != owner_id:
|
||||
raise PermissionError("not owner")
|
||||
|
||||
touched = False
|
||||
if name is not None and name != info.name:
|
||||
info.name = name
|
||||
touched = True
|
||||
|
||||
# Compute filename from best live path
|
||||
computed_filename = _compute_filename_for_asset(session, info.asset_id)
|
||||
|
||||
if user_metadata is not None:
|
||||
new_meta = dict(user_metadata)
|
||||
if computed_filename:
|
||||
new_meta["filename"] = computed_filename
|
||||
replace_asset_info_metadata_projection(
|
||||
session, asset_info_id=asset_info_id, user_metadata=new_meta
|
||||
)
|
||||
touched = True
|
||||
else:
|
||||
if computed_filename:
|
||||
current_meta = info.user_metadata or {}
|
||||
if current_meta.get("filename") != computed_filename:
|
||||
new_meta = dict(current_meta)
|
||||
new_meta["filename"] = computed_filename
|
||||
replace_asset_info_metadata_projection(
|
||||
session, asset_info_id=asset_info_id, user_metadata=new_meta
|
||||
)
|
||||
touched = True
|
||||
|
||||
if tags is not None:
|
||||
set_asset_info_tags(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
tags=tags,
|
||||
origin=tag_origin,
|
||||
)
|
||||
touched = True
|
||||
|
||||
if touched and user_metadata is None:
|
||||
info.updated_at = utcnow()
|
||||
session.flush()
|
||||
|
||||
# Fetch updated info with tags
|
||||
result = fetch_asset_info_asset_and_tags(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
owner_id=owner_id,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if not result:
|
||||
raise RuntimeError("State changed during update")
|
||||
|
||||
info, asset, tag_list = result
|
||||
return {
|
||||
"info": info,
|
||||
"asset": asset,
|
||||
"tags": tag_list,
|
||||
}
|
||||
|
||||
|
||||
def delete_asset_reference(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
owner_id: str,
|
||||
delete_content_if_orphan: bool = True,
|
||||
) -> bool:
|
||||
"""
|
||||
Delete an AssetInfo reference.
|
||||
If delete_content_if_orphan is True and no other AssetInfos reference the asset,
|
||||
also delete the Asset and its cached files.
|
||||
"""
|
||||
with create_session() as session:
|
||||
info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id)
|
||||
asset_id = info_row.asset_id if info_row else None
|
||||
|
||||
deleted = delete_asset_info_by_id(session, asset_info_id=asset_info_id, owner_id=owner_id)
|
||||
if not deleted:
|
||||
session.commit()
|
||||
return False
|
||||
|
||||
if not delete_content_if_orphan or not asset_id:
|
||||
session.commit()
|
||||
return True
|
||||
|
||||
still_exists = asset_info_exists_for_asset_id(session, asset_id=asset_id)
|
||||
if still_exists:
|
||||
session.commit()
|
||||
return True
|
||||
|
||||
# Orphaned asset - delete it and its files
|
||||
states = list_cache_states_by_asset_id(session, asset_id=asset_id)
|
||||
file_paths = [s.file_path for s in (states or []) if getattr(s, "file_path", None)]
|
||||
|
||||
from app.assets.database.models import Asset
|
||||
asset_row = session.get(Asset, asset_id)
|
||||
if asset_row is not None:
|
||||
session.delete(asset_row)
|
||||
|
||||
session.commit()
|
||||
|
||||
# Delete files after commit
|
||||
for p in file_paths:
|
||||
with contextlib.suppress(Exception):
|
||||
if p and os.path.isfile(p):
|
||||
os.remove(p)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def set_asset_preview(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
preview_asset_id: str | None = None,
|
||||
owner_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Set or clear preview_id on an AssetInfo.
|
||||
Returns updated asset detail dict.
|
||||
"""
|
||||
with create_session() as session:
|
||||
info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id)
|
||||
if not info_row:
|
||||
raise ValueError(f"AssetInfo {asset_info_id} not found")
|
||||
if info_row.owner_id and info_row.owner_id != owner_id:
|
||||
raise PermissionError("not owner")
|
||||
|
||||
set_asset_info_preview(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
preview_asset_id=preview_asset_id,
|
||||
)
|
||||
|
||||
result = fetch_asset_info_asset_and_tags(
|
||||
session, asset_info_id=asset_info_id, owner_id=owner_id
|
||||
)
|
||||
if not result:
|
||||
raise RuntimeError("State changed during preview update")
|
||||
|
||||
info, asset, tags = result
|
||||
session.commit()
|
||||
|
||||
return {
|
||||
"info": info,
|
||||
"asset": asset,
|
||||
"tags": tags,
|
||||
}
|
||||
|
||||
|
||||
def _compute_filename_for_asset(session, asset_id: str) -> str | None:
|
||||
"""Compute the relative filename for an asset from its cache states."""
|
||||
primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id))
|
||||
return compute_relative_filename(primary_path) if primary_path else None
|
||||
255
app/assets/services/ingest.py
Normal file
255
app/assets/services/ingest.py
Normal file
@@ -0,0 +1,255 @@
|
||||
"""
|
||||
Ingest services - handles ingesting files into the asset database.
|
||||
|
||||
Business logic for:
|
||||
- ingest_file_from_path: Ingest a file from filesystem path (upsert asset, cache state, info)
|
||||
- register_existing_asset: Create AssetInfo for an asset that already exists by hash
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
from typing import Sequence
|
||||
|
||||
from app.database.db import create_session
|
||||
from app.assets.helpers import (
|
||||
compute_relative_filename,
|
||||
normalize_tags,
|
||||
pick_best_live_path,
|
||||
utcnow,
|
||||
)
|
||||
from app.assets.database.queries import (
|
||||
get_asset_by_hash,
|
||||
get_or_create_asset_info,
|
||||
list_cache_states_by_asset_id,
|
||||
remove_missing_tag_for_asset_id,
|
||||
replace_asset_info_metadata_projection,
|
||||
set_asset_info_tags,
|
||||
update_asset_info_timestamps,
|
||||
upsert_asset,
|
||||
upsert_cache_state,
|
||||
add_tags_to_asset_info,
|
||||
ensure_tags_exist,
|
||||
get_asset_tags,
|
||||
)
|
||||
|
||||
|
||||
def ingest_file_from_path(
|
||||
*,
|
||||
abs_path: str,
|
||||
asset_hash: str,
|
||||
size_bytes: int,
|
||||
mtime_ns: int,
|
||||
mime_type: str | None = None,
|
||||
info_name: str | None = None,
|
||||
owner_id: str = "",
|
||||
preview_id: str | None = None,
|
||||
user_metadata: dict | None = None,
|
||||
tags: Sequence[str] = (),
|
||||
tag_origin: str = "manual",
|
||||
require_existing_tags: bool = False,
|
||||
) -> dict:
|
||||
"""
|
||||
Idempotently upsert:
|
||||
- Asset by content hash (create if missing)
|
||||
- AssetCacheState(file_path) pointing to asset_id
|
||||
- Optionally AssetInfo + tag links and metadata projection
|
||||
Returns flags and ids.
|
||||
"""
|
||||
locator = os.path.abspath(abs_path)
|
||||
|
||||
out: dict = {
|
||||
"asset_created": False,
|
||||
"asset_updated": False,
|
||||
"state_created": False,
|
||||
"state_updated": False,
|
||||
"asset_info_id": None,
|
||||
}
|
||||
|
||||
with create_session() as session:
|
||||
# Validate preview_id if provided
|
||||
if preview_id:
|
||||
from app.assets.database.models import Asset
|
||||
if not session.get(Asset, preview_id):
|
||||
preview_id = None
|
||||
|
||||
# 1. Upsert Asset
|
||||
asset, created, updated = upsert_asset(
|
||||
session,
|
||||
asset_hash=asset_hash,
|
||||
size_bytes=size_bytes,
|
||||
mime_type=mime_type,
|
||||
)
|
||||
out["asset_created"] = created
|
||||
out["asset_updated"] = updated
|
||||
|
||||
# 2. Upsert CacheState
|
||||
state_created, state_updated = upsert_cache_state(
|
||||
session,
|
||||
asset_id=asset.id,
|
||||
file_path=locator,
|
||||
mtime_ns=mtime_ns,
|
||||
)
|
||||
out["state_created"] = state_created
|
||||
out["state_updated"] = state_updated
|
||||
|
||||
# 3. Optionally create/update AssetInfo
|
||||
if info_name:
|
||||
info, info_created = get_or_create_asset_info(
|
||||
session,
|
||||
asset_id=asset.id,
|
||||
owner_id=owner_id,
|
||||
name=info_name,
|
||||
preview_id=preview_id,
|
||||
)
|
||||
if info_created:
|
||||
out["asset_info_id"] = info.id
|
||||
else:
|
||||
update_asset_info_timestamps(session, asset_info=info, preview_id=preview_id)
|
||||
out["asset_info_id"] = info.id
|
||||
|
||||
# 4. Handle tags
|
||||
norm = normalize_tags(list(tags))
|
||||
if norm and out["asset_info_id"]:
|
||||
if require_existing_tags:
|
||||
_validate_tags_exist(session, norm)
|
||||
add_tags_to_asset_info(
|
||||
session,
|
||||
asset_info_id=out["asset_info_id"],
|
||||
tags=norm,
|
||||
origin=tag_origin,
|
||||
create_if_missing=not require_existing_tags,
|
||||
)
|
||||
|
||||
# 5. Update metadata with computed filename
|
||||
if out["asset_info_id"]:
|
||||
_update_metadata_with_filename(
|
||||
session,
|
||||
asset_info_id=out["asset_info_id"],
|
||||
asset_id=asset.id,
|
||||
info=info,
|
||||
user_metadata=user_metadata,
|
||||
)
|
||||
|
||||
# 6. Remove missing tag
|
||||
try:
|
||||
remove_missing_tag_for_asset_id(session, asset_id=asset.id)
|
||||
except Exception:
|
||||
logging.exception("Failed to clear 'missing' tag for asset %s", asset.id)
|
||||
|
||||
session.commit()
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def register_existing_asset(
|
||||
*,
|
||||
asset_hash: str,
|
||||
name: str,
|
||||
user_metadata: dict | None = None,
|
||||
tags: list[str] | None = None,
|
||||
tag_origin: str = "manual",
|
||||
owner_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Create or return existing AssetInfo for an asset that already exists by hash.
|
||||
|
||||
Returns dict with asset and info details, or raises ValueError if hash not found.
|
||||
"""
|
||||
with create_session() as session:
|
||||
asset = get_asset_by_hash(session, asset_hash=asset_hash)
|
||||
if not asset:
|
||||
raise ValueError(f"No asset with hash {asset_hash}")
|
||||
|
||||
info, info_created = get_or_create_asset_info(
|
||||
session,
|
||||
asset_id=asset.id,
|
||||
owner_id=owner_id,
|
||||
name=name,
|
||||
preview_id=None,
|
||||
)
|
||||
|
||||
if not info_created:
|
||||
# Return existing info
|
||||
tag_names = get_asset_tags(session, asset_info_id=info.id)
|
||||
session.commit()
|
||||
return {
|
||||
"info": info,
|
||||
"asset": asset,
|
||||
"tags": tag_names,
|
||||
"created": False,
|
||||
}
|
||||
|
||||
# New info - apply metadata and tags
|
||||
new_meta = dict(user_metadata or {})
|
||||
computed_filename = _compute_filename_for_asset(session, asset.id)
|
||||
if computed_filename:
|
||||
new_meta["filename"] = computed_filename
|
||||
|
||||
if new_meta:
|
||||
replace_asset_info_metadata_projection(
|
||||
session,
|
||||
asset_info_id=info.id,
|
||||
user_metadata=new_meta,
|
||||
)
|
||||
|
||||
if tags is not None:
|
||||
set_asset_info_tags(
|
||||
session,
|
||||
asset_info_id=info.id,
|
||||
tags=tags,
|
||||
origin=tag_origin,
|
||||
)
|
||||
|
||||
tag_names = get_asset_tags(session, asset_info_id=info.id)
|
||||
session.commit()
|
||||
|
||||
return {
|
||||
"info": info,
|
||||
"asset": asset,
|
||||
"tags": tag_names,
|
||||
"created": True,
|
||||
}
|
||||
|
||||
|
||||
def _validate_tags_exist(session, tags: list[str]) -> None:
|
||||
"""Raise ValueError if any tags don't exist."""
|
||||
from sqlalchemy import select
|
||||
from app.assets.database.models import Tag
|
||||
existing_tag_names = set(
|
||||
name for (name,) in session.execute(select(Tag.name).where(Tag.name.in_(tags))).all()
|
||||
)
|
||||
missing = [t for t in tags if t not in existing_tag_names]
|
||||
if missing:
|
||||
raise ValueError(f"Unknown tags: {missing}")
|
||||
|
||||
|
||||
def _compute_filename_for_asset(session, asset_id: str) -> str | None:
|
||||
"""Compute the relative filename for an asset from its cache states."""
|
||||
primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id))
|
||||
return compute_relative_filename(primary_path) if primary_path else None
|
||||
|
||||
|
||||
def _update_metadata_with_filename(
|
||||
session,
|
||||
*,
|
||||
asset_info_id: str,
|
||||
asset_id: str,
|
||||
info,
|
||||
user_metadata: dict | None,
|
||||
) -> None:
|
||||
"""Update metadata projection with computed filename."""
|
||||
computed_filename = _compute_filename_for_asset(session, asset_id)
|
||||
|
||||
current_meta = info.user_metadata or {}
|
||||
new_meta = dict(current_meta)
|
||||
if user_metadata:
|
||||
for k, v in user_metadata.items():
|
||||
new_meta[k] = v
|
||||
if computed_filename:
|
||||
new_meta["filename"] = computed_filename
|
||||
|
||||
if new_meta != current_meta:
|
||||
replace_asset_info_metadata_projection(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
user_metadata=new_meta,
|
||||
)
|
||||
102
app/assets/services/tagging.py
Normal file
102
app/assets/services/tagging.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Tagging services - manage tags on assets.
|
||||
|
||||
Business logic for:
|
||||
- apply_tags: Add tags to an asset
|
||||
- remove_tags: Remove tags from an asset
|
||||
- list_tags: List tags with usage counts
|
||||
"""
|
||||
from app.database.db import create_session
|
||||
from app.assets.database.queries import (
|
||||
add_tags_to_asset_info,
|
||||
get_asset_info_by_id,
|
||||
list_tags_with_usage,
|
||||
remove_tags_from_asset_info,
|
||||
)
|
||||
|
||||
|
||||
def apply_tags(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
tags: list[str],
|
||||
origin: str = "manual",
|
||||
owner_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Add tags to an asset.
|
||||
Returns dict with added, already_present, and total_tags lists.
|
||||
"""
|
||||
with create_session() as session:
|
||||
info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id)
|
||||
if not info_row:
|
||||
raise ValueError(f"AssetInfo {asset_info_id} not found")
|
||||
if info_row.owner_id and info_row.owner_id != owner_id:
|
||||
raise PermissionError("not owner")
|
||||
|
||||
data = add_tags_to_asset_info(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
tags=tags,
|
||||
origin=origin,
|
||||
create_if_missing=True,
|
||||
asset_info_row=info_row,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def remove_tags(
|
||||
*,
|
||||
asset_info_id: str,
|
||||
tags: list[str],
|
||||
owner_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Remove tags from an asset.
|
||||
Returns dict with removed, not_present, and total_tags lists.
|
||||
"""
|
||||
with create_session() as session:
|
||||
info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id)
|
||||
if not info_row:
|
||||
raise ValueError(f"AssetInfo {asset_info_id} not found")
|
||||
if info_row.owner_id and info_row.owner_id != owner_id:
|
||||
raise PermissionError("not owner")
|
||||
|
||||
data = remove_tags_from_asset_info(
|
||||
session,
|
||||
asset_info_id=asset_info_id,
|
||||
tags=tags,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def list_tags(
|
||||
prefix: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
order: str = "count_desc",
|
||||
include_zero: bool = True,
|
||||
owner_id: str = "",
|
||||
) -> tuple[list[tuple[str, str, int]], int]:
|
||||
"""
|
||||
List tags with usage counts.
|
||||
Returns (rows, total) where rows are (name, tag_type, count) tuples.
|
||||
"""
|
||||
limit = max(1, min(1000, limit))
|
||||
offset = max(0, offset)
|
||||
|
||||
with create_session() as session:
|
||||
rows, total = list_tags_with_usage(
|
||||
session,
|
||||
prefix=prefix,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
include_zero=include_zero,
|
||||
order=order,
|
||||
owner_id=owner_id,
|
||||
)
|
||||
|
||||
return rows, total
|
||||
Reference in New Issue
Block a user