From 1f6b3d29a256d64a190a36b73353dc3e193fe9bf Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Wed, 4 Mar 2026 15:22:42 -0800 Subject: [PATCH] Add checkpoint/interrupt support to BLAKE3 hashing - Add HashCheckpoint dataclass for saving/resuming interrupted hash computations - compute_blake3_hash now accepts interrupt_check and checkpoint parameters - Returns (digest, None) on completion or (None, checkpoint) on interruption - Update ingest.py caller to handle new tuple return type Amp-Thread-ID: https://ampcode.com/threads/T-019cbb0b-8563-7199-b628-33e3c4fe9f41 Co-authored-by: Amp --- app/assets/services/hashing.py | 68 +++++++++++++++++++++++++++++++--- app/assets/services/ingest.py | 2 +- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/app/assets/services/hashing.py b/app/assets/services/hashing.py index 27bae32fe..fe1e3a502 100644 --- a/app/assets/services/hashing.py +++ b/app/assets/services/hashing.py @@ -1,24 +1,80 @@ import io import os -from typing import IO +from dataclasses import dataclass +from typing import IO, Any, Callable from blake3 import blake3 DEFAULT_CHUNK = 8 * 1024 * 1024 +InterruptCheck = Callable[[], bool] + + +@dataclass +class HashCheckpoint: + """Saved state for resuming an interrupted hash computation.""" + + bytes_processed: int + hasher: Any # blake3 hasher instance + def compute_blake3_hash( fp: str | IO[bytes], chunk_size: int = DEFAULT_CHUNK, -) -> str: + interrupt_check: InterruptCheck | None = None, + checkpoint: HashCheckpoint | None = None, +) -> tuple[str | None, HashCheckpoint | None]: + """Compute BLAKE3 hash of a file, with optional checkpoint support. + + Args: + fp: File path or file-like object + chunk_size: Size of chunks to read at a time + interrupt_check: Optional callable that may block (e.g. while paused) + and returns True if the operation should be cancelled. Checked + between chunk reads. + checkpoint: Optional checkpoint to resume from (file paths only) + + Returns: + Tuple of (hex_digest, None) on completion, or + (None, checkpoint) on interruption (file paths only), or + (None, None) on interruption of a file object + """ if hasattr(fp, "read"): - return _hash_file_obj(fp, chunk_size) + digest = _hash_file_obj(fp, chunk_size, interrupt_check) + return digest, None with open(os.fspath(fp), "rb") as f: - return _hash_file_obj(f, chunk_size) + if checkpoint is not None: + f.seek(checkpoint.bytes_processed) + h = checkpoint.hasher + bytes_processed = checkpoint.bytes_processed + else: + h = blake3() + bytes_processed = 0 + + if chunk_size <= 0: + chunk_size = DEFAULT_CHUNK + + while True: + if interrupt_check is not None and interrupt_check(): + return None, HashCheckpoint( + bytes_processed=bytes_processed, + hasher=h, + ) + chunk = f.read(chunk_size) + if not chunk: + break + h.update(chunk) + bytes_processed += len(chunk) + + return h.hexdigest(), None -def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str: +def _hash_file_obj( + file_obj: IO, + chunk_size: int = DEFAULT_CHUNK, + interrupt_check: InterruptCheck | None = None, +) -> str | None: if chunk_size <= 0: chunk_size = DEFAULT_CHUNK @@ -37,6 +93,8 @@ def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str: try: h = blake3() while True: + if interrupt_check is not None and interrupt_check(): + return None chunk = file_obj.read(chunk_size) if not chunk: break diff --git a/app/assets/services/ingest.py b/app/assets/services/ingest.py index 3adaf4350..44d7aef36 100644 --- a/app/assets/services/ingest.py +++ b/app/assets/services/ingest.py @@ -244,7 +244,7 @@ def upload_from_temp_path( expected_hash: str | None = None, ) -> UploadResult: try: - digest = hashing.compute_blake3_hash(temp_path) + digest, _ = hashing.compute_blake3_hash(temp_path) except ImportError as e: raise DependencyMissingError(str(e)) except Exception as e: