refactor: move bulk_ops to queries and scanner service

- Delete bulk_ops.py, moving logic to appropriate layers
- Add bulk insert query functions:
  - queries/asset.bulk_insert_assets
  - queries/cache_state.bulk_insert_cache_states_ignore_conflicts
  - queries/cache_state.get_cache_states_by_paths_and_asset_ids
  - queries/asset_info.bulk_insert_asset_infos_ignore_conflicts
  - queries/asset_info.get_asset_info_ids_by_ids
  - queries/tags.bulk_insert_tags_and_meta
- Move seed_from_paths_batch orchestration to scanner._seed_from_paths_batch

Amp-Thread-ID: https://ampcode.com/threads/T-019c24fd-157d-776a-ad24-4f19cf5d3afe
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-03 11:50:39 -08:00
parent 5b4726e01f
commit bf6e85bc01
7 changed files with 343 additions and 205 deletions

View File

@@ -1,3 +1,5 @@
from typing import Iterable
import sqlalchemy as sa
from sqlalchemy import select
from sqlalchemy.dialects import sqlite
@@ -5,6 +7,17 @@ from sqlalchemy.orm import Session
from app.assets.database.models import Asset
MAX_BIND_PARAMS = 800
def _rows_per_stmt(cols: int) -> int:
return max(1, MAX_BIND_PARAMS // max(1, cols))
def _iter_chunks(seq, n: int):
for i in range(0, len(seq), n):
yield seq[i : i + n]
def asset_exists_by_hash(
session: Session,
@@ -68,3 +81,15 @@ def upsert_asset(
updated = True
return asset, created, updated
def bulk_insert_assets(
session: Session,
rows: list[dict],
) -> None:
"""Bulk insert Asset rows. Each dict should have: id, hash, size_bytes, mime_type, created_at."""
if not rows:
return
ins = sqlite.insert(Asset)
for chunk in _iter_chunks(rows, _rows_per_stmt(5)):
session.execute(ins, chunk)