Compare commits

...

6 Commits

Author SHA1 Message Date
Glary-Bot
1d14f1313c test: drop unnecessary Set cast and cover initializing clear on execution_success 2026-05-13 19:58:01 +00:00
Glary-Bot
6a303494d4 fix: preserve backwards-compat 'executing' payload + plug initializing leak
Revert the 'executing' WS event payload back to the legacy
NodeId-only shape so extensions consuming api.addEventListener('executing')
keep working. The extension ecosystem (40+ custom node repos per
AGENTS.md) accesses e.detail directly as a string, and the marginal
workflow gating we added on handleExecuting did not justify the
breaking change.

Drop the corresponding gate in handleExecuting and the groupNode
forwarding-wrapper changes that depended on the object payload. The
gating on progress, progress_state, progress_text, execution_start,
execution_success, execution_interrupted, execution_error, execution_cached,
and executed remains intact (those events carry prompt_id and workflow_id
natively in their WS payload).

Additional fixes from review:
- handleExecutionSuccess now calls clearInitializationByJobId(jobId)
  before the workflow gate so a non-active workflow job's initializing
  flag still clears, matching handleExecutionInterrupted and
  handleExecutionError.
- Extract canResolveWorkflowOwnership helper and reuse it in
  handleProgressText to remove the duplicated resolution check.
2026-05-13 19:58:01 +00:00
Glary-Bot
11ef8c9337 test: tighten active-workflow gating coverage
Address two coverage nitpicks from review:

- The 'executing clears _executingNodeProgress' test in the
  WebSocket-event-handlers suite did not reset mockActiveWorkflow.current
  in beforeEach and did not pass workflow_id, so it relied on the
  hoisted mock leaking from earlier suites and only ever exercised the
  unresolvable-ownership fallback. Reset the mock per-test, set an
  active workflow explicitly with workflow_id: 'wf-active', and add a
  separate test for the legacy unresolvable-ownership fallback.

- The 'execution_error from a non-active workflow' test only proved the
  active job's state was untouched. It now also asserts the errored
  job's initializing flag is cleared (the per-job bookkeeping that runs
  unconditionally) and that executionErrorStore.lastExecutionError stays
  null (proving the global gate held).
2026-05-13 19:58:01 +00:00
Glary-Bot
60828f2286 fix: gate execution_cached, executed, execution_error and revoke transitions
Three handlers and one revocation rule were missed in the first gating
pass and could still let a non-active workflow's events bleed into the
active workflow's UI:

- handleExecutionCached and handleExecuted mutated activeJob.value.nodes
  unconditionally, skewing executionProgress / nodesExecuted on the
  visible workflow when a background workflow emitted those events.
- handleExecutionError, handleServiceLevelError and handleCloudValidationError
  called resetExecutionState and wrote executionErrorStore.* unconditionally,
  meaning a background error wiped activeJobId and node progress for the
  visible workflow. Initialization clearing for the errored job still
  runs in every case.
- handleProgressState revoked previews only when a node was first seen
  (!previousForJob[nodeId]). Once progress_state begins emitting pending
  entries that node is already 'seen', so the pending->running
  transition never revoked. Switch to checking previous state.

groupNode forwarder: reinstate the legacy string detail path in the
'executing' id-extractor so callers still dispatching the pre-change
string payload keep bubbling execution up to the group node.

4 new unit tests cover: execution_cached gated, executed gated,
execution_error gated, and pending->running preview revocation.
2026-05-13 19:58:01 +00:00
Glary-Bot
3c3b0872c3 feat: gate execution lifecycle handlers and executing event by active workflow
Extend the active-workflow gate to lifecycle events that previously
operated globally:

- handleExecutionStart: only adopts activeJobId / clears shared UI state
  when the starting job belongs to the active workflow. Per-job
  bookkeeping (queuedJobs, jobIdToSessionWorkflowPath, initialization
  clearing) still runs for every job.
- handleExecutionSuccess / handleExecutionInterrupted: only call
  resetExecutionState when the terminating job belongs to the active
  workflow. Initialization clearing for the terminated job still runs.
