Compare commits

...

2 Commits

Author SHA1 Message Date
DrJKL
abb892b459 knip fix 2026-05-04 13:21:43 -07:00
DrJKL
c89da7ed35 refactor: dedupe abort + hash-grouping in missing-asset scans
Follow-up cleanup to PR #11873. Removes three patterns that
  were
  duplicated across the missing-media and missing-model scans,
  and
  replaces two private helpers in assetService with the
  standard
  AbortSignal#throwIfAborted API.

  - Extract the parallel hash-verification flow into a single
    verifyCandidatesByAssetHash helper (called out as a
  follow-up
    in PR #11873).
  - Replace the imperative Map + legacy-list builder in both
  scans
    with es-toolkit partition + groupBy.
  - Drop local AssetHashVerifier type duplicates; export the
    canonical type from assetService.
  - Drop local isAbortError duplicates in both scans in favour
  of
    the shared @/utils/typeGuardUtil isAbortError, widened to
    instanceof Error so it matches the contract the local
  copies
    were enforcing.
  - Replace createAbortError + throwIfAborted private helpers
  with
    signal?.throwIfAborted() and signal.reason.

  Net -43 lines of production code, no behaviour change. All
  177
  tests across missingMedia, missingModel, assetService,
  releaseService, customerEvents, and comfyRegistry pass.
2026-05-04 13:11:13 -07:00
5 changed files with 103 additions and 149 deletions

View File

@@ -185,6 +185,12 @@ export const MISSING_TAG = 'missing'
/** Result of a HEAD lookup against an exact asset hash. */
export type AssetHashStatus = 'exists' | 'missing' | 'invalid'
/** Verifier signature shared by callers that probe the hash endpoint. */
export type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
const BLAKE3_ASSET_HASH_PATTERN = /^blake3:[0-9a-f]{64}$/i
const BLAKE3_HEX_PATTERN = /^[0-9a-f]{64}$/i
const uploadedAssetResponseSchema = assetItemSchema.extend({
@@ -202,24 +208,16 @@ export function toBlake3AssetHash(hash: string | undefined): string | null {
return `blake3:${hash}`
}
function createAbortError(): DOMException {
return new DOMException('Aborted', 'AbortError')
}
function throwIfAborted(signal?: AbortSignal): void {
if (signal?.aborted) throw createAbortError()
}
async function withCallerAbort<T>(
promise: Promise<T>,
signal?: AbortSignal
): Promise<T> {
throwIfAborted(signal)
signal?.throwIfAborted()
if (!signal) return await promise
let removeAbortListener = () => {}
const abortPromise = new Promise<never>((_, reject) => {
const onAbort = () => reject(createAbortError())
const onAbort = () => reject(signal.reason)
signal.addEventListener('abort', onAbort, { once: true })
removeAbortListener = () => signal.removeEventListener('abort', onAbort)
})
@@ -533,7 +531,7 @@ function createAssetService() {
let offset = 0
while (true) {
if (signal?.aborted) throw createAbortError()
signal?.throwIfAborted()
const data = await handleAssetRequest(
{
@@ -589,7 +587,7 @@ function createAssetService() {
async function getInputAssetsIncludingPublic(
signal?: AbortSignal
): Promise<AssetItem[]> {
throwIfAborted(signal)
signal?.throwIfAborted()
if (inputAssetsIncludingPublic) return inputAssetsIncludingPublic
const request =

View File

@@ -0,0 +1,47 @@
import type {
AssetHashStatus,
AssetHashVerifier
} from '@/platform/assets/services/assetService'
import { isAbortError } from '@/utils/typeGuardUtil'
/**
* Returns true when the status fully resolves the group; false leaves the group
* for the caller's fallback path.
*/
type ApplyAssetHashStatus<T> = (status: AssetHashStatus, group: T[]) => boolean
/**
* Verifies grouped candidates against the asset hash endpoint in parallel.
*
* For each `[hash, group]` entry, calls `checkAssetHash(hash)` and hands the
* resulting status to `applyStatus`. Groups whose status is unresolved or
* whose request fails (non-abort) are returned so the caller can run a
* fallback path. Aborts and abort-propagated rejections are silent.
*/
export async function verifyCandidatesByAssetHash<T>(
candidatesByHash: Record<string, T[]>,
signal: AbortSignal | undefined,
checkAssetHash: AssetHashVerifier,
applyStatus: ApplyAssetHashStatus<T>,
logTag: string
): Promise<T[][]> {
const unresolved: T[][] = []
await Promise.all(
Object.entries(candidatesByHash).map(async ([assetHash, group]) => {
if (signal?.aborted) return
try {
const status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
if (!applyStatus(status, group)) unresolved.push(group)
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(`${logTag} Failed to verify asset hash:`, err)
unresolved.push(group)
}
})
)
return unresolved
}

View File

@@ -1,4 +1,4 @@
import { groupBy } from 'es-toolkit'
import { groupBy, partition } from 'es-toolkit'
import type { NodeId } from '@/platform/workflow/validation/schemas/workflowSchema'
import type {
MissingMediaCandidate,
@@ -19,11 +19,13 @@ import {
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetItem } from '@/platform/assets/schemas/assetSchema'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import type { AssetHashVerifier } from '@/platform/assets/services/assetService'
import {
assetService,
isBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/verifyCandidatesByAssetHash'
import { isAbortError } from '@/utils/typeGuardUtil'
/** Map of node types to their media widget name and media type. */
const MEDIA_NODE_WIDGETS: Record<
@@ -112,70 +114,8 @@ export function scanNodeMediaCandidates(
return candidates
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
type InputAssetFetcher = (signal?: AbortSignal) => Promise<AssetItem[]>
function groupCandidatesForHashLookup(candidates: MissingMediaCandidate[]): {
candidatesByHash: Map<string, MissingMediaCandidate[]>
legacyCandidates: MissingMediaCandidate[]
} {
const candidatesByHash = new Map<string, MissingMediaCandidate[]>()
const legacyCandidates: MissingMediaCandidate[] = []
for (const candidate of candidates) {
if (!isBlake3AssetHash(candidate.name)) {
legacyCandidates.push(candidate)
continue
}
const hashCandidates = candidatesByHash.get(candidate.name)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(candidate.name, [candidate])
}
return { candidatesByHash, legacyCandidates }
}
async function verifyCandidatesByHash(
candidatesByHash: Map<string, MissingMediaCandidate[]>,
legacyCandidates: MissingMediaCandidate[],
signal: AbortSignal | undefined,
checkAssetHash: AssetHashVerifier
): Promise<void> {
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
let status: AssetHashStatus
try {
status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Media Pipeline] Failed to verify asset hash:',
err
)
legacyCandidates.push(...hashCandidates)
return
}
if (status === 'invalid') {
legacyCandidates.push(...hashCandidates)
return
}
for (const candidate of hashCandidates) {
candidate.isMissing = status === 'missing'
}
})
)
}
/**
* Verify cloud media candidates by probing the asset hash endpoint first.
* Invalid hash values fall back to the legacy input asset list check.
@@ -191,15 +131,24 @@ export async function verifyCloudMediaCandidates(
const pending = candidates.filter((c) => c.isMissing === undefined)
if (pending.length === 0) return
const { candidatesByHash, legacyCandidates } =
groupCandidatesForHashLookup(pending)
await verifyCandidatesByHash(
const [hashable, nonHashCandidates] = partition(pending, (c) =>
isBlake3AssetHash(c.name)
)
const candidatesByHash = groupBy(hashable, (c) => c.name)
const unresolvedHashGroups = await verifyCandidatesByAssetHash(
candidatesByHash,
legacyCandidates,
signal,
checkAssetHash
checkAssetHash,
(status, group) => {
if (status === 'invalid') return false
for (const c of group) c.isMissing = status === 'missing'
return true
},
'[Missing Media Pipeline]'
)
const legacyCandidates = nonHashCandidates.concat(...unresolvedHashGroups)
if (signal?.aborted || legacyCandidates.length === 0) return
let inputAssets: AssetItem[]
@@ -227,15 +176,6 @@ async function fetchMissingInputAssets(
return await assetService.getInputAssetsIncludingPublic(signal)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
/** Group confirmed-missing candidates by file name into view models. */
export function groupCandidatesByName(
candidates: MissingMediaCandidate[]

View File

@@ -24,11 +24,13 @@ import {
} from '@/utils/graphTraversalUtil'
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import type { AssetHashVerifier } from '@/platform/assets/services/assetService'
import {
assetService,
toBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/verifyCandidatesByAssetHash'
import { groupBy, partition } from 'es-toolkit'
export type MissingModelWorkflowData = FlattenableWorkflowGraph & {
models?: ModelFile[]
@@ -450,11 +452,6 @@ interface AssetVerifier {
getAssets: (nodeType: string) => AssetItem[] | undefined
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
export async function verifyAssetSupportedCandidates(
candidates: MissingModelCandidate[],
signal?: AbortSignal,
@@ -468,48 +465,27 @@ export async function verifyAssetSupportedCandidates(
)
if (pendingCandidates.length === 0) return
const pendingNodeTypes = new Set<string>()
const candidatesByHash = new Map<string, MissingModelCandidate[]>()
for (const candidate of pendingCandidates) {
const assetHash = getBlake3AssetHash(candidate)
if (!assetHash) {
pendingNodeTypes.add(candidate.nodeType)
continue
}
const hashCandidates = candidatesByHash.get(assetHash)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(assetHash, [candidate])
}
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
try {
const status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
if (status === 'exists') {
for (const candidate of hashCandidates) {
candidate.isMissing = false
}
return
}
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Model Pipeline] Failed to verify asset hash:',
err
)
}
for (const candidate of hashCandidates) {
pendingNodeTypes.add(candidate.nodeType)
}
})
const [hashable, nonHashCandidates] = partition(
pendingCandidates,
(c) => getBlake3AssetHash(c) !== null
)
const pendingNodeTypes = new Set(nonHashCandidates.map((c) => c.nodeType))
const candidatesByHash = groupBy(hashable, (c) => getBlake3AssetHash(c)!)
const unresolvedHashGroups = await verifyCandidatesByAssetHash(
candidatesByHash,
signal,
checkAssetHash,
(status, group) => {
if (status !== 'exists') return false
for (const c of group) c.isMissing = false
return true
},
'[Missing Model Pipeline]'
)
for (const group of unresolvedHashGroups) {
for (const c of group) pendingNodeTypes.add(c.nodeType)
}
if (signal?.aborted) return
if (pendingNodeTypes.size === 0) return
@@ -549,15 +525,6 @@ function getBlake3AssetHash(candidate: MissingModelCandidate): string | null {
return toBlake3AssetHash(candidate.hash)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
function normalizePath(path: string): string {
return path.replace(/\\/g, '/')
}

View File

@@ -8,12 +8,14 @@ import type { ResultItemType } from '@/schemas/apiSchema'
/**
* Check if an error is an AbortError triggered by `AbortController#abort`
* when cancelling a request.
* when cancelling a request. Real AbortControllers reject with a
* `DOMException` (which extends `Error`), and some libraries throw plain
* `Error`s with `name === 'AbortError'`. Either is treated as an abort.
*/
export const isAbortError = (
err: unknown
): err is DOMException & { name: 'AbortError' } =>
err instanceof DOMException && err.name === 'AbortError'
): err is Error & { name: 'AbortError' } =>
err instanceof Error && err.name === 'AbortError'
export const isSubgraph = (
item: LGraph | Subgraph | undefined | null