Compare commits

...

2 Commits

Author SHA1 Message Date
jaeone94
cea30e2b69 fix: harden asset hash verification abort handling 2026-05-07 21:01:45 +09:00
jaeone94
dfcca34880 refactor: extract asset hash verification 2026-05-07 21:01:45 +09:00
7 changed files with 375 additions and 145 deletions

View File

@@ -0,0 +1,211 @@
import { describe, expect, it, vi } from 'vitest'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from './assetHashVerification'
interface Candidate {
id: string
hash: string | null
}
interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason?: unknown) => void
}
function candidate(id: string, hash: string | null): Candidate {
return { id, hash }
}
function createDeferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((resolvePromise, rejectPromise) => {
resolve = resolvePromise
reject = rejectPromise
})
return { promise, resolve, reject }
}
describe(verifyCandidatesByAssetHash, () => {
it('deduplicates hash checks and groups existing and missing candidates', async () => {
const existingHash = 'blake3:existing'
const missingHash = 'blake3:missing'
const candidates = [
candidate('a', existingHash),
candidate('b', existingHash),
candidate('c', missingHash)
]
const checkAssetHash = vi.fn(async (hash: string) =>
hash === existingHash ? ('exists' as const) : ('missing' as const)
)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash
})
expect(result.aborted).toBe(false)
expect(result.existing.map((candidate) => candidate.id)).toEqual(['a', 'b'])
expect(result.missing.map((candidate) => candidate.id)).toEqual(['c'])
expect(result.fallback).toEqual([])
expect(checkAssetHash).toHaveBeenCalledTimes(2)
})
it('routes candidates without hashes and invalid hashes to fallback', async () => {
const invalidHash = 'blake3:invalid'
const candidates = [candidate('a', null), candidate('b', invalidHash)]
const checkAssetHash = vi.fn(async () => 'invalid' as const)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash
})
expect(result.existing).toEqual([])
expect(result.missing).toEqual([])
expect(result.fallback.map((candidate) => candidate.id)).toEqual(['a', 'b'])
expect(checkAssetHash).toHaveBeenCalledOnce()
})
it('routes non-abort verification failures to fallback', async () => {
const candidates = [candidate('a', 'blake3:network-failure')]
const error = new Error('network failed')
const onError = vi.fn()
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash: async () => {
throw error
},
onError
})
expect(result.fallback).toEqual(candidates)
expect(result.aborted).toBe(false)
expect(onError).toHaveBeenCalledWith(error)
})
it('returns aborted without resolving candidates when the signal is aborted', async () => {
const controller = new AbortController()
controller.abort()
const candidates = [candidate('a', 'blake3:aborted')]
const checkAssetHash = vi.fn(async () => 'exists' as const)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash,
signal: controller.signal
})
expect(result).toEqual({
existing: [],
missing: [],
fallback: [],
aborted: true
})
expect(checkAssetHash).not.toHaveBeenCalled()
})
it('silences abort errors from hash verification', async () => {
const candidates = [candidate('a', 'blake3:aborted')]
const onError = vi.fn()
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash: async () => {
throw new DOMException('aborted', 'AbortError')
},
onError
})
expect(result.aborted).toBe(true)
expect(result.existing).toEqual([])
expect(result.missing).toEqual([])
expect(result.fallback).toEqual([])
expect(onError).not.toHaveBeenCalled()
})
it('caps concurrent hash checks', async () => {
let activeChecks = 0
let maxActiveChecks = 0
let completedChecks = 0
const inFlight: Array<Deferred<AssetHashStatus>> = []
const candidates = Array.from({ length: 6 }, (_, index) =>
candidate(String(index), `blake3:${index}`)
)
const verificationPromise = verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
maxConcurrent: 2,
checkAssetHash: async () => {
activeChecks++
maxActiveChecks = Math.max(maxActiveChecks, activeChecks)
const deferred = createDeferred<AssetHashStatus>()
inFlight.push(deferred)
try {
return await deferred.promise
} finally {
activeChecks--
completedChecks++
}
}
})
while (completedChecks < candidates.length) {
const currentChecks = inFlight.splice(0)
expect(currentChecks.length).toBeLessThanOrEqual(2)
for (const check of currentChecks) {
check.resolve('missing')
}
await Promise.resolve()
}
await verificationPromise
expect(maxActiveChecks).toBeLessThanOrEqual(2)
})
it('does not report fallback failures after another worker aborts', async () => {
const abortHash = 'blake3:abort'
const errorHash = 'blake3:error'
const requests = new Map<string, Deferred<AssetHashStatus>>()
const onError = vi.fn()
const verificationPromise = verifyCandidatesByAssetHash({
candidates: [candidate('a', abortHash), candidate('b', errorHash)],
getAssetHash: (candidate) => candidate.hash,
maxConcurrent: 2,
checkAssetHash: (hash) => {
const deferred = createDeferred<AssetHashStatus>()
requests.set(hash, deferred)
return deferred.promise
},
onError
})
expect(requests.size).toBe(2)
requests.get(abortHash)?.reject(new DOMException('aborted', 'AbortError'))
await Promise.resolve()
requests.get(errorHash)?.reject(new Error('network failed'))
const result = await verificationPromise
expect(result).toEqual({
existing: [],
missing: [],
fallback: [],
aborted: true
})
expect(onError).not.toHaveBeenCalled()
})
})

