refactor: extract multipart upload parsing from routes

- Add app/assets/api/upload.py with parse_multipart_upload() for HTTP parsing
- Add ParsedUpload dataclass to schemas_in.py
- Add domain exceptions (AssetValidationError, AssetNotFoundError, HashMismatchError)
- Add manager.process_upload() with domain exceptions (no HTTP status codes)
- Routes map domain exceptions to HTTP responses
- Slim down upload_asset route to ~20 lines (was ~150)

Amp-Thread-ID: https://ampcode.com/threads/T-019c2519-abe1-738a-ad2e-29ece17c0e42
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr
2026-02-03 12:12:25 -08:00
parent 69b3fef327
commit 2780b6cd57
4 changed files with 309 additions and 170 deletions

View File

@@ -12,9 +12,19 @@ import mimetypes
import contextlib
from typing import Sequence
from pydantic import ValidationError
import folder_paths
import app.assets.services.hashing as hashing
from app.database.db import create_session
from app.assets.api import schemas_out, schemas_in
from app.assets.api.schemas_in import (
AssetNotFoundError,
AssetValidationError,
HashMismatchError,
ParsedUpload,
)
from app.assets.api.upload import _cleanup_temp
from app.assets.database.queries import (
asset_exists_by_hash,
fetch_asset_info_and_asset,
@@ -292,6 +302,81 @@ def upload_asset_from_temp_path(
)
def process_upload(
parsed: ParsedUpload,
owner_id: str = "",
) -> schemas_out.AssetCreated:
"""
Process a parsed multipart upload.
Args:
parsed: The parsed upload data from parse_multipart_upload
owner_id: The owner ID for the asset
Returns:
AssetCreated response (check created_new to determine if new asset was created)
Raises:
UploadError: On validation or processing errors
"""
try:
spec = schemas_in.UploadAssetSpec.model_validate({
"tags": parsed.tags_raw,
"name": parsed.provided_name,
"user_metadata": parsed.user_metadata_raw,
"hash": parsed.provided_hash,
})
except ValidationError as ve:
_cleanup_temp(parsed.tmp_path)
raise AssetValidationError("INVALID_BODY", f"Validation failed: {ve.json()}")
# Validate models category against configured folders
if spec.tags and spec.tags[0] == "models":
if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths:
_cleanup_temp(parsed.tmp_path)
category = spec.tags[1] if len(spec.tags) >= 2 else ""
raise AssetValidationError("INVALID_BODY", f"unknown models category '{category}'")
# Fast path: if a valid provided hash exists, create AssetInfo without writing anything
if spec.hash and parsed.provided_hash_exists is True:
result = create_asset_from_hash(
hash_str=spec.hash,
name=spec.name or (spec.hash.split(":", 1)[1]),
tags=spec.tags,
user_metadata=spec.user_metadata or {},
owner_id=owner_id,
)
if result is None:
raise AssetNotFoundError(f"Asset content {spec.hash} does not exist")
# Drain temp if we accidentally saved (e.g., hash field came after file)
_cleanup_temp(parsed.tmp_path)
return result
# Otherwise, we must have a temp file path to ingest
if not parsed.tmp_path or not os.path.exists(parsed.tmp_path):
raise AssetNotFoundError("Provided hash not found and no file uploaded.")
try:
return upload_asset_from_temp_path(
spec,
temp_path=parsed.tmp_path,
client_filename=parsed.file_client_name,
owner_id=owner_id,
expected_asset_hash=spec.hash,
)
except ValueError as e:
_cleanup_temp(parsed.tmp_path)
msg = str(e)
if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH":
raise HashMismatchError("Uploaded file hash does not match provided hash.")
raise AssetValidationError("BAD_REQUEST", "Invalid inputs.")
except Exception:
_cleanup_temp(parsed.tmp_path)
raise
def update_asset(
asset_info_id: str,
name: str | None = None,
@@ -332,7 +417,7 @@ def set_asset_preview(
owner_id=owner_id,
)
info = result["info"]
asset = result["asset"]
asset = result["asset"]T
tags = result["tags"]
return schemas_out.AssetDetail(