mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-06 06:01:58 +00:00
Compare commits
7 Commits
batch-disp
...
glary/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d9011b756 | ||
|
|
452ef585bd | ||
|
|
7f48b3324a | ||
|
|
07f4569458 | ||
|
|
34b556c7e4 | ||
|
|
82f1f91e3c | ||
|
|
cd73b6f987 |
@@ -63,13 +63,13 @@ describe('useQueuePolling', () => {
|
||||
expect(store.update).toHaveBeenCalledOnce()
|
||||
})
|
||||
|
||||
it('does not poll when activeJobsCount > 1', async () => {
|
||||
it('polls when activeJobsCount > 1', async () => {
|
||||
mountUseQueuePolling()
|
||||
|
||||
store.activeJobsCount = 2
|
||||
await vi.advanceTimersByTimeAsync(16_000)
|
||||
await vi.advanceTimersByTimeAsync(8_000)
|
||||
|
||||
expect(store.update).not.toHaveBeenCalled()
|
||||
expect(store.update).toHaveBeenCalledOnce()
|
||||
})
|
||||
|
||||
it('stops polling when activeJobsCount drops to 0', async () => {
|
||||
|
||||
@@ -13,7 +13,7 @@ export function useQueuePolling() {
|
||||
|
||||
const { start, stop } = useTimeoutFn(
|
||||
() => {
|
||||
if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return
|
||||
if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return
|
||||
delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS)
|
||||
void queueStore.update()
|
||||
},
|
||||
@@ -22,7 +22,7 @@ export function useQueuePolling() {
|
||||
)
|
||||
|
||||
function scheduleNextPoll() {
|
||||
if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start()
|
||||
if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start()
|
||||
else stop()
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
|
||||
const zNodeType = z.string()
|
||||
const zJobId = z.string()
|
||||
export type JobId = z.infer<typeof zJobId>
|
||||
const zWorkflowId = z.string()
|
||||
export const resultItemType = z.enum(['input', 'output', 'temp'])
|
||||
export type ResultItemType = z.infer<typeof resultItemType>
|
||||
|
||||
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
|
||||
value: z.number().int(),
|
||||
max: z.number().int(),
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
node: zNodeId
|
||||
})
|
||||
|
||||
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
|
||||
state: z.enum(['pending', 'running', 'finished', 'error']),
|
||||
node_id: zNodeId,
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
display_node_id: zNodeId.optional(),
|
||||
parent_node_id: zNodeId.optional(),
|
||||
real_node_id: zNodeId.optional()
|
||||
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
|
||||
|
||||
const zProgressStateWsMessage = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
nodes: z.record(zNodeId, zNodeProgressState)
|
||||
})
|
||||
|
||||
const zExecutingWsMessage = z.object({
|
||||
node: zNodeId,
|
||||
display_node: zNodeId,
|
||||
prompt_id: zJobId
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
|
||||
const zExecutionWsMessageBase = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
timestamp: z.number().int()
|
||||
})
|
||||
|
||||
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
|
||||
const zProgressTextWsMessage = z.object({
|
||||
nodeId: zNodeId,
|
||||
text: z.string(),
|
||||
prompt_id: z.string().optional()
|
||||
prompt_id: z.string().optional(),
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zNotificationWsMessage = z.object({
|
||||
|
||||
@@ -11,12 +11,22 @@ const {
|
||||
mockNodeExecutionIdToNodeLocatorId,
|
||||
mockNodeIdToNodeLocatorId,
|
||||
mockNodeLocatorIdToNodeExecutionId,
|
||||
mockShowTextPreview
|
||||
mockShowTextPreview,
|
||||
mockActiveWorkflow,
|
||||
mockRevokePreviewsByExecutionId
|
||||
} = vi.hoisted(() => ({
|
||||
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
|
||||
mockShowTextPreview: vi.fn()
|
||||
mockShowTextPreview: vi.fn(),
|
||||
mockActiveWorkflow: {
|
||||
current: null as null | {
|
||||
activeState?: { id?: string }
|
||||
initialState?: { id?: string }
|
||||
path?: string
|
||||
}
|
||||
},
|
||||
mockRevokePreviewsByExecutionId: vi.fn()
|
||||
}))
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
|
||||
useWorkflowStore: vi.fn(() => ({
|
||||
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
|
||||
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
|
||||
get activeWorkflow() {
|
||||
return mockActiveWorkflow.current
|
||||
}
|
||||
}))
|
||||
}
|
||||
})
|
||||
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
vi.mock('@/stores/nodeOutputStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
|
||||
})
|
||||
}))
|
||||
|
||||
@@ -440,6 +453,599 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - active workflow gating of progress mirror', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('updates per-job progress regardless of active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
|
||||
})
|
||||
|
||||
it('skips preview revocation for non-active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('revokes previews for active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
|
||||
})
|
||||
|
||||
it('skips global mirror when message workflow_id mismatches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates global mirror when message workflow_id matches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('falls back to jobIdToWorkflowId mapping when message has no workflow_id', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when message has no workflow_id and no id mapping', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates mirror when no resolution is available (preserves single-tab behaviour)', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(
|
||||
makeProgressNodes('1', 'job-unknown')
|
||||
)
|
||||
})
|
||||
|
||||
it('updates mirror when there is no active workflow', () => {
|
||||
mockActiveWorkflow.current = null
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('skips _executingNodeProgress on workflow_id mismatch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
prompt_id: 'job-other',
|
||||
node: '1',
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('updates _executingNodeProgress on workflow_id match', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store._executingNodeProgress).toEqual({
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileMirrorForActiveWorkflow', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('rebuilds the mirror from the active workflow job on tab switch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-b' },
|
||||
path: '/wf-b.json'
|
||||
}
|
||||
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
|
||||
store.registerJobWorkflowIdMapping('job-b', 'wf-b')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-b'))
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
})
|
||||
|
||||
it('clears the mirror when the active workflow has no matching job', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-empty' },
|
||||
path: '/wf-empty.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('clears _executingNodeProgress that belonged to a different job', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.registerJobWorkflowIdMapping('job-a', 'wf-a')
|
||||
|
||||
const progressHandler = apiEventHandlers.get('progress')
|
||||
if (!progressHandler) throw new Error('progress handler not bound')
|
||||
progressHandler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
prompt_id: 'job-a',
|
||||
node: '1',
|
||||
workflow_id: 'wf-a'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-empty' },
|
||||
path: '/wf-empty.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when workflow id is not registered', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a')
|
||||
store.ensureSessionWorkflowPath('job-a', '/wf-a.json')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-b' },
|
||||
path: '/wf-b.json'
|
||||
}
|
||||
fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b')
|
||||
store.ensureSessionWorkflowPath('job-b', '/wf-b.json')
|
||||
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-a' },
|
||||
path: '/wf-a.json'
|
||||
}
|
||||
store.reconcileMirrorForActiveWorkflow()
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a'))
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileTerminalJobs', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionStart(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('evicts a non-active terminal job without disturbing the active job', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
})
|
||||
|
||||
it('evicts an active terminal job and clears global mirror', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
expect(Object.keys(store.nodeProgressStates)).toHaveLength(1)
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('clears stale global mirror when its owner job becomes terminal', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates['1']?.prompt_id).toBe('job-old')
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old']))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
})
|
||||
|
||||
it('skips jobs that are still active even if also in terminal set', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
store.reconcileTerminalJobs(new Set(['job-1']), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('skips jobs absent from the terminal set', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set())
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('is idempotent for an already-cleared job', () => {
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-1']))
|
||||
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1')
|
||||
expect(store.activeJobId).toBeNull()
|
||||
})
|
||||
|
||||
it('evicts initializing-only jobs that landed in history without progress events', () => {
|
||||
store.initializingJobIds = new Set(['job-init'])
|
||||
|
||||
store.reconcileTerminalJobs(new Set(), new Set(['job-init']))
|
||||
|
||||
expect(store.initializingJobIds.has('job-init')).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - terminal WS handlers do not clobber active job', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionStart(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionSuccess(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_success')
|
||||
if (!handler) throw new Error('execution_success handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_success', { detail: { prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
function fireExecutionInterrupted(jobId: string) {
|
||||
const handler = apiEventHandlers.get('execution_interrupted')
|
||||
if (!handler) throw new Error('execution_interrupted handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_interrupted', {
|
||||
detail: { prompt_id: jobId, node_id: '1', node_type: 'X', executed: [] }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('execution_success for a non-active job does not clobber the active job mirror', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
|
||||
fireExecutionSuccess('job-old')
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-new')
|
||||
})
|
||||
|
||||
it('execution_interrupted for a non-active job does not clobber the active job', () => {
|
||||
fireExecutionStart('job-old')
|
||||
fireProgressState('job-old', makeProgressNodes('1', 'job-old'))
|
||||
|
||||
fireExecutionStart('job-new')
|
||||
fireProgressState('job-new', makeProgressNodes('2', 'job-new'))
|
||||
|
||||
fireExecutionInterrupted('job-old')
|
||||
|
||||
expect(store.activeJobId).toBe('job-new')
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new'))
|
||||
expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old')
|
||||
})
|
||||
|
||||
it('execution_success for the active job clears the global mirror and activeJobId', () => {
|
||||
fireExecutionStart('job-1')
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'))
|
||||
|
||||
fireExecutionSuccess('job-1')
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - progress_text startup guard', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
@@ -447,6 +1053,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
nodeId: string
|
||||
text: string
|
||||
prompt_id?: string
|
||||
workflow_id?: string
|
||||
}) {
|
||||
const handler = apiEventHandlers.get('progress_text')
|
||||
if (!handler) throw new Error('progress_text handler not bound')
|
||||
@@ -456,6 +1063,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -488,6 +1096,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
|
||||
it('should skip progress_text whose workflow_id mismatches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-other',
|
||||
workflow_id: 'wf-other'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should call showTextPreview when workflow_id matches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionErrorStore - Node Error Lookups', () => {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { defineStore } from 'pinia'
|
||||
import { computed, ref, shallowRef } from 'vue'
|
||||
import { computed, ref, shallowRef, watch } from 'vue'
|
||||
|
||||
import { useNodeProgressText } from '@/composables/node/useNodeProgressText'
|
||||
import { isCloud } from '@/platform/distribution/types'
|
||||
@@ -273,6 +273,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
) {
|
||||
const jobId = e.detail.prompt_id
|
||||
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
@@ -288,6 +292,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
})
|
||||
}
|
||||
const jobId = e.detail.prompt_id
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
@@ -335,43 +343,146 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
|
||||
jobId,
|
||||
messageWorkflowId
|
||||
)
|
||||
|
||||
// Revoke previews for nodes that are starting to execute
|
||||
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
// This node just started executing, revoke its previews
|
||||
// Note that we're doing the *actual* node id instead of the display node id
|
||||
// here intentionally. That way, we don't clear the preview every time a new node
|
||||
// within an expanded graph starts executing.
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
if (isActiveWorkflowMessage) {
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
nodeProgressStatesByJob.value = {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
if (isActiveWorkflowMessage) {
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a WebSocket execution message belongs to the
|
||||
* currently active workflow tab. Used to gate writes to the global
|
||||
* "current execution" mirror so a job initiated from another open
|
||||
* workflow cannot leak its progress into the active one.
|
||||
*
|
||||
* Resolution order:
|
||||
* 1. `workflow_id` carried on the WS message (when backend supports it).
|
||||
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
|
||||
* from this tab.
|
||||
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
|
||||
*
|
||||
* When the workflow cannot be resolved at all (e.g. job queued in a
|
||||
* different browser session), the message is treated as belonging to
|
||||
* the active workflow to preserve current behaviour for the existing
|
||||
* single-tab common case.
|
||||
*/
|
||||
function messageMatchesActiveWorkflow(
|
||||
jobId: JobId,
|
||||
messageWorkflowId: string | undefined
|
||||
): boolean {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return true
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
|
||||
if (messageWorkflowId && activeId) {
|
||||
return messageWorkflowId === activeId
|
||||
}
|
||||
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
if (mappedId && activeId) return mappedId === activeId
|
||||
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
if (mappedPath && activeWorkflow.path) {
|
||||
return mappedPath === activeWorkflow.path
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuilds the global progress mirror to match the currently active
|
||||
* workflow tab. Called when the user switches tabs so stale progress
|
||||
* from the previously active workflow does not bleed into the new one.
|
||||
*
|
||||
* Picks the most recent job whose mapping resolves to the active
|
||||
* workflow and replays its `nodeProgressStatesByJob` entry into the
|
||||
* mirror; clears the mirror entirely when no such job exists.
|
||||
*/
|
||||
function reconcileMirrorForActiveWorkflow() {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
const activePath = activeWorkflow.path ?? null
|
||||
|
||||
const jobIds = Object.keys(nodeProgressStatesByJob.value)
|
||||
let matchedJobId: JobId | null = null
|
||||
for (let i = jobIds.length - 1; i >= 0; i--) {
|
||||
const jobId = jobIds[i]
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
const idMatch = activeId !== null && mappedId === activeId
|
||||
const pathMatch = activePath !== null && mappedPath === activePath
|
||||
if (idMatch || pathMatch) {
|
||||
matchedJobId = jobId
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (matchedJobId) {
|
||||
const nodes = nodeProgressStatesByJob.value[matchedJobId] ?? {}
|
||||
nodeProgressStates.value = nodes
|
||||
executionIdToLocatorCache.clear()
|
||||
if (
|
||||
_executingNodeProgress.value &&
|
||||
_executingNodeProgress.value.prompt_id !== matchedJobId
|
||||
) {
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
} else {
|
||||
if (Object.keys(nodeProgressStates.value).length > 0) {
|
||||
nodeProgressStates.value = {}
|
||||
executionIdToLocatorCache.clear()
|
||||
}
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
}
|
||||
|
||||
watch(
|
||||
() => workflowStore.activeWorkflow,
|
||||
() => {
|
||||
reconcileMirrorForActiveWorkflow()
|
||||
}
|
||||
)
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
|
||||
_executingNodeProgress.value = e.detail
|
||||
}
|
||||
|
||||
@@ -384,6 +495,20 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Routes a terminal cleanup to the correct primitive: `evictTerminalJob`
|
||||
* for non-active jobs (safe for any jobId, never clobbers another running
|
||||
* job's mirror) and `resetExecutionState` for the active job (clears the
|
||||
* global mirror that the active job owns).
|
||||
*/
|
||||
function terminateJob(jobId: JobId) {
|
||||
if (jobId !== activeJobId.value) {
|
||||
evictTerminalJob(jobId)
|
||||
return
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecutionError(e: CustomEvent<ExecutionErrorWsMessage>) {
|
||||
if (isCloud) {
|
||||
useTelemetry()?.trackExecutionError({
|
||||
@@ -404,7 +529,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
// OSS path / Cloud fallback (real runtime errors)
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
clearInitializationByJobId(e.detail.prompt_id)
|
||||
resetExecutionState(e.detail.prompt_id)
|
||||
terminateJob(e.detail.prompt_id)
|
||||
}
|
||||
|
||||
function handleServiceLevelError(detail: ExecutionErrorWsMessage): boolean {
|
||||
@@ -413,7 +538,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
resetExecutionState(detail.prompt_id)
|
||||
terminateJob(detail.prompt_id)
|
||||
executionErrorStore.lastPromptError = {
|
||||
type: detail.exception_type ?? 'error',
|
||||
message: detail.exception_type
|
||||
@@ -431,7 +556,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
if (!result) return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
resetExecutionState(detail.prompt_id)
|
||||
terminateJob(detail.prompt_id)
|
||||
|
||||
if (result.kind === 'nodeErrors') {
|
||||
executionErrorStore.lastNodeErrors = result.nodeErrors
|
||||
@@ -485,6 +610,103 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobIds(orphaned)
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely evict per-job execution artifacts for a job that has reached a
|
||||
* terminal state, without disturbing state belonging to a different
|
||||
* currently-running job.
|
||||
*
|
||||
* Unlike {@link resetExecutionState}, this is safe to call for any jobId,
|
||||
* including jobs that are not the {@link activeJobId}. It is the polling
|
||||
* fallback for the case where a WebSocket terminal message
|
||||
* (`execution_success` / `execution_error` / `execution_interrupted`) is
|
||||
* dropped and per-job UI state would otherwise remain stuck.
|
||||
*
|
||||
* Behaviour:
|
||||
* - Always removes the job's per-job entries
|
||||
* ({@link nodeProgressStatesByJob}, {@link queuedJobs}, preview).
|
||||
* - Clears the global "current execution" mirror
|
||||
* ({@link nodeProgressStates}, {@link _executingNodeProgress},
|
||||
* {@link activeJobId}) only when those still belong to the evicted job.
|
||||
* - Idempotent: calling for an already-cleared job is a no-op.
|
||||
*/
|
||||
function evictTerminalJob(jobId: JobId) {
|
||||
if (!jobId) return
|
||||
|
||||
const hadProgress = jobId in nodeProgressStatesByJob.value
|
||||
if (hadProgress) {
|
||||
const map = { ...nodeProgressStatesByJob.value }
|
||||
delete map[jobId]
|
||||
nodeProgressStatesByJob.value = map
|
||||
}
|
||||
|
||||
if (jobId in queuedJobs.value) {
|
||||
const next = { ...queuedJobs.value }
|
||||
delete next[jobId]
|
||||
queuedJobs.value = next
|
||||
}
|
||||
|
||||
useJobPreviewStore().clearPreview(jobId)
|
||||
clearInitializationByJobId(jobId)
|
||||
|
||||
const isActive = activeJobId.value === jobId
|
||||
const mirrorBelongsToEvicted = mirrorOwnerJobId() === jobId
|
||||
|
||||
if (isActive || mirrorBelongsToEvicted) {
|
||||
nodeProgressStates.value = {}
|
||||
executionIdToLocatorCache.clear()
|
||||
}
|
||||
|
||||
if (
|
||||
_executingNodeProgress.value &&
|
||||
_executingNodeProgress.value.prompt_id === jobId
|
||||
) {
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
|
||||
if (isActive) {
|
||||
activeJobId.value = null
|
||||
executionErrorStore.clearPromptError()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the prompt_id that the global {@link nodeProgressStates} mirror
|
||||
* currently belongs to, or `null` when the mirror is empty.
|
||||
*
|
||||
* The mirror is replaced wholesale on every `progress_state` message, so
|
||||
* all entries within it always share a single prompt_id; reading the
|
||||
* first entry is sufficient.
|
||||
*/
|
||||
function mirrorOwnerJobId(): JobId | null {
|
||||
const first = Object.values(nodeProgressStates.value)[0]
|
||||
return first?.prompt_id ?? null
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile per-job progress state against the authoritative job sets from
|
||||
* the backend (running/pending vs. terminal). Used by the queue polling
|
||||
* path to recover from dropped WebSocket terminal messages.
|
||||
*
|
||||
* @param activeJobIds Jobs currently in Running or Pending on the backend.
|
||||
* @param terminalJobIds Jobs in History (completed/failed/cancelled).
|
||||
*/
|
||||
function reconcileTerminalJobs(
|
||||
activeJobIds: Set<JobId>,
|
||||
terminalJobIds: Set<JobId>
|
||||
) {
|
||||
const tracked = new Set<JobId>([
|
||||
...Object.keys(nodeProgressStatesByJob.value),
|
||||
...initializingJobIds.value
|
||||
])
|
||||
if (activeJobId.value) tracked.add(activeJobId.value)
|
||||
|
||||
for (const jobId of tracked) {
|
||||
if (activeJobIds.has(jobId)) continue
|
||||
if (!terminalJobIds.has(jobId)) continue
|
||||
evictTerminalJob(jobId)
|
||||
}
|
||||
}
|
||||
|
||||
function isJobInitializing(jobId: JobId | number | undefined): boolean {
|
||||
if (!jobId) return false
|
||||
return initializingJobIds.value.has(String(jobId))
|
||||
@@ -519,14 +741,27 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
|
||||
const { nodeId, text, prompt_id } = e.detail
|
||||
const { nodeId, text, prompt_id, workflow_id } = e.detail
|
||||
if (!text || !nodeId) return
|
||||
|
||||
// Filter: only accept progress for the active prompt
|
||||
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
|
||||
return
|
||||
// Prefer the workflow-ownership gate when we have any signal that lets
|
||||
// us resolve it (workflow_id on the message, or a registered mapping).
|
||||
// Only fall back to the legacy active-prompt guard when ownership is
|
||||
// unresolvable, otherwise activeJobId pointing at a different workflow's
|
||||
// job would incorrectly drop messages for the visible workflow.
|
||||
if (prompt_id) {
|
||||
const canResolveWorkflow =
|
||||
Boolean(workflow_id) ||
|
||||
jobIdToWorkflowId.value.has(prompt_id) ||
|
||||
jobIdToSessionWorkflowPath.value.has(prompt_id)
|
||||
|
||||
if (canResolveWorkflow) {
|
||||
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
|
||||
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle execution node IDs for subgraphs
|
||||
const currentId = getNodeIdIfExecuting(nodeId)
|
||||
if (!currentId) return
|
||||
const node = canvasStore.canvas?.graph?.getNodeById(currentId)
|
||||
@@ -643,6 +878,8 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobId,
|
||||
clearInitializationByJobIds,
|
||||
reconcileInitializingJobs,
|
||||
reconcileTerminalJobs,
|
||||
reconcileMirrorForActiveWorkflow,
|
||||
bindExecutionEvents,
|
||||
unbindExecutionEvents,
|
||||
storeJob,
|
||||
|
||||
@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
import type { TaskOutput } from '@/schemas/apiSchema'
|
||||
import { api } from '@/scripts/api'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { TaskItemImpl, useQueueStore } from '@/stores/queueStore'
|
||||
|
||||
// Fixture factory for JobListItem
|
||||
@@ -676,6 +677,34 @@ describe('useQueueStore', () => {
|
||||
// Should preserve array identity when history is unchanged
|
||||
expect(store.historyTasks).toBe(initialHistoryTasks)
|
||||
})
|
||||
|
||||
it('should reconcile terminal jobs when queue is empty but history is not', async () => {
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
|
||||
const finishedJob = createHistoryJob(10, 'finished-job')
|
||||
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([finishedJob])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).toHaveBeenCalledTimes(1)
|
||||
const [activeIds, terminalIds] = reconcileSpy.mock.calls[0]
|
||||
expect(activeIds.size).toBe(0)
|
||||
expect(terminalIds.has('finished-job')).toBe(true)
|
||||
})
|
||||
|
||||
it('should not reconcile terminal jobs when history is empty', async () => {
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs')
|
||||
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('update() - maxHistoryItems limit', () => {
|
||||
|
||||
@@ -546,18 +546,28 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
}
|
||||
})
|
||||
|
||||
// Only reconcile when the queue fetch returned data. api.getQueue()
|
||||
// returns empty Running/Pending on transient errors, which would
|
||||
// incorrectly clear all initializing prompts.
|
||||
const activeJobIds = new Set([
|
||||
...queue.Running.map((j) => j.id),
|
||||
...queue.Pending.map((j) => j.id)
|
||||
])
|
||||
|
||||
// Only reconcile initializing jobs when the queue fetch returned data.
|
||||
// api.getQueue() returns empty Running/Pending on transient errors,
|
||||
// which would incorrectly clear all initializing prompts.
|
||||
const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0
|
||||
if (queueHasData) {
|
||||
const activeJobIds = new Set([
|
||||
...queue.Running.map((j) => j.id),
|
||||
...queue.Pending.map((j) => j.id)
|
||||
])
|
||||
executionStore.reconcileInitializingJobs(activeJobIds)
|
||||
}
|
||||
|
||||
// Reconcile terminal jobs whenever history is non-empty. The last
|
||||
// active job finishing legitimately produces empty Running/Pending,
|
||||
// and terminal eviction is the only path that clears stuck node
|
||||
// progress when WebSocket terminal messages are dropped.
|
||||
if (history.length > 0) {
|
||||
const terminalJobIds = new Set(history.map((j) => j.id))
|
||||
executionStore.reconcileTerminalJobs(activeJobIds, terminalJobIds)
|
||||
}
|
||||
|
||||
// Sort by create_time descending and limit to maxItems
|
||||
const sortedHistory = [...history]
|
||||
.sort((a, b) => b.create_time - a.create_time)
|
||||
|
||||
Reference in New Issue
Block a user