mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-22 21:38:52 +00:00
Compare commits
8 Commits
glary/fix-
...
glary/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9eb6046323 | ||
|
|
4d9011b756 | ||
|
|
452ef585bd | ||
|
|
7f48b3324a | ||
|
|
07f4569458 | ||
|
|
34b556c7e4 | ||
|
|
82f1f91e3c | ||
|
|
cd73b6f987 |
@@ -19,26 +19,15 @@ reviews:
|
||||
- name: End-to-end regression coverage for fixes
|
||||
mode: error
|
||||
instructions: |
|
||||
Use only PR metadata already available in the review context:
|
||||
- the PR title
|
||||
- commit subjects in this PR
|
||||
- The files changed in this PR relative to the PR base (equivalent to `base...head`)
|
||||
- the PR description.
|
||||
Do not rely on shell commands.
|
||||
Do not inspect reverse diffs, files changed only on the base branch, or files outside this PR.
|
||||
If the changed-file list or commit subjects are unavailable, mark the check inconclusive instead of guessing.
|
||||
Use only PR metadata already available in the review context: the PR title, commit subjects in this PR, the files changed in this PR relative to the PR base (equivalent to `base...head`), and the PR description.
|
||||
Do not rely on shell commands. Do not inspect reverse diffs, files changed only on the base branch, or files outside this PR. If the changed-file list or commit subjects are unavailable, mark the check inconclusive instead of guessing.
|
||||
|
||||
Fail if all of the following are true:
|
||||
1. The PR title and/or any commit subject in the PR uses bug-fix language such as `fix`, `fixed`, `fixes`, `fixing`, `bugfix`, or `hotfix`.
|
||||
2. The PR changes files under `src/` or `packages/` related to the main frontend application but the PR does not change at least one file under `browser_tests/`.
|
||||
3. The PR description lacks a concrete explanation of why an end-to-end regression test was not added.
|
||||
|
||||
Do not fail if the changes are exclusively in `apps/website`, just documentation changes, or changes related to CI processes.
|
||||
The goal is to make sure that fixes include End-to-End regression tests. Do not insist on tests when the PR is not fixing a bug.
|
||||
|
||||
Pass otherwise.
|
||||
When failing, mention which bug-fix signal you found and ask the author to either add or update a Playwright regression test under `browser_tests/` or add a concrete explanation in the PR description of why an end-to-end regression test is not practical.
|
||||
Pass if at least one of the following is true:
|
||||
1. Neither the PR title nor any commit subject in the PR uses bug-fix language such as `fix`, `fixed`, `fixes`, `fixing`, `bugfix`, or `hotfix`.
|
||||
2. The PR changes at least one file under `browser_tests/`.
|
||||
3. The PR description includes a concrete, non-placeholder explanation of why an end-to-end regression test was not added.
|
||||
|
||||
Fail otherwise. When failing, mention which bug-fix signal you found and ask the author to either add or update a Playwright regression test under `browser_tests/` or add a concrete explanation in the PR description of why an end-to-end regression test is not practical.
|
||||
- name: ADR compliance for entity/litegraph changes
|
||||
mode: warning
|
||||
instructions: |
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
import { expect } from '@playwright/test'
|
||||
|
||||
import type { ComfyPage } from '@e2e/fixtures/ComfyPage'
|
||||
import { comfyPageFixture as test } from '@e2e/fixtures/ComfyPage'
|
||||
|
||||
async function getNodeOutputImageCount(
|
||||
comfyPage: ComfyPage,
|
||||
nodeId: string
|
||||
): Promise<number> {
|
||||
return await comfyPage.page.evaluate(
|
||||
(id) => window.app!.nodeOutputs?.[id]?.images?.length ?? 0,
|
||||
nodeId
|
||||
)
|
||||
}
|
||||
|
||||
async function seedNodeOutput(
|
||||
comfyPage: ComfyPage,
|
||||
nodeId: string
|
||||
): Promise<void> {
|
||||
await comfyPage.page.evaluate((id) => {
|
||||
window.app!.nodeOutputs[id] = {
|
||||
images: [
|
||||
{ filename: 'seeded-preview.png', subfolder: '', type: 'output' }
|
||||
]
|
||||
}
|
||||
}, nodeId)
|
||||
}
|
||||
|
||||
test.describe('Node output cleanup on removal', { tag: '@workflow' }, () => {
|
||||
test('Deleting a node clears its outputs from the store', async ({
|
||||
comfyPage
|
||||
}) => {
|
||||
test.info().annotations.push({
|
||||
type: 'regression',
|
||||
description:
|
||||
'Pressing delete left previews behind because nodeOutputStore did not listen to onNodeRemoved'
|
||||
})
|
||||
|
||||
const node = (await comfyPage.nodeOps.getFirstNodeRef())!
|
||||
expect(node).toBeTruthy()
|
||||
const nodeId = String(node.id)
|
||||
|
||||
await seedNodeOutput(comfyPage, nodeId)
|
||||
await expect.poll(() => getNodeOutputImageCount(comfyPage, nodeId)).toBe(1)
|
||||
|
||||
await node.click('title')
|
||||
await comfyPage.page.keyboard.press('Delete')
|
||||
|
||||
await expect.poll(() => getNodeOutputImageCount(comfyPage, nodeId)).toBe(0)
|
||||
})
|
||||
|
||||
test('Undoing a node addition clears outputs produced for the removed node', async ({
|
||||
comfyPage
|
||||
}) => {
|
||||
test.info().annotations.push({
|
||||
type: 'regression',
|
||||
description:
|
||||
'Undo removed the node but left its preview rendered because the removal lifecycle did not invalidate nodeOutputStore'
|
||||
})
|
||||
|
||||
const initialNodeCount = await comfyPage.nodeOps.getGraphNodesCount()
|
||||
|
||||
await comfyPage.canvasOps.clickEmptySpace()
|
||||
await comfyPage.page.keyboard.press('Control+a')
|
||||
|
||||
const addedNodeId = await comfyPage.page.evaluate(() => {
|
||||
const litegraph = window.LiteGraph
|
||||
if (!litegraph) throw new Error('LiteGraph is not available on window')
|
||||
const graph = window.app!.graph
|
||||
const registered = litegraph.registered_node_types
|
||||
const typeName = registered['LoadImage']
|
||||
? 'LoadImage'
|
||||
: registered['PreviewImage']
|
||||
? 'PreviewImage'
|
||||
: undefined
|
||||
if (!typeName) {
|
||||
throw new Error('No suitable node type registered for the test')
|
||||
}
|
||||
const node = litegraph.createNode(typeName)
|
||||
if (!node) throw new Error('Failed to create test node')
|
||||
graph.add(node)
|
||||
return String(node.id)
|
||||
})
|
||||
|
||||
await expect
|
||||
.poll(() => comfyPage.nodeOps.getGraphNodesCount())
|
||||
.toBe(initialNodeCount + 1)
|
||||
|
||||
await seedNodeOutput(comfyPage, addedNodeId)
|
||||
await expect
|
||||
.poll(() => getNodeOutputImageCount(comfyPage, addedNodeId))
|
||||
.toBe(1)
|
||||
|
||||
await comfyPage.canvasOps.clickEmptySpace()
|
||||
await comfyPage.keyboard.undo()
|
||||
|
||||
await expect
|
||||
.poll(() => comfyPage.nodeOps.getGraphNodesCount())
|
||||
.toBe(initialNodeCount)
|
||||
await expect
|
||||
.poll(() => getNodeOutputImageCount(comfyPage, addedNodeId))
|
||||
.toBe(0)
|
||||
})
|
||||
})
|
||||
@@ -1,7 +1,7 @@
|
||||
import {
|
||||
comfyPageFixture as test,
|
||||
comfyExpect as expect
|
||||
} from '../fixtures/ComfyPage'
|
||||
} from '@e2e/fixtures/ComfyPage'
|
||||
|
||||
test.describe('Preview as Text node', () => {
|
||||
test('does not include preview widget values in the API prompt', async ({
|
||||
|
||||
@@ -143,7 +143,6 @@ import WorkflowTabs from '@/components/topbar/WorkflowTabs.vue'
|
||||
import { useChainCallback } from '@/composables/functional/useChainCallback'
|
||||
import { installErrorClearingHooks } from '@/composables/graph/useErrorClearingHooks'
|
||||
import type { VueNodeData } from '@/composables/graph/useGraphNodeManager'
|
||||
import { installNodeOutputClearingHooks } from '@/composables/graph/useNodeOutputClearingHooks'
|
||||
import { useVueNodeLifecycle } from '@/composables/graph/useVueNodeLifecycle'
|
||||
import { useNodeBadge } from '@/composables/node/useNodeBadge'
|
||||
import { useCanvasDrop } from '@/composables/useCanvasDrop'
|
||||
@@ -250,16 +249,11 @@ const vueNodeLifecycle = useVueNodeLifecycle()
|
||||
|
||||
// Error-clearing hooks run regardless of rendering mode (Vue or legacy canvas).
|
||||
let cleanupErrorHooks: (() => void) | null = null
|
||||
let cleanupNodeOutputHooks: (() => void) | null = null
|
||||
watch(
|
||||
() => canvasStore.currentGraph,
|
||||
(graph) => {
|
||||
cleanupErrorHooks?.()
|
||||
cleanupErrorHooks = graph ? installErrorClearingHooks(graph) : null
|
||||
cleanupNodeOutputHooks?.()
|
||||
cleanupNodeOutputHooks = graph
|
||||
? installNodeOutputClearingHooks(graph)
|
||||
: null
|
||||
}
|
||||
)
|
||||
|
||||
@@ -544,9 +538,6 @@ onMounted(async () => {
|
||||
// Install error-clearing hooks on the initial graph
|
||||
if (comfyApp.canvas?.graph) {
|
||||
cleanupErrorHooks = installErrorClearingHooks(comfyApp.canvas.graph)
|
||||
cleanupNodeOutputHooks = installNodeOutputClearingHooks(
|
||||
comfyApp.canvas.graph
|
||||
)
|
||||
}
|
||||
|
||||
vueNodeLifecycle.setupEmptyGraphListener()
|
||||
@@ -606,8 +597,6 @@ onMounted(async () => {
|
||||
onUnmounted(() => {
|
||||
cleanupErrorHooks?.()
|
||||
cleanupErrorHooks = null
|
||||
cleanupNodeOutputHooks?.()
|
||||
cleanupNodeOutputHooks = null
|
||||
vueNodeLifecycle.cleanup()
|
||||
})
|
||||
function forwardPanEvent(e: PointerEvent) {
|
||||
|
||||
@@ -1,208 +0,0 @@
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import { installNodeOutputClearingHooks } from '@/composables/graph/useNodeOutputClearingHooks'
|
||||
import { LGraph, LGraphNode } from '@/lib/litegraph/src/litegraph'
|
||||
import {
|
||||
createTestSubgraph,
|
||||
createTestSubgraphNode
|
||||
} from '@/lib/litegraph/src/subgraph/__fixtures__/subgraphHelpers'
|
||||
import { app } from '@/scripts/app'
|
||||
import { ChangeTracker } from '@/scripts/changeTracker'
|
||||
import { useNodeOutputStore } from '@/stores/nodeOutputStore'
|
||||
import { useWorkflowStore } from '@/platform/workflow/management/stores/workflowStore'
|
||||
|
||||
function seedOutputForLocator(locatorId: string) {
|
||||
app.nodeOutputs[locatorId] = {
|
||||
images: [{ filename: 'preview.png', type: 'output', subfolder: '' }]
|
||||
}
|
||||
}
|
||||
|
||||
describe('installNodeOutputClearingHooks', () => {
|
||||
beforeEach(() => {
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
app.nodeOutputs = {}
|
||||
app.nodePreviewImages = {}
|
||||
})
|
||||
|
||||
it('removes outputs for a root-level node when it is removed from the graph', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const node = new LGraphNode('LoadImage')
|
||||
graph.add(node)
|
||||
|
||||
const locatorId = String(node.id)
|
||||
seedOutputForLocator(locatorId)
|
||||
expect(app.nodeOutputs[locatorId]).toBeDefined()
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
graph.remove(node)
|
||||
|
||||
expect(app.nodeOutputs[locatorId]).toBeUndefined()
|
||||
expect(useNodeOutputStore().nodeOutputs[locatorId]).toBeUndefined()
|
||||
})
|
||||
|
||||
it('removes outputs for a subgraph interior node using subgraphUuid:nodeId locator', () => {
|
||||
const subgraph = createTestSubgraph()
|
||||
const interiorNode = new LGraphNode('LoadImage')
|
||||
subgraph.add(interiorNode)
|
||||
|
||||
const subgraphNode = createTestSubgraphNode(subgraph, { id: 65 })
|
||||
const rootGraph = subgraphNode.graph as LGraph
|
||||
rootGraph.add(subgraphNode)
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(rootGraph)
|
||||
|
||||
const interiorLocator = `${subgraph.id}:${interiorNode.id}`
|
||||
seedOutputForLocator(interiorLocator)
|
||||
expect(app.nodeOutputs[interiorLocator]).toBeDefined()
|
||||
|
||||
installNodeOutputClearingHooks(subgraph)
|
||||
subgraph.remove(interiorNode)
|
||||
|
||||
expect(app.nodeOutputs[interiorLocator]).toBeUndefined()
|
||||
})
|
||||
|
||||
it('does not affect outputs for other nodes that remain in the graph', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const removed = new LGraphNode('LoadImage')
|
||||
const kept = new LGraphNode('LoadImage')
|
||||
graph.add(removed)
|
||||
graph.add(kept)
|
||||
|
||||
const removedLocator = String(removed.id)
|
||||
const keptLocator = String(kept.id)
|
||||
seedOutputForLocator(removedLocator)
|
||||
seedOutputForLocator(keptLocator)
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
graph.remove(removed)
|
||||
|
||||
expect(app.nodeOutputs[removedLocator]).toBeUndefined()
|
||||
expect(app.nodeOutputs[keptLocator]).toBeDefined()
|
||||
})
|
||||
|
||||
it('chains with existing onNodeRemoved callbacks', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
let calledWith: LGraphNode | undefined
|
||||
graph.onNodeRemoved = (node) => {
|
||||
calledWith = node
|
||||
}
|
||||
|
||||
const node = new LGraphNode('LoadImage')
|
||||
graph.add(node)
|
||||
seedOutputForLocator(String(node.id))
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
graph.remove(node)
|
||||
|
||||
expect(calledWith).toBe(node)
|
||||
expect(app.nodeOutputs[String(node.id)]).toBeUndefined()
|
||||
})
|
||||
|
||||
it('restores original onNodeRemoved when cleanup is called', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const original = () => undefined
|
||||
graph.onNodeRemoved = original
|
||||
|
||||
const cleanup = installNodeOutputClearingHooks(graph)
|
||||
expect(graph.onNodeRemoved).not.toBe(original)
|
||||
|
||||
cleanup()
|
||||
expect(graph.onNodeRemoved).toBe(original)
|
||||
})
|
||||
|
||||
it('clears interior node outputs when a subgraph container is removed from the root graph', () => {
|
||||
const subgraph = createTestSubgraph()
|
||||
const interiorNode = new LGraphNode('LoadImage')
|
||||
subgraph.add(interiorNode)
|
||||
|
||||
const subgraphNode = createTestSubgraphNode(subgraph, { id: 65 })
|
||||
const rootGraph = subgraphNode.graph as LGraph
|
||||
rootGraph.add(subgraphNode)
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(rootGraph)
|
||||
|
||||
const subgraphNodeLocator = String(subgraphNode.id)
|
||||
const interiorLocator = `${subgraph.id}:${interiorNode.id}`
|
||||
seedOutputForLocator(subgraphNodeLocator)
|
||||
seedOutputForLocator(interiorLocator)
|
||||
|
||||
installNodeOutputClearingHooks(rootGraph)
|
||||
rootGraph.remove(subgraphNode)
|
||||
|
||||
expect(app.nodeOutputs[subgraphNodeLocator]).toBeUndefined()
|
||||
expect(app.nodeOutputs[interiorLocator]).toBeUndefined()
|
||||
})
|
||||
|
||||
it('also prunes the active workflow change tracker output cache so undo cannot resurrect the entry', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const node = new LGraphNode('LoadImage')
|
||||
graph.add(node)
|
||||
const locator = String(node.id)
|
||||
seedOutputForLocator(locator)
|
||||
|
||||
const trackerCache: Record<string, unknown> = {
|
||||
[locator]: { images: [{ filename: 'preview.png' }] }
|
||||
}
|
||||
vi.spyOn(useWorkflowStore(), 'activeWorkflow', 'get').mockReturnValue({
|
||||
changeTracker: { nodeOutputs: trackerCache }
|
||||
} as never)
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
graph.remove(node)
|
||||
|
||||
expect(app.nodeOutputs[locator]).toBeUndefined()
|
||||
expect(trackerCache[locator]).toBeUndefined()
|
||||
})
|
||||
|
||||
it('preserves the tracker cache during workflow tab switch teardown', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const node = new LGraphNode('LoadImage')
|
||||
graph.add(node)
|
||||
const locator = String(node.id)
|
||||
seedOutputForLocator(locator)
|
||||
|
||||
const trackerCache: Record<string, unknown> = {
|
||||
[locator]: { images: [{ filename: 'preview.png' }] }
|
||||
}
|
||||
vi.spyOn(useWorkflowStore(), 'activeWorkflow', 'get').mockReturnValue({
|
||||
changeTracker: { nodeOutputs: trackerCache, _restoringState: false }
|
||||
} as never)
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
ChangeTracker.isLoadingGraph = true
|
||||
try {
|
||||
graph.remove(node)
|
||||
} finally {
|
||||
ChangeTracker.isLoadingGraph = false
|
||||
}
|
||||
|
||||
expect(trackerCache[locator]).toBeDefined()
|
||||
})
|
||||
|
||||
it('does not throw when the removal hook fires for an already-cleared node', () => {
|
||||
const graph = new LGraph()
|
||||
vi.spyOn(app, 'rootGraph', 'get').mockReturnValue(graph)
|
||||
|
||||
const node = new LGraphNode('LoadImage')
|
||||
graph.add(node)
|
||||
const locator = String(node.id)
|
||||
seedOutputForLocator(locator)
|
||||
|
||||
installNodeOutputClearingHooks(graph)
|
||||
graph.remove(node)
|
||||
expect(() => graph.onNodeRemoved?.(node)).not.toThrow()
|
||||
expect(app.nodeOutputs[locator]).toBeUndefined()
|
||||
})
|
||||
})
|
||||
@@ -1,68 +0,0 @@
|
||||
import type { LGraph, LGraphNode } from '@/lib/litegraph/src/litegraph'
|
||||
import type { Subgraph } from '@/lib/litegraph/src/subgraph/Subgraph'
|
||||
import type { SubgraphNode } from '@/lib/litegraph/src/subgraph/SubgraphNode'
|
||||
import { useWorkflowStore } from '@/platform/workflow/management/stores/workflowStore'
|
||||
import { app } from '@/scripts/app'
|
||||
import { ChangeTracker } from '@/scripts/changeTracker'
|
||||
import { useNodeOutputStore } from '@/stores/nodeOutputStore'
|
||||
import { getExecutionIdForNodeInGraph } from '@/utils/graphTraversalUtil'
|
||||
import { isSubgraph } from '@/utils/typeGuardUtil'
|
||||
|
||||
function isTabSwitchTeardown(): boolean {
|
||||
const tracker = useWorkflowStore().activeWorkflow?.changeTracker
|
||||
return ChangeTracker.isLoadingGraph && !tracker?._restoringState
|
||||
}
|
||||
|
||||
function dropTrackerCacheEntry(execId: string) {
|
||||
if (isTabSwitchTeardown()) return
|
||||
const tracked = useWorkflowStore().activeWorkflow?.changeTracker?.nodeOutputs
|
||||
if (tracked) delete tracked[execId]
|
||||
}
|
||||
|
||||
function clearInteriorOutputs(
|
||||
subgraphNode: SubgraphNode,
|
||||
execIdPrefix: string
|
||||
) {
|
||||
const subgraph: Subgraph | undefined = subgraphNode.subgraph
|
||||
if (!subgraph) return
|
||||
|
||||
const store = useNodeOutputStore()
|
||||
for (const interior of subgraph.nodes) {
|
||||
store.removeOutputsByLocatorId(`${subgraph.id}:${interior.id}`)
|
||||
const interiorExecId = `${execIdPrefix}:${interior.id}`
|
||||
dropTrackerCacheEntry(interiorExecId)
|
||||
if (interior.isSubgraphNode()) {
|
||||
clearInteriorOutputs(interior, interiorExecId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function installNodeOutputClearingHooks(graph: LGraph): () => void {
|
||||
const originalOnNodeRemoved = graph.onNodeRemoved
|
||||
|
||||
graph.onNodeRemoved = function (node: LGraphNode) {
|
||||
try {
|
||||
const store = useNodeOutputStore()
|
||||
const { nodeIdToNodeLocatorId } = useWorkflowStore()
|
||||
const locatorId = isSubgraph(graph)
|
||||
? nodeIdToNodeLocatorId(node.id, graph)
|
||||
: String(node.id)
|
||||
store.removeOutputsByLocatorId(locatorId)
|
||||
|
||||
const execId = app.rootGraph
|
||||
? getExecutionIdForNodeInGraph(app.rootGraph, graph, node.id)
|
||||
: String(node.id)
|
||||
dropTrackerCacheEntry(execId)
|
||||
|
||||
if (node.isSubgraphNode()) {
|
||||
clearInteriorOutputs(node, execId)
|
||||
}
|
||||
} finally {
|
||||
originalOnNodeRemoved?.call(this, node)
|
||||
}
|
||||
}
|
||||
|
||||
return () => {
|
||||
graph.onNodeRemoved = originalOnNodeRemoved || undefined
|
||||
}
|
||||
}
|
||||
@@ -63,13 +63,13 @@ describe('useQueuePolling', () => {
|
||||
expect(store.update).toHaveBeenCalledOnce()
|
||||
})
|
||||
|
||||
it('does not poll when activeJobsCount > 1', async () => {
|
||||
it('polls when activeJobsCount > 1', async () => {
|
||||
mountUseQueuePolling()
|
||||
|
||||
store.activeJobsCount = 2
|
||||
await vi.advanceTimersByTimeAsync(16_000)
|
||||
await vi.advanceTimersByTimeAsync(8_000)
|
||||
|
||||
expect(store.update).not.toHaveBeenCalled()
|
||||
expect(store.update).toHaveBeenCalledOnce()
|
||||
})
|
||||
|
||||
it('stops polling when activeJobsCount drops to 0', async () => {
|
||||
|
||||
@@ -13,7 +13,7 @@ export function useQueuePolling() {
|
||||
|
||||
const { start, stop } = useTimeoutFn(
|
||||
() => {
|
||||
if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return
|
||||
if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return
|
||||
delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS)
|
||||
void queueStore.update()
|
||||
},
|
||||
@@ -22,7 +22,7 @@ export function useQueuePolling() {
|
||||
)
|
||||
|
||||
function scheduleNextPoll() {
|
||||
if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start()
|
||||
if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start()
|
||||
else stop()
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
|
||||
const zNodeType = z.string()
|
||||
const zJobId = z.string()
|
||||
export type JobId = z.infer<typeof zJobId>
|
||||
const zWorkflowId = z.string()
|
||||
export const resultItemType = z.enum(['input', 'output', 'temp'])
|
||||
export type ResultItemType = z.infer<typeof resultItemType>
|
||||
|
||||
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
|
||||
value: z.number().int(),
|
||||
max: z.number().int(),
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
node: zNodeId
|
||||
})
|
||||
|
||||
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
|
||||
state: z.enum(['pending', 'running', 'finished', 'error']),
|
||||
node_id: zNodeId,
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
display_node_id: zNodeId.optional(),
|
||||
parent_node_id: zNodeId.optional(),
|
||||
real_node_id: zNodeId.optional()
|
||||
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
|
||||
|
||||
const zProgressStateWsMessage = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
nodes: z.record(zNodeId, zNodeProgressState)
|
||||
})
|
||||
|
||||
const zExecutingWsMessage = z.object({
|
||||
node: zNodeId,
|
||||
display_node: zNodeId,
|
||||
prompt_id: zJobId
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
|
||||
const zExecutionWsMessageBase = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
timestamp: z.number().int()
|
||||
})
|
||||
|
||||
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
|
||||
const zProgressTextWsMessage = z.object({
|
||||
nodeId: zNodeId,
|
||||
text: z.string(),
|
||||
prompt_id: z.string().optional()
|
||||
prompt_id: z.string().optional(),
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zNotificationWsMessage = z.object({
|
||||
|
||||
@@ -26,6 +26,9 @@ vi.mock('@/platform/settings/settingStore', () => ({
|
||||
useSettingStore: () => mockSettingStore
|
||||
}))
|
||||
|
||||
//@ts-expect-error Define global for the test
|
||||
global.__COMFYUI_FRONTEND_VERSION__ = '1.24.0'
|
||||
|
||||
import { useNewUserService } from '@/services/useNewUserService'
|
||||
|
||||
describe('useNewUserService', () => {
|
||||
@@ -117,73 +120,6 @@ describe('useNewUserService', () => {
|
||||
expect(service.isNewUser()).toBe(false)
|
||||
})
|
||||
|
||||
it('should identify existing user when V1 draft store keys exist', async () => {
|
||||
mockSettingStore.settingValues = {}
|
||||
mockSettingStore.get.mockReturnValue(undefined)
|
||||
mockLocalStorage.getItem.mockImplementation((key: string) => {
|
||||
if (key === 'Comfy.Workflow.Drafts') return '{}'
|
||||
return null
|
||||
})
|
||||
|
||||
await service.initializeIfNewUser()
|
||||
|
||||
expect(service.isNewUser()).toBe(false)
|
||||
})
|
||||
|
||||
it('should identify existing user when V1 draft order key exists', async () => {
|
||||
mockSettingStore.settingValues = {}
|
||||
mockSettingStore.get.mockReturnValue(undefined)
|
||||
mockLocalStorage.getItem.mockImplementation((key: string) => {
|
||||
if (key === 'Comfy.Workflow.DraftOrder') return '[]'
|
||||
return null
|
||||
})
|
||||
|
||||
await service.initializeIfNewUser()
|
||||
|
||||
expect(service.isNewUser()).toBe(false)
|
||||
})
|
||||
|
||||
it('should identify existing user when V2 draft index has entries', async () => {
|
||||
mockSettingStore.settingValues = {}
|
||||
mockSettingStore.get.mockReturnValue(undefined)
|
||||
mockLocalStorage.getItem.mockImplementation((key: string) => {
|
||||
if (key === 'Comfy.Workflow.DraftIndex.v2:personal')
|
||||
return '{"v":2,"updatedAt":1,"order":["abc"],"entries":{"abc":{"path":"workflows/Untitled.json","name":"Untitled","isTemporary":true,"updatedAt":1}}}'
|
||||
return null
|
||||
})
|
||||
|
||||
await service.initializeIfNewUser()
|
||||
|
||||
expect(service.isNewUser()).toBe(false)
|
||||
})
|
||||
|
||||
it('should identify new user when V2 draft index exists but is empty', async () => {
|
||||
mockSettingStore.settingValues = {}
|
||||
mockSettingStore.get.mockReturnValue(undefined)
|
||||
mockLocalStorage.getItem.mockImplementation((key: string) => {
|
||||
if (key === 'Comfy.Workflow.DraftIndex.v2:personal')
|
||||
return '{"v":2,"updatedAt":1,"order":[],"entries":{}}'
|
||||
return null
|
||||
})
|
||||
|
||||
await service.initializeIfNewUser()
|
||||
|
||||
expect(service.isNewUser()).toBe(true)
|
||||
})
|
||||
|
||||
it('should identify new user when V2 draft index is malformed', async () => {
|
||||
mockSettingStore.settingValues = {}
|
||||
mockSettingStore.get.mockReturnValue(undefined)
|
||||
mockLocalStorage.getItem.mockImplementation((key: string) => {
|
||||
if (key === 'Comfy.Workflow.DraftIndex.v2:personal') return 'not json'
|
||||
return null
|
||||
})
|
||||
|
||||
await service.initializeIfNewUser()
|
||||
|
||||
expect(service.isNewUser()).toBe(true)
|
||||
})
|
||||
|
||||
it('should identify new user when tutorial is explicitly false', async () => {
|
||||
mockSettingStore.settingValues = { 'Comfy.TutorialCompleted': false }
|
||||
mockSettingStore.get.mockImplementation((key: string) => {
|
||||
|
||||
@@ -2,24 +2,6 @@ import { ref, shallowRef } from 'vue'
|
||||
import { createSharedComposable } from '@vueuse/core'
|
||||
import { useSettingStore } from '@/platform/settings/settingStore'
|
||||
|
||||
function hasV2DraftHistory(raw: string | null): boolean {
|
||||
if (!raw) return false
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as {
|
||||
order?: unknown
|
||||
entries?: unknown
|
||||
}
|
||||
const orderLength = Array.isArray(parsed.order) ? parsed.order.length : 0
|
||||
const entriesCount =
|
||||
parsed.entries && typeof parsed.entries === 'object'
|
||||
? Object.keys(parsed.entries as Record<string, unknown>).length
|
||||
: 0
|
||||
return orderLength > 0 || entriesCount > 0
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function _useNewUserService() {
|
||||
const settingStore = useSettingStore()
|
||||
const pendingCallbacks = shallowRef<Array<() => Promise<void>>>([])
|
||||
@@ -36,32 +18,12 @@ function _useNewUserService() {
|
||||
const isNewUserSettings =
|
||||
Object.keys(settingStore.settingValues).length === 0 ||
|
||||
!settingStore.get('Comfy.TutorialCompleted')
|
||||
|
||||
// Legacy keys (pre-V1 and V1 persistence)
|
||||
const hasNoLegacyWorkflow =
|
||||
!localStorage.getItem('workflow') &&
|
||||
!localStorage.getItem('Comfy.PreviousWorkflow')
|
||||
|
||||
// V1 draft store keys
|
||||
const hasNoV1Drafts =
|
||||
!localStorage.getItem('Comfy.Workflow.Drafts') &&
|
||||
!localStorage.getItem('Comfy.Workflow.DraftOrder')
|
||||
|
||||
// V2 draft index key (scoped to personal workspace; cloud workspace id
|
||||
// comes from sessionStorage which may not be set yet at this point).
|
||||
// Check for actual draft history rather than key existence: an empty
|
||||
// index is written by `migrateV1toV2()` for genuine new users during
|
||||
// startup, so key presence alone is not evidence of prior usage.
|
||||
const hasNoV2DraftIndex = !hasV2DraftHistory(
|
||||
localStorage.getItem('Comfy.Workflow.DraftIndex.v2:personal')
|
||||
const hasNoWorkflow = !localStorage.getItem('workflow')
|
||||
const hasNoPreviousWorkflow = !localStorage.getItem(
|
||||
'Comfy.PreviousWorkflow'
|
||||
)
|
||||
|
||||
return (
|
||||
isNewUserSettings &&
|
||||
hasNoLegacyWorkflow &&
|
||||
hasNoV1Drafts &&
|
||||
hasNoV2DraftIndex
|
||||
)
|
||||
return isNewUserSettings && hasNoWorkflow && hasNoPreviousWorkflow
|
||||
}
|
||||
|
||||
async function registerInitCallback(callback: () => Promise<void>) {
|
||||
|
||||
@@ -11,12 +11,22 @@ const {
|
||||
mockNodeExecutionIdToNodeLocatorId,
|
||||
mockNodeIdToNodeLocatorId,
|
||||
mockNodeLocatorIdToNodeExecutionId,
|
||||
mockShowTextPreview
|
||||
mockShowTextPreview,
|
||||
mockActiveWorkflow,
|
||||
mockRevokePreviewsByExecutionId
|
||||
} = vi.hoisted(() => ({
|
||||
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
|
||||
mockShowTextPreview: vi.fn()
|
||||
mockShowTextPreview: vi.fn(),
|
||||
mockActiveWorkflow: {
|
||||
current: null as null | {
|
||||
activeState?: { id?: string }
|
||||
initialState?: { id?: string }
|
||||
path?: string
|
||||
}
|
||||
},
|
||||
mockRevokePreviewsByExecutionId: vi.fn()
|
||||
}))
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
|
||||
useWorkflowStore: vi.fn(() => ({
|
||||
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
|
||||
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
|
||||
get activeWorkflow() {
|
||||
return mockActiveWorkflow.current
|
||||
}
|
||||
}))
|
||||
}
|
||||
})
|
||||
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
vi.mock('@/stores/nodeOutputStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
|
||||
})
|
||||
}))
|
||||
|
||||
@@ -440,6 +453,599 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - active workflow gating of progress mirror', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('updates per-job progress regardless of active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
|
||||
})
|
||||
|
||||
it('skips preview revocation for non-active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('revokes previews for active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
|
||||
})
|
||||
|
||||
it('skips global mirror when message workflow_id mismatches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates global mirror when message workflow_id matches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('falls back to jobIdToWorkflowId mapping when message has no workflow_id', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when message has no workflow_id and no id mapping', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates mirror when no resolution is available (preserves single-tab behaviour)', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(
|
||||
makeProgressNodes('1', 'job-unknown')
|
||||
)
|
||||
})
|
||||
|
||||
it('updates mirror when there is no active workflow', () => {
|
||||
mockActiveWorkflow.current = null
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('skips _executingNodeProgress on workflow_id mismatch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
prompt_id: 'job-other',
|
||||
node: '1',
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('updates _executingNodeProgress on workflow_id match', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store._executingNodeProgress).toEqual({
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileMirrorForActiveWorkflow', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('rebuilds the mirror from the active workflow job on tab switch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-b' },
|
||||
path: '/wf-b.json'
|
||||
}
|
||||
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
|
||||
store.registerJobWorkflowIdMapping('job-b', 'wf-b')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-b'))
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
})
|
||||
|
||||
it('clears the mirror when the active workflow has no matching job', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-empty' },
|
||||
path: '/wf-empty.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('clears _executingNodeProgress that belonged to a different job', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
const progressHandler = apiEventHandlers.get('progress')
|
||||
if (!progressHandler) throw new Error('progress handler not bound')
|
||||
progressHandler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
prompt_id: 'job-a',
|
||||
node: '1',
|
||||
workflow_id: 'wf-a'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-empty' },
|
||||
path: '/wf-empty.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when workflow id is not registered', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.ensureSessionWorkflowPath('job-a', '/wf-a.json')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-b' },
|
||||
path: '/wf-b.json'
|
||||
}
|
||||
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
|
||||
store.ensureSessionWorkflowPath('job-b', '/wf-b.json')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileTerminalJobs', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionStart(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('evicts a non-active terminal job without disturbing the active job', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
})
|
||||
|
||||
it('evicts an active terminal job and clears global mirror', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
expect(Object.keys(store.nodeProgressStates)).toHaveLength(1)
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('clears stale global mirror when its owner job becomes terminal', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates['1']?.prompt_id).toBe('job-old')
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
})
|
||||
|
||||
it('skips jobs that are still active even if also in terminal set', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-1']), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('skips jobs absent from the terminal set', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set())
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('is idempotent for an already-cleared job', () => {
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBeNull()
|
||||
})
|
||||
|
||||
it('evicts initializing-only jobs that landed in history without progress events', () => {
|
||||
store.initializingJobIds = new Set(['job-init'])
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-init']))
|
||||
|
||||
expect(store.initializingJobIds.has('job-init')).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - terminal WS handlers do not clobber active job', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionStart(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionSuccess(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_success')
|
||||
if (!handler) throw new Error('execution_success handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_success', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionInterrupted(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_interrupted')
|
||||
if (!handler) throw new Error('execution_interrupted handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_interrupted', {
|
||||
detail: { prompt_id: jobId, node_id: '1', node_type: 'X', executed: [] }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('execution_success for a non-active job does not clobber the active job mirror', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
|
||||
fireExecutionSuccess('job-old')
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
})
|
||||
|
||||
it('execution_interrupted for a non-active job does not clobber the active job', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
fireExecutionInterrupted('job-old')
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
})
|
||||
|
||||
it('execution_success for the active job clears the global mirror and activeJobId', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
fireExecutionSuccess('job-1')
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - clearActiveJobIfStale', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
@@ -498,6 +1104,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
nodeId: string
|
||||
text: string
|
||||
prompt_id?: string
|
||||
workflow_id?: string
|
||||
}) {
|
||||
const handler = apiEventHandlers.get('progress_text')
|
||||
if (!handler) throw new Error('progress_text handler not bound')
|
||||
@@ -507,6 +1114,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -539,6 +1147,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
|
||||
it('should skip progress_text whose workflow_id mismatches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-other',
|
||||
workflow_id: 'wf-other'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should call showTextPreview when workflow_id matches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionErrorStore - Node Error Lookups', () => {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { defineStore } from 'pinia'
|
||||
import { computed, ref, shallowRef } from 'vue'
|
||||
import { computed, ref, shallowRef, watch } from 'vue'
|
||||
|
||||
import { useNodeProgressText } from '@/composables/node/useNodeProgressText'
|
||||
import { isCloud } from '@/platform/distribution/types'
|
||||
@@ -273,6 +273,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
) {
|
||||
const jobId = e.detail.prompt_id
|
||||
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
@@ -288,6 +292,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
})
|
||||
}
|
||||
const jobId = e.detail.prompt_id
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
@@ -335,43 +343,146 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
|
||||
jobId,
|
||||
messageWorkflowId
|
||||
)
|
||||
|
||||
// Revoke previews for nodes that are starting to execute
|
||||
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
// This node just started executing, revoke its previews
|
||||
// Note that we're doing the *actual* node id instead of the display node id
|
||||
// here intentionally. That way, we don't clear the preview every time a new node
|
||||
// within an expanded graph starts executing.
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
if (isActiveWorkflowMessage) {
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
nodeProgressStatesByJob.value = {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
if (isActiveWorkflowMessage) {
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a WebSocket execution message belongs to the
|
||||
* currently active workflow tab. Used to gate writes to the global
|
||||
* "current execution" mirror so a job initiated from another open
|
||||
* workflow cannot leak its progress into the active one.
|
||||
*
|
||||
* Resolution order:
|
||||
* 1. `workflow_id` carried on the WS message (when backend supports it).
|
||||
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
|
||||
* from this tab.
|
||||
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
|
||||
*
|
||||
* When the workflow cannot be resolved at all (e.g. job queued in a
|
||||
* different browser session), the message is treated as belonging to
|
||||
* the active workflow to preserve current behaviour for the existing
|
||||
* single-tab common case.
|
||||
*/
|
||||
function messageMatchesActiveWorkflow(
|
||||
jobId: JobId,
|
||||
messageWorkflowId: string | undefined
|
||||
): boolean {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return true
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
|
||||
if (messageWorkflowId && activeId) {
|
||||
return messageWorkflowId === activeId
|
||||
}
|
||||
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
if (mappedId && activeId) return mappedId === activeId
|
||||
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
if (mappedPath && activeWorkflow.path) {
|
||||
return mappedPath === activeWorkflow.path
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuilds the global progress mirror to match the currently active
|
||||
* workflow tab. Called when the user switches tabs so stale progress
|
||||
* from the previously active workflow does not bleed into the new one.
|
||||
*
|
||||
* Picks the most recent job whose mapping resolves to the active
|
||||
* workflow and replays its `nodeProgressStatesByJob` entry into the
|
||||
* mirror; clears the mirror entirely when no such job exists.
|
||||
*/
|
||||
function reconcileMirrorForActiveWorkflow() {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
const activePath = activeWorkflow.path ?? null
|
||||
|
||||
const jobIds = Object.keys(nodeProgressStatesByJob.value)
|
||||
let matchedJobId: JobId | null = null
|
||||
for (let i = jobIds.length - 1; i >= 0; i--) {
|
||||
const jobId = jobIds[i]
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
const idMatch = activeId !== null && mappedId === activeId
|
||||
const pathMatch = activePath !== null && mappedPath === activePath
|
||||
if (idMatch || pathMatch) {
|
||||
matchedJobId = jobId
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (matchedJobId) {
|
||||
const nodes = nodeProgressStatesByJob.value[matchedJobId] ?? {}
|
||||
nodeProgressStates.value = nodes
|
||||
executionIdToLocatorCache.clear()
|
||||
if (
|
||||
_executingNodeProgress.value &&
|
||||
_executingNodeProgress.value.prompt_id !== matchedJobId
|
||||
) {
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
} else {
|
||||
if (Object.keys(nodeProgressStates.value).length > 0) {
|
||||
nodeProgressStates.value = {}
|
||||
executionIdToLocatorCache.clear()
|
||||
}
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
}
|
||||
|
||||
watch(
|
||||
() => workflowStore.activeWorkflow,
|
||||
() => {
|
||||
reconcileMirrorForActiveWorkflow()
|
||||
}
|
||||
)
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
|
||||
_executingNodeProgress.value = e.detail
|
||||
}
|
||||
|
||||
@@ -384,6 +495,20 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Routes a terminal cleanup to the correct primitive: `evictTerminalJob`
|
||||
* for non-active jobs (safe for any jobId, never clobbers another running
|
||||
* job's mirror) and `resetExecutionState` for the active job (clears the
|
||||
* global mirror that the active job owns).
|
||||
*/
|
||||
function terminateJob(jobId: JobId) {
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecutionError(e: CustomEvent<ExecutionErrorWsMessage>) {
|
||||
if (isCloud) {
|
||||
useTelemetry()?.trackExecutionError({
|
||||
@@ -404,7 +529,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
// OSS path / Cloud fallback (real runtime errors)
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
clearInitializationByJobId(e.detail.prompt_id)
|
||||
resetExecutionState(e.detail.prompt_id)
|
||||
terminateJob(e.detail.prompt_id)
|
||||
}
|
||||
|
||||
function handleServiceLevelError(detail: ExecutionErrorWsMessage): boolean {
|
||||
@@ -413,7 +538,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
resetExecutionState(detail.prompt_id)
|
||||
terminateJob(detail.prompt_id)
|
||||
executionErrorStore.lastPromptError = {
|
||||
type: detail.exception_type ?? 'error',
|
||||
message: detail.exception_type
|
||||
@@ -431,7 +556,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
if (!result) return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
resetExecutionState(detail.prompt_id)
|
||||
terminateJob(detail.prompt_id)
|
||||
|
||||
if (result.kind === 'nodeErrors') {
|
||||
executionErrorStore.lastNodeErrors = result.nodeErrors
|
||||
@@ -485,6 +610,103 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobIds(orphaned)
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely evict per-job execution artifacts for a job that has reached a
|
||||
* terminal state, without disturbing state belonging to a different
|
||||
* currently-running job.
|
||||
*
|
||||
* Unlike {@link resetExecutionState}, this is safe to call for any jobId,
|
||||
* including jobs that are not the {@link activeJobId}. It is the polling
|
||||
* fallback for the case where a WebSocket terminal message
|
||||
* (`execution_success` / `execution_error` / `execution_interrupted`) is
|
||||
* dropped and per-job UI state would otherwise remain stuck.
|
||||
*
|
||||
* Behaviour:
|
||||
* - Always removes the job's per-job entries
|
||||
* ({@link nodeProgressStatesByJob}, {@link queuedJobs}, preview).
|
||||
* - Clears the global "current execution" mirror
|
||||
* ({@link nodeProgressStates}, {@link _executingNodeProgress},
|
||||
* {@link activeJobId}) only when those still belong to the evicted job.
|
||||
* - Idempotent: calling for an already-cleared job is a no-op.
|
||||
*/
|
||||
function evictTerminalJob(jobId: JobId) {
|
||||
if (!jobId) return
|
||||
|
||||
const hadProgress = jobId in nodeProgressStatesByJob.value
|
||||
if (hadProgress) {
|
||||
const map = { ...nodeProgressStatesByJob.value }
|
||||
delete map[jobId]
|
||||
nodeProgressStatesByJob.value = map
|
||||
}
|
||||
|
||||
if (jobId in queuedJobs.value) {
|
||||
const next = { ...queuedJobs.value }
|
||||
delete next[jobId]
|
||||
queuedJobs.value = next
|
||||
}
|
||||
|
||||
useJobPreviewStore().clearPreview(jobId)
|
||||
clearInitializationByJobId(jobId)
|
||||
|
||||
const isActive = activeJobId.value === jobId
|
||||
const mirrorBelongsToEvicted = mirrorOwnerJobId() === jobId
|
||||
|
||||
if (isActive || mirrorBelongsToEvicted) {
|
||||
nodeProgressStates.value = {}
|
||||
executionIdToLocatorCache.clear()
|
||||
}
|
||||
|
||||
if (
|
||||
_executingNodeProgress.value &&
|
||||
_executingNodeProgress.value.prompt_id === jobId
|
||||
) {
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
|
||||
if (isActive) {
|
||||
activeJobId.value = null
|
||||
executionErrorStore.clearPromptError()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the prompt_id that the global {@link nodeProgressStates} mirror
|
||||
* currently belongs to, or `null` when the mirror is empty.
|
||||
*
|
||||
* The mirror is replaced wholesale on every `progress_state` message, so
|
||||
* all entries within it always share a single prompt_id; reading the
|
||||
* first entry is sufficient.
|
||||
*/
|
||||
function mirrorOwnerJobId(): JobId | null {
|
||||
const first = Object.values(nodeProgressStates.value)[0]
|
||||
return first?.prompt_id ?? null
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile per-job progress state against the authoritative job sets from
|
||||
* the backend (running/pending vs. terminal). Used by the queue polling
|
||||
* path to recover from dropped WebSocket terminal messages.
|
||||
*
|
||||
* @param activeJobIds Jobs currently in Running or Pending on the backend.
|
||||
* @param terminalJobIds Jobs in History (completed/failed/cancelled).
|
||||
*/
|
||||
function reconcileTerminalJobs(
|
||||
activeJobIds: Set<JobId>,
|
||||
terminalJobIds: Set<JobId>
|
||||
) {
|
||||
const tracked = new Set<JobId>([
|
||||
...Object.keys(nodeProgressStatesByJob.value),
|
||||
...initializingJobIds.value
|
||||
])
|
||||
if (activeJobId.value) tracked.add(activeJobId.value)
|
||||
|
||||
for (const jobId of tracked) {
|
||||
if (activeJobIds.has(jobId)) continue
|
||||
if (!terminalJobIds.has(jobId)) continue
|
||||
evictTerminalJob(jobId)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the active job if the server's queue snapshot doesn't list it.
|
||||
* Used after WS reconnect to recover from stale state when a job finished
|
||||
@@ -529,14 +751,27 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
|
||||
const { nodeId, text, prompt_id } = e.detail
|
||||
const { nodeId, text, prompt_id, workflow_id } = e.detail
|
||||
if (!text || !nodeId) return
|
||||
|
||||
// Filter: only accept progress for the active prompt
|
||||
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
|
||||
return
|
||||
// Prefer the workflow-ownership gate when we have any signal that lets
|
||||
// us resolve it (workflow_id on the message, or a registered mapping).
|
||||
// Only fall back to the legacy active-prompt guard when ownership is
|
||||
// unresolvable, otherwise activeJobId pointing at a different workflow's
|
||||
// job would incorrectly drop messages for the visible workflow.
|
||||
if (prompt_id) {
|
||||
const canResolveWorkflow =
|
||||
Boolean(workflow_id) ||
|
||||
jobIdToWorkflowId.value.has(prompt_id) ||
|
||||
jobIdToSessionWorkflowPath.value.has(prompt_id)
|
||||
|
||||
if (canResolveWorkflow) {
|
||||
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
|
||||
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle execution node IDs for subgraphs
|
||||
const currentId = getNodeIdIfExecuting(nodeId)
|
||||
if (!currentId) return
|
||||
const node = canvasStore.canvas?.graph?.getNodeById(currentId)
|
||||
@@ -653,6 +888,8 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobId,
|
||||
clearInitializationByJobIds,
|
||||
reconcileInitializingJobs,
|
||||
reconcileTerminalJobs,
|
||||
reconcileMirrorForActiveWorkflow,
|
||||
clearActiveJobIfStale,
|
||||
bindExecutionEvents,
|
||||
unbindExecutionEvents,
|
||||
|
||||
@@ -499,7 +499,6 @@ export const useNodeOutputStore = defineStore('nodeOutput', () => {
|
||||
revokeSubgraphPreviews,
|
||||
removeNodeOutputs,
|
||||
removeNodeOutputsForNode,
|
||||
removeOutputsByLocatorId,
|
||||
snapshotOutputs,
|
||||
restoreOutputs,
|
||||
resetAllOutputsAndPreviews,
|
||||
|
||||
@@ -677,6 +677,34 @@ describe('useQueueStore', () => {
|
||||
// Should preserve array identity when history is unchanged
|
||||
expect(store.historyTasks).toBe(initialHistoryTasks)
|
||||
})
|
||||
|
||||
it('should reconcile terminal jobs when queue is empty but history is not', async () => {
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
|
||||
const finishedJob = createHistoryJob(10, 'finished-job')
|
||||
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([finishedJob])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).toHaveBeenCalledTimes(1)
|
||||
const [activeIds, terminalIds] = reconcileSpy.mock.calls[0]
|
||||
expect(activeIds.size).toBe(0)
|
||||
expect(terminalIds.has('finished-job')).toBe(true)
|
||||
})
|
||||
|
||||
it('should not reconcile terminal jobs when history is empty', async () => {
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
|
||||
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('update() - maxHistoryItems limit', () => {
|
||||
|
||||
@@ -530,6 +530,9 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
api.getHistory(maxHistoryItems.value)
|
||||
])
|
||||
|
||||
// Track activeJobIds for reconciliation across both blocks
|
||||
let activeJobIds: Set<string> | null = null
|
||||
|
||||
if (queueResult.status === 'fulfilled') {
|
||||
const queue = queueResult.value
|
||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||
@@ -546,10 +549,11 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
}
|
||||
})
|
||||
|
||||
const activeJobIds = new Set([
|
||||
activeJobIds = new Set([
|
||||
...queue.Running.map((j) => j.id),
|
||||
...queue.Pending.map((j) => j.id)
|
||||
])
|
||||
|
||||
executionStore.reconcileInitializingJobs(activeJobIds)
|
||||
} else {
|
||||
console.error('Failed to fetch queue:', queueResult.reason)
|
||||
@@ -558,6 +562,16 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
if (historyResult.status === 'fulfilled') {
|
||||
const history = historyResult.value
|
||||
const currentHistory = toValue(historyTasks)
|
||||
const executionStore = useExecutionStore()
|
||||
|
||||
// Reconcile terminal jobs whenever history is non-empty. The last
|
||||
// active job finishing legitimately produces empty Running/Pending,
|
||||
// and terminal eviction is the only path that clears stuck node
|
||||
// progress when WebSocket terminal messages are dropped.
|
||||
if (history.length > 0 && activeJobIds) {
|
||||
const terminalJobIds = new Set(history.map((j) => j.id))
|
||||
executionStore.reconcileTerminalJobs(activeJobIds, terminalJobIds)
|
||||
}
|
||||
|
||||
// Sort by create_time descending and limit to maxItems
|
||||
const sortedHistory = [...history]
|
||||
|
||||
Reference in New Issue
Block a user