View File

@@ -0,0 +1,119 @@
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import { isAbortError } from '@/utils/typeGuardUtil'
/**
* Low-level hash checker. Cancellation should reject with the same
* DOMException AbortError shape as `fetch`.
*/
export type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
interface AssetHashVerificationResult<T> {
existing: T[]
missing: T[]
fallback: T[]
aborted: boolean
}
interface VerifyCandidatesByAssetHashOptions<T> {
candidates: readonly T[]
getAssetHash: (candidate: T) => string | null
checkAssetHash: AssetHashVerifier
signal?: AbortSignal
maxConcurrent?: number
onError?: (error: unknown) => void
}
const DEFAULT_MAX_CONCURRENT_HASH_CHECKS = 12
/**
* Deduplicates asset hash checks and partitions candidates for scanner policy.
* `fallback` candidates should continue through the caller's legacy lookup path.
*/
export async function verifyCandidatesByAssetHash<T>({
candidates,
getAssetHash,
checkAssetHash,
signal,
maxConcurrent = DEFAULT_MAX_CONCURRENT_HASH_CHECKS,
onError
}: VerifyCandidatesByAssetHashOptions<T>): Promise<
AssetHashVerificationResult<T>
> {
const result: AssetHashVerificationResult<T> = {
existing: [],
missing: [],
fallback: [],
aborted: false
}
if (signal?.aborted) return { ...result, aborted: true }
const candidatesByHash = new Map<string, T[]>()
for (const candidate of candidates) {
const assetHash = getAssetHash(candidate)
if (!assetHash) {
result.fallback.push(candidate)
continue
}
const hashCandidates = candidatesByHash.get(assetHash)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(assetHash, [candidate])
}
const entries = [...candidatesByHash.entries()]
let nextIndex = 0
const requestedWorkerCount = Math.max(1, Math.floor(maxConcurrent))
const workerCount = Math.min(entries.length, requestedWorkerCount)
function hasAborted(): boolean {
return result.aborted || signal?.aborted === true
}
async function verifyNextHash(): Promise<void> {
while (!hasAborted() && nextIndex < entries.length) {
const entry = entries[nextIndex++]
if (!entry) return
const [assetHash, hashCandidates] = entry
if (hasAborted()) {
result.aborted = true
return
}
let status: AssetHashStatus
try {
status = await checkAssetHash(assetHash, signal)
} catch (error) {
if (hasAborted() || isAbortError(error)) {
result.aborted = true
return
}
onError?.(error)
result.fallback.push(...hashCandidates)
continue
}
if (hasAborted()) {
result.aborted = true
return
}
if (status === 'exists') {
result.existing.push(...hashCandidates)
} else if (status === 'missing') {
result.missing.push(...hashCandidates)
} else {
result.fallback.push(...hashCandidates)
}
}
}
await Promise.all(Array.from({ length: workerCount }, verifyNextHash))
return result
}

