Compare commits

...

4 Commits

Author SHA1 Message Date
Glary-Bot
a67e3dbce9 test: tighten active-workflow gating coverage
Address two coverage nitpicks from review:

- The 'executing clears _executingNodeProgress' test in the
  WebSocket-event-handlers suite did not reset mockActiveWorkflow.current
  in beforeEach and did not pass workflow_id, so it relied on the
  hoisted mock leaking from earlier suites and only ever exercised the
  unresolvable-ownership fallback. Reset the mock per-test, set an
  active workflow explicitly with workflow_id: 'wf-active', and add a
  separate test for the legacy unresolvable-ownership fallback.

- The 'execution_error from a non-active workflow' test only proved the
  active job's state was untouched. It now also asserts the errored
  job's initializing flag is cleared (the per-job bookkeeping that runs
  unconditionally) and that executionErrorStore.lastExecutionError stays
  null (proving the global gate held).
2026-05-05 07:09:25 +00:00
Glary-Bot
7e75921fb5 fix: gate execution_cached, executed, execution_error and revoke transitions
Three handlers and one revocation rule were missed in the first gating
pass and could still let a non-active workflow's events bleed into the
active workflow's UI:

- handleExecutionCached and handleExecuted mutated activeJob.value.nodes
  unconditionally, skewing executionProgress / nodesExecuted on the
  visible workflow when a background workflow emitted those events.
- handleExecutionError, handleServiceLevelError and handleCloudValidationError
  called resetExecutionState and wrote executionErrorStore.* unconditionally,
  meaning a background error wiped activeJobId and node progress for the
  visible workflow. Initialization clearing for the errored job still
  runs in every case.
- handleProgressState revoked previews only when a node was first seen
  (!previousForJob[nodeId]). Once progress_state begins emitting pending
  entries that node is already 'seen', so the pending->running
  transition never revoked. Switch to checking previous state.

groupNode forwarder: reinstate the legacy string detail path in the
'executing' id-extractor so callers still dispatching the pre-change
string payload keep bubbling execution up to the group node.

4 new unit tests cover: execution_cached gated, executed gated,
execution_error gated, and pending->running preview revocation.
2026-05-05 07:01:04 +00:00
Glary-Bot
48b953be31 feat: gate execution lifecycle handlers and executing event by active workflow
Extend the active-workflow gate to lifecycle events that previously
operated globally:

- handleExecutionStart: only adopts activeJobId / clears shared UI state
  when the starting job belongs to the active workflow. Per-job
  bookkeeping (queuedJobs, jobIdToSessionWorkflowPath, initialization
  clearing) still runs for every job.
- handleExecutionSuccess / handleExecutionInterrupted: only call
  resetExecutionState when the terminating job belongs to the active
  workflow. Initialization clearing for the terminated job still runs.
- handleExecuting: now receives the full ExecutingWsMessage from the
  api dispatcher (instead of just NodeId) and gates _executingNodeProgress
  clearing on workflow ownership.

To support handleExecuting gating, drop the ApiToEventType override that
narrowed the executing event to NodeId, forward the full payload through
api.dispatchCustomEvent, and update the groupNode forwarding wrapper to
extract display_node/node from the new object detail and synthesise a
matching ExecutingWsMessage when re-dispatching.

5 new tests cover the cross-workflow cases: execution_start from a
non-active workflow does not steal activeJobId, execution_success and
execution_interrupted from a non-active workflow do not clear the
active job's state, executing from a non-active workflow does not
clear _executingNodeProgress, and execution_start from the active
workflow still adopts activeJobId.

Eventual-consistency cleanup of non-active terminal jobs (which used to
be handled by the dropped polling/eviction code) will be implemented in
a follow-up PR using a reactive watcher pattern over a derived
finishedJobs set.
2026-05-05 06:49:51 +00:00
Glary-Bot
72877c8c1a feat: scope progress events and UI state by active workflow
When multiple workflow tabs are open and a job initiated from one tab
sends progress messages, those messages can leak into the active tab's
canvas because the global nodeProgressStates mirror, _executingNodeProgress,
and progress_text preview state are written unconditionally.

Add an optional workflow_id field to the WS schema for execution-related
messages (progress, progress_state, executing, executed, progress_text,
execution_start/success/cached/interrupted/error, NodeProgressState),
and gate handleProgressState, handleProgress, and handleProgressText on
whether the incoming message belongs to the currently active workflow.

