mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-03-08 06:39:56 +00:00
refactor(assets): consolidate duplicated query utilities and remove unused code
- Extract shared helpers to database/queries/common.py: - MAX_BIND_PARAMS, calculate_rows_per_statement, iter_chunks, iter_row_chunks - build_visible_owner_clause - Remove duplicate _compute_filename_for_asset, consolidate in path_utils.py - Remove unused get_asset_info_with_tags (duplicated get_asset_detail) - Remove redundant __all__ from cache_state.py - Make internal helpers private (_check_is_scalar) Amp-Thread-ID: https://ampcode.com/threads/T-019c2ad9-9432-7451-94a8-79287dbbb19e Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -27,7 +27,9 @@ from app.database.models import Base, to_dict
|
||||
class Asset(Base):
|
||||
__tablename__ = "assets"
|
||||
|
||||
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
||||
id: Mapped[str] = mapped_column(
|
||||
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
|
||||
)
|
||||
hash: Mapped[str | None] = mapped_column(String(256), nullable=True)
|
||||
size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
|
||||
mime_type: Mapped[str | None] = mapped_column(String(255))
|
||||
@@ -75,7 +77,9 @@ class AssetCacheState(Base):
|
||||
__tablename__ = "asset_cache_state"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
||||
asset_id: Mapped[str] = mapped_column(String(36), ForeignKey("assets.id", ondelete="CASCADE"), nullable=False)
|
||||
asset_id: Mapped[str] = mapped_column(
|
||||
String(36), ForeignKey("assets.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
file_path: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||
needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
||||
@@ -85,7 +89,9 @@ class AssetCacheState(Base):
|
||||
__table_args__ = (
|
||||
Index("ix_asset_cache_state_file_path", "file_path"),
|
||||
Index("ix_asset_cache_state_asset_id", "asset_id"),
|
||||
CheckConstraint("(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"),
|
||||
CheckConstraint(
|
||||
"(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"
|
||||
),
|
||||
UniqueConstraint("file_path", name="uq_asset_cache_state_file_path"),
|
||||
)
|
||||
|
||||
@@ -99,15 +105,29 @@ class AssetCacheState(Base):
|
||||
class AssetInfo(Base):
|
||||
__tablename__ = "assets_info"
|
||||
|
||||
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
||||
id: Mapped[str] = mapped_column(
|
||||
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
|
||||
)
|
||||
owner_id: Mapped[str] = mapped_column(String(128), nullable=False, default="")
|
||||
name: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
asset_id: Mapped[str] = mapped_column(String(36), ForeignKey("assets.id", ondelete="RESTRICT"), nullable=False)
|
||||
preview_id: Mapped[str | None] = mapped_column(String(36), ForeignKey("assets.id", ondelete="SET NULL"))
|
||||
user_metadata: Mapped[dict[str, Any] | None] = mapped_column(JSON(none_as_null=True))
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now)
|
||||
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now)
|
||||
last_access_time: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now)
|
||||
asset_id: Mapped[str] = mapped_column(
|
||||
String(36), ForeignKey("assets.id", ondelete="RESTRICT"), nullable=False
|
||||
)
|
||||
preview_id: Mapped[str | None] = mapped_column(
|
||||
String(36), ForeignKey("assets.id", ondelete="SET NULL")
|
||||
)
|
||||
user_metadata: Mapped[dict[str, Any] | None] = mapped_column(
|
||||
JSON(none_as_null=True)
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=False), nullable=False, default=get_utc_now
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=False), nullable=False, default=get_utc_now
|
||||
)
|
||||
last_access_time: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=False), nullable=False, default=get_utc_now
|
||||
)
|
||||
|
||||
asset: Mapped[Asset] = relationship(
|
||||
"Asset",
|
||||
@@ -143,7 +163,9 @@ class AssetInfo(Base):
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint("asset_id", "owner_id", "name", name="uq_assets_info_asset_owner_name"),
|
||||
UniqueConstraint(
|
||||
"asset_id", "owner_id", "name", name="uq_assets_info_asset_owner_name"
|
||||
),
|
||||
Index("ix_assets_info_owner_name", "owner_id", "name"),
|
||||
Index("ix_assets_info_owner_id", "owner_id"),
|
||||
Index("ix_assets_info_asset_id", "asset_id"),
|
||||
@@ -225,9 +247,7 @@ class Tag(Base):
|
||||
overlaps="asset_info_links,tag_links,tags,asset_info",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index("ix_tags_tag_type", "tag_type"),
|
||||
)
|
||||
__table_args__ = (Index("ix_tags_tag_type", "tag_type"),)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<Tag {self.name}>"
|
||||
|
||||
@@ -4,17 +4,7 @@ from sqlalchemy.dialects import sqlite
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.assets.database.models import Asset
|
||||
|
||||
MAX_BIND_PARAMS = 800
|
||||
|
||||
|
||||
def _calculate_rows_per_statement(cols: int) -> int:
|
||||
return max(1, MAX_BIND_PARAMS // max(1, cols))
|
||||
|
||||
|
||||
def _iter_chunks(seq, n: int):
|
||||
for i in range(0, len(seq), n):
|
||||
yield seq[i : i + n]
|
||||
from app.assets.database.queries.common import calculate_rows_per_statement, iter_chunks
|
||||
|
||||
|
||||
def asset_exists_by_hash(
|
||||
@@ -26,7 +16,10 @@ def asset_exists_by_hash(
|
||||
"""
|
||||
row = (
|
||||
session.execute(
|
||||
select(sa.literal(True)).select_from(Asset).where(Asset.hash == asset_hash).limit(1)
|
||||
select(sa.literal(True))
|
||||
.select_from(Asset)
|
||||
.where(Asset.hash == asset_hash)
|
||||
.limit(1)
|
||||
)
|
||||
).first()
|
||||
return row is not None
|
||||
@@ -37,8 +30,10 @@ def get_asset_by_hash(
|
||||
asset_hash: str,
|
||||
) -> Asset | None:
|
||||
return (
|
||||
session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1))
|
||||
).scalars().first()
|
||||
(session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1)))
|
||||
.scalars()
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
def upsert_asset(
|
||||
@@ -60,9 +55,11 @@ def upsert_asset(
|
||||
res = session.execute(ins)
|
||||
created = int(res.rowcount or 0) > 0
|
||||
|
||||
asset = session.execute(
|
||||
select(Asset).where(Asset.hash == asset_hash).limit(1)
|
||||
).scalars().first()
|
||||
asset = (
|
||||
session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1))
|
||||
.scalars()
|
||||
.first()
|
||||
)
|
||||
if not asset:
|
||||
raise RuntimeError("Asset row not found after upsert.")
|
||||
|
||||
@@ -89,5 +86,5 @@ def bulk_insert_assets(
|
||||
if not rows:
|
||||
return
|
||||
ins = sqlite.insert(Asset)
|
||||
for chunk in _iter_chunks(rows, _calculate_rows_per_statement(5)):
|
||||
for chunk in iter_chunks(rows, calculate_rows_per_statement(5)):
|
||||
session.execute(ins, chunk)
|
||||
|
||||
@@ -16,10 +16,16 @@ from app.assets.database.models import (
|
||||
AssetInfoTag,
|
||||
Tag,
|
||||
)
|
||||
from app.assets.database.queries.common import (
|
||||
MAX_BIND_PARAMS,
|
||||
build_visible_owner_clause,
|
||||
calculate_rows_per_statement,
|
||||
iter_chunks,
|
||||
)
|
||||
from app.assets.helpers import escape_sql_like_string, get_utc_now, normalize_tags
|
||||
|
||||
|
||||
def check_is_scalar(v):
|
||||
def _check_is_scalar(v):
|
||||
if v is None:
|
||||
return True
|
||||
if isinstance(v, bool):
|
||||
@@ -33,8 +39,12 @@ def _scalar_to_row(key: str, ordinal: int, value) -> dict:
|
||||
"""Convert a scalar value to a typed projection row."""
|
||||
if value is None:
|
||||
return {
|
||||
"key": key, "ordinal": ordinal,
|
||||
"val_str": None, "val_num": None, "val_bool": None, "val_json": None
|
||||
"key": key,
|
||||
"ordinal": ordinal,
|
||||
"val_str": None,
|
||||
"val_num": None,
|
||||
"val_bool": None,
|
||||
"val_json": None,
|
||||
}
|
||||
if isinstance(value, bool):
|
||||
return {"key": key, "ordinal": ordinal, "val_bool": bool(value)}
|
||||
@@ -55,35 +65,16 @@ def convert_metadata_to_rows(key: str, value) -> list[dict]:
|
||||
if value is None:
|
||||
return [_scalar_to_row(key, 0, None)]
|
||||
|
||||
if check_is_scalar(value):
|
||||
if _check_is_scalar(value):
|
||||
return [_scalar_to_row(key, 0, value)]
|
||||
|
||||
if isinstance(value, list):
|
||||
if all(check_is_scalar(x) for x in value):
|
||||
if all(_check_is_scalar(x) for x in value):
|
||||
return [_scalar_to_row(key, i, x) for i, x in enumerate(value)]
|
||||
return [{"key": key, "ordinal": i, "val_json": x} for i, x in enumerate(value)]
|
||||
|
||||
return [{"key": key, "ordinal": 0, "val_json": value}]
|
||||
|
||||
MAX_BIND_PARAMS = 800
|
||||
|
||||
|
||||
def _calculate_rows_per_statement(cols: int) -> int:
|
||||
return max(1, MAX_BIND_PARAMS // max(1, cols))
|
||||
|
||||
|
||||
def _iter_chunks(seq, n: int):
|
||||
for i in range(0, len(seq), n):
|
||||
yield seq[i : i + n]
|
||||
|
||||
|
||||
def _build_visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
|
||||
"""Build owner visibility predicate for reads. Owner-less rows are visible to everyone."""
|
||||
owner_id = (owner_id or "").strip()
|
||||
if owner_id == "":
|
||||
return AssetInfo.owner_id == ""
|
||||
return AssetInfo.owner_id.in_(["", owner_id])
|
||||
|
||||
|
||||
def _apply_tag_filters(
|
||||
stmt: sa.sql.Select,
|
||||
@@ -229,15 +220,19 @@ def get_or_create_asset_info(
|
||||
if info:
|
||||
return info, True
|
||||
|
||||
existing = session.execute(
|
||||
select(AssetInfo)
|
||||
.where(
|
||||
AssetInfo.asset_id == asset_id,
|
||||
AssetInfo.name == name,
|
||||
AssetInfo.owner_id == owner_id,
|
||||
existing = (
|
||||
session.execute(
|
||||
select(AssetInfo)
|
||||
.where(
|
||||
AssetInfo.asset_id == asset_id,
|
||||
AssetInfo.name == name,
|
||||
AssetInfo.owner_id == owner_id,
|
||||
)
|
||||
.limit(1)
|
||||
)
|
||||
.limit(1)
|
||||
).unique().scalar_one_or_none()
|
||||
.unique()
|
||||
.scalar_one_or_none()
|
||||
)
|
||||
if not existing:
|
||||
raise RuntimeError("Failed to find AssetInfo after insert conflict.")
|
||||
return existing, False
|
||||
@@ -274,7 +269,7 @@ def list_asset_infos_page(
|
||||
select(AssetInfo)
|
||||
.join(Asset, Asset.id == AssetInfo.asset_id)
|
||||
.options(contains_eager(AssetInfo.asset), noload(AssetInfo.tags))
|
||||
.where(_build_visible_owner_clause(owner_id))
|
||||
.where(build_visible_owner_clause(owner_id))
|
||||
)
|
||||
|
||||
if name_contains:
|
||||
@@ -302,7 +297,7 @@ def list_asset_infos_page(
|
||||
select(sa.func.count())
|
||||
.select_from(AssetInfo)
|
||||
.join(Asset, Asset.id == AssetInfo.asset_id)
|
||||
.where(_build_visible_owner_clause(owner_id))
|
||||
.where(build_visible_owner_clause(owner_id))
|
||||
)
|
||||
if name_contains:
|
||||
escaped, esc = escape_sql_like_string(name_contains)
|
||||
@@ -341,7 +336,7 @@ def fetch_asset_info_asset_and_tags(
|
||||
.join(Tag, Tag.name == AssetInfoTag.tag_name, isouter=True)
|
||||
.where(
|
||||
AssetInfo.id == asset_info_id,
|
||||
_build_visible_owner_clause(owner_id),
|
||||
build_visible_owner_clause(owner_id),
|
||||
)
|
||||
.options(noload(AssetInfo.tags))
|
||||
.order_by(Tag.name.asc())
|
||||
@@ -371,7 +366,7 @@ def fetch_asset_info_and_asset(
|
||||
.join(Asset, Asset.id == AssetInfo.asset_id)
|
||||
.where(
|
||||
AssetInfo.id == asset_info_id,
|
||||
_build_visible_owner_clause(owner_id),
|
||||
build_visible_owner_clause(owner_id),
|
||||
)
|
||||
.limit(1)
|
||||
.options(noload(AssetInfo.tags))
|
||||
@@ -393,7 +388,9 @@ def update_asset_info_access_time(
|
||||
stmt = sa.update(AssetInfo).where(AssetInfo.id == asset_info_id)
|
||||
if only_if_newer:
|
||||
stmt = stmt.where(
|
||||
sa.or_(AssetInfo.last_access_time.is_(None), AssetInfo.last_access_time < ts)
|
||||
sa.or_(
|
||||
AssetInfo.last_access_time.is_(None), AssetInfo.last_access_time < ts
|
||||
)
|
||||
)
|
||||
session.execute(stmt.values(last_access_time=ts))
|
||||
|
||||
@@ -420,9 +417,7 @@ def update_asset_info_updated_at(
|
||||
"""Update the updated_at timestamp of an AssetInfo."""
|
||||
ts = ts or get_utc_now()
|
||||
session.execute(
|
||||
sa.update(AssetInfo)
|
||||
.where(AssetInfo.id == asset_info_id)
|
||||
.values(updated_at=ts)
|
||||
sa.update(AssetInfo).where(AssetInfo.id == asset_info_id).values(updated_at=ts)
|
||||
)
|
||||
|
||||
|
||||
@@ -439,7 +434,9 @@ def set_asset_info_metadata(
|
||||
info.updated_at = get_utc_now()
|
||||
session.flush()
|
||||
|
||||
session.execute(delete(AssetInfoMeta).where(AssetInfoMeta.asset_info_id == asset_info_id))
|
||||
session.execute(
|
||||
delete(AssetInfoMeta).where(AssetInfoMeta.asset_info_id == asset_info_id)
|
||||
)
|
||||
session.flush()
|
||||
|
||||
if not user_metadata:
|
||||
@@ -471,7 +468,7 @@ def delete_asset_info_by_id(
|
||||
) -> bool:
|
||||
stmt = sa.delete(AssetInfo).where(
|
||||
AssetInfo.id == asset_info_id,
|
||||
_build_visible_owner_clause(owner_id),
|
||||
build_visible_owner_clause(owner_id),
|
||||
)
|
||||
return int((session.execute(stmt)).rowcount or 0) > 0
|
||||
|
||||
@@ -511,7 +508,7 @@ def bulk_insert_asset_infos_ignore_conflicts(
|
||||
ins = sqlite.insert(AssetInfo).on_conflict_do_nothing(
|
||||
index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name]
|
||||
)
|
||||
for chunk in _iter_chunks(rows, _calculate_rows_per_statement(9)):
|
||||
for chunk in iter_chunks(rows, calculate_rows_per_statement(9)):
|
||||
session.execute(ins, chunk)
|
||||
|
||||
|
||||
@@ -524,9 +521,7 @@ def get_asset_info_ids_by_ids(
|
||||
return set()
|
||||
|
||||
found: set[str] = set()
|
||||
for chunk in _iter_chunks(info_ids, MAX_BIND_PARAMS):
|
||||
result = session.execute(
|
||||
select(AssetInfo.id).where(AssetInfo.id.in_(chunk))
|
||||
)
|
||||
for chunk in iter_chunks(info_ids, MAX_BIND_PARAMS):
|
||||
result = session.execute(select(AssetInfo.id).where(AssetInfo.id.in_(chunk)))
|
||||
found.update(result.scalars().all())
|
||||
return found
|
||||
|
||||
@@ -7,34 +7,13 @@ from sqlalchemy.dialects import sqlite
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.assets.database.models import Asset, AssetCacheState, AssetInfo
|
||||
from app.assets.database.queries.common import (
|
||||
MAX_BIND_PARAMS,
|
||||
calculate_rows_per_statement,
|
||||
iter_chunks,
|
||||
)
|
||||
from app.assets.helpers import escape_sql_like_string
|
||||
|
||||
MAX_BIND_PARAMS = 800
|
||||
|
||||
__all__ = [
|
||||
"CacheStateRow",
|
||||
"list_cache_states_by_asset_id",
|
||||
"upsert_cache_state",
|
||||
"delete_cache_states_outside_prefixes",
|
||||
"get_orphaned_seed_asset_ids",
|
||||
"delete_assets_by_ids",
|
||||
"get_cache_states_for_prefixes",
|
||||
"bulk_set_needs_verify",
|
||||
"delete_cache_states_by_ids",
|
||||
"delete_orphaned_seed_asset",
|
||||
"bulk_insert_cache_states_ignore_conflicts",
|
||||
"get_cache_states_by_paths_and_asset_ids",
|
||||
]
|
||||
|
||||
|
||||
def _calculate_rows_per_statement(cols: int) -> int:
|
||||
return max(1, MAX_BIND_PARAMS // max(1, cols))
|
||||
|
||||
|
||||
def _iter_chunks(seq, n: int):
|
||||
for i in range(0, len(seq), n):
|
||||
yield seq[i : i + n]
|
||||
|
||||
|
||||
class CacheStateRow(NamedTuple):
|
||||
"""Row from cache state query with joined asset data."""
|
||||
@@ -52,12 +31,16 @@ def list_cache_states_by_asset_id(
|
||||
session: Session, *, asset_id: str
|
||||
) -> Sequence[AssetCacheState]:
|
||||
return (
|
||||
session.execute(
|
||||
select(AssetCacheState)
|
||||
.where(AssetCacheState.asset_id == asset_id)
|
||||
.order_by(AssetCacheState.id.asc())
|
||||
(
|
||||
session.execute(
|
||||
select(AssetCacheState)
|
||||
.where(AssetCacheState.asset_id == asset_id)
|
||||
.order_by(AssetCacheState.id.asc())
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
def upsert_cache_state(
|
||||
@@ -100,7 +83,9 @@ def upsert_cache_state(
|
||||
return False, updated
|
||||
|
||||
|
||||
def delete_cache_states_outside_prefixes(session: Session, valid_prefixes: list[str]) -> int:
|
||||
def delete_cache_states_outside_prefixes(
|
||||
session: Session, valid_prefixes: list[str]
|
||||
) -> int:
|
||||
"""Delete cache states with file_path not matching any of the valid prefixes.
|
||||
|
||||
Args:
|
||||
@@ -261,7 +246,7 @@ def bulk_insert_cache_states_ignore_conflicts(
|
||||
ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing(
|
||||
index_elements=[AssetCacheState.file_path]
|
||||
)
|
||||
for chunk in _iter_chunks(rows, _calculate_rows_per_statement(3)):
|
||||
for chunk in iter_chunks(rows, calculate_rows_per_statement(3)):
|
||||
session.execute(ins, chunk)
|
||||
|
||||
|
||||
@@ -283,7 +268,7 @@ def get_cache_states_by_paths_and_asset_ids(
|
||||
paths = list(path_to_asset.keys())
|
||||
winners: set[str] = set()
|
||||
|
||||
for chunk in _iter_chunks(paths, MAX_BIND_PARAMS):
|
||||
for chunk in iter_chunks(paths, MAX_BIND_PARAMS):
|
||||
result = session.execute(
|
||||
select(AssetCacheState.file_path).where(
|
||||
AssetCacheState.file_path.in_(chunk),
|
||||
|
||||
37
app/assets/database/queries/common.py
Normal file
37
app/assets/database/queries/common.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Shared utilities for database query modules."""
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from app.assets.database.models import AssetInfo
|
||||
|
||||
MAX_BIND_PARAMS = 800
|
||||
|
||||
|
||||
def calculate_rows_per_statement(cols: int) -> int:
|
||||
"""Calculate how many rows can fit in one statement given column count."""
|
||||
return max(1, MAX_BIND_PARAMS // max(1, cols))
|
||||
|
||||
|
||||
def iter_chunks(seq, n: int):
|
||||
"""Yield successive n-sized chunks from seq."""
|
||||
for i in range(0, len(seq), n):
|
||||
yield seq[i : i + n]
|
||||
|
||||
|
||||
def iter_row_chunks(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]:
|
||||
"""Yield chunks of rows sized to fit within bind param limits."""
|
||||
if not rows:
|
||||
return []
|
||||
rows_per_stmt = max(1, MAX_BIND_PARAMS // max(1, cols_per_row))
|
||||
for i in range(0, len(rows), rows_per_stmt):
|
||||
yield rows[i : i + rows_per_stmt]
|
||||
|
||||
|
||||
def build_visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
|
||||
"""Build owner visibility predicate for reads. Owner-less rows are visible to everyone."""
|
||||
owner_id = (owner_id or "").strip()
|
||||
if owner_id == "":
|
||||
return AssetInfo.owner_id == ""
|
||||
return AssetInfo.owner_id.in_(["", owner_id])
|
||||
@@ -7,6 +7,10 @@ from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.assets.database.models import AssetInfo, AssetInfoMeta, AssetInfoTag, Tag
|
||||
from app.assets.database.queries.common import (
|
||||
build_visible_owner_clause,
|
||||
iter_row_chunks,
|
||||
)
|
||||
from app.assets.helpers import escape_sql_like_string, get_utc_now, normalize_tags
|
||||
|
||||
|
||||
@@ -27,30 +31,10 @@ class SetTagsDict(TypedDict):
|
||||
removed: list[str]
|
||||
total: list[str]
|
||||
|
||||
MAX_BIND_PARAMS = 800
|
||||
|
||||
|
||||
def _calculate_rows_per_statement(cols: int) -> int:
|
||||
return max(1, MAX_BIND_PARAMS // max(1, cols))
|
||||
|
||||
|
||||
def _iter_row_chunks(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]:
|
||||
if not rows:
|
||||
return []
|
||||
rows_per_stmt = max(1, MAX_BIND_PARAMS // max(1, cols_per_row))
|
||||
for i in range(0, len(rows), rows_per_stmt):
|
||||
yield rows[i : i + rows_per_stmt]
|
||||
|
||||
|
||||
def _build_visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
|
||||
"""Build owner visibility predicate for reads. Owner-less rows are visible to everyone."""
|
||||
owner_id = (owner_id or "").strip()
|
||||
if owner_id == "":
|
||||
return AssetInfo.owner_id == ""
|
||||
return AssetInfo.owner_id.in_(["", owner_id])
|
||||
|
||||
|
||||
def ensure_tags_exist(session: Session, names: Iterable[str], tag_type: str = "user") -> None:
|
||||
def ensure_tags_exist(
|
||||
session: Session, names: Iterable[str], tag_type: str = "user"
|
||||
) -> None:
|
||||
wanted = normalize_tags(list(names))
|
||||
if not wanted:
|
||||
return
|
||||
@@ -65,9 +49,12 @@ def ensure_tags_exist(session: Session, names: Iterable[str], tag_type: str = "u
|
||||
|
||||
def get_asset_tags(session: Session, asset_info_id: str) -> list[str]:
|
||||
return [
|
||||
tag_name for (tag_name,) in (
|
||||
tag_name
|
||||
for (tag_name,) in (
|
||||
session.execute(
|
||||
select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id)
|
||||
select(AssetInfoTag.tag_name).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id
|
||||
)
|
||||
)
|
||||
).all()
|
||||
]
|
||||
@@ -82,8 +69,13 @@ def set_asset_info_tags(
|
||||
desired = normalize_tags(tags)
|
||||
|
||||
current = set(
|
||||
tag_name for (tag_name,) in (
|
||||
session.execute(select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id))
|
||||
tag_name
|
||||
for (tag_name,) in (
|
||||
session.execute(
|
||||
select(AssetInfoTag.tag_name).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id
|
||||
)
|
||||
)
|
||||
).all()
|
||||
)
|
||||
|
||||
@@ -92,16 +84,25 @@ def set_asset_info_tags(
|
||||
|
||||
if to_add:
|
||||
ensure_tags_exist(session, to_add, tag_type="user")
|
||||
session.add_all([
|
||||
AssetInfoTag(asset_info_id=asset_info_id, tag_name=t, origin=origin, added_at=get_utc_now())
|
||||
for t in to_add
|
||||
])
|
||||
session.add_all(
|
||||
[
|
||||
AssetInfoTag(
|
||||
asset_info_id=asset_info_id,
|
||||
tag_name=t,
|
||||
origin=origin,
|
||||
added_at=get_utc_now(),
|
||||
)
|
||||
for t in to_add
|
||||
]
|
||||
)
|
||||
session.flush()
|
||||
|
||||
if to_remove:
|
||||
session.execute(
|
||||
delete(AssetInfoTag)
|
||||
.where(AssetInfoTag.asset_info_id == asset_info_id, AssetInfoTag.tag_name.in_(to_remove))
|
||||
delete(AssetInfoTag).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id,
|
||||
AssetInfoTag.tag_name.in_(to_remove),
|
||||
)
|
||||
)
|
||||
session.flush()
|
||||
|
||||
@@ -133,7 +134,9 @@ def add_tags_to_asset_info(
|
||||
tag_name
|
||||
for (tag_name,) in (
|
||||
session.execute(
|
||||
sa.select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id)
|
||||
sa.select(AssetInfoTag.tag_name).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id
|
||||
)
|
||||
)
|
||||
).all()
|
||||
}
|
||||
@@ -185,7 +188,9 @@ def remove_tags_from_asset_info(
|
||||
tag_name
|
||||
for (tag_name,) in (
|
||||
session.execute(
|
||||
sa.select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id)
|
||||
sa.select(AssetInfoTag.tag_name).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id
|
||||
)
|
||||
)
|
||||
).all()
|
||||
}
|
||||
@@ -195,8 +200,7 @@ def remove_tags_from_asset_info(
|
||||
|
||||
if to_remove:
|
||||
session.execute(
|
||||
delete(AssetInfoTag)
|
||||
.where(
|
||||
delete(AssetInfoTag).where(
|
||||
AssetInfoTag.asset_info_id == asset_info_id,
|
||||
AssetInfoTag.tag_name.in_(to_remove),
|
||||
)
|
||||
@@ -222,7 +226,10 @@ def add_missing_tag_for_asset_id(
|
||||
.where(AssetInfo.asset_id == asset_id)
|
||||
.where(
|
||||
sa.not_(
|
||||
sa.exists().where((AssetInfoTag.asset_info_id == AssetInfo.id) & (AssetInfoTag.tag_name == "missing"))
|
||||
sa.exists().where(
|
||||
(AssetInfoTag.asset_info_id == AssetInfo.id)
|
||||
& (AssetInfoTag.tag_name == "missing")
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
@@ -232,7 +239,9 @@ def add_missing_tag_for_asset_id(
|
||||
["asset_info_id", "tag_name", "origin", "added_at"],
|
||||
select_rows,
|
||||
)
|
||||
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
|
||||
.on_conflict_do_nothing(
|
||||
index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -242,7 +251,9 @@ def remove_missing_tag_for_asset_id(
|
||||
) -> None:
|
||||
session.execute(
|
||||
sa.delete(AssetInfoTag).where(
|
||||
AssetInfoTag.asset_info_id.in_(sa.select(AssetInfo.id).where(AssetInfo.asset_id == asset_id)),
|
||||
AssetInfoTag.asset_info_id.in_(
|
||||
sa.select(AssetInfo.id).where(AssetInfo.asset_id == asset_id)
|
||||
),
|
||||
AssetInfoTag.tag_name == "missing",
|
||||
)
|
||||
)
|
||||
@@ -264,7 +275,7 @@ def list_tags_with_usage(
|
||||
)
|
||||
.select_from(AssetInfoTag)
|
||||
.join(AssetInfo, AssetInfo.id == AssetInfoTag.asset_info_id)
|
||||
.where(_build_visible_owner_clause(owner_id))
|
||||
.where(build_visible_owner_clause(owner_id))
|
||||
.group_by(AssetInfoTag.tag_name)
|
||||
.subquery()
|
||||
)
|
||||
@@ -323,12 +334,16 @@ def bulk_insert_tags_and_meta(
|
||||
ins_tags = sqlite.insert(AssetInfoTag).on_conflict_do_nothing(
|
||||
index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]
|
||||
)
|
||||
for chunk in _iter_row_chunks(tag_rows, cols_per_row=4):
|
||||
for chunk in iter_row_chunks(tag_rows, cols_per_row=4):
|
||||
session.execute(ins_tags, chunk)
|
||||
|
||||
if meta_rows:
|
||||
ins_meta = sqlite.insert(AssetInfoMeta).on_conflict_do_nothing(
|
||||
index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal]
|
||||
index_elements=[
|
||||
AssetInfoMeta.asset_info_id,
|
||||
AssetInfoMeta.key,
|
||||
AssetInfoMeta.ordinal,
|
||||
]
|
||||
)
|
||||
for chunk in _iter_row_chunks(meta_rows, cols_per_row=7):
|
||||
for chunk in iter_row_chunks(meta_rows, cols_per_row=7):
|
||||
session.execute(ins_meta, chunk)
|
||||
|
||||
Reference in New Issue
Block a user