mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-22 21:38:52 +00:00
Compare commits
2 Commits
save-previ
...
fix/cached
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
536703cd6e | ||
|
|
b7b7eb55aa |
@@ -192,6 +192,16 @@ export class ExecutionHelper {
|
||||
)
|
||||
}
|
||||
|
||||
/** Send `execution_cached` WS event with the list of cached node IDs. */
|
||||
executionCached(jobId: string, nodes: string[]): void {
|
||||
this.ws.send(
|
||||
JSON.stringify({
|
||||
type: 'execution_cached',
|
||||
data: { prompt_id: jobId, timestamp: Date.now(), nodes }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/** Send `execution_success` WS event. */
|
||||
executionSuccess(jobId: string): void {
|
||||
this.requireWs().send(
|
||||
@@ -260,6 +270,48 @@ export class ExecutionHelper {
|
||||
this.status(0)
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a fully-cached job: mock the job detail endpoint with full
|
||||
* outputs, send execution_cached + execution_success, and trigger a
|
||||
* history refresh. The store will fetch job detail to populate outputs.
|
||||
*/
|
||||
async completeWithCachedHistory(
|
||||
jobId: string,
|
||||
cachedNodeIds: string[],
|
||||
nodeId: string,
|
||||
filename: string
|
||||
): Promise<void> {
|
||||
this.completedJobs.push(
|
||||
createMockJob({
|
||||
id: jobId,
|
||||
preview_output: {
|
||||
filename,
|
||||
subfolder: '',
|
||||
type: 'output',
|
||||
nodeId,
|
||||
mediaType: 'images'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
await this.assets.mockOutputHistory(this.completedJobs)
|
||||
await this.assets.mockJobDetail(jobId, {
|
||||
id: jobId,
|
||||
status: 'completed',
|
||||
create_time: Date.now() / 1000,
|
||||
priority: 0,
|
||||
outputs: {
|
||||
[nodeId]: {
|
||||
images: [{ filename, subfolder: '', type: 'output' as const }]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
this.executionCached(jobId, cachedNodeIds)
|
||||
this.executionSuccess(jobId)
|
||||
this.status(0)
|
||||
}
|
||||
|
||||
/** Send `status` WS event to update queue count. */
|
||||
status(queueRemaining: number): void {
|
||||
this.requireWs().send(
|
||||
|
||||
@@ -339,6 +339,32 @@ test.describe('Output History', { tag: '@ui' }, () => {
|
||||
).toBeInViewport()
|
||||
})
|
||||
|
||||
test('Cached execution shows image output (not permanent skeleton)', async ({
|
||||
comfyPage,
|
||||
getWebSocket
|
||||
}) => {
|
||||
const ws = await getWebSocket()
|
||||
const { exec, jobId } = await startExecution(comfyPage, ws)
|
||||
|
||||
await expect(
|
||||
comfyPage.appMode.outputHistory.skeletons.first()
|
||||
).toBeVisible()
|
||||
|
||||
// All nodes return cached results — no `executed` events fire
|
||||
await exec.completeWithCachedHistory(
|
||||
jobId,
|
||||
ALL_NODE_IDS,
|
||||
SAVE_IMAGE_NODE,
|
||||
'cached_output.png'
|
||||
)
|
||||
|
||||
// Skeleton must resolve to an image, not remain stuck
|
||||
await expect(comfyPage.appMode.outputHistory.skeletons).toHaveCount(0)
|
||||
await expect(
|
||||
comfyPage.appMode.outputHistory.imageOutputs.first()
|
||||
).toBeVisible()
|
||||
})
|
||||
|
||||
test('Execution error cleans up in-progress items', async ({
|
||||
comfyPage,
|
||||
getWebSocket
|
||||
|
||||
@@ -13,8 +13,9 @@ const activeWorkflowPathRef = ref<string>('workflows/test-workflow.json')
|
||||
const jobIdToWorkflowPathRef = ref(new Map<string, string>())
|
||||
const selectedOutputsRef = ref<string[]>([])
|
||||
|
||||
const { apiTarget } = vi.hoisted(() => ({
|
||||
apiTarget: new EventTarget()
|
||||
const { apiTarget, getJobDetailMock } = vi.hoisted(() => ({
|
||||
apiTarget: new EventTarget(),
|
||||
getJobDetailMock: vi.fn()
|
||||
}))
|
||||
|
||||
vi.mock('@/composables/useAppMode', () => ({
|
||||
@@ -64,6 +65,10 @@ vi.mock('@/scripts/api', () => ({
|
||||
})
|
||||
}))
|
||||
|
||||
vi.mock('@/services/jobOutputCache', () => ({
|
||||
getJobDetail: (...args: unknown[]) => getJobDetailMock(...args)
|
||||
}))
|
||||
|
||||
vi.mock('@/renderer/extensions/linearMode/flattenNodeOutput', () => ({
|
||||
flattenNodeOutput: ([nodeId, output]: [
|
||||
string | number,
|
||||
@@ -111,6 +116,7 @@ describe('linearOutputStore', () => {
|
||||
activeWorkflowPathRef.value = 'workflows/test-workflow.json'
|
||||
jobIdToWorkflowPathRef.value = new Map()
|
||||
selectedOutputsRef.value = []
|
||||
getJobDetailMock.mockReset()
|
||||
})
|
||||
|
||||
it('creates a skeleton item when a job starts', () => {
|
||||
@@ -1313,6 +1319,115 @@ describe('linearOutputStore', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('cached execution', () => {
|
||||
function dispatchCachedEvent(promptId: string, nodes: string[]) {
|
||||
apiTarget.dispatchEvent(
|
||||
new CustomEvent('execution_cached', {
|
||||
detail: { prompt_id: promptId, timestamp: Date.now(), nodes }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
it('keeps skeleton visible while fetching cached job outputs', async () => {
|
||||
const store = useLinearOutputStore()
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
let resolveJobDetail!: (v: unknown) => void
|
||||
getJobDetailMock.mockReturnValue(
|
||||
new Promise((resolve) => {
|
||||
resolveJobDetail = resolve
|
||||
})
|
||||
)
|
||||
|
||||
store.onJobStart('job-1')
|
||||
dispatchCachedEvent('job-1', ['1'])
|
||||
store.onJobComplete('job-1')
|
||||
|
||||
// Skeleton should still be present while awaiting job detail
|
||||
expect(store.inProgressItems).toHaveLength(1)
|
||||
expect(store.inProgressItems[0].state).toBe('skeleton')
|
||||
|
||||
// Resolve to unblock async work
|
||||
resolveJobDetail(null)
|
||||
})
|
||||
|
||||
it('replaces skeleton with image items once cached job detail resolves', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
let resolveJobDetail!: (v: unknown) => void
|
||||
getJobDetailMock.mockReturnValue(
|
||||
new Promise((resolve) => {
|
||||
resolveJobDetail = resolve
|
||||
})
|
||||
)
|
||||
|
||||
store.onJobStart('job-1')
|
||||
dispatchCachedEvent('job-1', ['1'])
|
||||
store.onJobComplete('job-1')
|
||||
|
||||
resolveJobDetail({
|
||||
id: 'job-1',
|
||||
status: 'completed',
|
||||
create_time: Date.now(),
|
||||
outputs: {
|
||||
'1': {
|
||||
images: [{ filename: 'cached.png', subfolder: '', type: 'output' }]
|
||||
}
|
||||
}
|
||||
})
|
||||
await nextTick()
|
||||
|
||||
const imageItems = store.inProgressItems.filter(
|
||||
(i) => i.state === 'image'
|
||||
)
|
||||
expect(imageItems).toHaveLength(1)
|
||||
expect(imageItems[0].output).toBeDefined()
|
||||
expect(store.pendingResolve.has('job-1')).toBe(true)
|
||||
})
|
||||
|
||||
it('removes skeleton when cached job detail has no previewable outputs', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
getJobDetailMock.mockResolvedValue({
|
||||
id: 'job-1',
|
||||
status: 'completed',
|
||||
create_time: Date.now(),
|
||||
outputs: {}
|
||||
})
|
||||
|
||||
store.onJobStart('job-1')
|
||||
dispatchCachedEvent('job-1', ['1'])
|
||||
store.onJobComplete('job-1')
|
||||
await nextTick()
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('removes skeleton when cached job detail fetch fails', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
getJobDetailMock.mockResolvedValue(undefined)
|
||||
|
||||
store.onJobStart('job-1')
|
||||
dispatchCachedEvent('job-1', ['1'])
|
||||
store.onJobComplete('job-1')
|
||||
await nextTick()
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('does not defer removal for jobs without cached nodes', () => {
|
||||
const store = useLinearOutputStore()
|
||||
store.onJobStart('job-1')
|
||||
store.onJobComplete('job-1')
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
expect(getJobDetailMock).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('deferred path mapping (WebSocket/HTTP race)', () => {
|
||||
it('starts tracking when path mapping arrives after activeJobId', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
|
||||
@@ -6,8 +6,13 @@ import { useWorkflowStore } from '@/platform/workflow/management/stores/workflow
|
||||
import { flattenNodeOutput } from '@/renderer/extensions/linearMode/flattenNodeOutput'
|
||||
import type { InProgressItem } from '@/renderer/extensions/linearMode/linearModeTypes'
|
||||
import type { ResultItemImpl } from '@/stores/queueStore'
|
||||
import type { ExecutedWsMessage, JobId } from '@/schemas/apiSchema'
|
||||
import type {
|
||||
ExecutedWsMessage,
|
||||
ExecutionCachedWsMessage,
|
||||
JobId
|
||||
} from '@/schemas/apiSchema'
|
||||
import { api } from '@/scripts/api'
|
||||
import { getJobDetail } from '@/services/jobOutputCache'
|
||||
import { useAppModeStore } from '@/stores/appModeStore'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { useJobPreviewStore } from '@/stores/jobPreviewStore'
|
||||
@@ -26,6 +31,7 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
const trackedJobId = ref<JobId | null>(null)
|
||||
const pendingResolve = ref(new Set<JobId>())
|
||||
const executedNodeIds = new Set<string>()
|
||||
const cachedJobIds = new Set<JobId>()
|
||||
|
||||
const activeWorkflowInProgressItems = computed(() => {
|
||||
const path = workflowStore.activeWorkflow?.path
|
||||
@@ -166,6 +172,51 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
inProgressItems.value = [...newItems, ...inProgressItems.value]
|
||||
}
|
||||
|
||||
async function populateCachedOutputs(jobId: JobId) {
|
||||
const jobDetail = await getJobDetail(jobId)
|
||||
if (!jobDetail?.outputs) {
|
||||
removeJobItems(jobId)
|
||||
return
|
||||
}
|
||||
|
||||
const outputNodeIds = appModeStore.selectedOutputs
|
||||
const allOutputs: ResultItemImpl[] = []
|
||||
for (const [nodeId, output] of Object.entries(jobDetail.outputs)) {
|
||||
if (
|
||||
outputNodeIds.length > 0 &&
|
||||
!outputNodeIds.some((id) => String(id) === nodeId)
|
||||
)
|
||||
continue
|
||||
allOutputs.push(...flattenNodeOutput([nodeId, output]))
|
||||
}
|
||||
|
||||
if (allOutputs.length === 0) {
|
||||
removeJobItems(jobId)
|
||||
return
|
||||
}
|
||||
|
||||
const skeletonItem = inProgressItems.value.find(
|
||||
(i) => i.jobId === jobId && i.state === 'skeleton'
|
||||
)
|
||||
const newItems: InProgressItem[] = allOutputs.map((o, idx) => ({
|
||||
id: skeletonItem && idx === 0 ? skeletonItem.id : makeItemId(jobId),
|
||||
jobId,
|
||||
state: 'image' as const,
|
||||
output: o
|
||||
}))
|
||||
|
||||
if (skeletonItem) {
|
||||
const idx = inProgressItems.value.indexOf(skeletonItem)
|
||||
const arr = [...inProgressItems.value]
|
||||
arr.splice(idx, 1, ...newItems)
|
||||
inProgressItems.value = arr
|
||||
} else {
|
||||
inProgressItems.value = [...newItems, ...inProgressItems.value]
|
||||
}
|
||||
autoSelect(`slot:${newItems[0].id}`, jobId)
|
||||
pendingResolve.value = new Set([...pendingResolve.value, jobId])
|
||||
}
|
||||
|
||||
function onJobComplete(jobId: JobId) {
|
||||
// On any job complete, remove all pending resolve items.
|
||||
if (pendingResolve.value.size > 0) {
|
||||
@@ -194,6 +245,11 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
(i) => i.jobId !== jobId || i.state === 'image'
|
||||
)
|
||||
pendingResolve.value = new Set([...pendingResolve.value, jobId])
|
||||
} else if (cachedJobIds.has(jobId)) {
|
||||
// No executed nodes produced images, but some nodes were cached.
|
||||
// Keep the skeleton visible while we fetch outputs from job history.
|
||||
cachedJobIds.delete(jobId)
|
||||
void populateCachedOutputs(jobId)
|
||||
} else {
|
||||
removeJobItems(jobId)
|
||||
}
|
||||
@@ -262,6 +318,14 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
onNodeExecuted(jobId, detail)
|
||||
}
|
||||
|
||||
function handleExecutionCached({
|
||||
detail
|
||||
}: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
if (detail.nodes.length > 0) {
|
||||
cachedJobIds.add(detail.prompt_id)
|
||||
}
|
||||
}
|
||||
|
||||
// Watch both activeJobId and the path mapping together. The path mapping
|
||||
// may arrive after activeJobId due to a race between WebSocket
|
||||
// (execution_start) and the HTTP response (queuePrompt > storeJob).
|
||||
@@ -356,9 +420,11 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
(active, wasActive) => {
|
||||
if (active) {
|
||||
api.addEventListener('executed', handleExecuted)
|
||||
api.addEventListener('execution_cached', handleExecutionCached)
|
||||
reconcileOnEnter()
|
||||
} else if (wasActive) {
|
||||
api.removeEventListener('executed', handleExecuted)
|
||||
api.removeEventListener('execution_cached', handleExecutionCached)
|
||||
cleanupOnLeave()
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user