Resolution order for ownership:
1. workflow_id carried on the WS message (when backend supports it).
2. jobIdToWorkflowId mapping populated when the job was queued from this tab.
3. jobIdToSessionWorkflowPath mapping (path-based fallback).

When ownership is unresolvable (e.g. job queued in a different browser
session), the message is treated as belonging to the active workflow to
preserve current single-tab behaviour. Per-job state
(nodeProgressStatesByJob) is always written regardless of ownership so
the source of truth covers every workflow's jobs.

handleProgressText prefers the workflow gate over the legacy activeJobId
guard when ownership can be resolved, since activeJobId is global and may
point at a different workflow's job — falling through to the legacy
guard only when ownership is genuinely unresolvable.

12 new unit tests cover workflow_id match/mismatch, both fallback
resolution paths, default-to-legacy when unresolvable, no-active-workflow
short-circuit, per-job state always updating, preview revocation gating,
_executingNodeProgress gating, and progress_text gating.

Eventual-consistency fallback for dropped terminal WebSocket messages
will be addressed in a follow-up PR using a reactive watcher pattern
rather than queue polling, per design discussion.
2026-05-04 22:43:17 +00:00
5 changed files with 710 additions and 76 deletions

View File

@@ -17,6 +17,7 @@ import {
type ComfyNode,
type ComfyWorkflowJSON
} from '@/platform/workflow/validation/schemas/workflowSchema'
import type { ExecutingWsMessage } from '@/schemas/apiSchema'
import type { ComfyNodeDef, InputSpec } from '@/schemas/nodeDefSchema'
import { useDialogService } from '@/services/dialogService'
import { useExecutionStore } from '@/stores/executionStore'
@@ -1446,7 +1447,11 @@ export class GroupNodeHandler {
).runningInternalNodeId = innerNodeIndex
api.dispatchCustomEvent(
type as 'executing',
getEvent(detail, `${this.node.id}`, this.node) as string
getEvent(
detail,
`${this.node.id}`,
this.node
) as unknown as ExecutingWsMessage
)
}
}
@@ -1459,8 +1464,11 @@ export class GroupNodeHandler {
const executing = handleEvent(
'executing',
(d) => (typeof d === 'string' ? d : undefined),
(_d, id) => id
(d) => (typeof d === 'string' ? d : (d?.display_node ?? d?.node)),
(d, id) =>
typeof d === 'object'
? { ...d, node: id, display_node: id }
: { prompt_id: '', node: id, display_node: id }
)
const executed = handleEvent(

View File

@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})
const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})
const zExecutedWsMessage = zExecutingWsMessage.extend({
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
})
const zNotificationWsMessage = z.object({

View File

@@ -23,8 +23,7 @@ import type {
} from '@/platform/workflow/templates/types/template'
import type {
ComfyApiWorkflow,
ComfyWorkflowJSON,
NodeId
ComfyWorkflowJSON
} from '@/platform/workflow/validation/schemas/workflowSchema'
import type {
AssetDownloadWsMessage,
@@ -213,11 +212,7 @@ type AsCustomEvents<T> = {
/** Handles differing event and API signatures. */
type ApiToEventType<T = ApiCalls> = {
[K in keyof T]: K extends 'status'
? StatusWsMessageStatus
: K extends 'executing'
? NodeId
: T[K]
[K in keyof T]: K extends 'status' ? StatusWsMessageStatus : T[K]
}
/** Dictionary of types used in the detail for a custom event */
@@ -728,10 +723,7 @@ export class ComfyApi extends EventTarget {
this.dispatchCustomEvent('status', msg.data.status ?? null)
break
case 'executing':
this.dispatchCustomEvent(
'executing',
msg.data.display_node || msg.data.node
)
this.dispatchCustomEvent('executing', msg.data)
break
case 'execution_start':
case 'execution_error':

View File

@@ -11,12 +11,22 @@ const {
mockNodeExecutionIdToNodeLocatorId,
mockNodeIdToNodeLocatorId,
mockNodeLocatorIdToNodeExecutionId,
mockShowTextPreview
mockShowTextPreview,
mockActiveWorkflow,
mockRevokePreviewsByExecutionId
} = vi.hoisted(() => ({
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
mockNodeIdToNodeLocatorId: vi.fn(),
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
mockShowTextPreview: vi.fn()
mockShowTextPreview: vi.fn(),
mockActiveWorkflow: {
current: null as null | {
activeState?: { id?: string }
initialState?: { id?: string }
path?: string
}
},
mockRevokePreviewsByExecutionId: vi.fn()
}))
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
useWorkflowStore: vi.fn(() => ({
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
get activeWorkflow() {
return mockActiveWorkflow.current
}
}))
}
})
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
}
}))
vi.mock('@/stores/imagePreviewStore', () => ({
vi.mock('@/stores/nodeOutputStore', () => ({
useNodeOutputStore: () => ({
revokePreviewsByExecutionId: vi.fn()
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
})
}))
@@ -440,6 +453,469 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
})
})
describe('useExecutionStore - active workflow gating', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
function fireProgress(
jobId: string,
nodeId: string,
workflowId?: string,
value = 5,
max = 10
) {
const handler = apiEventHandlers.get('progress')
if (!handler) throw new Error('progress handler not bound')
handler(
new CustomEvent('progress', {
detail: {
value,
max,
prompt_id: jobId,
node: nodeId,
workflow_id: workflowId
}
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('always updates per-job progress regardless of active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
})
it('skips global mirror when message workflow_id mismatches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStates).toEqual({})
})
it('updates global mirror when message workflow_id matches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('falls back to jobIdToWorkflowId mapping when workflow_id missing', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('falls back to session path mapping when no id mapping is registered', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('preserves single-tab behaviour when ownership is unresolvable', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
expect(store.nodeProgressStates).toEqual(
makeProgressNodes('1', 'job-unknown')
)
})
it('updates mirror when there is no active workflow', () => {
mockActiveWorkflow.current = null
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('skips preview revocation for non-active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
})
it('revokes previews for active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
})
it('skips _executingNodeProgress on workflow_id mismatch', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-other', '1', 'wf-other')
expect(store._executingNodeProgress).toBeNull()
})
it('updates _executingNodeProgress on workflow_id match', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-1', '1', 'wf-active', 7, 10)
expect(store._executingNodeProgress).toEqual({
value: 7,
max: 10,
prompt_id: 'job-1',
node: '1',
workflow_id: 'wf-active'
})
})
it('execution_start from a non-active workflow does not steal activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBeNull()
})
it('execution_start from active workflow adopts activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_success from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const successHandler = apiEventHandlers.get('execution_success')
if (!successHandler) throw new Error('execution_success handler not bound')
successHandler(
new CustomEvent('execution_success', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_interrupted from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const intHandler = apiEventHandlers.get('execution_interrupted')
if (!intHandler) throw new Error('execution_interrupted handler not bound')
intHandler(
new CustomEvent('execution_interrupted', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
node_id: '1',
node_type: 'X',
executed: [],
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('executing from a non-active workflow does not clear _executingNodeProgress', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-1', '1', 'wf-active', 5, 10)
expect(store._executingNodeProgress).not.toBeNull()
const handler = apiEventHandlers.get('executing')
if (!handler) throw new Error('executing handler not bound')
handler(
new CustomEvent('executing', {
detail: {
prompt_id: 'job-other',
node: '2',
display_node: '2',
workflow_id: 'wf-other'
}
})
)
expect(store._executingNodeProgress).not.toBeNull()
})
it('execution_cached from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const cachedHandler = apiEventHandlers.get('execution_cached')
if (!cachedHandler) throw new Error('execution_cached handler not bound')
cachedHandler(
new CustomEvent('execution_cached', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
nodes: ['n1', 'n2']
}
})
)
expect(store.activeJob?.nodes).toEqual({})
})
it('executed from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const executedHandler = apiEventHandlers.get('executed')
if (!executedHandler) throw new Error('executed handler not bound')
executedHandler(
new CustomEvent('executed', {
detail: {
prompt_id: 'job-other',
node: 'n1',
display_node: 'n1',
workflow_id: 'wf-other',
output: {}
}
})
)
expect(store.activeJob?.nodes['n1']).toBeUndefined()
})
it('execution_error from a non-active workflow does not clear active job state but still clears the errored job initializing flag', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
store.initializingJobIds = new Set(['job-other'])
const errorHandler = apiEventHandlers.get('execution_error')
if (!errorHandler) throw new Error('execution_error handler not bound')
errorHandler(
new CustomEvent('execution_error', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
node_id: 'n1',
node_type: 'X',
executed: [],
exception_message: 'oops',
exception_type: 'RuntimeError',
traceback: [],
current_inputs: {},
current_outputs: {}
}
})
)
expect(store.activeJobId).toBe('job-1')
expect(store.initializingJobIds.has('job-other')).toBe(false)
expect(useExecutionErrorStore().lastExecutionError).toBeNull()
})
it('revokes preview when node transitions pending -> running', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const pendingNodes: Record<string, NodeProgressState> = {
n1: {
value: 0,
max: 10,
state: 'pending',
node_id: 'n1',
prompt_id: 'job-1',
display_node_id: 'n1'
}
}
fireProgressState('job-1', pendingNodes, 'wf-active')
mockRevokePreviewsByExecutionId.mockClear()
const runningNodes: Record<string, NodeProgressState> = {
n1: { ...pendingNodes.n1, state: 'running', value: 1 }
}
fireProgressState('job-1', runningNodes, 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('n1')
})
})
describe('useExecutionStore - progress_text startup guard', () => {
let store: ReturnType<typeof useExecutionStore>
@@ -447,6 +923,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
nodeId: string
text: string
prompt_id?: string
workflow_id?: string
}) {
const handler = apiEventHandlers.get('progress_text')
if (!handler) throw new Error('progress_text handler not bound')
@@ -456,6 +933,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -488,6 +966,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
it('skips progress_text whose workflow_id mismatches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-other',
workflow_id: 'wf-other'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('forwards progress_text whose workflow_id matches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-1',
workflow_id: 'wf-active'
})
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
@@ -767,6 +1289,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -878,7 +1401,35 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
describe('executing', () => {
it('clears _executingNodeProgress and activeJobId when detail is null', () => {
it('clears _executingNodeProgress when workflow_id matches the active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fire('execution_start', {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
})
store._executingNodeProgress = {
value: 1,
max: 2,
prompt_id: 'job-1',
node: '1'
}
fire('executing', {
prompt_id: 'job-1',
node: '1',
display_node: '1',
workflow_id: 'wf-active'
})
expect(store._executingNodeProgress).toBeNull()
})
it('clears _executingNodeProgress when ownership is unresolvable (legacy fallback)', () => {
mockActiveWorkflow.current = null
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
store._executingNodeProgress = {
value: 1,
@@ -887,10 +1438,13 @@ describe('useExecutionStore - WebSocket event handlers', () => {
node: '1'
}
fire('executing', null)
fire('executing', {
prompt_id: 'job-1',
node: '1',
display_node: '1'
})
expect(store._executingNodeProgress).toBeNull()
expect(store.activeJobId).toBeNull()
})
})

View File

@@ -15,6 +15,7 @@ import type {
import { useCanvasStore } from '@/renderer/core/canvas/canvasStore'
import type {
ExecutedWsMessage,
ExecutingWsMessage,
ExecutionCachedWsMessage,
ExecutionErrorWsMessage,
ExecutionInterruptedWsMessage,
@@ -247,22 +248,32 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = e.detail.prompt_id
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
const jobId = e.detail.prompt_id
queuedJobs.value[jobId] ??= { nodes: {} }
clearInitializationByJobId(jobId)
// Ensure path mapping exists — execution_start can arrive via WebSocket
// before the HTTP response from queuePrompt triggers storeJob.
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
const path = queuedJobs.value[activeJobId.value]?.workflow?.path
if (path) ensureSessionWorkflowPath(activeJobId.value, path)
if (!jobIdToSessionWorkflowPath.value.has(jobId)) {
const path = queuedJobs.value[jobId]?.workflow?.path
if (path) ensureSessionWorkflowPath(jobId, path)
}
// Only adopt as the global active job and clear shared UI state when the
// starting job belongs to the active workflow. Otherwise a job started
// from another tab would steal activeJobId and clobber the active tab's
// execution UI.
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = jobId
}
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
for (const n of e.detail.nodes) {
activeJob.value.nodes[n] = true
}
@@ -272,12 +283,15 @@ export const useExecutionStore = defineStore('execution', () => {
e: CustomEvent<ExecutionInterruptedWsMessage>
) {
const jobId = e.detail.prompt_id
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
clearInitializationByJobId(jobId)
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
activeJob.value.nodes[e.detail.node] = true
}
@@ -288,22 +302,14 @@ export const useExecutionStore = defineStore('execution', () => {
})
}
const jobId = e.detail.prompt_id
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuting(e: CustomEvent<NodeId | null>): void {
// Clear the current node progress when a new node starts executing
function handleExecuting(e: CustomEvent<ExecutingWsMessage>): void {
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
_executingNodeProgress.value = null
if (!activeJob.value) return
// Update the executing nodes list
if (typeof e.detail !== 'string') {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
activeJobId.value = null
}
}
/**
@@ -335,43 +341,92 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes, prompt_id: jobId } = e.detail
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
jobId,
messageWorkflowId
)
// Revoke previews for nodes that are starting to execute
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
// This node just started executing, revoke its previews
// Note that we're doing the *actual* node id instead of the display node id
// here intentionally. That way, we don't clear the preview every time a new node
// within an expanded graph starts executing.
const { revokePreviewsByExecutionId } = useNodeOutputStore()
revokePreviewsByExecutionId(nodeId)
if (isActiveWorkflowMessage) {
const { revokePreviewsByExecutionId } = useNodeOutputStore()
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (
nodeState.state === 'running' &&
previousForJob[nodeId]?.state !== 'running'
) {
revokePreviewsByExecutionId(nodeId)
}
}
}
// Update the progress states for all nodes
nodeProgressStatesByJob.value = {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
if (isActiveWorkflowMessage) {
nodeProgressStates.value = nodes
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
}
}
}
}
/**
* Determines whether a WebSocket execution message belongs to the
* currently active workflow tab. Used to gate writes to the global
* "current execution" mirror so a job initiated from another open
* workflow cannot leak its progress into the active one.
*
* Resolution order:
* 1. `workflow_id` carried on the WS message (when backend supports it).
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
* from this tab.
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
*
* When the workflow cannot be resolved at all (e.g. job queued in a
* different browser session), the message is treated as belonging to
* the active workflow to preserve current behaviour for the existing
* single-tab common case.
*/
function messageMatchesActiveWorkflow(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
const activeWorkflow = workflowStore.activeWorkflow
if (!activeWorkflow) return true
const activeId =
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
if (messageWorkflowId && activeId) {
return messageWorkflowId === activeId
}
const mappedId = jobIdToWorkflowId.value.get(jobId)
if (mappedId && activeId) return mappedId === activeId
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
if (mappedPath && activeWorkflow.path) {
return mappedPath === activeWorkflow.path
}
return true
}
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
_executingNodeProgress.value = e.detail
}
@@ -393,17 +448,16 @@ export const useExecutionStore = defineStore('execution', () => {
error: e.detail.exception_message
})
// Cloud wraps validation errors (400) in exception_message as embedded JSON.
if (handleCloudValidationError(e.detail)) return
}
// Service-level errors (e.g. "Job has stagnated") have no associated node.
// Route them as job errors
if (handleServiceLevelError(e.detail)) return
// OSS path / Cloud fallback (real runtime errors)
executionErrorStore.lastExecutionError = e.detail
clearInitializationByJobId(e.detail.prompt_id)
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
executionErrorStore.lastExecutionError = e.detail
resetExecutionState(e.detail.prompt_id)
}
@@ -413,6 +467,9 @@ export const useExecutionStore = defineStore('execution', () => {
return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
executionErrorStore.lastPromptError = {
type: detail.exception_type ?? 'error',
@@ -431,6 +488,9 @@ export const useExecutionStore = defineStore('execution', () => {
if (!result) return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
if (result.kind === 'nodeErrors') {
@@ -519,14 +579,27 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
const { nodeId, text, prompt_id } = e.detail
const { nodeId, text, prompt_id, workflow_id } = e.detail
if (!text || !nodeId) return
// Filter: only accept progress for the active prompt
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
return
// Prefer the workflow-ownership gate when ownership can be resolved
// (workflow_id on the message, or a registered mapping). Only fall back
// to the legacy active-prompt guard when ownership is unresolvable;
// otherwise activeJobId pointing at a different workflow's job would
// incorrectly drop messages for the visible workflow.
if (prompt_id) {
const canResolveWorkflow =
Boolean(workflow_id) ||
jobIdToWorkflowId.value.has(prompt_id) ||
jobIdToSessionWorkflowPath.value.has(prompt_id)
if (canResolveWorkflow) {
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
return
}
}
// Handle execution node IDs for subgraphs
const currentId = getNodeIdIfExecuting(nodeId)
if (!currentId) return
const node = canvasStore.canvas?.graph?.getNodeById(currentId)