Compare commits

...

1 Commits

Author SHA1 Message Date
huang47
9d4cc41dfc refactor: tidy executionStore job-state handling 2026-06-30 22:38:05 -07:00
2 changed files with 434 additions and 22 deletions

View File

@@ -127,9 +127,10 @@ vi.mock('@/scripts/api', () => ({
}
}))
const revokePreviewsByExecutionId = vi.hoisted(() => vi.fn())
vi.mock('@/stores/nodeOutputStore', () => ({
useNodeOutputStore: () => ({
revokePreviewsByExecutionId: vi.fn()
revokePreviewsByExecutionId
})
}))
@@ -423,6 +424,124 @@ describe('useExecutionStore - nodeLocationProgressStates caching', () => {
'running'
)
})
it('keeps an existing error state when later progress maps to the same locator', () => {
store.nodeProgressStates = {
node1: {
display_node_id: '123',
state: 'error',
value: 0,
max: 100,
prompt_id: 'test',
node_id: 'node1'
},
node2: {
display_node_id: '123:456',
state: 'running',
value: 50,
max: 100,
prompt_id: 'test',
node_id: 'node2'
}
}
expect(
store.nodeLocationProgressStates[createNodeLocatorId(null, toNodeId(123))]
.state
).toBe('error')
})
it('ignores finished progress when current state is already running', () => {
store.nodeProgressStates = {
node1: {
display_node_id: '123',
state: 'running',
value: 5,
max: 10,
prompt_id: 'test',
node_id: 'node1'
},
node2: {
display_node_id: '123',
state: 'finished',
value: 10,
max: 10,
prompt_id: 'test',
node_id: 'node2'
}
}
expect(
store.nodeLocationProgressStates[createNodeLocatorId(null, toNodeId(123))]
).toMatchObject({ state: 'running', value: 5 })
})
it('keeps later running progress from moving a locator backwards', () => {
store.nodeProgressStates = {
node1: {
display_node_id: '123',
state: 'running',
value: 6,
max: 10,
prompt_id: 'test',
node_id: 'node1'
},
node2: {
display_node_id: '123',
state: 'running',
value: 8,
max: 10,
prompt_id: 'test',
node_id: 'node2'
}
}
expect(
store.nodeLocationProgressStates[createNodeLocatorId(null, toNodeId(123))]
).toMatchObject({ state: 'running', value: 6, max: 10 })
})
it('merges zero-max running progress without dividing by zero', () => {
store.nodeProgressStates = {
node1: {
display_node_id: '123',
state: 'pending',
value: 0,
max: 0,
prompt_id: 'test',
node_id: 'node1'
},
node2: {
display_node_id: '123',
state: 'running',
value: 0,
max: 0,
prompt_id: 'test',
node_id: 'node2'
}
}
expect(
store.nodeLocationProgressStates[createNodeLocatorId(null, toNodeId(123))]
).toMatchObject({ state: 'running', value: 0, max: 0 })
})
it('skips nested progress when the execution id cannot be resolved', () => {
vi.mocked(app.rootGraph.getNodeById).mockReturnValue(null)
store.nodeProgressStates = {
node1: {
display_node_id: '404:1',
state: 'running',
value: 5,
max: 10,
prompt_id: 'test',
node_id: 'node1'
}
}
expect(store.nodeLocationProgressStates).toHaveProperty('404')
expect(store.nodeLocationProgressStates).not.toHaveProperty('404:1')
})
})
describe('useExecutionStore - nodeProgressStatesByJob eviction', () => {
@@ -551,6 +670,31 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
expect(store.initializingJobIds).toEqual(new Set())
})
it('clears initialization ids directly', () => {
store.initializingJobIds = new Set(['job-1'])
store.clearInitializationByJobId(null)
store.clearInitializationByJobId('missing')
store.clearInitializationByJobId('job-1')
expect(store.initializingJobIds).toEqual(new Set())
})
it('checks initializing jobs by stringified id', () => {
store.initializingJobIds = new Set(['7'])
expect(store.isJobInitializing(undefined)).toBe(false)
expect(store.isJobInitializing(7)).toBe(true)
})
it('does not rewrite initializing state when no requested ids are tracked', () => {
store.initializingJobIds = new Set(['job-1'])
store.clearInitializationByJobIds(['missing'])
expect(store.initializingJobIds).toEqual(new Set(['job-1']))
})
})
describe('useExecutionStore - workflowStatus', () => {
@@ -675,6 +819,16 @@ describe('useExecutionStore - workflowStatus', () => {
expect(store.getWorkflowStatus(workflowA)).toBe('completed')
})
it('leaves workflowStatus unchanged when open workflows are unchanged', async () => {
callStoreJob('job-a', workflowA)
fireExecutionSuccess('job-a')
mockOpenWorkflows.value = [workflowA, workflowB]
await nextTick()
expect(store.getWorkflowStatus(workflowA)).toBe('completed')
})
it('sets failed on execution_error', () => {
callStoreJob('job-1', workflowA)
fireExecutionStart('job-1')
@@ -691,6 +845,14 @@ describe('useExecutionStore - workflowStatus', () => {
expect(store.getWorkflowStatus(workflowA)).toBeUndefined()
})
it('handles interrupt for a queued workflow with no active job', () => {
callStoreJob('job-1', workflowA)
fireExecutionInterrupted('job-1')
expect(store.getWorkflowStatus(workflowA)).toBeUndefined()
})
it('evicts the oldest pending status once the buffer cap is exceeded', () => {
// Each start with no matching storeJob buffers a 'running' status. One
// past the cap evicts the oldest so the buffer can't grow unbounded.
@@ -900,6 +1062,35 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
it('should ignore progress_text for another active prompt', async () => {
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
store.activeJobId = 'job-1'
fireProgressText({
nodeId: toNodeId('1'),
text: 'warming up',
prompt_id: 'job-2'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('should ignore progress_text without text or node id', () => {
fireProgressText({ nodeId: toNodeId('1'), text: '' })
fireProgressText({
nodeId: '' as ReturnType<typeof toNodeId>,
text: 'warming up'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('should ignore nested progress_text when the execution ID cannot be mapped', async () => {
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
@@ -915,6 +1106,19 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockExecutionIdToCurrentId).toHaveBeenCalledWith('1:2')
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('should ignore progress_text when the current node id cannot be parsed', async () => {
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn() }
} as unknown as LGraphCanvas
mockExecutionIdToCurrentId.mockReturnValue({})
fireProgressText({ nodeId: toNodeId('1:2'), text: 'warming up' })
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
@@ -1375,6 +1579,21 @@ describe('useExecutionStore - WebSocket event handlers', () => {
expect(store.initializingJobIds.has('job-1')).toBe(false)
expect(store.initializingJobIds.has('job-2')).toBe(true)
})
it('captures a queued workflow path when the start event wins the race', () => {
store.queuedJobs = {
'job-1': {
nodes: {},
workflow: createQueuedWorkflow('/workflows/race.json')
}
}
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
expect(store.jobIdToSessionWorkflowPath.get('job-1')).toBe(
'/workflows/race.json'
)
})
})
describe('execution_cached', () => {
@@ -1562,9 +1781,35 @@ describe('useExecutionStore - WebSocket event handlers', () => {
is_app_mode: true
})
})
it('uses current mode when shared queued job has no queued mode snapshot', () => {
mockAppModeState.mode.value = 'app'
mockAppModeState.isAppMode.value = true
store.queuedJobs = {
'job-1': {
nodes: {},
shareId: 'share-1'
}
}
fire('execution_success', { prompt_id: 'job-1', timestamp: 0 })
expect(mockTrackSharedWorkflowRun).toHaveBeenCalledWith({
job_id: 'job-1',
share_id: 'share-1',
view_mode: 'app',
is_app_mode: true
})
})
})
describe('executing', () => {
it('is a no-op when there is no active job', () => {
fire('executing', null)
expect(store.activeJobId).toBeNull()
})
it('clears _executingNodeProgress and activeJobId when detail is null', () => {
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
store._executingNodeProgress = {
@@ -1590,7 +1835,32 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
})
describe('progress_state', () => {
it('does not revoke previews when the node execution id is invalid', () => {
fire('progress_state', {
prompt_id: 'job-1',
nodes: {
'': {
value: 1,
max: 2,
state: 'running',
node_id: '',
display_node_id: '',
prompt_id: 'job-1'
}
}
})
expect(store.nodeProgressStates).toHaveProperty('')
expect(revokePreviewsByExecutionId).not.toHaveBeenCalled()
})
})
describe('progress', () => {
it('reports null executing node progress before progress events arrive', () => {
expect(store.executingNodeProgress).toBeNull()
})
it('sets _executingNodeProgress from the event payload', () => {
const payload = { value: 3, max: 10, prompt_id: 'job-1', node: 'n1' }
@@ -1610,6 +1880,23 @@ describe('useExecutionStore - WebSocket event handlers', () => {
expect(store.clientId).toBe('test-client')
expect(removeSpy).toHaveBeenCalledWith('status', expect.any(Function))
})
it('keeps listening when status arrives before clientId is available', async () => {
const apiModule = await import('@/scripts/api')
const removeSpy = vi.mocked(apiModule.api.removeEventListener)
apiModule.api.clientId = ''
fire('status', { exec_info: { queue_remaining: 0 } })
expect(store.clientId).toBeNull()
expect(removeSpy).not.toHaveBeenCalledWith('status', expect.any(Function))
apiModule.api.clientId = 'test-client'
fire('status', { exec_info: { queue_remaining: 0 } })
expect(store.clientId).toBe('test-client')
expect(removeSpy).toHaveBeenCalledWith('status', expect.any(Function))
})
})
describe('execution_error', () => {
@@ -1631,6 +1918,39 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
})
it('uses the message directly for service-level errors without a type', () => {
const errorStore = useExecutionErrorStore()
fire('execution_error', {
prompt_id: 'job-1',
node_id: null,
exception_message: 'Job failed before node execution',
traceback: []
})
expect(errorStore.lastPromptError).toMatchObject({
type: 'error',
message: 'Job failed before node execution',
details: ''
})
})
it('uses an empty prompt message for service-level errors without backend copy', () => {
const errorStore = useExecutionErrorStore()
fire('execution_error', {
prompt_id: 'job-1',
node_id: null,
traceback: []
})
expect(errorStore.lastPromptError).toMatchObject({
type: 'error',
message: '',
details: ''
})
})
it('routes a runtime error (with node_id) to lastExecutionError', () => {
const errorStore = useExecutionErrorStore()
@@ -1744,6 +2064,12 @@ describe('useExecutionStore - WebSocket event handlers', () => {
expect(store.initializingJobIds.has('job-9')).toBe(false)
})
it('ignores notifications without text', () => {
fire('notification', { id: 'job-9' })
expect(store.initializingJobIds.has('job-9')).toBe(false)
})
})
describe('unbindExecutionEvents', () => {
@@ -1813,6 +2139,45 @@ describe('useExecutionStore - storeJob and workflow path tracking', () => {
)
})
it('storeJob works without workflow metadata', () => {
const workflow = {} as Parameters<typeof store.storeJob>[0]['workflow']
const missingWorkflow = undefined as unknown as Parameters<
typeof store.storeJob
>[0]['workflow']
store.storeJob({
nodes: ['a'],
id: 'job-1',
promptOutput: {
a: createPromptNode('Node A', 'NodeA')
},
workflow
})
expect(store.queuedJobs['job-1']?.nodes).toEqual({ a: false })
expect(store.jobIdToWorkflowId.has('job-1')).toBe(false)
expect(store.jobIdToSessionWorkflowPath.has('job-1')).toBe(false)
store.storeJob({
nodes: ['b'],
id: 'job-2',
promptOutput: {
b: createPromptNode('Node B', 'NodeB')
},
workflow: missingWorkflow
})
expect(store.queuedJobs['job-2']?.nodes).toEqual({ b: false })
expect(store.queuedJobs['job-2']?.workflow).toBeUndefined()
})
it('reports zero execution progress for an active job with no nodes', () => {
store.activeJobId = 'job-1'
store.queuedJobs = { 'job-1': { nodes: {} } }
expect(store.executionProgress).toBe(0)
})
it('registerJobWorkflowIdMapping ignores empty inputs', () => {
store.registerJobWorkflowIdMapping('job-1', 'wf-1')
store.registerJobWorkflowIdMapping('', 'wf-2')
@@ -1829,4 +2194,58 @@ describe('useExecutionStore - storeJob and workflow path tracking', () => {
expect(store.jobIdToSessionWorkflowPath.get('job-1')).toBe('/b.json')
})
it('evicts the oldest workflow paths when the session map exceeds capacity', () => {
for (let i = 0; i < 4001; i++) {
store.ensureSessionWorkflowPath(`job-${i}`, `/workflow-${i}.json`)
}
expect(store.jobIdToSessionWorkflowPath.size).toBe(4000)
expect(store.jobIdToSessionWorkflowPath.has('job-0')).toBe(false)
expect(store.jobIdToSessionWorkflowPath.get('job-4000')).toBe(
'/workflow-4000.json'
)
})
it('reports whether the active workflow is running', () => {
mockActiveWorkflow.value = { path: '/workflows/foo.json' }
store.activeJobId = 'job-1'
store.ensureSessionWorkflowPath('job-1', '/workflows/foo.json')
expect(store.isActiveWorkflowRunning).toBe(true)
store.ensureSessionWorkflowPath('job-1', '/workflows/bar.json')
expect(store.isActiveWorkflowRunning).toBe(false)
mockActiveWorkflow.value = {}
expect(store.isActiveWorkflowRunning).toBe(false)
})
it('counts running jobs from progress state', () => {
store.nodeProgressStatesByJob = {
'job-1': {
a: {
value: 1,
max: 10,
state: 'running',
node_id: 'a',
display_node_id: 'a',
prompt_id: 'job-1'
}
},
'job-2': {
b: {
value: 10,
max: 10,
state: 'finished',
node_id: 'b',
display_node_id: 'b',
prompt_id: 'job-2'
}
}
}
expect(store.runningJobIds).toEqual(['job-1'])
expect(store.runningWorkflowCount).toBe(1)
})
})

View File

@@ -153,9 +153,9 @@ export const useExecutionStore = defineStore('execution', () => {
pendingWorkflowStatusByJobId.delete(jobId)
pendingWorkflowStatusByJobId.set(jobId, status)
while (pendingWorkflowStatusByJobId.size > MAX_PROGRESS_JOBS) {
const oldest = pendingWorkflowStatusByJobId.keys().next().value
if (oldest === undefined) break
pendingWorkflowStatusByJobId.delete(oldest)
pendingWorkflowStatusByJobId.delete(
pendingWorkflowStatusByJobId.keys().next().value as string
)
}
}
@@ -314,8 +314,8 @@ export const useExecutionStore = defineStore('execution', () => {
: null
)
const activeJob = computed<QueuedJob | undefined>(
() => queuedJobs.value[activeJobId.value ?? '']
const activeJob = computed<QueuedJob | undefined>(() =>
activeJobId.value ? queuedJobs.value[activeJobId.value] : undefined
)
const totalNodesToExecute = computed<number>(() => {
@@ -440,9 +440,7 @@ export const useExecutionStore = defineStore('execution', () => {
// Update the executing nodes list
if (e.detail == null) {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
delete queuedJobs.value[activeJobId.value as JobId]
activeJobId.value = null
}
}
@@ -593,7 +591,7 @@ export const useExecutionStore = defineStore('execution', () => {
function handleCloudValidationError(
detail: ExecutionErrorWsMessage
): boolean {
const result = classifyCloudValidationError(detail.exception_message)
const result = classifyCloudValidationError(detail.exception_message ?? '')
if (!result) return false
clearInitializationByJobId(detail.prompt_id)
@@ -669,17 +667,14 @@ export const useExecutionStore = defineStore('execution', () => {
/**
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: JobId | null) {
function resetExecutionState(jobId: JobId) {
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}
const jobId = jobIdParam ?? activeJobId.value ?? null
if (jobId) {
const map = { ...nodeProgressStatesByJob.value }
delete map[jobId]
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
jobIdToWorkflow.delete(jobId)
}
const map = { ...nodeProgressStatesByJob.value }
delete map[jobId]
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
jobIdToWorkflow.delete(jobId)
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
@@ -771,9 +766,7 @@ export const useExecutionStore = defineStore('execution', () => {
const next = new Map(jobIdToSessionWorkflowPath.value)
next.set(jobId, path)
while (next.size > MAX_SESSION_PATH_ENTRIES) {
const oldest = next.keys().next().value
if (oldest !== undefined) next.delete(oldest)
else break
next.delete(next.keys().next().value as JobId)
}
jobIdToSessionWorkflowPath.value = next
}