- handleExecuting: now receives the full ExecutingWsMessage from the
  api dispatcher (instead of just NodeId) and gates _executingNodeProgress
  clearing on workflow ownership.

To support handleExecuting gating, drop the ApiToEventType override that
narrowed the executing event to NodeId, forward the full payload through
api.dispatchCustomEvent, and update the groupNode forwarding wrapper to
extract display_node/node from the new object detail and synthesise a
matching ExecutingWsMessage when re-dispatching.

5 new tests cover the cross-workflow cases: execution_start from a
non-active workflow does not steal activeJobId, execution_success and
execution_interrupted from a non-active workflow do not clear the
active job's state, executing from a non-active workflow does not
clear _executingNodeProgress, and execution_start from the active
workflow still adopts activeJobId.

Eventual-consistency cleanup of non-active terminal jobs (which used to
be handled by the dropped polling/eviction code) will be implemented in
a follow-up PR using a reactive watcher pattern over a derived
finishedJobs set.
2026-05-13 19:58:01 +00:00
Glary-Bot
7b2b974968 feat: scope progress events and UI state by active workflow
When multiple workflow tabs are open and a job initiated from one tab
sends progress messages, those messages can leak into the active tab's
canvas because the global nodeProgressStates mirror, _executingNodeProgress,
and progress_text preview state are written unconditionally.

Add an optional workflow_id field to the WS schema for execution-related
messages (progress, progress_state, executing, executed, progress_text,
execution_start/success/cached/interrupted/error, NodeProgressState),
and gate handleProgressState, handleProgress, and handleProgressText on
whether the incoming message belongs to the currently active workflow.

Resolution order for ownership:
1. workflow_id carried on the WS message (when backend supports it).
2. jobIdToWorkflowId mapping populated when the job was queued from this tab.
3. jobIdToSessionWorkflowPath mapping (path-based fallback).

When ownership is unresolvable (e.g. job queued in a different browser
session), the message is treated as belonging to the active workflow to
preserve current single-tab behaviour. Per-job state
(nodeProgressStatesByJob) is always written regardless of ownership so
the source of truth covers every workflow's jobs.

handleProgressText prefers the workflow gate over the legacy activeJobId
guard when ownership can be resolved, since activeJobId is global and may
point at a different workflow's job — falling through to the legacy
guard only when ownership is genuinely unresolvable.

12 new unit tests cover workflow_id match/mismatch, both fallback
resolution paths, default-to-legacy when unresolvable, no-active-workflow
short-circuit, per-job state always updating, preview revocation gating,
_executingNodeProgress gating, and progress_text gating.

Eventual-consistency fallback for dropped terminal WebSocket messages
will be addressed in a follow-up PR using a reactive watcher pattern
rather than queue polling, per design discussion.
2026-05-13 19:58:01 +00:00
3 changed files with 658 additions and 53 deletions

View File