View File

@@ -403,8 +403,7 @@ describe('verifyCloudMediaCandidates', () => {
})
it('silences aborts while loading legacy fallback input assets', async () => {
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const controller = new AbortController()
const candidates = [
makeCandidate('1', 'photo.png', { isMissing: undefined })
@@ -427,8 +426,7 @@ describe('verifyCloudMediaCandidates', () => {
})
it('silences aborts from the default legacy fallback input asset store path', async () => {
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const controller = new AbortController()
const candidates = [
makeCandidate('1', 'photo.png', { isMissing: undefined })

View File

@@ -19,11 +19,13 @@ import {
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetItem } from '@/platform/assets/schemas/assetSchema'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import {
assetService,
isBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/assetHashVerification'
import type { AssetHashVerifier } from '@/platform/assets/utils/assetHashVerification'
import { isAbortError } from '@/utils/typeGuardUtil'
/** Map of node types to their media widget name and media type. */
const MEDIA_NODE_WIDGETS: Record<
@@ -112,70 +114,8 @@ export function scanNodeMediaCandidates(
return candidates
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
type InputAssetFetcher = (signal?: AbortSignal) => Promise<AssetItem[]>
function groupCandidatesForHashLookup(candidates: MissingMediaCandidate[]): {
candidatesByHash: Map<string, MissingMediaCandidate[]>
legacyCandidates: MissingMediaCandidate[]
} {
const candidatesByHash = new Map<string, MissingMediaCandidate[]>()
const legacyCandidates: MissingMediaCandidate[] = []
for (const candidate of candidates) {
if (!isBlake3AssetHash(candidate.name)) {
legacyCandidates.push(candidate)
continue
}
const hashCandidates = candidatesByHash.get(candidate.name)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(candidate.name, [candidate])
}
return { candidatesByHash, legacyCandidates }
}
async function verifyCandidatesByHash(
candidatesByHash: Map<string, MissingMediaCandidate[]>,
legacyCandidates: MissingMediaCandidate[],
signal: AbortSignal | undefined,
checkAssetHash: AssetHashVerifier
): Promise<void> {
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
let status: AssetHashStatus
try {
status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Media Pipeline] Failed to verify asset hash:',
err
)
legacyCandidates.push(...hashCandidates)
return
}
if (status === 'invalid') {
legacyCandidates.push(...hashCandidates)
return
}
for (const candidate of hashCandidates) {
candidate.isMissing = status === 'missing'
}
})
)
}
/**
* Verify cloud media candidates by probing the asset hash endpoint first.
* Invalid hash values fall back to the legacy input asset list check.
@@ -191,16 +131,26 @@ export async function verifyCloudMediaCandidates(
const pending = candidates.filter((c) => c.isMissing === undefined)
if (pending.length === 0) return
const { candidatesByHash, legacyCandidates } =
groupCandidatesForHashLookup(pending)
await verifyCandidatesByHash(
candidatesByHash,
legacyCandidates,
const verification = await verifyCandidatesByAssetHash({
candidates: pending,
getAssetHash: (candidate) =>
isBlake3AssetHash(candidate.name) ? candidate.name : null,
signal,
checkAssetHash
)
checkAssetHash,
onError: (err) => {
console.warn('[Missing Media Pipeline] Failed to verify asset hash:', err)
}
})
if (verification.aborted) return
if (signal?.aborted || legacyCandidates.length === 0) return
for (const candidate of verification.existing) {
candidate.isMissing = false
}
for (const candidate of verification.missing) {
candidate.isMissing = true
}
if (signal?.aborted || verification.fallback.length === 0) return
let inputAssets: AssetItem[]
try {
@@ -216,7 +166,7 @@ export async function verifyCloudMediaCandidates(
inputAssets.map((a) => a.asset_hash).filter((h): h is string => !!h)
)
for (const candidate of legacyCandidates) {
for (const candidate of verification.fallback) {
candidate.isMissing = !assetHashes.has(candidate.name)
}
}
@@ -227,15 +177,6 @@ async function fetchMissingInputAssets(
return await assetService.getInputAssetsIncludingPublic(signal)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
/** Group confirmed-missing candidates by file name into view models. */
export function groupCandidatesByName(
candidates: MissingMediaCandidate[]

View File

@@ -1557,8 +1557,7 @@ describe('verifyAssetSupportedCandidates', () => {
it('should not warn or fall back when hash verification is aborted', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => {})
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const hash =
'4444444444444444444444444444444444444444444444444444444444444444'
const candidates = [

View File

@@ -24,11 +24,12 @@ import {
} from '@/utils/graphTraversalUtil'
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import {
assetService,
toBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/assetHashVerification'
import type { AssetHashVerifier } from '@/platform/assets/utils/assetHashVerification'
export type MissingModelWorkflowData = FlattenableWorkflowGraph & {
models?: ModelFile[]
@@ -450,11 +451,6 @@ interface AssetVerifier {
getAssets: (nodeType: string) => AssetItem[] | undefined
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
export async function verifyAssetSupportedCandidates(
candidates: MissingModelCandidate[],
signal?: AbortSignal,
@@ -468,58 +464,36 @@ export async function verifyAssetSupportedCandidates(
)
if (pendingCandidates.length === 0) return
const pendingNodeTypes = new Set<string>()
const candidatesByHash = new Map<string, MissingModelCandidate[]>()
for (const candidate of pendingCandidates) {
const assetHash = getBlake3AssetHash(candidate)
if (!assetHash) {
pendingNodeTypes.add(candidate.nodeType)
continue
const verification = await verifyCandidatesByAssetHash({
candidates: pendingCandidates,
getAssetHash: getBlake3AssetHash,
signal,
checkAssetHash,
onError: (err) => {
console.warn('[Missing Model Pipeline] Failed to verify asset hash:', err)
}
})
if (verification.aborted) return
const hashCandidates = candidatesByHash.get(assetHash)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(assetHash, [candidate])
for (const candidate of verification.existing) {
candidate.isMissing = false
}
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
try {
const status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
if (status === 'exists') {
for (const candidate of hashCandidates) {
candidate.isMissing = false
}
return
}
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Model Pipeline] Failed to verify asset hash:',
err
)
}
for (const candidate of hashCandidates) {
pendingNodeTypes.add(candidate.nodeType)
}
})
const nodeTypesNeedingAssetRefresh = new Set(
[...verification.missing, ...verification.fallback].map(
(candidate) => candidate.nodeType
)
)
if (signal?.aborted) return
if (pendingNodeTypes.size === 0) return
if (nodeTypesNeedingAssetRefresh.size === 0) return
const store =
assetsStore ?? (await import('@/stores/assetsStore')).useAssetsStore()
const failedNodeTypes = new Set<string>()
await Promise.allSettled(
[...pendingNodeTypes].map(async (nodeType) => {
[...nodeTypesNeedingAssetRefresh].map(async (nodeType) => {
if (signal?.aborted) return
try {
await store.updateModelsForNodeType(nodeType)
@@ -549,15 +523,6 @@ function getBlake3AssetHash(candidate: MissingModelCandidate): string | null {
return toBlake3AssetHash(candidate.hash)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
function normalizePath(path: string): string {
return path.replace(/\\/g, '/')
}

View File

@@ -6,6 +6,7 @@ import { t } from '@/i18n'
import { useCanvasStore } from '@/renderer/core/canvas/canvasStore'
import { app } from '@/scripts/app'
import { useToastStore } from '@/platform/updates/common/toastStore'
import { isAbortError } from '@/utils/typeGuardUtil'
import type { MissingModelCandidate } from '@/platform/missingModel/types'
import type { AssetMetadata } from '@/platform/assets/schemas/assetSchema'
import type { LGraphNode } from '@/lib/litegraph/src/litegraph'
@@ -273,10 +274,6 @@ export const useMissingModelStore = defineStore('missingModel', () => {
fileSizes.value = {}
}
function isAbortError(error: unknown) {
return error instanceof Error && error.name === 'AbortError'
}
async function refreshMissingModels() {
if (isRefreshingMissingModels.value) return