mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-21 21:09:00 +00:00
Compare commits
2 Commits
worktree-s
...
refactor/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
abb892b459 | ||
|
|
c89da7ed35 |
@@ -185,6 +185,12 @@ export const MISSING_TAG = 'missing'
|
||||
/** Result of a HEAD lookup against an exact asset hash. */
|
||||
export type AssetHashStatus = 'exists' | 'missing' | 'invalid'
|
||||
|
||||
/** Verifier signature shared by callers that probe the hash endpoint. */
|
||||
export type AssetHashVerifier = (
|
||||
assetHash: string,
|
||||
signal?: AbortSignal
|
||||
) => Promise<AssetHashStatus>
|
||||
|
||||
const BLAKE3_ASSET_HASH_PATTERN = /^blake3:[0-9a-f]{64}$/i
|
||||
const BLAKE3_HEX_PATTERN = /^[0-9a-f]{64}$/i
|
||||
const uploadedAssetResponseSchema = assetItemSchema.extend({
|
||||
@@ -202,24 +208,16 @@ export function toBlake3AssetHash(hash: string | undefined): string | null {
|
||||
return `blake3:${hash}`
|
||||
}
|
||||
|
||||
function createAbortError(): DOMException {
|
||||
return new DOMException('Aborted', 'AbortError')
|
||||
}
|
||||
|
||||
function throwIfAborted(signal?: AbortSignal): void {
|
||||
if (signal?.aborted) throw createAbortError()
|
||||
}
|
||||
|
||||
async function withCallerAbort<T>(
|
||||
promise: Promise<T>,
|
||||
signal?: AbortSignal
|
||||
): Promise<T> {
|
||||
throwIfAborted(signal)
|
||||
signal?.throwIfAborted()
|
||||
if (!signal) return await promise
|
||||
|
||||
let removeAbortListener = () => {}
|
||||
const abortPromise = new Promise<never>((_, reject) => {
|
||||
const onAbort = () => reject(createAbortError())
|
||||
const onAbort = () => reject(signal.reason)
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
removeAbortListener = () => signal.removeEventListener('abort', onAbort)
|
||||
})
|
||||
@@ -533,7 +531,7 @@ function createAssetService() {
|
||||
let offset = 0
|
||||
|
||||
while (true) {
|
||||
if (signal?.aborted) throw createAbortError()
|
||||
signal?.throwIfAborted()
|
||||
|
||||
const data = await handleAssetRequest(
|
||||
{
|
||||
@@ -589,7 +587,7 @@ function createAssetService() {
|
||||
async function getInputAssetsIncludingPublic(
|
||||
signal?: AbortSignal
|
||||
): Promise<AssetItem[]> {
|
||||
throwIfAborted(signal)
|
||||
signal?.throwIfAborted()
|
||||
if (inputAssetsIncludingPublic) return inputAssetsIncludingPublic
|
||||
|
||||
const request =
|
||||
|
||||
47
src/platform/assets/utils/verifyCandidatesByAssetHash.ts
Normal file
47
src/platform/assets/utils/verifyCandidatesByAssetHash.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import type {
|
||||
AssetHashStatus,
|
||||
AssetHashVerifier
|
||||
} from '@/platform/assets/services/assetService'
|
||||
import { isAbortError } from '@/utils/typeGuardUtil'
|
||||
|
||||
/**
|
||||
* Returns true when the status fully resolves the group; false leaves the group
|
||||
* for the caller's fallback path.
|
||||
*/
|
||||
type ApplyAssetHashStatus<T> = (status: AssetHashStatus, group: T[]) => boolean
|
||||
|
||||
/**
|
||||
* Verifies grouped candidates against the asset hash endpoint in parallel.
|
||||
*
|
||||
* For each `[hash, group]` entry, calls `checkAssetHash(hash)` and hands the
|
||||
* resulting status to `applyStatus`. Groups whose status is unresolved or
|
||||
* whose request fails (non-abort) are returned so the caller can run a
|
||||
* fallback path. Aborts and abort-propagated rejections are silent.
|
||||
*/
|
||||
export async function verifyCandidatesByAssetHash<T>(
|
||||
candidatesByHash: Record<string, T[]>,
|
||||
signal: AbortSignal | undefined,
|
||||
checkAssetHash: AssetHashVerifier,
|
||||
applyStatus: ApplyAssetHashStatus<T>,
|
||||
logTag: string
|
||||
): Promise<T[][]> {
|
||||
const unresolved: T[][] = []
|
||||
|
||||
await Promise.all(
|
||||
Object.entries(candidatesByHash).map(async ([assetHash, group]) => {
|
||||
if (signal?.aborted) return
|
||||
|
||||
try {
|
||||
const status = await checkAssetHash(assetHash, signal)
|
||||
if (signal?.aborted) return
|
||||
if (!applyStatus(status, group)) unresolved.push(group)
|
||||
} catch (err) {
|
||||
if (signal?.aborted || isAbortError(err)) return
|
||||
console.warn(`${logTag} Failed to verify asset hash:`, err)
|
||||
unresolved.push(group)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
return unresolved
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { groupBy } from 'es-toolkit'
|
||||
import { groupBy, partition } from 'es-toolkit'
|
||||
import type { NodeId } from '@/platform/workflow/validation/schemas/workflowSchema'
|
||||
import type {
|
||||
MissingMediaCandidate,
|
||||
@@ -19,11 +19,13 @@ import {
|
||||
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
|
||||
import { resolveComboValues } from '@/utils/litegraphUtil'
|
||||
import type { AssetItem } from '@/platform/assets/schemas/assetSchema'
|
||||
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
|
||||
import type { AssetHashVerifier } from '@/platform/assets/services/assetService'
|
||||
import {
|
||||
assetService,
|
||||
isBlake3AssetHash
|
||||
} from '@/platform/assets/services/assetService'
|
||||
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/verifyCandidatesByAssetHash'
|
||||
import { isAbortError } from '@/utils/typeGuardUtil'
|
||||
|
||||
/** Map of node types to their media widget name and media type. */
|
||||
const MEDIA_NODE_WIDGETS: Record<
|
||||
@@ -112,70 +114,8 @@ export function scanNodeMediaCandidates(
|
||||
return candidates
|
||||
}
|
||||
|
||||
type AssetHashVerifier = (
|
||||
assetHash: string,
|
||||
signal?: AbortSignal
|
||||
) => Promise<AssetHashStatus>
|
||||
|
||||
type InputAssetFetcher = (signal?: AbortSignal) => Promise<AssetItem[]>
|
||||
|
||||
function groupCandidatesForHashLookup(candidates: MissingMediaCandidate[]): {
|
||||
candidatesByHash: Map<string, MissingMediaCandidate[]>
|
||||
legacyCandidates: MissingMediaCandidate[]
|
||||
} {
|
||||
const candidatesByHash = new Map<string, MissingMediaCandidate[]>()
|
||||
const legacyCandidates: MissingMediaCandidate[] = []
|
||||
|
||||
for (const candidate of candidates) {
|
||||
if (!isBlake3AssetHash(candidate.name)) {
|
||||
legacyCandidates.push(candidate)
|
||||
continue
|
||||
}
|
||||
|
||||
const hashCandidates = candidatesByHash.get(candidate.name)
|
||||
if (hashCandidates) hashCandidates.push(candidate)
|
||||
else candidatesByHash.set(candidate.name, [candidate])
|
||||
}
|
||||
|
||||
return { candidatesByHash, legacyCandidates }
|
||||
}
|
||||
|
||||
async function verifyCandidatesByHash(
|
||||
candidatesByHash: Map<string, MissingMediaCandidate[]>,
|
||||
legacyCandidates: MissingMediaCandidate[],
|
||||
signal: AbortSignal | undefined,
|
||||
checkAssetHash: AssetHashVerifier
|
||||
): Promise<void> {
|
||||
await Promise.all(
|
||||
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
|
||||
if (signal?.aborted) return
|
||||
|
||||
let status: AssetHashStatus
|
||||
try {
|
||||
status = await checkAssetHash(assetHash, signal)
|
||||
if (signal?.aborted) return
|
||||
} catch (err) {
|
||||
if (signal?.aborted || isAbortError(err)) return
|
||||
console.warn(
|
||||
'[Missing Media Pipeline] Failed to verify asset hash:',
|
||||
err
|
||||
)
|
||||
legacyCandidates.push(...hashCandidates)
|
||||
return
|
||||
}
|
||||
|
||||
if (status === 'invalid') {
|
||||
legacyCandidates.push(...hashCandidates)
|
||||
return
|
||||
}
|
||||
|
||||
for (const candidate of hashCandidates) {
|
||||
candidate.isMissing = status === 'missing'
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify cloud media candidates by probing the asset hash endpoint first.
|
||||
* Invalid hash values fall back to the legacy input asset list check.
|
||||
@@ -191,15 +131,24 @@ export async function verifyCloudMediaCandidates(
|
||||
const pending = candidates.filter((c) => c.isMissing === undefined)
|
||||
if (pending.length === 0) return
|
||||
|
||||
const { candidatesByHash, legacyCandidates } =
|
||||
groupCandidatesForHashLookup(pending)
|
||||
await verifyCandidatesByHash(
|
||||
const [hashable, nonHashCandidates] = partition(pending, (c) =>
|
||||
isBlake3AssetHash(c.name)
|
||||
)
|
||||
const candidatesByHash = groupBy(hashable, (c) => c.name)
|
||||
|
||||
const unresolvedHashGroups = await verifyCandidatesByAssetHash(
|
||||
candidatesByHash,
|
||||
legacyCandidates,
|
||||
signal,
|
||||
checkAssetHash
|
||||
checkAssetHash,
|
||||
(status, group) => {
|
||||
if (status === 'invalid') return false
|
||||
for (const c of group) c.isMissing = status === 'missing'
|
||||
return true
|
||||
},
|
||||
'[Missing Media Pipeline]'
|
||||
)
|
||||
|
||||
const legacyCandidates = nonHashCandidates.concat(...unresolvedHashGroups)
|
||||
if (signal?.aborted || legacyCandidates.length === 0) return
|
||||
|
||||
let inputAssets: AssetItem[]
|
||||
@@ -227,15 +176,6 @@ async function fetchMissingInputAssets(
|
||||
return await assetService.getInputAssetsIncludingPublic(signal)
|
||||
}
|
||||
|
||||
function isAbortError(err: unknown): boolean {
|
||||
return (
|
||||
typeof err === 'object' &&
|
||||
err !== null &&
|
||||
'name' in err &&
|
||||
err.name === 'AbortError'
|
||||
)
|
||||
}
|
||||
|
||||
/** Group confirmed-missing candidates by file name into view models. */
|
||||
export function groupCandidatesByName(
|
||||
candidates: MissingMediaCandidate[]
|
||||
|
||||
@@ -24,11 +24,13 @@ import {
|
||||
} from '@/utils/graphTraversalUtil'
|
||||
import { LGraphEventMode } from '@/lib/litegraph/src/types/globalEnums'
|
||||
import { resolveComboValues } from '@/utils/litegraphUtil'
|
||||
import type { AssetHashStatus } from '@/platform/assets/services/assetService'
|
||||
import type { AssetHashVerifier } from '@/platform/assets/services/assetService'
|
||||
import {
|
||||
assetService,
|
||||
toBlake3AssetHash
|
||||
} from '@/platform/assets/services/assetService'
|
||||
import { verifyCandidatesByAssetHash } from '@/platform/assets/utils/verifyCandidatesByAssetHash'
|
||||
import { groupBy, partition } from 'es-toolkit'
|
||||
|
||||
export type MissingModelWorkflowData = FlattenableWorkflowGraph & {
|
||||
models?: ModelFile[]
|
||||
@@ -450,11 +452,6 @@ interface AssetVerifier {
|
||||
getAssets: (nodeType: string) => AssetItem[] | undefined
|
||||
}
|
||||
|
||||
type AssetHashVerifier = (
|
||||
assetHash: string,
|
||||
signal?: AbortSignal
|
||||
) => Promise<AssetHashStatus>
|
||||
|
||||
export async function verifyAssetSupportedCandidates(
|
||||
candidates: MissingModelCandidate[],
|
||||
signal?: AbortSignal,
|
||||
@@ -468,48 +465,27 @@ export async function verifyAssetSupportedCandidates(
|
||||
)
|
||||
if (pendingCandidates.length === 0) return
|
||||
|
||||
const pendingNodeTypes = new Set<string>()
|
||||
const candidatesByHash = new Map<string, MissingModelCandidate[]>()
|
||||
|
||||
for (const candidate of pendingCandidates) {
|
||||
const assetHash = getBlake3AssetHash(candidate)
|
||||
if (!assetHash) {
|
||||
pendingNodeTypes.add(candidate.nodeType)
|
||||
continue
|
||||
}
|
||||
|
||||
const hashCandidates = candidatesByHash.get(assetHash)
|
||||
if (hashCandidates) hashCandidates.push(candidate)
|
||||
else candidatesByHash.set(assetHash, [candidate])
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
Array.from(candidatesByHash, async ([assetHash, hashCandidates]) => {
|
||||
if (signal?.aborted) return
|
||||
|
||||
try {
|
||||
const status = await checkAssetHash(assetHash, signal)
|
||||
if (signal?.aborted) return
|
||||
|
||||
if (status === 'exists') {
|
||||
for (const candidate of hashCandidates) {
|
||||
candidate.isMissing = false
|
||||
}
|
||||
return
|
||||
}
|
||||
} catch (err) {
|
||||
if (signal?.aborted || isAbortError(err)) return
|
||||
console.warn(
|
||||
'[Missing Model Pipeline] Failed to verify asset hash:',
|
||||
err
|
||||
)
|
||||
}
|
||||
|
||||
for (const candidate of hashCandidates) {
|
||||
pendingNodeTypes.add(candidate.nodeType)
|
||||
}
|
||||
})
|
||||
const [hashable, nonHashCandidates] = partition(
|
||||
pendingCandidates,
|
||||
(c) => getBlake3AssetHash(c) !== null
|
||||
)
|
||||
const pendingNodeTypes = new Set(nonHashCandidates.map((c) => c.nodeType))
|
||||
const candidatesByHash = groupBy(hashable, (c) => getBlake3AssetHash(c)!)
|
||||
|
||||
const unresolvedHashGroups = await verifyCandidatesByAssetHash(
|
||||
candidatesByHash,
|
||||
signal,
|
||||
checkAssetHash,
|
||||
(status, group) => {
|
||||
if (status !== 'exists') return false
|
||||
for (const c of group) c.isMissing = false
|
||||
return true
|
||||
},
|
||||
'[Missing Model Pipeline]'
|
||||
)
|
||||
for (const group of unresolvedHashGroups) {
|
||||
for (const c of group) pendingNodeTypes.add(c.nodeType)
|
||||
}
|
||||
|
||||
if (signal?.aborted) return
|
||||
if (pendingNodeTypes.size === 0) return
|
||||
@@ -549,15 +525,6 @@ function getBlake3AssetHash(candidate: MissingModelCandidate): string | null {
|
||||
return toBlake3AssetHash(candidate.hash)
|
||||
}
|
||||
|
||||
function isAbortError(err: unknown): boolean {
|
||||
return (
|
||||
typeof err === 'object' &&
|
||||
err !== null &&
|
||||
'name' in err &&
|
||||
err.name === 'AbortError'
|
||||
)
|
||||
}
|
||||
|
||||
function normalizePath(path: string): string {
|
||||
return path.replace(/\\/g, '/')
|
||||
}
|
||||
|
||||
@@ -8,12 +8,14 @@ import type { ResultItemType } from '@/schemas/apiSchema'
|
||||
|
||||
/**
|
||||
* Check if an error is an AbortError triggered by `AbortController#abort`
|
||||
* when cancelling a request.
|
||||
* when cancelling a request. Real AbortControllers reject with a
|
||||
* `DOMException` (which extends `Error`), and some libraries throw plain
|
||||
* `Error`s with `name === 'AbortError'`. Either is treated as an abort.
|
||||
*/
|
||||
export const isAbortError = (
|
||||
err: unknown
|
||||
): err is DOMException & { name: 'AbortError' } =>
|
||||
err instanceof DOMException && err.name === 'AbortError'
|
||||
): err is Error & { name: 'AbortError' } =>
|
||||
err instanceof Error && err.name === 'AbortError'
|
||||
|
||||
export const isSubgraph = (
|
||||
item: LGraph | Subgraph | undefined | null
|
||||
|
||||
Reference in New Issue
Block a user