Compare commits

...

3 Commits

Author SHA1 Message Date
Benjamin Lu
ea691eaea3 fix: clear stale active job after reconnect 2026-03-13 11:54:47 -07:00
Benjamin Lu
422db2baa2 Merge origin/main into bl/fix-stale-progress-reconnect 2026-03-13 11:46:04 -07:00
Benjamin Lu
9866118df5 fix: reconcile stale progress after reconnect 2026-03-13 11:38:46 -07:00
4 changed files with 222 additions and 15 deletions

View File

@@ -430,6 +430,108 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
})
})
describe('useExecutionStore - clearTrackedJob', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('removes only the stale job state', () => {
const staleNodeState = {
value: 4,
max: 10,
state: 'running' as const,
node_id: 'node-1',
prompt_id: 'job-1'
}
const activeNodeState = {
value: 2,
max: 5,
state: 'running' as const,
node_id: 'node-2',
prompt_id: 'job-2'
}
store.activeJobId = 'job-1'
store.queuedJobs = {
'job-1': { nodes: {} },
'job-2': { nodes: {} }
}
store.nodeProgressStates = {
'node-1': staleNodeState,
'node-2': activeNodeState
}
store.nodeProgressStatesByJob = {
'job-1': { 'node-1': staleNodeState },
'job-2': { 'node-2': activeNodeState }
}
store.initializingJobIds = new Set(['job-1', 'job-2'])
store._executingNodeProgress = {
value: 4,
max: 10,
prompt_id: 'job-1',
node: 'node-1'
}
store.clearTrackedJob('job-1')
expect(store.activeJobId).toBeNull()
expect(store.queuedJobs).toEqual({
'job-2': { nodes: {} }
})
expect(store.nodeProgressStates).toEqual({
'node-2': activeNodeState
})
expect(store.nodeProgressStatesByJob).toEqual({
'job-2': { 'node-2': activeNodeState }
})
expect(store.initializingJobIds).toEqual(new Set(['job-2']))
expect(store._executingNodeProgress).toBeNull()
})
it('clears a stale active job when a different prompt completes', () => {
const terminalJobState = {
value: 3,
max: 10,
state: 'running' as const,
node_id: 'node-2',
prompt_id: 'job-2'
}
store.activeJobId = 'job-1'
store.queuedJobs = {
'job-1': { nodes: {} },
'job-2': { nodes: {} }
}
store.nodeProgressStates = {
'node-2': terminalJobState
}
store.nodeProgressStatesByJob = {
'job-1': {},
'job-2': { 'node-2': terminalJobState }
}
store.initializingJobIds = new Set(['job-1', 'job-2'])
store.bindExecutionEvents()
const handler = apiEventHandlers.get('execution_success')
if (!handler) throw new Error('execution_success handler not bound')
handler(
new CustomEvent('execution_success', {
detail: { prompt_id: 'job-2' }
})
)
expect(store.activeJobId).toBeNull()
expect(store.queuedJobs).toEqual({})
expect(store.nodeProgressStates).toEqual({})
expect(store.nodeProgressStatesByJob).toEqual({})
expect(store.initializingJobIds).toEqual(new Set())
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
let store: ReturnType<typeof useExecutionErrorStore>

View File

@@ -488,25 +488,86 @@ export const useExecutionStore = defineStore('execution', () => {
return initializingJobIds.value.has(String(jobId))
}
function removeTrackedJobState(
jobId: string,
options?: {
clearAllNodeProgressStates?: boolean
clearPromptError?: boolean
}
) {
const normalizedJobId = String(jobId)
const wasActiveJob = activeJobId.value === normalizedJobId
clearInitializationByJobId(normalizedJobId)
const progressStatesByJob = { ...nodeProgressStatesByJob.value }
delete progressStatesByJob[normalizedJobId]
nodeProgressStatesByJob.value = progressStatesByJob
nodeProgressStates.value = options?.clearAllNodeProgressStates
? {}
: Object.fromEntries(
Object.entries(nodeProgressStates.value).filter(
([_, state]) => state.prompt_id !== normalizedJobId
)
)
delete queuedJobs.value[normalizedJobId]
useJobPreviewStore().clearPreview(normalizedJobId)
if (
options?.clearAllNodeProgressStates ||
_executingNodeProgress.value?.prompt_id === normalizedJobId
) {
_executingNodeProgress.value = null
}
if (wasActiveJob) {
activeJobId.value = null
if (options?.clearPromptError) {
executionErrorStore.clearPromptError()
}
}
}
/**
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: string | null) {
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}
const jobId = jobIdParam ?? activeJobId.value ?? null
if (jobId) {
const map = { ...nodeProgressStatesByJob.value }
delete map[jobId]
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
if (!jobId) {
nodeProgressStates.value = {}
activeJobId.value = null
_executingNodeProgress.value = null
executionErrorStore.clearPromptError()
return
}
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
const normalizedJobId = String(jobId)
const staleActiveJobId =
activeJobId.value && activeJobId.value !== normalizedJobId
? activeJobId.value
: null
removeTrackedJobState(normalizedJobId, {
clearAllNodeProgressStates: true,
clearPromptError: true
})
if (staleActiveJobId) {
removeTrackedJobState(staleActiveJobId, {
clearPromptError: true
})
}
activeJobId.value = null
_executingNodeProgress.value = null
executionErrorStore.clearPromptError()
}
/**
* Clear tracked execution state for a single job.
* Used to recover from missed terminal websocket events after reconnects.
*/
function clearTrackedJob(jobId: string) {
removeTrackedJobState(jobId)
}
function getNodeIdIfExecuting(nodeId: string | number) {
@@ -641,6 +702,7 @@ export const useExecutionStore = defineStore('execution', () => {
clearInitializationByJobId,
clearInitializationByJobIds,
reconcileInitializingJobs,
clearTrackedJob,
bindExecutionEvents,
unbindExecutionEvents,
storeJob,

View File

@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
import type { TaskOutput } from '@/schemas/apiSchema'
import { api } from '@/scripts/api'
import { useExecutionStore } from '@/stores/executionStore'
import { TaskItemImpl, useQueueStore } from '@/stores/queueStore'
// Fixture factory for JobListItem
@@ -564,6 +565,36 @@ describe('useQueueStore', () => {
// Should preserve array identity when history is unchanged
expect(store.historyTasks).toBe(initialHistoryTasks)
})
it('clears stale execution tracking when the active job appears in history', async () => {
const executionStore = useExecutionStore()
const clearTrackedJobSpy = vi.spyOn(executionStore, 'clearTrackedJob')
executionStore.activeJobId = 'job-1'
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([createHistoryJob(10, 'job-1')])
await store.update()
expect(clearTrackedJobSpy).toHaveBeenCalledWith('job-1')
expect(executionStore.activeJobId).toBeNull()
})
it('does not clear execution tracking when the active job is missing from history', async () => {
const executionStore = useExecutionStore()
const clearTrackedJobSpy = vi.spyOn(executionStore, 'clearTrackedJob')
executionStore.activeJobId = 'job-1'
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([])
await store.update()
expect(clearTrackedJobSpy).not.toHaveBeenCalled()
expect(executionStore.activeJobId).toBe('job-1')
})
})
describe('update() - maxHistoryItems limit', () => {

View File

@@ -540,15 +540,16 @@ export const useQueueStore = defineStore('queue', () => {
}
})
const activeJobIds = new Set([
...queue.Running.map((job) => String(job.id)),
...queue.Pending.map((job) => String(job.id))
])
// Only reconcile when the queue fetch returned data. api.getQueue()
// returns empty Running/Pending on transient errors, which would
// incorrectly clear all initializing prompts.
const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0
if (queueHasData) {
const activeJobIds = new Set([
...queue.Running.map((j) => j.id),
...queue.Pending.map((j) => j.id)
])
executionStore.reconcileInitializingJobs(activeJobIds)
}
@@ -580,6 +581,17 @@ export const useQueueStore = defineStore('queue', () => {
if (!isHistoryUnchanged) {
historyTasks.value = nextHistoryTasks
}
const activeJobId = executionStore.activeJobId
const missedTerminalExecution =
activeJobId !== null &&
!activeJobIds.has(activeJobId) &&
nextHistoryTasks.some((task) => task.jobId === activeJobId)
if (missedTerminalExecution) {
executionStore.clearTrackedJob(activeJobId)
}
hasFetchedHistorySnapshot.value = true
} finally {
// Only clear loading if this is the latest request.