From 6db4f4e3f1d93663df5598ffa300ad7c739869ab Mon Sep 17 00:00:00 2001 From: Jedrzej Kosinski Date: Thu, 15 Jan 2026 23:41:19 -0800 Subject: [PATCH] Finished @ROUTES.post("/api/assets") --- app/assets/api/routes.py | 186 +++++++++++++++++++++++++++ app/assets/api/schemas_in.py | 114 ++++++++++++++++- app/assets/database/queries.py | 226 ++++++++++++++++++++++++++++++++- app/assets/helpers.py | 35 ++++- app/assets/manager.py | 133 ++++++++++++++++++- requirements.txt | 1 + 6 files changed, 691 insertions(+), 4 deletions(-) diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index dddf8f790..a162aaa24 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -1,6 +1,8 @@ import logging import uuid import urllib.parse +import os +import contextlib from aiohttp import web from pydantic import ValidationError @@ -10,6 +12,8 @@ from app import user_manager from app.assets.api import schemas_in from app.assets.helpers import get_query_dict +import folder_paths + ROUTES = web.RouteTableDef() USER_MANAGER: user_manager.UserManager | None = None @@ -164,3 +168,185 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist") return web.json_response(result.model_dump(mode="json"), status=201) + +@ROUTES.post("/api/assets") +async def upload_asset(request: web.Request) -> web.Response: + """Multipart/form-data endpoint for Asset uploads.""" + + if not (request.content_type or "").lower().startswith("multipart/"): + return _error_response(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.") + + reader = await request.multipart() + + file_present = False + file_client_name: str | None = None + tags_raw: list[str] = [] + provided_name: str | None = None + user_metadata_raw: str | None = None + provided_hash: str | None = None + provided_hash_exists: bool | None = None + + file_written = 0 + tmp_path: str | None = None + while True: + field = await reader.next() + if field is None: + break + + fname = getattr(field, "name", "") or "" + + if fname == "hash": + try: + s = ((await field.text()) or "").strip().lower() + except Exception: + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + + if s: + if ":" not in s: + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + algo, digest = s.split(":", 1) + if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + provided_hash = f"{algo}:{digest}" + try: + provided_hash_exists = manager.asset_exists(asset_hash=provided_hash) + except Exception: + provided_hash_exists = None # do not fail the whole request here + + elif fname == "file": + file_present = True + file_client_name = (field.filename or "").strip() + + if provided_hash and provided_hash_exists is True: + # If client supplied a hash that we know exists, drain but do not write to disk + try: + while True: + chunk = await field.read_chunk(8 * 1024 * 1024) + if not chunk: + break + file_written += len(chunk) + except Exception: + return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.") + continue # Do not create temp file; we will create AssetInfo from the existing content + + # Otherwise, store to temp for hashing/ingest + uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads") + unique_dir = os.path.join(uploads_root, uuid.uuid4().hex) + os.makedirs(unique_dir, exist_ok=True) + tmp_path = os.path.join(unique_dir, ".upload.part") + + try: + with open(tmp_path, "wb") as f: + while True: + chunk = await field.read_chunk(8 * 1024 * 1024) + if not chunk: + break + f.write(chunk) + file_written += len(chunk) + except Exception: + try: + if os.path.exists(tmp_path or ""): + os.remove(tmp_path) + finally: + return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.") + elif fname == "tags": + tags_raw.append((await field.text()) or "") + elif fname == "name": + provided_name = (await field.text()) or None + elif fname == "user_metadata": + user_metadata_raw = (await field.text()) or None + + # If client did not send file, and we are not doing a from-hash fast path -> error + if not file_present and not (provided_hash and provided_hash_exists): + return _error_response(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.") + + if file_present and file_written == 0 and not (provided_hash and provided_hash_exists): + # Empty upload is only acceptable if we are fast-pathing from existing hash + try: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + finally: + return _error_response(400, "EMPTY_UPLOAD", "Uploaded file is empty.") + + try: + spec = schemas_in.UploadAssetSpec.model_validate({ + "tags": tags_raw, + "name": provided_name, + "user_metadata": user_metadata_raw, + "hash": provided_hash, + }) + except ValidationError as ve: + try: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + finally: + return _validation_error_response("INVALID_BODY", ve) + + # Validate models category against configured folders (consistent with previous behavior) + if spec.tags and spec.tags[0] == "models": + if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + return _error_response( + 400, "INVALID_BODY", f"unknown models category '{spec.tags[1] if len(spec.tags) >= 2 else ''}'" + ) + + owner_id = USER_MANAGER.get_request_user_id(request) + + # Fast path: if a valid provided hash exists, create AssetInfo without writing anything + if spec.hash and provided_hash_exists is True: + try: + result = manager.create_asset_from_hash( + hash_str=spec.hash, + name=spec.name or (spec.hash.split(":", 1)[1]), + tags=spec.tags, + user_metadata=spec.user_metadata or {}, + owner_id=owner_id, + ) + except Exception: + logging.exception("create_asset_from_hash failed for hash=%s, owner_id=%s", spec.hash, owner_id) + return _error_response(500, "INTERNAL", "Unexpected server error.") + + if result is None: + return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist") + + # Drain temp if we accidentally saved (e.g., hash field came after file) + if tmp_path and os.path.exists(tmp_path): + with contextlib.suppress(Exception): + os.remove(tmp_path) + + status = 200 if (not result.created_new) else 201 + return web.json_response(result.model_dump(mode="json"), status=status) + + # Otherwise, we must have a temp file path to ingest + if not tmp_path or not os.path.exists(tmp_path): + # The only case we reach here without a temp file is: client sent a hash that does not exist and no file + return _error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.") + + try: + created = manager.upload_asset_from_temp_path( + spec, + temp_path=tmp_path, + client_filename=file_client_name, + owner_id=owner_id, + expected_asset_hash=spec.hash, + ) + status = 201 if created.created_new else 200 + return web.json_response(created.model_dump(mode="json"), status=status) + except ValueError as e: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + msg = str(e) + if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH": + return _error_response( + 400, + "HASH_MISMATCH", + "Uploaded file hash does not match provided hash.", + ) + return _error_response(400, "BAD_REQUEST", "Invalid inputs.") + except Exception: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + logging.exception("upload_asset_from_temp_path failed for tmp_path=%s, owner_id=%s", tmp_path, owner_id) + return _error_response(500, "INTERNAL", "Unexpected server error.") + diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index 0f16a0f0e..718416543 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -8,6 +8,7 @@ from pydantic import ( Field, conint, field_validator, + model_validator, ) @@ -97,7 +98,6 @@ class CreateFromHashBody(BaseModel): return [] - class TagsListQuery(BaseModel): model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) @@ -116,6 +116,118 @@ class TagsListQuery(BaseModel): return v.lower() or None +class UploadAssetSpec(BaseModel): + """Upload Asset operation. + - tags: ordered; first is root ('models'|'input'|'output'); + if root == 'models', second must be a valid category from folder_paths.folder_names_and_paths + - name: display name + - user_metadata: arbitrary JSON object (optional) + - hash: optional canonical 'blake3:' provided by the client for validation / fast-path + + Files created via this endpoint are stored on disk using the **content hash** as the filename stem + and the original extension is preserved when available. + """ + model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) + + tags: list[str] = Field(..., min_length=1) + name: str | None = Field(default=None, max_length=512, description="Display Name") + user_metadata: dict[str, Any] = Field(default_factory=dict) + hash: str | None = Field(default=None) + + @field_validator("hash", mode="before") + @classmethod + def _parse_hash(cls, v): + if v is None: + return None + s = str(v).strip().lower() + if not s: + return None + if ":" not in s: + raise ValueError("hash must be 'blake3:'") + algo, digest = s.split(":", 1) + if algo != "blake3": + raise ValueError("only canonical 'blake3:' is accepted here") + if not digest or any(c for c in digest if c not in "0123456789abcdef"): + raise ValueError("hash digest must be lowercase hex") + return f"{algo}:{digest}" + + @field_validator("tags", mode="before") + @classmethod + def _parse_tags(cls, v): + """ + Accepts a list of strings (possibly multiple form fields), + where each string can be: + - JSON array (e.g., '["models","loras","foo"]') + - comma-separated ('models, loras, foo') + - single token ('models') + Returns a normalized, deduplicated, ordered list. + """ + items: list[str] = [] + if v is None: + return [] + if isinstance(v, str): + v = [v] + + if isinstance(v, list): + for item in v: + if item is None: + continue + s = str(item).strip() + if not s: + continue + if s.startswith("["): + try: + arr = json.loads(s) + if isinstance(arr, list): + items.extend(str(x) for x in arr) + continue + except Exception: + pass # fallback to CSV parse below + items.extend([p for p in s.split(",") if p.strip()]) + else: + return [] + + # normalize + dedupe + norm = [] + seen = set() + for t in items: + tnorm = str(t).strip().lower() + if tnorm and tnorm not in seen: + seen.add(tnorm) + norm.append(tnorm) + return norm + + @field_validator("user_metadata", mode="before") + @classmethod + def _parse_metadata_json(cls, v): + if v is None or isinstance(v, dict): + return v or {} + if isinstance(v, str): + s = v.strip() + if not s: + return {} + try: + parsed = json.loads(s) + except Exception as e: + raise ValueError(f"user_metadata must be JSON: {e}") from e + if not isinstance(parsed, dict): + raise ValueError("user_metadata must be a JSON object") + return parsed + return {} + + @model_validator(mode="after") + def _validate_order(self): + if not self.tags: + raise ValueError("tags must be provided and non-empty") + root = self.tags[0] + if root not in {"models", "input", "output"}: + raise ValueError("first tag must be one of: models, input, output") + if root == "models": + if len(self.tags) < 2: + raise ValueError("models uploads require a category tag as the second tag") + return self + + class SetPreviewBody(BaseModel): """Set or clear the preview for an AssetInfo. Provide an Asset.id or null.""" preview_id: str | None = None diff --git a/app/assets/database/queries.py b/app/assets/database/queries.py index c9aac4c68..a446fe914 100644 --- a/app/assets/database/queries.py +++ b/app/assets/database/queries.py @@ -1,8 +1,9 @@ import os +import logging import sqlalchemy as sa from collections import defaultdict from datetime import datetime -from typing import Iterable +from typing import Iterable, Any from sqlalchemy import select, delete, exists, func from sqlalchemy.dialects import sqlite from sqlalchemy.exc import IntegrityError @@ -441,6 +442,216 @@ def replace_asset_info_metadata_projection( session.flush() +def ingest_fs_asset( + session: Session, + *, + asset_hash: str, + abs_path: str, + size_bytes: int, + mtime_ns: int, + mime_type: str | None = None, + info_name: str | None = None, + owner_id: str = "", + preview_id: str | None = None, + user_metadata: dict | None = None, + tags: Sequence[str] = (), + tag_origin: str = "manual", + require_existing_tags: bool = False, +) -> dict: + """ + Idempotently upsert: + - Asset by content hash (create if missing) + - AssetCacheState(file_path) pointing to asset_id + - Optionally AssetInfo + tag links and metadata projection + Returns flags and ids. + """ + locator = os.path.abspath(abs_path) + now = utcnow() + + if preview_id: + if not session.get(Asset, preview_id): + preview_id = None + + out: dict[str, Any] = { + "asset_created": False, + "asset_updated": False, + "state_created": False, + "state_updated": False, + "asset_info_id": None, + } + + # 1) Asset by hash + asset = ( + session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1)) + ).scalars().first() + if not asset: + vals = { + "hash": asset_hash, + "size_bytes": int(size_bytes), + "mime_type": mime_type, + "created_at": now, + } + res = session.execute( + sqlite.insert(Asset) + .values(**vals) + .on_conflict_do_nothing(index_elements=[Asset.hash]) + ) + if int(res.rowcount or 0) > 0: + out["asset_created"] = True + asset = ( + session.execute( + select(Asset).where(Asset.hash == asset_hash).limit(1) + ) + ).scalars().first() + if not asset: + raise RuntimeError("Asset row not found after upsert.") + else: + changed = False + if asset.size_bytes != int(size_bytes) and int(size_bytes) > 0: + asset.size_bytes = int(size_bytes) + changed = True + if mime_type and asset.mime_type != mime_type: + asset.mime_type = mime_type + changed = True + if changed: + out["asset_updated"] = True + + # 2) AssetCacheState upsert by file_path (unique) + vals = { + "asset_id": asset.id, + "file_path": locator, + "mtime_ns": int(mtime_ns), + } + ins = ( + sqlite.insert(AssetCacheState) + .values(**vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) + ) + + res = session.execute(ins) + if int(res.rowcount or 0) > 0: + out["state_created"] = True + else: + upd = ( + sa.update(AssetCacheState) + .where(AssetCacheState.file_path == locator) + .where( + sa.or_( + AssetCacheState.asset_id != asset.id, + AssetCacheState.mtime_ns.is_(None), + AssetCacheState.mtime_ns != int(mtime_ns), + ) + ) + .values(asset_id=asset.id, mtime_ns=int(mtime_ns)) + ) + res2 = session.execute(upd) + if int(res2.rowcount or 0) > 0: + out["state_updated"] = True + + # 3) Optional AssetInfo + tags + metadata + if info_name: + try: + with session.begin_nested(): + info = AssetInfo( + owner_id=owner_id, + name=info_name, + asset_id=asset.id, + preview_id=preview_id, + created_at=now, + updated_at=now, + last_access_time=now, + ) + session.add(info) + session.flush() + out["asset_info_id"] = info.id + except IntegrityError: + pass + + existing_info = ( + session.execute( + select(AssetInfo) + .where( + AssetInfo.asset_id == asset.id, + AssetInfo.name == info_name, + (AssetInfo.owner_id == owner_id), + ) + .limit(1) + ) + ).unique().scalar_one_or_none() + if not existing_info: + raise RuntimeError("Failed to update or insert AssetInfo.") + + if preview_id and existing_info.preview_id != preview_id: + existing_info.preview_id = preview_id + + existing_info.updated_at = now + if existing_info.last_access_time < now: + existing_info.last_access_time = now + session.flush() + out["asset_info_id"] = existing_info.id + + norm = [t.strip().lower() for t in (tags or []) if (t or "").strip()] + if norm and out["asset_info_id"] is not None: + if not require_existing_tags: + ensure_tags_exist(session, norm, tag_type="user") + + existing_tag_names = set( + name for (name,) in (session.execute(select(Tag.name).where(Tag.name.in_(norm)))).all() + ) + missing = [t for t in norm if t not in existing_tag_names] + if missing and require_existing_tags: + raise ValueError(f"Unknown tags: {missing}") + + existing_links = set( + tag_name + for (tag_name,) in ( + session.execute( + select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == out["asset_info_id"]) + ) + ).all() + ) + to_add = [t for t in norm if t in existing_tag_names and t not in existing_links] + if to_add: + session.add_all( + [ + AssetInfoTag( + asset_info_id=out["asset_info_id"], + tag_name=t, + origin=tag_origin, + added_at=now, + ) + for t in to_add + ] + ) + session.flush() + + # metadata["filename"] hack + if out["asset_info_id"] is not None: + primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset.id)) + computed_filename = compute_relative_filename(primary_path) if primary_path else None + + current_meta = existing_info.user_metadata or {} + new_meta = dict(current_meta) + if user_metadata is not None: + for k, v in user_metadata.items(): + new_meta[k] = v + if computed_filename: + new_meta["filename"] = computed_filename + + if new_meta != current_meta: + replace_asset_info_metadata_projection( + session, + asset_info_id=out["asset_info_id"], + user_metadata=new_meta, + ) + + try: + remove_missing_tag_for_asset_id(session, asset_id=asset.id) + except Exception: + logging.exception("Failed to clear 'missing' tag for asset %s", asset.id) + return out + + def list_tags_with_usage( session: Session, prefix: str | None = None, @@ -521,3 +732,16 @@ def get_asset_tags(session: Session, *, asset_info_id: str) -> list[str]: ) ).all() ] + + +def remove_missing_tag_for_asset_id( + session: Session, + *, + asset_id: str, +) -> None: + session.execute( + sa.delete(AssetInfoTag).where( + AssetInfoTag.asset_info_id.in_(sa.select(AssetInfo.id).where(AssetInfo.asset_id == asset_id)), + AssetInfoTag.tag_name == "missing", + ) + ) diff --git a/app/assets/helpers.py b/app/assets/helpers.py index b9b260921..5030b123a 100644 --- a/app/assets/helpers.py +++ b/app/assets/helpers.py @@ -88,6 +88,40 @@ def get_comfy_models_folders() -> list[tuple[str, list[str]]]: targets.append((name, paths)) return targets +def resolve_destination_from_tags(tags: list[str]) -> tuple[str, list[str]]: + """Validates and maps tags -> (base_dir, subdirs_for_fs)""" + root = tags[0] + if root == "models": + if len(tags) < 2: + raise ValueError("at least two tags required for model asset") + try: + bases = folder_paths.folder_names_and_paths[tags[1]][0] + except KeyError: + raise ValueError(f"unknown model category '{tags[1]}'") + if not bases: + raise ValueError(f"no base path configured for category '{tags[1]}'") + base_dir = os.path.abspath(bases[0]) + raw_subdirs = tags[2:] + else: + base_dir = os.path.abspath( + folder_paths.get_input_directory() if root == "input" else folder_paths.get_output_directory() + ) + raw_subdirs = tags[1:] + for i in raw_subdirs: + if i in (".", ".."): + raise ValueError("invalid path component in tags") + + return base_dir, raw_subdirs if raw_subdirs else [] + +def ensure_within_base(candidate: str, base: str) -> None: + cand_abs = os.path.abspath(candidate) + base_abs = os.path.abspath(base) + try: + if os.path.commonpath([cand_abs, base_abs]) != base_abs: + raise ValueError("destination escapes base directory") + except Exception: + raise ValueError("invalid destination path") + def compute_relative_filename(file_path: str) -> str | None: """ Return the model's path relative to the last well-known folder (the model category), @@ -114,7 +148,6 @@ def compute_relative_filename(file_path: str) -> str | None: return "/".join(inside) return "/".join(parts) # input/output: keep all parts - def get_relative_to_root_category_path_of_asset(file_path: str) -> tuple[Literal["input", "output", "models"], str]: """Given an absolute or relative file path, determine which root category the path belongs to: - 'input' if the file resides under `folder_paths.get_input_directory()` diff --git a/app/assets/manager.py b/app/assets/manager.py index 999961cef..7e77408e4 100644 --- a/app/assets/manager.py +++ b/app/assets/manager.py @@ -1,9 +1,10 @@ import os import mimetypes +import contextlib from typing import Sequence from app.database.db import create_session -from app.assets.api import schemas_out +from app.assets.api import schemas_out, schemas_in from app.assets.database.queries import ( asset_exists_by_hash, get_asset_by_hash, @@ -16,7 +17,11 @@ from app.assets.database.queries import ( list_tags_with_usage, get_asset_tags, pick_best_live_path, + ingest_fs_asset, ) +from app.assets.helpers import resolve_destination_from_tags, ensure_within_base + +import app.assets.hashing as hashing def _safe_sort_field(requested: str | None) -> str: @@ -28,6 +33,11 @@ def _safe_sort_field(requested: str | None) -> str: return "created_at" +def _get_size_mtime_ns(path: str) -> tuple[int, int]: + st = os.stat(path, follow_symlinks=True) + return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) + + def _safe_filename(name: str | None, fallback: str) -> str: n = os.path.basename((name or "").strip() or fallback) if n: @@ -42,6 +52,7 @@ def asset_exists(*, asset_hash: str) -> bool: with create_session() as session: return asset_exists_by_hash(session, asset_hash=asset_hash) + def list_assets( *, include_tags: Sequence[str] | None = None, @@ -147,6 +158,126 @@ def resolve_asset_content_for_download( return abs_path, ctype, download_name +def upload_asset_from_temp_path( + spec: schemas_in.UploadAssetSpec, + *, + temp_path: str, + client_filename: str | None = None, + owner_id: str = "", + expected_asset_hash: str | None = None, +) -> schemas_out.AssetCreated: + try: + digest = hashing.blake3_hash(temp_path) + except Exception as e: + raise RuntimeError(f"failed to hash uploaded file: {e}") + asset_hash = "blake3:" + digest + + if expected_asset_hash and asset_hash != expected_asset_hash.strip().lower(): + raise ValueError("HASH_MISMATCH") + + with create_session() as session: + existing = get_asset_by_hash(session, asset_hash=asset_hash) + if existing is not None: + with contextlib.suppress(Exception): + if temp_path and os.path.exists(temp_path): + os.remove(temp_path) + + display_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest) + info = create_asset_info_for_existing_asset( + session, + asset_hash=asset_hash, + name=display_name, + user_metadata=spec.user_metadata or {}, + tags=spec.tags or [], + tag_origin="manual", + owner_id=owner_id, + ) + tag_names = get_asset_tags(session, asset_info_id=info.id) + session.commit() + + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=existing.hash, + size=int(existing.size_bytes) if existing.size_bytes is not None else None, + mime_type=existing.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=False, + ) + + base_dir, subdirs = resolve_destination_from_tags(spec.tags) + dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir + os.makedirs(dest_dir, exist_ok=True) + + src_for_ext = (client_filename or spec.name or "").strip() + _ext = os.path.splitext(os.path.basename(src_for_ext))[1] if src_for_ext else "" + ext = _ext if 0 < len(_ext) <= 16 else "" + hashed_basename = f"{digest}{ext}" + dest_abs = os.path.abspath(os.path.join(dest_dir, hashed_basename)) + ensure_within_base(dest_abs, base_dir) + + content_type = ( + mimetypes.guess_type(os.path.basename(src_for_ext), strict=False)[0] + or mimetypes.guess_type(hashed_basename, strict=False)[0] + or "application/octet-stream" + ) + + try: + os.replace(temp_path, dest_abs) + except Exception as e: + raise RuntimeError(f"failed to move uploaded file into place: {e}") + + try: + size_bytes, mtime_ns = _get_size_mtime_ns(dest_abs) + except OSError as e: + raise RuntimeError(f"failed to stat destination file: {e}") + + with create_session() as session: + result = ingest_fs_asset( + session, + asset_hash=asset_hash, + abs_path=dest_abs, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + mime_type=content_type, + info_name=_safe_filename(spec.name or (client_filename or ""), fallback=digest), + owner_id=owner_id, + preview_id=None, + user_metadata=spec.user_metadata or {}, + tags=spec.tags, + tag_origin="manual", + require_existing_tags=False, + ) + info_id = result["asset_info_id"] + if not info_id: + raise RuntimeError("failed to create asset metadata") + + pair = fetch_asset_info_and_asset(session, asset_info_id=info_id, owner_id=owner_id) + if not pair: + raise RuntimeError("inconsistent DB state after ingest") + info, asset = pair + tag_names = get_asset_tags(session, asset_info_id=info.id) + session.commit() + + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=asset.hash, + size=int(asset.size_bytes), + mime_type=asset.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=result["asset_created"], + ) + + def create_asset_from_hash( *, hash_str: str, diff --git a/requirements.txt b/requirements.txt index 996701550..2c0d59cc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,6 +22,7 @@ alembic SQLAlchemy av>=14.2.0 comfy-kitchen>=0.2.6 +blake3 #non essential dependencies: kornia>=0.7.1