refactor: move bulk_ops to queries and scanner service

- Delete bulk_ops.py, moving logic to appropriate layers
- Add bulk insert query functions:
  - queries/asset.bulk_insert_assets
  - queries/cache_state.bulk_insert_cache_states_ignore_conflicts
  - queries/cache_state.get_cache_states_by_paths_and_asset_ids
  - queries/asset_info.bulk_insert_asset_infos_ignore_conflicts
  - queries/asset_info.get_asset_info_ids_by_ids
  - queries/tags.bulk_insert_tags_and_meta
- Move seed_from_paths_batch orchestration to scanner._seed_from_paths_batch

Amp-Thread-ID: https://ampcode.com/threads/T-019c24fd-157d-776a-ad24-4f19cf5d3afe
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-03 11:50:39 -08:00
parent 48bfd29fb6
commit ef97ea8880
7 changed files with 343 additions and 205 deletions

View File

@@ -5,6 +5,7 @@ from app.assets.database.queries.asset import (
asset_exists_by_hash,
get_asset_by_hash,
upsert_asset,
bulk_insert_assets,
)
from app.assets.database.queries.asset_info import (
@@ -20,6 +21,8 @@ from app.assets.database.queries.asset_info import (
replace_asset_info_metadata_projection,
delete_asset_info_by_id,
set_asset_info_preview,
bulk_insert_asset_infos_ignore_conflicts,
get_asset_info_ids_by_ids,
)
from app.assets.database.queries.cache_state import (
@@ -33,6 +36,8 @@ from app.assets.database.queries.cache_state import (
bulk_set_needs_verify,
delete_cache_states_by_ids,
delete_orphaned_seed_asset,
bulk_insert_cache_states_ignore_conflicts,
get_cache_states_by_paths_and_asset_ids,
)
from app.assets.database.queries.tags import (
@@ -44,6 +49,7 @@ from app.assets.database.queries.tags import (
add_missing_tag_for_asset_id,
remove_missing_tag_for_asset_id,
list_tags_with_usage,
bulk_insert_tags_and_meta,
)
__all__ = [
@@ -51,6 +57,7 @@ __all__ = [
"asset_exists_by_hash",
"get_asset_by_hash",
"upsert_asset",
"bulk_insert_assets",
# asset_info.py
"asset_info_exists_for_asset_id",
"get_asset_info_by_id",
@@ -64,6 +71,8 @@ __all__ = [
"replace_asset_info_metadata_projection",
"delete_asset_info_by_id",
"set_asset_info_preview",
"bulk_insert_asset_infos_ignore_conflicts",
"get_asset_info_ids_by_ids",
# cache_state.py
"CacheStateRow",
"list_cache_states_by_asset_id",
@@ -75,6 +84,8 @@ __all__ = [
"bulk_set_needs_verify",
"delete_cache_states_by_ids",
"delete_orphaned_seed_asset",
"bulk_insert_cache_states_ignore_conflicts",
"get_cache_states_by_paths_and_asset_ids",
# tags.py
"ensure_tags_exist",
"get_asset_tags",
@@ -84,4 +95,5 @@ __all__ = [
"add_missing_tag_for_asset_id",
"remove_missing_tag_for_asset_id",
"list_tags_with_usage",
"bulk_insert_tags_and_meta",
]

View File

@@ -1,3 +1,5 @@
from typing import Iterable
import sqlalchemy as sa
from sqlalchemy import select
from sqlalchemy.dialects import sqlite
@@ -5,6 +7,17 @@ from sqlalchemy.orm import Session
from app.assets.database.models import Asset
MAX_BIND_PARAMS = 800
def _rows_per_stmt(cols: int) -> int:
return max(1, MAX_BIND_PARAMS // max(1, cols))
def _iter_chunks(seq, n: int):
for i in range(0, len(seq), n):
yield seq[i : i + n]
def asset_exists_by_hash(
session: Session,
@@ -68,3 +81,15 @@ def upsert_asset(
updated = True
return asset, created, updated
def bulk_insert_assets(
session: Session,
rows: list[dict],
) -> None:
"""Bulk insert Asset rows. Each dict should have: id, hash, size_bytes, mime_type, created_at."""
if not rows:
return
ins = sqlite.insert(Asset)
for chunk in _iter_chunks(rows, _rows_per_stmt(5)):
session.execute(ins, chunk)

View File

@@ -11,6 +11,7 @@ from typing import Sequence
import sqlalchemy as sa
from sqlalchemy import select, delete, exists
from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, contains_eager, noload
@@ -19,6 +20,17 @@ from app.assets.database.models import (
)
from app.assets.helpers import escape_like_prefix, normalize_tags, project_kv, utcnow
MAX_BIND_PARAMS = 800
def _rows_per_stmt(cols: int) -> int:
return max(1, MAX_BIND_PARAMS // max(1, cols))
def _iter_chunks(seq, n: int):
for i in range(0, len(seq), n):
yield seq[i : i + n]
def _visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
"""Build owner visibility predicate for reads. Owner-less rows are visible to everyone."""
@@ -410,3 +422,38 @@ def set_asset_info_preview(
info.updated_at = utcnow()
session.flush()
def bulk_insert_asset_infos_ignore_conflicts(
session: Session,
rows: list[dict],
) -> None:
"""Bulk insert AssetInfo rows with ON CONFLICT DO NOTHING.
Each dict should have: id, owner_id, name, asset_id, preview_id,
user_metadata, created_at, updated_at, last_access_time
"""
if not rows:
return
ins = sqlite.insert(AssetInfo).on_conflict_do_nothing(
index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name]
)
for chunk in _iter_chunks(rows, _rows_per_stmt(9)):
session.execute(ins, chunk)
def get_asset_info_ids_by_ids(
session: Session,
info_ids: list[str],
) -> set[str]:
"""Query to find which AssetInfo IDs exist in the database."""
if not info_ids:
return set()
found: set[str] = set()
for chunk in _iter_chunks(info_ids, MAX_BIND_PARAMS):
result = session.execute(
select(AssetInfo.id).where(AssetInfo.id.in_(chunk))
)
found.update(result.scalars().all())
return found

View File

@@ -9,6 +9,8 @@ from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetCacheState, AssetInfo
from app.assets.helpers import escape_like_prefix
MAX_BIND_PARAMS = 800
__all__ = [
"CacheStateRow",
"list_cache_states_by_asset_id",
@@ -20,9 +22,20 @@ __all__ = [
"bulk_set_needs_verify",
"delete_cache_states_by_ids",
"delete_orphaned_seed_asset",
"bulk_insert_cache_states_ignore_conflicts",
"get_cache_states_by_paths_and_asset_ids",
]
def _rows_per_stmt(cols: int) -> int:
return max(1, MAX_BIND_PARAMS // max(1, cols))
def _iter_chunks(seq, n: int):
for i in range(0, len(seq), n):
yield seq[i : i + n]
class CacheStateRow(NamedTuple):
"""Row from cache state query with joined asset data."""
@@ -233,3 +246,50 @@ def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool:
session.delete(asset)
return True
return False
def bulk_insert_cache_states_ignore_conflicts(
session: Session,
rows: list[dict],
) -> None:
"""Bulk insert cache state rows with ON CONFLICT DO NOTHING on file_path.
Each dict should have: asset_id, file_path, mtime_ns
"""
if not rows:
return
ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing(
index_elements=[AssetCacheState.file_path]
)
for chunk in _iter_chunks(rows, _rows_per_stmt(3)):
session.execute(ins, chunk)
def get_cache_states_by_paths_and_asset_ids(
session: Session,
path_to_asset: dict[str, str],
) -> set[str]:
"""Query cache states to find paths where our asset_id won the insert.
Args:
path_to_asset: Mapping of file_path -> asset_id we tried to insert
Returns:
Set of file_paths where our asset_id is present
"""
if not path_to_asset:
return set()
paths = list(path_to_asset.keys())
winners: set[str] = set()
for chunk in _iter_chunks(paths, MAX_BIND_PARAMS):
result = session.execute(
select(AssetCacheState.file_path).where(
AssetCacheState.file_path.in_(chunk),
AssetCacheState.asset_id.in_([path_to_asset[p] for p in chunk]),
)
)
winners.update(result.scalars().all())
return winners

View File

@@ -6,9 +6,23 @@ from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.assets.database.models import AssetInfo, AssetInfoTag, Tag
from app.assets.database.models import AssetInfo, AssetInfoMeta, AssetInfoTag, Tag
from app.assets.helpers import escape_like_prefix, normalize_tags, utcnow
MAX_BIND_PARAMS = 800
def _rows_per_stmt(cols: int) -> int:
return max(1, MAX_BIND_PARAMS // max(1, cols))
def _chunk_rows(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]:
if not rows:
return []
rows_per_stmt = max(1, MAX_BIND_PARAMS // max(1, cols_per_row))
for i in range(0, len(rows), rows_per_stmt):
yield rows[i : i + rows_per_stmt]
def _visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
"""Build owner visibility predicate for reads. Owner-less rows are visible to everyone."""
@@ -273,3 +287,30 @@ def list_tags_with_usage(
rows_norm = [(name, ttype, int(count or 0)) for (name, ttype, count) in rows]
return rows_norm, int(total or 0)
def bulk_insert_tags_and_meta(
session: Session,
tag_rows: list[dict],
meta_rows: list[dict],
) -> None:
"""Batch insert into asset_info_tags and asset_info_meta with ON CONFLICT DO NOTHING.
Args:
session: Database session
tag_rows: List of dicts with keys: asset_info_id, tag_name, origin, added_at
meta_rows: List of dicts with keys: asset_info_id, key, ordinal, val_str, val_num, val_bool, val_json
"""
if tag_rows:
ins_tags = sqlite.insert(AssetInfoTag).on_conflict_do_nothing(
index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]
)
for chunk in _chunk_rows(tag_rows, cols_per_row=4):
session.execute(ins_tags, chunk)
if meta_rows:
ins_meta = sqlite.insert(AssetInfoMeta).on_conflict_do_nothing(
index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal]
)
for chunk in _chunk_rows(meta_rows, cols_per_row=7):
session.execute(ins_meta, chunk)