@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})
const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})
const zExecutedWsMessage = zExecutingWsMessage.extend({
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
})
const zNotificationWsMessage = z.object({

View File

@@ -11,12 +11,22 @@ const {
mockNodeExecutionIdToNodeLocatorId,
mockNodeIdToNodeLocatorId,
mockNodeLocatorIdToNodeExecutionId,
mockShowTextPreview
mockShowTextPreview,
mockActiveWorkflow,
mockRevokePreviewsByExecutionId
} = vi.hoisted(() => ({
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
mockNodeIdToNodeLocatorId: vi.fn(),
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
mockShowTextPreview: vi.fn()
mockShowTextPreview: vi.fn(),
mockActiveWorkflow: {
current: null as null | {
activeState?: { id?: string }
initialState?: { id?: string }
path?: string
}
},
mockRevokePreviewsByExecutionId: vi.fn()
}))
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
useWorkflowStore: vi.fn(() => ({
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
get activeWorkflow() {
return mockActiveWorkflow.current
}
}))
}
})
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
}
}))
vi.mock('@/stores/imagePreviewStore', () => ({
vi.mock('@/stores/nodeOutputStore', () => ({
useNodeOutputStore: () => ({
revokePreviewsByExecutionId: vi.fn()
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
})
}))
@@ -491,6 +504,445 @@ describe('useExecutionStore - clearActiveJobIfStale', () => {
})
})
describe('useExecutionStore - active workflow gating', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
function fireProgress(
jobId: string,
nodeId: string,
workflowId?: string,
value = 5,
max = 10
) {
const handler = apiEventHandlers.get('progress')
if (!handler) throw new Error('progress handler not bound')
handler(
new CustomEvent('progress', {
detail: {
value,
max,
prompt_id: jobId,
node: nodeId,
workflow_id: workflowId
}
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('always updates per-job progress regardless of active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
})
it('skips global mirror when message workflow_id mismatches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStates).toEqual({})
})
it('updates global mirror when message workflow_id matches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('falls back to jobIdToWorkflowId mapping when workflow_id missing', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('falls back to session path mapping when no id mapping is registered', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('preserves single-tab behaviour when ownership is unresolvable', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
expect(store.nodeProgressStates).toEqual(
makeProgressNodes('1', 'job-unknown')
)
})
it('updates mirror when there is no active workflow', () => {
mockActiveWorkflow.current = null
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('skips preview revocation for non-active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
})
it('revokes previews for active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
})
it('skips _executingNodeProgress on workflow_id mismatch', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-other', '1', 'wf-other')
expect(store._executingNodeProgress).toBeNull()
})
it('updates _executingNodeProgress on workflow_id match', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-1', '1', 'wf-active', 7, 10)
expect(store._executingNodeProgress).toEqual({
value: 7,
max: 10,
prompt_id: 'job-1',
node: '1',
workflow_id: 'wf-active'
})
})
it('execution_start from a non-active workflow does not steal activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBeNull()
})
it('execution_start from active workflow adopts activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_success from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const successHandler = apiEventHandlers.get('execution_success')
if (!successHandler) throw new Error('execution_success handler not bound')
successHandler(
new CustomEvent('execution_success', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_interrupted from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const intHandler = apiEventHandlers.get('execution_interrupted')
if (!intHandler) throw new Error('execution_interrupted handler not bound')
intHandler(
new CustomEvent('execution_interrupted', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
node_id: '1',
node_type: 'X',
executed: [],
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_cached from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const cachedHandler = apiEventHandlers.get('execution_cached')
if (!cachedHandler) throw new Error('execution_cached handler not bound')
cachedHandler(
new CustomEvent('execution_cached', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
nodes: ['n1', 'n2']
}
})
)
expect(store.activeJob?.nodes).toEqual({})
})
it('executed from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const executedHandler = apiEventHandlers.get('executed')
if (!executedHandler) throw new Error('executed handler not bound')
executedHandler(
new CustomEvent('executed', {
detail: {
prompt_id: 'job-other',
node: 'n1',
display_node: 'n1',
workflow_id: 'wf-other',
output: {}
}
})
)
expect(store.activeJob?.nodes['n1']).toBeUndefined()
})
it('execution_error from a non-active workflow does not clear active job state but still clears the errored job initializing flag', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
store.initializingJobIds = new Set(['job-other'])
const errorHandler = apiEventHandlers.get('execution_error')
if (!errorHandler) throw new Error('execution_error handler not bound')
errorHandler(
new CustomEvent('execution_error', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
node_id: 'n1',
node_type: 'X',
executed: [],
exception_message: 'oops',
exception_type: 'RuntimeError',
traceback: [],
current_inputs: {},
current_outputs: {}
}
})
)
expect(store.activeJobId).toBe('job-1')
expect(store.initializingJobIds.has('job-other')).toBe(false)
expect(useExecutionErrorStore().lastExecutionError).toBeNull()
})
it('revokes preview when node transitions pending -> running', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const pendingNodes: Record<string, NodeProgressState> = {
n1: {
value: 0,
max: 10,
state: 'pending',
node_id: 'n1',
prompt_id: 'job-1',
display_node_id: 'n1'
}
}
fireProgressState('job-1', pendingNodes, 'wf-active')
mockRevokePreviewsByExecutionId.mockClear()
const runningNodes: Record<string, NodeProgressState> = {
n1: { ...pendingNodes.n1, state: 'running', value: 1 }
}
fireProgressState('job-1', runningNodes, 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('n1')
})
})
describe('useExecutionStore - progress_text startup guard', () => {
let store: ReturnType<typeof useExecutionStore>
@@ -498,6 +950,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
nodeId: string
text: string
prompt_id?: string
workflow_id?: string
}) {
const handler = apiEventHandlers.get('progress_text')
if (!handler) throw new Error('progress_text handler not bound')
@@ -507,6 +960,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -539,6 +993,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
it('skips progress_text whose workflow_id mismatches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-other',
workflow_id: 'wf-other'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('forwards progress_text whose workflow_id matches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-1',
workflow_id: 'wf-active'
})
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
@@ -818,6 +1316,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -832,10 +1331,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
it('clears initializing state for the starting job', () => {
store.initializingJobIds = new Set([
'job-1',
'job-2'
]) as unknown as Set<string>
store.initializingJobIds = new Set(['job-1', 'job-2'])
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
expect(store.initializingJobIds.has('job-1')).toBe(false)
@@ -926,6 +1422,16 @@ describe('useExecutionStore - WebSocket event handlers', () => {
expect(store.activeJobId).toBeNull()
expect(store.queuedJobs['job-1']).toBeUndefined()
})
it('clears initializing state for the completed job', () => {
store.initializingJobIds = new Set(['job-1', 'job-2'])
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
fire('execution_success', { prompt_id: 'job-1', timestamp: 0 })
expect(store.initializingJobIds.has('job-1')).toBe(false)
expect(store.initializingJobIds.has('job-2')).toBe(true)
})
})
describe('executing', () => {

View File

@@ -247,22 +247,32 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = e.detail.prompt_id
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
const jobId = e.detail.prompt_id
queuedJobs.value[jobId] ??= { nodes: {} }
clearInitializationByJobId(jobId)
// Ensure path mapping exists — execution_start can arrive via WebSocket
// before the HTTP response from queuePrompt triggers storeJob.
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
const path = queuedJobs.value[activeJobId.value]?.workflow?.path
if (path) ensureSessionWorkflowPath(activeJobId.value, path)
if (!jobIdToSessionWorkflowPath.value.has(jobId)) {
const path = queuedJobs.value[jobId]?.workflow?.path
if (path) ensureSessionWorkflowPath(jobId, path)
}
// Only adopt as the global active job and clear shared UI state when the
// starting job belongs to the active workflow. Otherwise a job started
// from another tab would steal activeJobId and clobber the active tab's
// execution UI.
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = jobId
}
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
for (const n of e.detail.nodes) {
activeJob.value.nodes[n] = true
}
@@ -272,12 +282,15 @@ export const useExecutionStore = defineStore('execution', () => {
e: CustomEvent<ExecutionInterruptedWsMessage>
) {
const jobId = e.detail.prompt_id
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
clearInitializationByJobId(jobId)
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
activeJob.value.nodes[e.detail.node] = true
}
@@ -288,16 +301,16 @@ export const useExecutionStore = defineStore('execution', () => {
})
}
const jobId = e.detail.prompt_id
clearInitializationByJobId(jobId)
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuting(e: CustomEvent<NodeId | null>): void {
// Clear the current node progress when a new node starts executing
_executingNodeProgress.value = null
if (!activeJob.value) return
// Update the executing nodes list
if (typeof e.detail !== 'string') {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
@@ -335,43 +348,110 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes, prompt_id: jobId } = e.detail
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
jobId,
messageWorkflowId
)
// Revoke previews for nodes that are starting to execute
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
// This node just started executing, revoke its previews
// Note that we're doing the *actual* node id instead of the display node id
// here intentionally. That way, we don't clear the preview every time a new node
// within an expanded graph starts executing.
const { revokePreviewsByExecutionId } = useNodeOutputStore()
revokePreviewsByExecutionId(nodeId)
if (isActiveWorkflowMessage) {
const { revokePreviewsByExecutionId } = useNodeOutputStore()
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (
nodeState.state === 'running' &&
previousForJob[nodeId]?.state !== 'running'
) {
revokePreviewsByExecutionId(nodeId)
}
}
}
// Update the progress states for all nodes
nodeProgressStatesByJob.value = {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
if (isActiveWorkflowMessage) {
nodeProgressStates.value = nodes
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
}
}
}
}
/**
* Determines whether a WebSocket execution message belongs to the
* currently active workflow tab. Used to gate writes to the global
* "current execution" mirror so a job initiated from another open
* workflow cannot leak its progress into the active one.
*
* Resolution order:
* 1. `workflow_id` carried on the WS message (when backend supports it).
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
* from this tab.
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
*
* When the workflow cannot be resolved at all (e.g. job queued in a
* different browser session), the message is treated as belonging to
* the active workflow to preserve current behaviour for the existing
* single-tab common case.
*/
function messageMatchesActiveWorkflow(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
const activeWorkflow = workflowStore.activeWorkflow
if (!activeWorkflow) return true
const activeId =
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
if (messageWorkflowId && activeId) {
return messageWorkflowId === activeId
}
const mappedId = jobIdToWorkflowId.value.get(jobId)
if (mappedId && activeId) return mappedId === activeId
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
if (mappedPath && activeWorkflow.path) {
return mappedPath === activeWorkflow.path
}
return true
}
/**
* Returns true when workflow ownership for {@link jobId} can be resolved
* — either by an explicit `workflow_id` on the incoming message or by a
* mapping registered when the job was queued. When this returns false
* the caller should fall back to whatever legacy guard applied before
* workflow gating was introduced.
*/
function canResolveWorkflowOwnership(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
return (
Boolean(messageWorkflowId) ||
jobIdToWorkflowId.value.has(jobId) ||
jobIdToSessionWorkflowPath.value.has(jobId)
)
}
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
_executingNodeProgress.value = e.detail
}
@@ -393,17 +473,16 @@ export const useExecutionStore = defineStore('execution', () => {
error: e.detail.exception_message
})
// Cloud wraps validation errors (400) in exception_message as embedded JSON.
if (handleCloudValidationError(e.detail)) return
}
// Service-level errors (e.g. "Job has stagnated") have no associated node.
// Route them as job errors
if (handleServiceLevelError(e.detail)) return
// OSS path / Cloud fallback (real runtime errors)
executionErrorStore.lastExecutionError = e.detail
clearInitializationByJobId(e.detail.prompt_id)
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
executionErrorStore.lastExecutionError = e.detail
resetExecutionState(e.detail.prompt_id)
}
@@ -413,6 +492,9 @@ export const useExecutionStore = defineStore('execution', () => {
return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
executionErrorStore.lastPromptError = {
type: detail.exception_type ?? 'error',
@@ -431,6 +513,9 @@ export const useExecutionStore = defineStore('execution', () => {
if (!result) return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
if (result.kind === 'nodeErrors') {
@@ -529,14 +614,21 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
const { nodeId, text, prompt_id } = e.detail
const { nodeId, text, prompt_id, workflow_id } = e.detail
if (!text || !nodeId) return
// Filter: only accept progress for the active prompt
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
return
// Prefer the workflow-ownership gate when ownership can be resolved.
// Only fall back to the legacy active-prompt guard when ownership is
// unresolvable; otherwise activeJobId pointing at a different workflow's
// job would incorrectly drop messages for the visible workflow.
if (prompt_id) {
if (canResolveWorkflowOwnership(prompt_id, workflow_id)) {
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
return
}
}
// Handle execution node IDs for subgraphs
const currentId = getNodeIdIfExecuting(nodeId)
if (!currentId) return
const node = canvasStore.canvas?.graph?.getNodeById(currentId)