Compare commits

...

6 Commits

Author SHA1 Message Date
jaeone94
f661280562 fix: harden asset hash verification abort handling 2026-05-04 22:39:34 +09:00
jaeone94
1fc6b1ace8 refactor: extract asset hash verification 2026-05-04 22:16:27 +09:00
jaeone94
0caf1a01d1 refactor: name asset filtering steps 2026-05-04 21:51:31 +09:00
jaeone94
8c760bc13c refactor: clarify asset response schema reuse 2026-05-04 21:19:08 +09:00
jaeone94
8f7650542d refactor: tighten asset response contract tests 2026-05-04 21:00:38 +09:00
jaeone94
5f43461623 refactor: align asset pagination schema 2026-05-04 17:42:37 +09:00
10 changed files with 500 additions and 247 deletions

View File

@@ -1,3 +1,4 @@
import { zListAssetsResponse } from '@comfyorg/ingest-types/zod'
import { z } from 'zod'
// Zod schemas for asset API validation matching ComfyUI Assets REST API spec
@@ -20,11 +21,11 @@ const zAsset = z.object({
user_metadata: z.record(z.unknown()).optional() // API allows arbitrary key-value pairs
})
const zAssetResponse = z.object({
assets: z.array(zAsset).optional(),
total: z.number().optional(),
has_more: z.boolean().optional()
})
const zAssetResponse = zListAssetsResponse
.pick({ total: true, has_more: true })
.extend({
assets: z.array(zAsset)
})
const zModelFolder = z.object({
name: z.string(),

View File

@@ -64,6 +64,16 @@ function buildResponse(
} as unknown as Response
}
function buildAssetListResponse(
assets: AssetItem[],
{
hasMore = false,
total = assets.length
}: { hasMore?: boolean; total?: number } = {}
): Response {
return buildResponse({ assets, total, has_more: hasMore })
}
function validAsset(overrides: Partial<AssetItem> = {}): AssetItem {
return {
id: 'asset-1',
@@ -218,7 +228,7 @@ describe(assetService.uploadAssetFromUrl, () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildResponse({ id: 'missing-name' }))
await assetService.getInputAssetsIncludingPublic()
@@ -240,7 +250,7 @@ describe(assetService.uploadAssetFromUrl, () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(
buildResponse(validAsset({ id: 'uploaded-input', tags: ['input'] }))
)
@@ -301,7 +311,7 @@ describe(assetService.uploadAssetFromBase64, () => {
.spyOn(globalThis, 'fetch')
.mockResolvedValueOnce(new Response('hello'))
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildResponse({ id: 'missing-name' }))
await assetService.getInputAssetsIncludingPublic()
@@ -327,7 +337,7 @@ describe(assetService.uploadAssetFromBase64, () => {
.spyOn(globalThis, 'fetch')
.mockResolvedValueOnce(new Response('hello'))
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(
buildResponse({
...validAsset({ id: 'uploaded-input', tags: ['input'] }),
@@ -423,15 +433,13 @@ describe(assetService.getAssetModelFolders, () => {
it('filters out missing-tagged assets and blacklisted directories, returning alphabetical unique folders without include_public', async () => {
fetchApiMock.mockResolvedValueOnce(
buildResponse({
assets: [
validAsset({ id: 'a', tags: ['models', 'loras'] }),
validAsset({ id: 'b', tags: ['models', 'checkpoints'] }),
validAsset({ id: 'c', tags: ['models', 'configs'] }),
validAsset({ id: 'd', tags: ['models', 'missing', 'controlnet'] }),
validAsset({ id: 'e', tags: ['models', 'loras'] })
]
})
buildAssetListResponse([
validAsset({ id: 'a', tags: ['models', 'loras'] }),
validAsset({ id: 'b', tags: ['models', 'checkpoints'] }),
validAsset({ id: 'c', tags: ['models', 'configs'] }),
validAsset({ id: 'd', tags: ['models', 'missing', 'controlnet'] }),
validAsset({ id: 'e', tags: ['models', 'loras'] })
])
)
const folders = await assetService.getAssetModelFolders()
@@ -492,12 +500,10 @@ describe(assetService.getAssetsByTag, () => {
it('forwards include_public=true by default and excludes missing-tagged assets', async () => {
fetchApiMock.mockResolvedValueOnce(
buildResponse({
assets: [
validAsset({ id: 'visible', tags: ['input'] }),
validAsset({ id: 'hidden', tags: ['input', 'missing'] })
]
})
buildAssetListResponse([
validAsset({ id: 'visible', tags: ['input'] }),
validAsset({ id: 'hidden', tags: ['input', 'missing'] })
])
)
const assets = await assetService.getAssetsByTag('input')
@@ -518,17 +524,16 @@ describe(assetService.getAllAssetsByTag, () => {
it('paginates tagged asset requests with include_public=true', async () => {
fetchApiMock
.mockResolvedValueOnce(
buildResponse({
assets: [
buildAssetListResponse(
[
validAsset({ id: 'a', tags: ['input'] }),
validAsset({ id: 'b', tags: ['input'] })
]
})
],
{ hasMore: true }
)
)
.mockResolvedValueOnce(
buildResponse({
assets: [validAsset({ id: 'c', tags: ['input'] })]
})
buildAssetListResponse([validAsset({ id: 'c', tags: ['input'] })])
)
const assets = await assetService.getAllAssetsByTag('input', true, {
@@ -553,17 +558,18 @@ describe(assetService.getAllAssetsByTag, () => {
it('paginates from raw response size before filtering missing-tagged assets', async () => {
fetchApiMock
.mockResolvedValueOnce(
buildResponse({
assets: [
buildAssetListResponse(
[
validAsset({ id: 'visible', tags: ['input'] }),
validAsset({ id: 'hidden', tags: ['input', MISSING_TAG] })
]
})
],
{ hasMore: true }
)
)
.mockResolvedValueOnce(
buildResponse({
assets: [validAsset({ id: 'later-public', tags: ['input'] })]
})
buildAssetListResponse([
validAsset({ id: 'later-public', tags: ['input'] })
])
)
const assets = await assetService.getAllAssetsByTag('input', true, {
@@ -584,19 +590,18 @@ describe(assetService.getAllAssetsByTag, () => {
it('honors has_more when walking tagged asset pages', async () => {
fetchApiMock
.mockResolvedValueOnce(
buildResponse({
assets: [
buildAssetListResponse(
[
validAsset({ id: 'first', tags: ['input'] }),
validAsset({ id: 'second', tags: ['input'] })
],
has_more: true
})
{ hasMore: true }
)
)
.mockResolvedValueOnce(
buildResponse({
assets: [validAsset({ id: 'later-public', tags: ['input'] })],
has_more: false
})
buildAssetListResponse([
validAsset({ id: 'later-public', tags: ['input'] })
])
)
const assets = await assetService.getAllAssetsByTag('input', true, {
@@ -614,12 +619,41 @@ describe(assetService.getAllAssetsByTag, () => {
expect(secondParams.get('offset')).toBe('2')
})
it.each([
{
name: 'missing has_more',
body: {
assets: [validAsset({ id: 'a', tags: ['input'] })],
total: 1
}
},
{
name: 'missing total',
body: {
assets: [validAsset({ id: 'a', tags: ['input'] })],
has_more: false
}
},
{
name: 'non-boolean has_more',
body: {
assets: [validAsset({ id: 'a', tags: ['input'] })],
total: 1,
has_more: 'false'
}
}
])('rejects asset responses with $name', async ({ body }) => {
fetchApiMock.mockResolvedValueOnce(buildResponse(body))
await expect(
assetService.getAllAssetsByTag('input', true, { limit: 2 })
).rejects.toThrow(/Invalid asset response/)
})
it('passes abort signals through paginated requests', async () => {
const controller = new AbortController()
fetchApiMock.mockResolvedValueOnce(
buildResponse({
assets: [validAsset({ id: 'a', tags: ['input'] })]
})
buildAssetListResponse([validAsset({ id: 'a', tags: ['input'] })])
)
await assetService.getAllAssetsByTag('input', true, {
@@ -636,12 +670,13 @@ describe(assetService.getAllAssetsByTag, () => {
const controller = new AbortController()
fetchApiMock.mockImplementationOnce(async () => {
controller.abort()
return buildResponse({
assets: [
return buildAssetListResponse(
[
validAsset({ id: 'a', tags: ['input'] }),
validAsset({ id: 'b', tags: ['input'] })
]
})
],
{ hasMore: true }
)
})
await expect(
@@ -666,7 +701,7 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
validAsset({ id: 'user-input', tags: ['input'] }),
validAsset({ id: 'public-input', tags: ['input'], is_immutable: true })
]
fetchApiMock.mockResolvedValueOnce(buildResponse({ assets }))
fetchApiMock.mockResolvedValueOnce(buildAssetListResponse(assets))
const first = await assetService.getInputAssetsIncludingPublic()
const second = await assetService.getInputAssetsIncludingPublic()
@@ -685,8 +720,8 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
const freshAssets = [validAsset({ id: 'fresh-input', tags: ['input'] })]
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildResponse({ assets: freshAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildAssetListResponse(freshAssets))
await assetService.getInputAssetsIncludingPublic()
assetService.invalidateInputAssetsIncludingPublic()
@@ -720,7 +755,7 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
await expect(first).rejects.toMatchObject({ name: 'AbortError' })
expect(serviceSignal).toBeUndefined()
resolveResponse(buildResponse({ assets }))
resolveResponse(buildAssetListResponse(assets))
await expect(second).resolves.toEqual(assets)
expect(fetchApiMock).toHaveBeenCalledOnce()
@@ -750,7 +785,7 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
await expect(first).rejects.toMatchObject({ name: 'AbortError' })
await expect(second).rejects.toMatchObject({ name: 'AbortError' })
resolveResponse(buildResponse({ assets }))
resolveResponse(buildAssetListResponse(assets))
await Promise.resolve()
await expect(assetService.getInputAssetsIncludingPublic()).resolves.toEqual(
@@ -770,12 +805,12 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
resolveResponse = resolve
})
)
.mockResolvedValueOnce(buildResponse({ assets: freshAssets }))
.mockResolvedValueOnce(buildAssetListResponse(freshAssets))
const inFlight = assetService.getInputAssetsIncludingPublic()
assetService.invalidateInputAssetsIncludingPublic()
resolveResponse(buildResponse({ assets }))
resolveResponse(buildAssetListResponse(assets))
await expect(inFlight).resolves.toEqual(assets)
await expect(assetService.getInputAssetsIncludingPublic()).resolves.toEqual(
@@ -788,9 +823,9 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
const freshAssets = [validAsset({ id: 'fresh-input', tags: ['input'] })]
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildResponse(null))
.mockResolvedValueOnce(buildResponse({ assets: freshAssets }))
.mockResolvedValueOnce(buildAssetListResponse(freshAssets))
await assetService.getInputAssetsIncludingPublic()
await assetService.deleteAsset('stale-input')
@@ -809,9 +844,9 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
const uploadedAsset = validAsset({ id: 'uploaded-input', tags: ['input'] })
const freshAssets = [uploadedAsset]
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildResponse(uploadedAsset))
.mockResolvedValueOnce(buildResponse({ assets: freshAssets }))
.mockResolvedValueOnce(buildAssetListResponse(freshAssets))
await assetService.getInputAssetsIncludingPublic()
await assetService.uploadAssetAsync({
@@ -827,7 +862,7 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
it('does not invalidate cached input assets for pending async input uploads', async () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(
buildResponse(
{ task_id: 'task-1', status: 'running' },
@@ -849,7 +884,7 @@ describe(assetService.getInputAssetsIncludingPublic, () => {
it('does not invalidate cached input assets for non-input uploads', async () => {
const staleAssets = [validAsset({ id: 'stale-input', tags: ['input'] })]
fetchApiMock
.mockResolvedValueOnce(buildResponse({ assets: staleAssets }))
.mockResolvedValueOnce(buildAssetListResponse(staleAssets))
.mockResolvedValueOnce(buildResponse(validAsset({ tags: ['models'] })))
await assetService.getInputAssetsIncludingPublic()

View File

@@ -337,15 +337,13 @@ function createAssetService() {
// Blacklist directories we don't want to show
const blacklistedDirectories = new Set(['configs'])
// Extract directory names from assets that actually exist, exclude missing assets
const discoveredFolders = new Set<string>(
data?.assets
?.filter((asset) => !asset.tags.includes(MISSING_TAG))
?.flatMap((asset) => asset.tags)
?.filter(
(tag) => tag !== MODELS_TAG && !blacklistedDirectories.has(tag)
) ?? []
const existingAssets = data.assets.filter(
(asset) => !asset.tags.includes(MISSING_TAG)
)
const folderTags = existingAssets
.flatMap((asset) => asset.tags)
.filter((tag) => tag !== MODELS_TAG && !blacklistedDirectories.has(tag))
const discoveredFolders = new Set<string>(folderTags)
// Return only discovered folders in alphabetical order
const sortedFolders = Array.from(discoveredFolders).toSorted()
@@ -363,17 +361,15 @@ function createAssetService() {
`models for ${folder}`
)
return (
data?.assets
?.filter(
(asset) =>
!asset.tags.includes(MISSING_TAG) && asset.tags.includes(folder)
)
?.map((asset) => ({
name: asset.name,
pathIndex: 0
})) ?? []
const modelsInFolder = data.assets.filter(
(asset) =>
!asset.tags.includes(MISSING_TAG) && asset.tags.includes(folder)
)
return modelsInFolder.map((asset) => ({
name: asset.name,
pathIndex: 0
}))
}
/**
@@ -449,11 +445,9 @@ function createAssetService() {
)
// Return full AssetItem[] objects (don't strip like getAssetModels does)
return (
data?.assets?.filter(
(asset) =>
!asset.tags.includes(MISSING_TAG) && asset.tags.includes(category)
) ?? []
return data.assets.filter(
(asset) =>
!asset.tags.includes(MISSING_TAG) && asset.tags.includes(category)
)
}
@@ -473,11 +467,8 @@ function createAssetService() {
}
const data = await res.json()
// Validate the single asset response against our schema
const result = assetResponseSchema.safeParse({ assets: [data] })
if (result.success && result.data.assets?.[0]) {
return result.data.assets[0]
}
const result = assetItemSchema.safeParse(data)
if (result.success) return result.data
const error = result.error
? fromZodError(result.error)
@@ -508,13 +499,12 @@ function createAssetService() {
`assets for tag ${tag}`
)
return (
data?.assets?.filter((asset) => !asset.tags.includes(MISSING_TAG)) ?? []
)
return data.assets.filter((asset) => !asset.tags.includes(MISSING_TAG))
}
/**
* Gets every asset for a tag by walking paginated asset API responses.
* Pagination follows the required server-provided `has_more` flag.
*
* @param tag - The tag to filter by (e.g., 'models', 'input')
* @param includePublic - Whether to include public assets (default: true)
@@ -545,13 +535,10 @@ function createAssetService() {
},
`assets for tag ${tag}`
)
const batch = data.assets ?? []
const batch = data.assets
assets.push(...batch.filter((asset) => !asset.tags.includes(MISSING_TAG)))
const noMoreFromServer = data.has_more === false
const inferredLastPage =
data.has_more === undefined && batch.length < pageSize
if (batch.length === 0 || noMoreFromServer || inferredLastPage) {
if (batch.length === 0 || !data.has_more) {
return assets
}

View File

@@ -0,0 +1,211 @@
import { describe, expect, it, vi } from 'vitest'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from './assetHashVerification'
interface Candidate {
id: string
hash: string | null
}
interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason?: unknown) => void
}
function candidate(id: string, hash: string | null): Candidate {
return { id, hash }
}
function createDeferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((resolvePromise, rejectPromise) => {
resolve = resolvePromise
reject = rejectPromise
})
return { promise, resolve, reject }
}
describe(verifyCandidatesByAssetHash, () => {
it('deduplicates hash checks and groups existing and missing candidates', async () => {
const existingHash = 'blake3:existing'
const missingHash = 'blake3:missing'
const candidates = [
candidate('a', existingHash),
candidate('b', existingHash),
candidate('c', missingHash)
]
const checkAssetHash = vi.fn(async (hash: string) =>
hash === existingHash ? ('exists' as const) : ('missing' as const)
)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash
})
expect(result.aborted).toBe(false)
expect(result.existing.map((candidate) => candidate.id)).toEqual(['a', 'b'])
expect(result.missing.map((candidate) => candidate.id)).toEqual(['c'])
expect(result.fallback).toEqual([])
expect(checkAssetHash).toHaveBeenCalledTimes(2)
})
it('routes candidates without hashes and invalid hashes to fallback', async () => {
const invalidHash = 'blake3:invalid'
const candidates = [candidate('a', null), candidate('b', invalidHash)]
const checkAssetHash = vi.fn(async () => 'invalid' as const)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash
})
expect(result.existing).toEqual([])
expect(result.missing).toEqual([])
expect(result.fallback.map((candidate) => candidate.id)).toEqual(['a', 'b'])
expect(checkAssetHash).toHaveBeenCalledOnce()
})
it('routes non-abort verification failures to fallback', async () => {
const candidates = [candidate('a', 'blake3:network-failure')]
const error = new Error('network failed')
const onError = vi.fn()
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash: async () => {
throw error
},
onError
})
expect(result.fallback).toEqual(candidates)
expect(result.aborted).toBe(false)
expect(onError).toHaveBeenCalledWith(error)
})
it('returns aborted without resolving candidates when the signal is aborted', async () => {
const controller = new AbortController()
controller.abort()
const candidates = [candidate('a', 'blake3:aborted')]
const checkAssetHash = vi.fn(async () => 'exists' as const)
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash,
signal: controller.signal
})
expect(result).toEqual({
existing: [],
missing: [],
fallback: [],
aborted: true
})
expect(checkAssetHash).not.toHaveBeenCalled()
})
it('silences abort errors from hash verification', async () => {
const candidates = [candidate('a', 'blake3:aborted')]
const onError = vi.fn()
const result = await verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
checkAssetHash: async () => {
throw new DOMException('aborted', 'AbortError')
},
onError
})
expect(result.aborted).toBe(true)
expect(result.existing).toEqual([])
expect(result.missing).toEqual([])
expect(result.fallback).toEqual([])
expect(onError).not.toHaveBeenCalled()
})
it('caps concurrent hash checks', async () => {
let activeChecks = 0
let maxActiveChecks = 0
let completedChecks = 0
const inFlight: Array<Deferred<AssetHashStatus>> = []
const candidates = Array.from({ length: 6 }, (_, index) =>
candidate(String(index), `blake3:${index}`)
)
const verificationPromise = verifyCandidatesByAssetHash({
candidates,
getAssetHash: (candidate) => candidate.hash,
maxConcurrent: 2,
checkAssetHash: async () => {
activeChecks++
maxActiveChecks = Math.max(maxActiveChecks, activeChecks)
const deferred = createDeferred<AssetHashStatus>()
inFlight.push(deferred)
try {
return await deferred.promise
} finally {
activeChecks--
completedChecks++
}
}
})
while (completedChecks < candidates.length) {
const currentChecks = inFlight.splice(0)
expect(currentChecks.length).toBeLessThanOrEqual(2)
for (const check of currentChecks) {
check.resolve('missing')
}
await Promise.resolve()
}
await verificationPromise
expect(maxActiveChecks).toBeLessThanOrEqual(2)
})
it('does not report fallback failures after another worker aborts', async () => {
const abortHash = 'blake3:abort'
const errorHash = 'blake3:error'
const requests = new Map<string, Deferred<AssetHashStatus>>()
const onError = vi.fn()
const verificationPromise = verifyCandidatesByAssetHash({
candidates: [candidate('a', abortHash), candidate('b', errorHash)],
getAssetHash: (candidate) => candidate.hash,
maxConcurrent: 2,
checkAssetHash: (hash) => {
const deferred = createDeferred<AssetHashStatus>()
requests.set(hash, deferred)
return deferred.promise
},
onError
})
expect(requests.size).toBe(2)
requests.get(abortHash)?.reject(new DOMException('aborted', 'AbortError'))
await Promise.resolve()
requests.get(errorHash)?.reject(new Error('network failed'))
const result = await verificationPromise
expect(result).toEqual({
existing: [],
missing: [],
fallback: [],
aborted: true
})
expect(onError).not.toHaveBeenCalled()
})
})

View File

@@ -0,0 +1,119 @@
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import { isAbortError } from '@/utils/typeGuardUtil'
/**
* Low-level hash checker. Cancellation should reject with the same
* DOMException AbortError shape as `fetch`.
*/
export type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
interface AssetHashVerificationResult<T> {
existing: T[]
missing: T[]
fallback: T[]
aborted: boolean
}
interface VerifyCandidatesByAssetHashOptions<T> {
candidates: readonly T[]
getAssetHash: (candidate: T) => string | null
checkAssetHash: AssetHashVerifier
signal?: AbortSignal
maxConcurrent?: number
onError?: (error: unknown) => void
}
const DEFAULT_MAX_CONCURRENT_HASH_CHECKS = 12
/**
* Deduplicates asset hash checks and partitions candidates for scanner policy.
* `fallback` candidates should continue through the caller's legacy lookup path.
*/
export async function verifyCandidatesByAssetHash<T>({
candidates,
getAssetHash,
checkAssetHash,
signal,
maxConcurrent = DEFAULT_MAX_CONCURRENT_HASH_CHECKS,
onError
}: VerifyCandidatesByAssetHashOptions<T>): Promise<
AssetHashVerificationResult<T>
> {
const result: AssetHashVerificationResult<T> = {
existing: [],
missing: [],
fallback: [],
aborted: false
}
if (signal?.aborted) return { ...result, aborted: true }
const candidatesByHash = new Map<string, T[]>()
for (const candidate of candidates) {
const assetHash = getAssetHash(candidate)
if (!assetHash) {
result.fallback.push(candidate)
continue
}
const hashCandidates = candidatesByHash.get(assetHash)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(assetHash, [candidate])
}
const entries = [...candidatesByHash.entries()]
let nextIndex = 0
const requestedWorkerCount = Math.max(1, Math.floor(maxConcurrent))
const workerCount = Math.min(entries.length, requestedWorkerCount)
function hasAborted(): boolean {
return result.aborted || signal?.aborted === true
}
async function verifyNextHash(): Promise<void> {
while (!hasAborted() && nextIndex < entries.length) {
const entry = entries[nextIndex++]
if (!entry) return
const [assetHash, hashCandidates] = entry
if (hasAborted()) {
result.aborted = true
return
}
let status: AssetHashStatus
try {
status = await checkAssetHash(assetHash, signal)
} catch (error) {
if (hasAborted() || isAbortError(error)) {
result.aborted = true
return
}
onError?.(error)
result.fallback.push(...hashCandidates)
continue
}
if (hasAborted()) {
result.aborted = true
return
}
if (status === 'exists') {
result.existing.push(...hashCandidates)
} else if (status === 'missing') {
result.missing.push(...hashCandidates)
} else {
result.fallback.push(...hashCandidates)
}
}
}
await Promise.all(Array.from({ length: workerCount }, verifyNextHash))
return result
}

View File

@@ -403,8 +403,7 @@ describe('verifyCloudMediaCandidates', () => {
})
it('silences aborts while loading legacy fallback input assets', async () => {
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const controller = new AbortController()
const candidates = [
makeCandidate('1', 'photo.png', { isMissing: undefined })
@@ -427,8 +426,7 @@ describe('verifyCloudMediaCandidates', () => {
})
it('silences aborts from the default legacy fallback input asset store path', async () => {
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const controller = new AbortController()
const candidates = [
makeCandidate('1', 'photo.png', { isMissing: undefined })

View File

@@ -19,11 +19,13 @@ import {
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetItem } from '@/platform/assets/schemas/assetSchema'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import {
assetService,
isBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/assetHashVerification'
import type { AssetHashVerifier } from '@/platform/assets/utils/assetHashVerification'
import { isAbortError } from '@/utils/typeGuardUtil'
/** Map of node types to their media widget name and media type. */
const MEDIA_NODE_WIDGETS: Record<
@@ -112,70 +114,8 @@ export function scanNodeMediaCandidates(
return candidates
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
type InputAssetFetcher = (signal?: AbortSignal) => Promise<AssetItem[]>
function groupCandidatesForHashLookup(candidates: MissingMediaCandidate[]): {
candidatesByHash: Map<string, MissingMediaCandidate[]>
legacyCandidates: MissingMediaCandidate[]
} {
const candidatesByHash = new Map<string, MissingMediaCandidate[]>()
const legacyCandidates: MissingMediaCandidate[] = []
for (const candidate of candidates) {
if (!isBlake3AssetHash(candidate.name)) {
legacyCandidates.push(candidate)
continue
}
const hashCandidates = candidatesByHash.get(candidate.name)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(candidate.name, [candidate])
}
return { candidatesByHash, legacyCandidates }
}
async function verifyCandidatesByHash(
candidatesByHash: Map<string, MissingMediaCandidate[]>,
legacyCandidates: MissingMediaCandidate[],
signal: AbortSignal | undefined,
checkAssetHash: AssetHashVerifier
): Promise<void> {
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
let status: AssetHashStatus
try {
status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Media Pipeline] Failed to verify asset hash:',
err
)
legacyCandidates.push(...hashCandidates)
return
}
if (status === 'invalid') {
legacyCandidates.push(...hashCandidates)
return
}
for (const candidate of hashCandidates) {
candidate.isMissing = status === 'missing'
}
})
)
}
/**
* Verify cloud media candidates by probing the asset hash endpoint first.
* Invalid hash values fall back to the legacy input asset list check.
@@ -191,16 +131,26 @@ export async function verifyCloudMediaCandidates(
const pending = candidates.filter((c) => c.isMissing === undefined)
if (pending.length === 0) return
const { candidatesByHash, legacyCandidates } =
groupCandidatesForHashLookup(pending)
await verifyCandidatesByHash(
candidatesByHash,
legacyCandidates,
const verification = await verifyCandidatesByAssetHash({
candidates: pending,
getAssetHash: (candidate) =>
isBlake3AssetHash(candidate.name) ? candidate.name : null,
signal,
checkAssetHash
)
checkAssetHash,
onError: (err) => {
console.warn('[Missing Media Pipeline] Failed to verify asset hash:', err)
}
})
if (verification.aborted) return
if (signal?.aborted || legacyCandidates.length === 0) return
for (const candidate of verification.existing) {
candidate.isMissing = false
}
for (const candidate of verification.missing) {
candidate.isMissing = true
}
if (signal?.aborted || verification.fallback.length === 0) return
let inputAssets: AssetItem[]
try {
@@ -216,7 +166,7 @@ export async function verifyCloudMediaCandidates(
inputAssets.map((a) => a.asset_hash).filter((h): h is string => !!h)
)
for (const candidate of legacyCandidates) {
for (const candidate of verification.fallback) {
candidate.isMissing = !assetHashes.has(candidate.name)
}
}
@@ -227,15 +177,6 @@ async function fetchMissingInputAssets(
return await assetService.getInputAssetsIncludingPublic(signal)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
/** Group confirmed-missing candidates by file name into view models. */
export function groupCandidatesByName(
candidates: MissingMediaCandidate[]

View File

@@ -1557,8 +1557,7 @@ describe('verifyAssetSupportedCandidates', () => {
it('should not warn or fall back when hash verification is aborted', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => {})
const abortError = new Error('aborted')
abortError.name = 'AbortError'
const abortError = new DOMException('aborted', 'AbortError')
const hash =
'4444444444444444444444444444444444444444444444444444444444444444'
const candidates = [

View File

@@ -24,11 +24,12 @@ import {
} from '@/utils/graphTraversalUtil'
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
import { resolveComboValues } from '@/utils/litegraphUtil'
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
import {
assetService,
toBlake3AssetHash
} from '@/platform/assets/services/assetService'
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/assetHashVerification'
import type { AssetHashVerifier } from '@/platform/assets/utils/assetHashVerification'
export type MissingModelWorkflowData = FlattenableWorkflowGraph & {
models?: ModelFile[]
@@ -450,11 +451,6 @@ interface AssetVerifier {
getAssets: (nodeType: string) => AssetItem[] | undefined
}
type AssetHashVerifier = (
assetHash: string,
signal?: AbortSignal
) => Promise<AssetHashStatus>
export async function verifyAssetSupportedCandidates(
candidates: MissingModelCandidate[],
signal?: AbortSignal,
@@ -468,58 +464,36 @@ export async function verifyAssetSupportedCandidates(
)
if (pendingCandidates.length === 0) return
const pendingNodeTypes = new Set<string>()
const candidatesByHash = new Map<string, MissingModelCandidate[]>()
for (const candidate of pendingCandidates) {
const assetHash = getBlake3AssetHash(candidate)
if (!assetHash) {
pendingNodeTypes.add(candidate.nodeType)
continue
const verification = await verifyCandidatesByAssetHash({
candidates: pendingCandidates,
getAssetHash: getBlake3AssetHash,
signal,
checkAssetHash,
onError: (err) => {
console.warn('[Missing Model Pipeline] Failed to verify asset hash:', err)
}
})
if (verification.aborted) return
const hashCandidates = candidatesByHash.get(assetHash)
if (hashCandidates) hashCandidates.push(candidate)
else candidatesByHash.set(assetHash, [candidate])
for (const candidate of verification.existing) {
candidate.isMissing = false
}
await Promise.all(
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
if (signal?.aborted) return
try {
const status = await checkAssetHash(assetHash, signal)
if (signal?.aborted) return
if (status === 'exists') {
for (const candidate of hashCandidates) {
candidate.isMissing = false
}
return
}
} catch (err) {
if (signal?.aborted || isAbortError(err)) return
console.warn(
'[Missing Model Pipeline] Failed to verify asset hash:',
err
)
}
for (const candidate of hashCandidates) {
pendingNodeTypes.add(candidate.nodeType)
}
})
const nodeTypesNeedingAssetRefresh = new Set(
[...verification.missing, ...verification.fallback].map(
(candidate) => candidate.nodeType
)
)
if (signal?.aborted) return
if (pendingNodeTypes.size === 0) return
if (nodeTypesNeedingAssetRefresh.size === 0) return
const store =
assetsStore ?? (await import('@/stores/assetsStore')).useAssetsStore()
const failedNodeTypes = new Set<string>()
await Promise.allSettled(
[...pendingNodeTypes].map(async (nodeType) => {
[...nodeTypesNeedingAssetRefresh].map(async (nodeType) => {
if (signal?.aborted) return
try {
await store.updateModelsForNodeType(nodeType)
@@ -549,15 +523,6 @@ function getBlake3AssetHash(candidate: MissingModelCandidate): string | null {
return toBlake3AssetHash(candidate.hash)
}
function isAbortError(err: unknown): boolean {
return (
typeof err === 'object' &&
err !== null &&
'name' in err &&
err.name === 'AbortError'
)
}
function normalizePath(path: string): string {
return path.replace(/\\/g, '/')
}

View File

@@ -6,6 +6,7 @@ import { t } from '@/i18n'
import { useCanvasStore } from '@/renderer/core/canvas/canvasStore'
import { app } from '@/scripts/app'
import { useToastStore } from '@/platform/updates/common/toastStore'
import { isAbortError } from '@/utils/typeGuardUtil'
import type { MissingModelCandidate } from '@/platform/missingModel/types'
import type { AssetMetadata } from '@/platform/assets/schemas/assetSchema'
import type { LGraphNode } from '@/lib/litegraph/src/litegraph'
@@ -273,10 +274,6 @@ export const useMissingModelStore = defineStore('missingModel', () => {
fileSizes.value = {}
}
function isAbortError(error: unknown) {
return error instanceof Error && error.name === 'AbortError'
}
async function refreshMissingModels() {
if (isRefreshingMissingModels.value) return