Compare commits

...

7 Commits

Author SHA1 Message Date
Glary-Bot
4d9011b756 chore: hoist useNodeOutputStore call and remove dead test mock
- Hoist useNodeOutputStore() destructure outside the per-node loop in
  handleProgressState so the store lookup happens once per message
  instead of once per running node.
- Remove the dead vi.mock for @/stores/imagePreviewStore (executionStore
  imports useNodeOutputStore from @/stores/nodeOutputStore; the second
  mock is the live one).
2026-05-03 19:54:49 +00:00
Glary-Bot
452ef585bd fix: route terminal handlers through evictTerminalJob for non-active jobs
The WS terminal handlers (execution_success, execution_error,
execution_interrupted, plus the cloud validation and service-level
error paths) all called resetExecutionState(prompt_id) directly. That
function unconditionally clears the global mirror, _executingNodeProgress,
activeJobId, and queuedJobs[activeJobId.value], which means a terminal
event for a non-active job (e.g. job A finishing after job B becomes
active) wipes the active job's UI state.

Add a small terminateJob(jobId) helper that routes to evictTerminalJob
for non-active jobs (already designed to be safe for any jobId) and to
resetExecutionState only when jobId === activeJobId. Apply at every WS
terminal call site.

Also reorder the gating in handleProgressText: prefer the active-workflow
gate when ownership can be resolved (workflow_id on the message or a
registered mapping), and only fall back to the legacy activeJobId guard
when ownership is unresolvable. The previous order would drop legitimate
text previews for the visible workflow when activeJobId pointed at a
different workflow's job.
2026-05-03 18:50:49 +00:00
Glary-Bot
7f48b3324a fix: gate preview revocation by active workflow
handleProgressState was calling revokePreviewsByExecutionId before the
active-workflow gate, so a progress_state from a non-active workflow
could still revoke previews on the active canvas when nodeIds collided.
Hoist the active-workflow check above the revocation loop so previews
are only touched when the message belongs to the active tab.
2026-05-03 06:43:41 +00:00
Glary-Bot
07f4569458 fix: reconcile progress mirror on active workflow change
The incoming-event gate prevents new progress messages from leaking into
the wrong workflow, but if workflow A had already populated the mirror
and the user switches to B, the stale A progress would still render
until another event arrived. Add a watcher on workflowStore.activeWorkflow
that rebuilds the mirror from nodeProgressStatesByJob using the existing
workflow-id / session-path mappings, picking the most recently inserted
job for the new active workflow, or clears the mirror entirely when no
job belongs to it. Also clears _executingNodeProgress when its job no
longer matches the active workflow.
2026-05-03 06:32:34 +00:00
Glary-Bot
34b556c7e4 fix: gate progress_text and reconcile initializing-only jobs
- handleProgressText: gate by active workflow using workflow_id from
  the message, matching the gating already on progress and
  progress_state. Prevents text previews from a different tab leaking
  onto the active workflow when nodeIds overlap.
- reconcileTerminalJobs: union initializingJobIds into the tracked set
  so a cloud job that lands in history while still in the 'Waiting for
  a machine' state gets evicted via evictTerminalJob, clearing the
  stuck initializing indicator.
- Add tests for both: workflow_id mismatch/match on progress_text, and
  initializing-only job eviction.
2026-05-03 06:21:54 +00:00
Glary-Bot
82f1f91e3c fix: address review feedback - eviction and progress gating
- queueStore.update(): always reconcile terminal jobs when history is
  non-empty, not just when queue has data. Previously, the common case
  of the last active job finishing (queue empty, job in history) was
  skipped, leaving the very stuck-progress bug we are trying to fix.
- handleProgress: gate _executingNodeProgress by the same active-
  workflow check as handleProgressState, so per-node current progress
  no longer leaks across workflows.
- Add tests for both fixes.
2026-05-03 06:07:23 +00:00
Glary-Bot
cd73b6f987 fix: recover stuck node progress and scope mirror to active workflow
When WebSocket terminal messages (execution_success / execution_error /
execution_interrupted) are dropped, node-level progress UI gets stuck on
the affected node even though the job has reached a terminal state on
the backend. Concurrently, progress messages from a job initiated in
another open workflow can leak into the active one because the global
nodeProgressStates mirror is overwritten unconditionally on every
progress_state event.

Changes:

- Add reconcileTerminalJobs(active, terminal) to executionStore. Used
  by polling to detect jobs that moved Running -> History via the
  backend snapshot and clear their per-job state without disturbing a
  different running job (evictTerminalJob clears the global mirror only
  when the evicted job actually owns it, identified via prompt_id on
  NodeProgressState).
- Wire reconcileTerminalJobs into queueStore.update() alongside the
  existing reconcileInitializingJobs call, so the polling fallback now
  recovers stuck progress without requiring backend retries or
  WebSocket sequence numbers.
- Broaden useQueuePolling activation from activeJobsCount === 1 to
  activeJobsCount > 0 so the recovery path also covers the case where
  one job's terminal message is dropped while another job is still
  pending or running.
- Gate writes to the global nodeProgressStates mirror in
  handleProgressState by active workflow. Resolution order:
  workflow_id on the WS message, jobIdToWorkflowId mapping,
  jobIdToSessionWorkflowPath fallback, defaulting to the legacy
  behaviour when nothing is resolvable.
- Add optional workflow_id field to all execution-related WS message
  schemas so the gating starts using the backend-provided value as
  soon as it ships.

Tests:

- Active-workflow gating: per-job state always updates; mirror only
  updates on workflow_id match (or via mapping fallbacks); preserves
  legacy behaviour when no resolution is available.
- reconcileTerminalJobs: evicts non-active terminal jobs without
  disturbing the active job, evicts the active job and clears the
  mirror, clears a stale mirror still owned by a job that just became
  terminal, skips still-active jobs, skips jobs absent from terminal,
  idempotent.
2026-05-02 02:09:01 +00:00
7 changed files with 984 additions and 49 deletions

View File

@@ -63,13 +63,13 @@ describe('useQueuePolling', () => {
expect(store.update).toHaveBeenCalledOnce()
})
it('does not poll when activeJobsCount > 1', async () => {
it('polls when activeJobsCount > 1', async () => {
mountUseQueuePolling()
store.activeJobsCount = 2
await vi.advanceTimersByTimeAsync(16_000)
await vi.advanceTimersByTimeAsync(8_000)
expect(store.update).not.toHaveBeenCalled()
expect(store.update).toHaveBeenCalledOnce()
})
it('stops polling when activeJobsCount drops to 0', async () => {

View File

@@ -13,7 +13,7 @@ export function useQueuePolling() {
const { start, stop } = useTimeoutFn(
() => {
if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return
if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return
delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS)
void queueStore.update()
},
@@ -22,7 +22,7 @@ export function useQueuePolling() {
)
function scheduleNextPoll() {
if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start()
if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start()
else stop()
}

View File

@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})
const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})
const zExecutedWsMessage = zExecutingWsMessage.extend({
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
})
const zNotificationWsMessage = z.object({

View File

@@ -11,12 +11,22 @@ const {
mockNodeExecutionIdToNodeLocatorId,
mockNodeIdToNodeLocatorId,
mockNodeLocatorIdToNodeExecutionId,
mockShowTextPreview
mockShowTextPreview,
mockActiveWorkflow,
mockRevokePreviewsByExecutionId
} = vi.hoisted(() => ({
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
mockNodeIdToNodeLocatorId: vi.fn(),
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
mockShowTextPreview: vi.fn()
mockShowTextPreview: vi.fn(),
mockActiveWorkflow: {
current: null as null | {
activeState?: { id?: string }
initialState?: { id?: string }
path?: string
}
},
mockRevokePreviewsByExecutionId: vi.fn()
}))
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
useWorkflowStore: vi.fn(() => ({
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
get activeWorkflow() {
return mockActiveWorkflow.current
}
}))
}
})
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
}
}))
vi.mock('@/stores/imagePreviewStore', () => ({
vi.mock('@/stores/nodeOutputStore', () => ({
useNodeOutputStore: () => ({
revokePreviewsByExecutionId: vi.fn()
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
})
}))
@@ -440,6 +453,599 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
})
})
describe('useExecutionStore - active workflow gating of progress mirror', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('updates per-job progress regardless of active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
})
it('skips preview revocation for non-active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
})
it('revokes previews for active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
})
it('skips global mirror when message workflow_id mismatches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStates).toEqual({})
})
it('updates global mirror when message workflow_id matches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('falls back to jobIdToWorkflowId mapping when message has no workflow_id', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('falls back to session path mapping when message has no workflow_id and no id mapping', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('updates mirror when no resolution is available (preserves single-tab behaviour)', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
expect(store.nodeProgressStates).toEqual(
makeProgressNodes('1', 'job-unknown')
)
})
it('updates mirror when there is no active workflow', () => {
mockActiveWorkflow.current = null
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('skips _executingNodeProgress on workflow_id mismatch', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('progress')
if (!handler) throw new Error('progress handler not bound')
handler(
new CustomEvent('progress', {
detail: {
value: 5,
max: 10,
prompt_id: 'job-other',
node: '1',
workflow_id: 'wf-other'
}
})
)
expect(store._executingNodeProgress).toBeNull()
})
it('updates _executingNodeProgress on workflow_id match', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('progress')
if (!handler) throw new Error('progress handler not bound')
handler(
new CustomEvent('progress', {
detail: {
value: 7,
max: 10,
prompt_id: 'job-1',
node: '1',
workflow_id: 'wf-active'
}
})
)
expect(store._executingNodeProgress).toEqual({
value: 7,
max: 10,
prompt_id: 'job-1',
node: '1',
workflow_id: 'wf-active'
})
})
})
describe('useExecutionStore - reconcileMirrorForActiveWorkflow', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('rebuilds the mirror from the active workflow job on tab switch', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
mockActiveWorkflow.current = {
activeState: { id: 'wf-b' },
path: '/wf-b.json'
}
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
store.registerJobWorkflowIdMapping('job-b', 'wf-b')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-b'))
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
store.reconcileMirrorForActiveWorkflow()
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
})
it('clears the mirror when the active workflow has no matching job', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
mockActiveWorkflow.current = {
activeState: { id: 'wf-empty' },
path: '/wf-empty.json'
}
store.reconcileMirrorForActiveWorkflow()
expect(store.nodeProgressStates).toEqual({})
})
it('clears _executingNodeProgress that belonged to a different job', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
const progressHandler = apiEventHandlers.get('progress')
if (!progressHandler) throw new Error('progress handler not bound')
progressHandler(
new CustomEvent('progress', {
detail: {
value: 5,
max: 10,
prompt_id: 'job-a',
node: '1',
workflow_id: 'wf-a'
}
})
)
mockActiveWorkflow.current = {
activeState: { id: 'wf-empty' },
path: '/wf-empty.json'
}
store.reconcileMirrorForActiveWorkflow()
expect(store._executingNodeProgress).toBeNull()
})
it('falls back to session path mapping when workflow id is not registered', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
store.ensureSessionWorkflowPath('job-a', '/wf-a.json')
mockActiveWorkflow.current = {
activeState: { id: 'wf-b' },
path: '/wf-b.json'
}
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
store.ensureSessionWorkflowPath('job-b', '/wf-b.json')
mockActiveWorkflow.current = {
activeState: { id: 'wf-a' },
path: '/wf-a.json'
}
store.reconcileMirrorForActiveWorkflow()
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
})
})
describe('useExecutionStore - reconcileTerminalJobs', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
)
}
function fireExecutionStart(jobId: string) {
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('evicts a non-active terminal job without disturbing the active job', () => {
fireExecutionStart('job-old')
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
fireExecutionStart('job-new')
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStatesByJob).toHaveProperty('job-old')
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
})
it('evicts an active terminal job and clears global mirror', () => {
fireExecutionStart('job-1')
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
expect(store.activeJobId).toBe('job-1')
expect(Object.keys(store.nodeProgressStates)).toHaveLength(1)
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
expect(store.activeJobId).toBeNull()
expect(store.nodeProgressStates).toEqual({})
})
it('clears stale global mirror when its owner job becomes terminal', () => {
fireExecutionStart('job-old')
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
fireExecutionStart('job-new')
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStates['1']?.prompt_id).toBe('job-old')
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
expect(store.nodeProgressStates).toEqual({})
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
})
it('skips jobs that are still active even if also in terminal set', () => {
fireExecutionStart('job-1')
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
store.reconcileTerminalJobs(new Set(['job-1']), new Set(['job-1']))
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
expect(store.activeJobId).toBe('job-1')
})
it('skips jobs absent from the terminal set', () => {
fireExecutionStart('job-1')
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
store.reconcileTerminalJobs(new Set(), new Set())
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
expect(store.activeJobId).toBe('job-1')
})
it('is idempotent for an already-cleared job', () => {
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
expect(store.activeJobId).toBeNull()
})
it('evicts initializing-only jobs that landed in history without progress events', () => {
store.initializingJobIds = new Set(['job-init'])
store.reconcileTerminalJobs(new Set(), new Set(['job-init']))
expect(store.initializingJobIds.has('job-init')).toBe(false)
})
})
describe('useExecutionStore - terminal WS handlers do not clobber active job', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
function fireExecutionStart(jobId: string) {
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
)
}
function fireExecutionSuccess(jobId: string) {
const handler = apiEventHandlers.get('execution_success')
if (!handler) throw new Error('execution_success handler not bound')
handler(
new CustomEvent('execution_success', { detail: { prompt_id: jobId } })
)
}
function fireExecutionInterrupted(jobId: string) {
const handler = apiEventHandlers.get('execution_interrupted')
if (!handler) throw new Error('execution_interrupted handler not bound')
handler(
new CustomEvent('execution_interrupted', {
detail: { prompt_id: jobId, node_id: '1', node_type: 'X', executed: [] }
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('execution_success for a non-active job does not clobber the active job mirror', () => {
fireExecutionStart('job-old')
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
fireExecutionStart('job-new')
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
expect(store.activeJobId).toBe('job-new')
fireExecutionSuccess('job-old')
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
})
it('execution_interrupted for a non-active job does not clobber the active job', () => {
fireExecutionStart('job-old')
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
fireExecutionStart('job-new')
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
fireExecutionInterrupted('job-old')
expect(store.activeJobId).toBe('job-new')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
})
it('execution_success for the active job clears the global mirror and activeJobId', () => {
fireExecutionStart('job-1')
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
fireExecutionSuccess('job-1')
expect(store.activeJobId).toBeNull()
expect(store.nodeProgressStates).toEqual({})
})
})
describe('useExecutionStore - progress_text startup guard', () => {
let store: ReturnType<typeof useExecutionStore>
@@ -447,6 +1053,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
nodeId: string
text: string
prompt_id?: string
workflow_id?: string
}) {
const handler = apiEventHandlers.get('progress_text')
if (!handler) throw new Error('progress_text handler not bound')
@@ -456,6 +1063,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -488,6 +1096,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
it('should skip progress_text whose workflow_id mismatches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-other',
workflow_id: 'wf-other'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('should call showTextPreview when workflow_id matches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-1',
workflow_id: 'wf-active'
})
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {

View File

@@ -1,5 +1,5 @@
import { defineStore } from 'pinia'
import { computed, ref, shallowRef } from 'vue'
import { computed, ref, shallowRef, watch } from 'vue'
import { useNodeProgressText } from '@/composables/node/useNodeProgressText'
import { isCloud } from '@/platform/distribution/types'
@@ -273,6 +273,10 @@ export const useExecutionStore = defineStore('execution', () => {
) {
const jobId = e.detail.prompt_id
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
if (jobId !== activeJobId.value) {
evictTerminalJob(jobId)
return
}
resetExecutionState(jobId)
}
@@ -288,6 +292,10 @@ export const useExecutionStore = defineStore('execution', () => {
})
}
const jobId = e.detail.prompt_id
if (jobId !== activeJobId.value) {
evictTerminalJob(jobId)
return
}
resetExecutionState(jobId)
}
@@ -335,43 +343,146 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes, prompt_id: jobId } = e.detail
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
jobId,
messageWorkflowId
)
// Revoke previews for nodes that are starting to execute
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
// This node just started executing, revoke its previews
// Note that we're doing the *actual* node id instead of the display node id
// here intentionally. That way, we don't clear the preview every time a new node
// within an expanded graph starts executing.
const { revokePreviewsByExecutionId } = useNodeOutputStore()
revokePreviewsByExecutionId(nodeId)
if (isActiveWorkflowMessage) {
const { revokePreviewsByExecutionId } = useNodeOutputStore()
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
revokePreviewsByExecutionId(nodeId)
}
}
}
// Update the progress states for all nodes
nodeProgressStatesByJob.value = {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
if (isActiveWorkflowMessage) {
nodeProgressStates.value = nodes
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
}
}
}
}
/**
* Determines whether a WebSocket execution message belongs to the
* currently active workflow tab. Used to gate writes to the global
* "current execution" mirror so a job initiated from another open
* workflow cannot leak its progress into the active one.
*
* Resolution order:
* 1. `workflow_id` carried on the WS message (when backend supports it).
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
* from this tab.
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
*
* When the workflow cannot be resolved at all (e.g. job queued in a
* different browser session), the message is treated as belonging to
* the active workflow to preserve current behaviour for the existing
* single-tab common case.
*/
function messageMatchesActiveWorkflow(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
const activeWorkflow = workflowStore.activeWorkflow
if (!activeWorkflow) return true
const activeId =
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
if (messageWorkflowId && activeId) {
return messageWorkflowId === activeId
}
const mappedId = jobIdToWorkflowId.value.get(jobId)
if (mappedId && activeId) return mappedId === activeId
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
if (mappedPath && activeWorkflow.path) {
return mappedPath === activeWorkflow.path
}
return true
}
/**
* Rebuilds the global progress mirror to match the currently active
* workflow tab. Called when the user switches tabs so stale progress
* from the previously active workflow does not bleed into the new one.
*
* Picks the most recent job whose mapping resolves to the active
* workflow and replays its `nodeProgressStatesByJob` entry into the
* mirror; clears the mirror entirely when no such job exists.
*/
function reconcileMirrorForActiveWorkflow() {
const activeWorkflow = workflowStore.activeWorkflow
if (!activeWorkflow) return
const activeId =
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
const activePath = activeWorkflow.path ?? null
const jobIds = Object.keys(nodeProgressStatesByJob.value)
let matchedJobId: JobId | null = null
for (let i = jobIds.length - 1; i >= 0; i--) {
const jobId = jobIds[i]
const mappedId = jobIdToWorkflowId.value.get(jobId)
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
const idMatch = activeId !== null && mappedId === activeId
const pathMatch = activePath !== null && mappedPath === activePath
if (idMatch || pathMatch) {
matchedJobId = jobId
break
}
}
if (matchedJobId) {
const nodes = nodeProgressStatesByJob.value[matchedJobId] ?? {}
nodeProgressStates.value = nodes
executionIdToLocatorCache.clear()
if (
_executingNodeProgress.value &&
_executingNodeProgress.value.prompt_id !== matchedJobId
) {
_executingNodeProgress.value = null
}
} else {
if (Object.keys(nodeProgressStates.value).length > 0) {
nodeProgressStates.value = {}
executionIdToLocatorCache.clear()
}
_executingNodeProgress.value = null
}
}
watch(
() => workflowStore.activeWorkflow,
() => {
reconcileMirrorForActiveWorkflow()
}
)
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
_executingNodeProgress.value = e.detail
}
@@ -384,6 +495,20 @@ export const useExecutionStore = defineStore('execution', () => {
}
}
/**
* Routes a terminal cleanup to the correct primitive: `evictTerminalJob`
* for non-active jobs (safe for any jobId, never clobbers another running
* job's mirror) and `resetExecutionState` for the active job (clears the
* global mirror that the active job owns).
*/
function terminateJob(jobId: JobId) {
if (jobId !== activeJobId.value) {
evictTerminalJob(jobId)
return
}
resetExecutionState(jobId)
}
function handleExecutionError(e: CustomEvent<ExecutionErrorWsMessage>) {
if (isCloud) {
useTelemetry()?.trackExecutionError({
@@ -404,7 +529,7 @@ export const useExecutionStore = defineStore('execution', () => {
// OSS path / Cloud fallback (real runtime errors)
executionErrorStore.lastExecutionError = e.detail
clearInitializationByJobId(e.detail.prompt_id)
resetExecutionState(e.detail.prompt_id)
terminateJob(e.detail.prompt_id)
}
function handleServiceLevelError(detail: ExecutionErrorWsMessage): boolean {
@@ -413,7 +538,7 @@ export const useExecutionStore = defineStore('execution', () => {
return false
clearInitializationByJobId(detail.prompt_id)
resetExecutionState(detail.prompt_id)
terminateJob(detail.prompt_id)
executionErrorStore.lastPromptError = {
type: detail.exception_type ?? 'error',
message: detail.exception_type
@@ -431,7 +556,7 @@ export const useExecutionStore = defineStore('execution', () => {
if (!result) return false
clearInitializationByJobId(detail.prompt_id)
resetExecutionState(detail.prompt_id)
terminateJob(detail.prompt_id)
if (result.kind === 'nodeErrors') {
executionErrorStore.lastNodeErrors = result.nodeErrors
@@ -485,6 +610,103 @@ export const useExecutionStore = defineStore('execution', () => {
clearInitializationByJobIds(orphaned)
}
/**
* Safely evict per-job execution artifacts for a job that has reached a
* terminal state, without disturbing state belonging to a different
* currently-running job.
*
* Unlike {@link resetExecutionState}, this is safe to call for any jobId,
* including jobs that are not the {@link activeJobId}. It is the polling
* fallback for the case where a WebSocket terminal message
* (`execution_success` / `execution_error` / `execution_interrupted`) is
* dropped and per-job UI state would otherwise remain stuck.
*
* Behaviour:
* - Always removes the job's per-job entries
* ({@link nodeProgressStatesByJob}, {@link queuedJobs}, preview).
* - Clears the global "current execution" mirror
* ({@link nodeProgressStates}, {@link _executingNodeProgress},
* {@link activeJobId}) only when those still belong to the evicted job.
* - Idempotent: calling for an already-cleared job is a no-op.
*/
function evictTerminalJob(jobId: JobId) {
if (!jobId) return
const hadProgress = jobId in nodeProgressStatesByJob.value
if (hadProgress) {
const map = { ...nodeProgressStatesByJob.value }
delete map[jobId]
nodeProgressStatesByJob.value = map
}
if (jobId in queuedJobs.value) {
const next = { ...queuedJobs.value }
delete next[jobId]
queuedJobs.value = next
}
useJobPreviewStore().clearPreview(jobId)
clearInitializationByJobId(jobId)
const isActive = activeJobId.value === jobId
const mirrorBelongsToEvicted = mirrorOwnerJobId() === jobId
if (isActive || mirrorBelongsToEvicted) {
nodeProgressStates.value = {}
executionIdToLocatorCache.clear()
}
if (
_executingNodeProgress.value &&
_executingNodeProgress.value.prompt_id === jobId
) {
_executingNodeProgress.value = null
}
if (isActive) {
activeJobId.value = null
executionErrorStore.clearPromptError()
}
}
/**
* Returns the prompt_id that the global {@link nodeProgressStates} mirror
* currently belongs to, or `null` when the mirror is empty.
*
* The mirror is replaced wholesale on every `progress_state` message, so
* all entries within it always share a single prompt_id; reading the
* first entry is sufficient.
*/
function mirrorOwnerJobId(): JobId | null {
const first = Object.values(nodeProgressStates.value)[0]
return first?.prompt_id ?? null
}
/**
* Reconcile per-job progress state against the authoritative job sets from
* the backend (running/pending vs. terminal). Used by the queue polling
* path to recover from dropped WebSocket terminal messages.
*
* @param activeJobIds Jobs currently in Running or Pending on the backend.
* @param terminalJobIds Jobs in History (completed/failed/cancelled).
*/
function reconcileTerminalJobs(
activeJobIds: Set<JobId>,
terminalJobIds: Set<JobId>
) {
const tracked = new Set<JobId>([
...Object.keys(nodeProgressStatesByJob.value),
...initializingJobIds.value
])
if (activeJobId.value) tracked.add(activeJobId.value)
for (const jobId of tracked) {
if (activeJobIds.has(jobId)) continue
if (!terminalJobIds.has(jobId)) continue
evictTerminalJob(jobId)
}
}
function isJobInitializing(jobId: JobId | number | undefined): boolean {
if (!jobId) return false
return initializingJobIds.value.has(String(jobId))
@@ -519,14 +741,27 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
const { nodeId, text, prompt_id } = e.detail
const { nodeId, text, prompt_id, workflow_id } = e.detail
if (!text || !nodeId) return
// Filter: only accept progress for the active prompt
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
return
// Prefer the workflow-ownership gate when we have any signal that lets
// us resolve it (workflow_id on the message, or a registered mapping).
// Only fall back to the legacy active-prompt guard when ownership is
// unresolvable, otherwise activeJobId pointing at a different workflow's
// job would incorrectly drop messages for the visible workflow.
if (prompt_id) {
const canResolveWorkflow =
Boolean(workflow_id) ||
jobIdToWorkflowId.value.has(prompt_id) ||
jobIdToSessionWorkflowPath.value.has(prompt_id)
if (canResolveWorkflow) {
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
return
}
}
// Handle execution node IDs for subgraphs
const currentId = getNodeIdIfExecuting(nodeId)
if (!currentId) return
const node = canvasStore.canvas?.graph?.getNodeById(currentId)
@@ -643,6 +878,8 @@ export const useExecutionStore = defineStore('execution', () => {
clearInitializationByJobId,
clearInitializationByJobIds,
reconcileInitializingJobs,
reconcileTerminalJobs,
reconcileMirrorForActiveWorkflow,
bindExecutionEvents,
unbindExecutionEvents,
storeJob,

View File

@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
import type { TaskOutput } from '@/schemas/apiSchema'
import { api } from '@/scripts/api'
import { useExecutionStore } from '@/stores/executionStore'
import { TaskItemImpl, useQueueStore } from '@/stores/queueStore'
// Fixture factory for JobListItem
@@ -676,6 +677,34 @@ describe('useQueueStore', () => {
// Should preserve array identity when history is unchanged
expect(store.historyTasks).toBe(initialHistoryTasks)
})
it('should reconcile terminal jobs when queue is empty but history is not', async () => {
const executionStore = useExecutionStore()
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
const finishedJob = createHistoryJob(10, 'finished-job')
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([finishedJob])
await store.update()
expect(reconcileSpy).toHaveBeenCalledTimes(1)
const [activeIds, terminalIds] = reconcileSpy.mock.calls[0]
expect(activeIds.size).toBe(0)
expect(terminalIds.has('finished-job')).toBe(true)
})
it('should not reconcile terminal jobs when history is empty', async () => {
const executionStore = useExecutionStore()
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([])
await store.update()
expect(reconcileSpy).not.toHaveBeenCalled()
})
})
describe('update() - maxHistoryItems limit', () => {

View File

@@ -546,18 +546,28 @@ export const useQueueStore = defineStore('queue', () => {
}
})
// Only reconcile when the queue fetch returned data. api.getQueue()
// returns empty Running/Pending on transient errors, which would
// incorrectly clear all initializing prompts.
const activeJobIds = new Set([
...queue.Running.map((j) => j.id),
...queue.Pending.map((j) => j.id)
])
// Only reconcile initializing jobs when the queue fetch returned data.
// api.getQueue() returns empty Running/Pending on transient errors,
// which would incorrectly clear all initializing prompts.
const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0
if (queueHasData) {
const activeJobIds = new Set([
...queue.Running.map((j) => j.id),
...queue.Pending.map((j) => j.id)
])
executionStore.reconcileInitializingJobs(activeJobIds)
}
// Reconcile terminal jobs whenever history is non-empty. The last
// active job finishing legitimately produces empty Running/Pending,
// and terminal eviction is the only path that clears stuck node
// progress when WebSocket terminal messages are dropped.
if (history.length > 0) {
const terminalJobIds = new Set(history.map((j) => j.id))
executionStore.reconcileTerminalJobs(activeJobIds, terminalJobIds)
}
// Sort by create_time descending and limit to maxItems
const sortedHistory = [...history]
.sort((a, b) => b.create_time - a.create_time)