mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-11 16:30:57 +00:00
fix: clear active job on reconnect if no longer in queue (#12067)
## Summary When a socket disconnects messages can be missed and lead to a stale UI state, this updates the state on reconnect and clears the active job if it is no longer running ## Changes - **What**: - add call to update queue on reconnect - clear active job if job not in queue response - tests ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-12067-fix-clear-active-job-on-reconnect-if-no-longer-in-queue-3596d73d365081f79d42d73966420c50) by [Unito](https://www.unito.io)
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
import type { WebSocketRoute } from '@playwright/test'
|
||||
|
||||
import type { NodeError, PromptResponse } from '@/schemas/apiSchema'
|
||||
import type {
|
||||
NodeError,
|
||||
NodeProgressState,
|
||||
PromptResponse
|
||||
} from '@/schemas/apiSchema'
|
||||
import type { RawJobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
import type { ComfyPage } from '@e2e/fixtures/ComfyPage'
|
||||
import { createMockJob } from '@e2e/fixtures/helpers/AssetsHelper'
|
||||
@@ -230,6 +234,16 @@ export class ExecutionHelper {
|
||||
)
|
||||
}
|
||||
|
||||
/** Send `progress_state` WS event with per-node execution state. */
|
||||
progressState(jobId: string, nodes: Record<string, NodeProgressState>): void {
|
||||
this.requireWs().send(
|
||||
JSON.stringify({
|
||||
type: 'progress_state',
|
||||
data: { prompt_id: jobId, nodes }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a job by adding it to mock history, sending execution_success,
|
||||
* and triggering a history refresh via a status event.
|
||||
|
||||
211
browser_tests/tests/wsReconnectStaleJob.spec.ts
Normal file
211
browser_tests/tests/wsReconnectStaleJob.spec.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import type { WebSocketRoute } from '@playwright/test'
|
||||
import { mergeTests } from '@playwright/test'
|
||||
import type { z } from 'zod'
|
||||
|
||||
import {
|
||||
comfyExpect as expect,
|
||||
comfyPageFixture
|
||||
} from '@e2e/fixtures/ComfyPage'
|
||||
import type { ComfyPage } from '@e2e/fixtures/ComfyPage'
|
||||
import { ExecutionHelper } from '@e2e/fixtures/helpers/ExecutionHelper'
|
||||
import { webSocketFixture } from '@e2e/fixtures/ws'
|
||||
import type {
|
||||
RawJobListItem,
|
||||
zJobsListResponse
|
||||
} from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
|
||||
type JobsListResponse = z.infer<typeof zJobsListResponse>
|
||||
|
||||
const test = mergeTests(comfyPageFixture, webSocketFixture)
|
||||
|
||||
const KSAMPLER_NODE = '3'
|
||||
const EXECUTING_CLASS = /outline-node-stroke-executing/
|
||||
|
||||
const QUEUE_ROUTE = /\/api\/jobs\?[^/]*status=in_progress,pending/
|
||||
const HISTORY_ROUTE = /\/api\/jobs\?[^/]*status=completed/
|
||||
|
||||
function jobsResponse(jobs: RawJobListItem[]): JobsListResponse {
|
||||
return {
|
||||
jobs,
|
||||
pagination: { offset: 0, limit: 200, total: jobs.length, has_more: false }
|
||||
}
|
||||
}
|
||||
|
||||
async function mockJobsRoute(
|
||||
comfyPage: ComfyPage,
|
||||
pattern: RegExp,
|
||||
body: string,
|
||||
status: number = 200
|
||||
): Promise<() => number> {
|
||||
let count = 0
|
||||
await comfyPage.page.route(pattern, async (route) => {
|
||||
count += 1
|
||||
await route.fulfill({
|
||||
status,
|
||||
contentType: 'application/json',
|
||||
body
|
||||
})
|
||||
})
|
||||
return () => count
|
||||
}
|
||||
|
||||
const emptyJobsBody = JSON.stringify(jobsResponse([]))
|
||||
|
||||
type Scenario = {
|
||||
name: string
|
||||
/** Built per-test so it can incorporate the runtime-assigned jobId. */
|
||||
queueBody: (jobId: string) => string
|
||||
/** Whether the active job state should still be reflected after reconnect. */
|
||||
expectsActiveAfter: boolean
|
||||
}
|
||||
|
||||
const scenarios: Scenario[] = [
|
||||
{
|
||||
name: 'clears stale active job when queue is empty after reconnect',
|
||||
queueBody: () => emptyJobsBody,
|
||||
expectsActiveAfter: false
|
||||
},
|
||||
{
|
||||
name: 'preserves active job when the job is still in the queue',
|
||||
queueBody: (jobId) =>
|
||||
JSON.stringify(
|
||||
jobsResponse([
|
||||
{ id: jobId, status: 'in_progress', create_time: Date.now() }
|
||||
])
|
||||
),
|
||||
expectsActiveAfter: true
|
||||
}
|
||||
]
|
||||
|
||||
/**
|
||||
* Stub the queue/history endpoints per `scenario`, close the WS, and wait
|
||||
* for the auto-reconnect to issue a fresh queue fetch.
|
||||
*/
|
||||
async function triggerReconnect(
|
||||
comfyPage: ComfyPage,
|
||||
ws: WebSocketRoute,
|
||||
scenario: Scenario,
|
||||
jobId: string
|
||||
): Promise<void> {
|
||||
await mockJobsRoute(comfyPage, HISTORY_ROUTE, emptyJobsBody)
|
||||
const queueFetches = await mockJobsRoute(
|
||||
comfyPage,
|
||||
QUEUE_ROUTE,
|
||||
scenario.queueBody(jobId)
|
||||
)
|
||||
const fetchesBeforeClose = queueFetches()
|
||||
await ws.close()
|
||||
await expect.poll(queueFetches).toBeGreaterThan(fetchesBeforeClose)
|
||||
}
|
||||
|
||||
test.describe('WebSocket reconnect with stale job', { tag: '@ui' }, () => {
|
||||
test.describe('app mode skeleton', () => {
|
||||
test.beforeEach(async ({ comfyPage }) => {
|
||||
await comfyPage.appMode.enterAppModeWithInputs([[KSAMPLER_NODE, 'seed']])
|
||||
await expect(comfyPage.appMode.linearWidgets).toBeVisible()
|
||||
})
|
||||
|
||||
for (const scenario of scenarios) {
|
||||
test(scenario.name, async ({ comfyPage, getWebSocket }) => {
|
||||
const ws = await getWebSocket()
|
||||
const exec = new ExecutionHelper(comfyPage, ws)
|
||||
|
||||
const jobId = await exec.run()
|
||||
exec.executionStart(jobId)
|
||||
|
||||
// Skeleton visibility is the deterministic sync point: it appears
|
||||
// once both `storeJob` (HTTP) and `executionStart` (WS) have been
|
||||
// processed, regardless of arrival order.
|
||||
const firstSkeleton = comfyPage.appMode.outputHistory.skeletons.first()
|
||||
await expect(firstSkeleton).toBeVisible()
|
||||
|
||||
await triggerReconnect(comfyPage, ws, scenario, jobId)
|
||||
|
||||
if (scenario.expectsActiveAfter) {
|
||||
await expect(firstSkeleton).toBeVisible()
|
||||
} else {
|
||||
await expect(comfyPage.appMode.outputHistory.skeletons).toHaveCount(0)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
test('preserves active job when the queue endpoint fails on reconnect', async ({
|
||||
comfyPage,
|
||||
getWebSocket
|
||||
}) => {
|
||||
const ws = await getWebSocket()
|
||||
const exec = new ExecutionHelper(comfyPage, ws)
|
||||
|
||||
const jobId = await exec.run()
|
||||
exec.executionStart(jobId)
|
||||
|
||||
const firstSkeleton = comfyPage.appMode.outputHistory.skeletons.first()
|
||||
await expect(firstSkeleton).toBeVisible()
|
||||
|
||||
await mockJobsRoute(comfyPage, HISTORY_ROUTE, emptyJobsBody)
|
||||
|
||||
// Prime queueStore.runningTasks with the active job — a WS status
|
||||
// event drives GraphView.onStatus -> queueStore.update().
|
||||
const primer = await mockJobsRoute(
|
||||
comfyPage,
|
||||
QUEUE_ROUTE,
|
||||
JSON.stringify(
|
||||
jobsResponse([
|
||||
{ id: jobId, status: 'in_progress', create_time: Date.now() }
|
||||
])
|
||||
)
|
||||
)
|
||||
exec.status(1)
|
||||
await expect.poll(primer).toBeGreaterThanOrEqual(1)
|
||||
|
||||
// Swap to a failing handler so the reconnect-driven fetch 500s.
|
||||
// The fix should preserve runningTasks from the priming call rather
|
||||
// than overwriting it with empty/error state.
|
||||
await comfyPage.page.unroute(QUEUE_ROUTE)
|
||||
const failed = await mockJobsRoute(comfyPage, QUEUE_ROUTE, '{}', 500)
|
||||
|
||||
const before = failed()
|
||||
await ws.close()
|
||||
await expect.poll(failed).toBeGreaterThan(before)
|
||||
|
||||
await expect(firstSkeleton).toBeVisible()
|
||||
})
|
||||
})
|
||||
|
||||
test.describe('vue node executing class', { tag: '@vue-nodes' }, () => {
|
||||
for (const scenario of scenarios) {
|
||||
test(scenario.name, async ({ comfyPage, getWebSocket }) => {
|
||||
const ws = await getWebSocket()
|
||||
const exec = new ExecutionHelper(comfyPage, ws)
|
||||
|
||||
// The executing outline lives on the outer `[data-node-id]`
|
||||
// container, not the inner wrapper.
|
||||
const ksamplerNode = comfyPage.vueNodes.getNodeLocator(KSAMPLER_NODE)
|
||||
await expect(ksamplerNode).toBeVisible()
|
||||
|
||||
const jobId = await exec.run()
|
||||
exec.executionStart(jobId)
|
||||
exec.progressState(jobId, {
|
||||
[KSAMPLER_NODE]: {
|
||||
value: 0,
|
||||
max: 1,
|
||||
state: 'running',
|
||||
node_id: KSAMPLER_NODE,
|
||||
display_node_id: KSAMPLER_NODE,
|
||||
prompt_id: jobId
|
||||
}
|
||||
})
|
||||
|
||||
await expect(ksamplerNode).toHaveClass(EXECUTING_CLASS)
|
||||
|
||||
await triggerReconnect(comfyPage, ws, scenario, jobId)
|
||||
|
||||
if (scenario.expectsActiveAfter) {
|
||||
await expect(ksamplerNode).toHaveClass(EXECUTING_CLASS)
|
||||
} else {
|
||||
await expect(ksamplerNode).not.toHaveClass(EXECUTING_CLASS)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
88
src/composables/useReconnectQueueRefresh.test.ts
Normal file
88
src/composables/useReconnectQueueRefresh.test.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh'
|
||||
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
import { api } from '@/scripts/api'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
|
||||
function makeJob(id: string, status: JobListItem['status']): JobListItem {
|
||||
return {
|
||||
id,
|
||||
status,
|
||||
create_time: 0,
|
||||
update_time: 0,
|
||||
last_state_update: 0,
|
||||
priority: 0
|
||||
}
|
||||
}
|
||||
|
||||
vi.mock('@/scripts/api', () => ({
|
||||
api: {
|
||||
getQueue: vi.fn(),
|
||||
getHistory: vi.fn(),
|
||||
addEventListener: vi.fn(),
|
||||
removeEventListener: vi.fn(),
|
||||
apiURL: vi.fn((p: string) => `/api${p}`)
|
||||
}
|
||||
}))
|
||||
|
||||
describe('useReconnectQueueRefresh', () => {
|
||||
beforeEach(() => {
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
vi.restoreAllMocks()
|
||||
vi.mocked(api.getQueue).mockResolvedValue({ Running: [], Pending: [] })
|
||||
vi.mocked(api.getHistory).mockResolvedValue([])
|
||||
})
|
||||
|
||||
it('forwards running+pending job ids to clearActiveJobIfStale', async () => {
|
||||
vi.mocked(api.getQueue).mockResolvedValue({
|
||||
Running: [makeJob('run-1', 'in_progress')],
|
||||
Pending: [makeJob('pend-1', 'pending'), makeJob('pend-2', 'pending')]
|
||||
})
|
||||
const executionStore = useExecutionStore()
|
||||
const clearSpy = vi
|
||||
.spyOn(executionStore, 'clearActiveJobIfStale')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
const refresh = useReconnectQueueRefresh()
|
||||
await refresh()
|
||||
|
||||
expect(clearSpy).toHaveBeenCalledTimes(1)
|
||||
expect(clearSpy).toHaveBeenCalledWith(
|
||||
new Set(['run-1', 'pend-1', 'pend-2'])
|
||||
)
|
||||
})
|
||||
|
||||
it('passes an empty set when the queue is genuinely empty', async () => {
|
||||
const executionStore = useExecutionStore()
|
||||
const clearSpy = vi
|
||||
.spyOn(executionStore, 'clearActiveJobIfStale')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
const refresh = useReconnectQueueRefresh()
|
||||
await refresh()
|
||||
|
||||
expect(clearSpy).toHaveBeenCalledWith(new Set())
|
||||
})
|
||||
|
||||
it('reuses the prior queue snapshot when the fetch fails, so a still-running job is not falsely cleared', async () => {
|
||||
vi.mocked(api.getQueue)
|
||||
.mockResolvedValueOnce({
|
||||
Running: [makeJob('run-1', 'in_progress')],
|
||||
Pending: []
|
||||
})
|
||||
.mockRejectedValueOnce(new Error('network down'))
|
||||
const executionStore = useExecutionStore()
|
||||
const clearSpy = vi
|
||||
.spyOn(executionStore, 'clearActiveJobIfStale')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
const refresh = useReconnectQueueRefresh()
|
||||
await refresh() // primes the store with run-1
|
||||
await refresh() // network failure here — store must not go empty
|
||||
|
||||
expect(clearSpy).toHaveBeenLastCalledWith(new Set(['run-1']))
|
||||
})
|
||||
})
|
||||
25
src/composables/useReconnectQueueRefresh.ts
Normal file
25
src/composables/useReconnectQueueRefresh.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { useQueueStore } from '@/stores/queueStore'
|
||||
|
||||
/**
|
||||
* After a WebSocket reconnect, refresh the queue from the server and clear
|
||||
* any active job that finished during the disconnect window. Returns the
|
||||
* handler so the caller can wire it to the `reconnected` api event.
|
||||
*
|
||||
* `update()` preserves the previous queue snapshot when the fetch fails, so
|
||||
* if the network is still flaky we reconcile against the last known good
|
||||
* state rather than an empty (and falsely "stale") set.
|
||||
*/
|
||||
export function useReconnectQueueRefresh() {
|
||||
const queueStore = useQueueStore()
|
||||
const executionStore = useExecutionStore()
|
||||
|
||||
return async function refreshOnReconnect() {
|
||||
await queueStore.update()
|
||||
const activeJobIds = new Set([
|
||||
...queueStore.runningTasks.map((t) => t.jobId),
|
||||
...queueStore.pendingTasks.map((t) => t.jobId)
|
||||
])
|
||||
executionStore.clearActiveJobIfStale(activeJobIds)
|
||||
}
|
||||
}
|
||||
@@ -1005,13 +1005,14 @@ export class ComfyApi extends EventTarget {
|
||||
* Gets the current state of the queue
|
||||
* @returns The currently running and queued items
|
||||
*/
|
||||
async getQueue(): Promise<{
|
||||
async getQueue(options?: { throwOnError?: boolean }): Promise<{
|
||||
Running: JobListItem[]
|
||||
Pending: JobListItem[]
|
||||
}> {
|
||||
try {
|
||||
return await fetchQueue(this.fetchApi.bind(this))
|
||||
} catch (error) {
|
||||
if (options?.throwOnError) throw error
|
||||
console.error('Failed to fetch queue:', error)
|
||||
return { Running: [], Pending: [] }
|
||||
}
|
||||
|
||||
@@ -440,6 +440,57 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - clearActiveJobIfStale', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('clears the active job and progress state when not in the active set', () => {
|
||||
store.activeJobId = 'job-1'
|
||||
store.queuedJobs = { 'job-1': { nodes: { 'node-1': false } } }
|
||||
store.nodeProgressStates = {
|
||||
'node-1': {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
display_node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
|
||||
store.clearActiveJobIfStale(new Set(['job-2']))
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.queuedJobs['job-1']).toBeUndefined()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('preserves the active job when present in the active set', () => {
|
||||
store.activeJobId = 'job-1'
|
||||
store.queuedJobs = { 'job-1': { nodes: {} } }
|
||||
|
||||
store.clearActiveJobIfStale(new Set(['job-1', 'job-2']))
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
expect(store.queuedJobs['job-1']).toBeDefined()
|
||||
})
|
||||
|
||||
it('is a no-op when there is no active job', () => {
|
||||
store.activeJobId = null
|
||||
store.queuedJobs = { other: { nodes: {} } }
|
||||
|
||||
store.clearActiveJobIfStale(new Set())
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.queuedJobs['other']).toBeDefined()
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - progress_text startup guard', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
|
||||
@@ -485,6 +485,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobIds(orphaned)
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the active job if the server's queue snapshot doesn't list it.
|
||||
* Used after WS reconnect to recover from stale state when a job finished
|
||||
* during the disconnect window.
|
||||
*/
|
||||
function clearActiveJobIfStale(activeJobIds: Set<JobId>) {
|
||||
const id = activeJobId.value
|
||||
if (id && !activeJobIds.has(id)) resetExecutionState(id)
|
||||
}
|
||||
|
||||
function isJobInitializing(jobId: JobId | number | undefined): boolean {
|
||||
if (!jobId) return false
|
||||
return initializingJobIds.value.has(String(jobId))
|
||||
@@ -643,6 +653,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
clearInitializationByJobId,
|
||||
clearInitializationByJobIds,
|
||||
reconcileInitializingJobs,
|
||||
clearActiveJobIfStale,
|
||||
bindExecutionEvents,
|
||||
unbindExecutionEvents,
|
||||
storeJob,
|
||||
|
||||
@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
import type { TaskOutput } from '@/schemas/apiSchema'
|
||||
import { api } from '@/scripts/api'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { TaskItemImpl, useQueueStore } from '@/stores/queueStore'
|
||||
|
||||
// Fixture factory for JobListItem
|
||||
@@ -340,11 +341,11 @@ describe('useQueueStore', () => {
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
|
||||
it('should clear loading state even if API fails', async () => {
|
||||
it('should clear loading state even if the queue fetch fails', async () => {
|
||||
mockGetQueue.mockRejectedValue(new Error('API error'))
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
await expect(store.update()).rejects.toThrow('API error')
|
||||
await store.update()
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
})
|
||||
@@ -1018,10 +1019,9 @@ describe('useQueueStore', () => {
|
||||
const firstUpdate = store.update()
|
||||
void store.update() // coalesces, sets dirty
|
||||
|
||||
// First call rejects — but dirty flag triggers re-fetch
|
||||
await expect(firstUpdate).rejects.toThrow('network error')
|
||||
|
||||
// Re-fetch was triggered
|
||||
// First call resolves (allSettled absorbs the failure) but the dirty
|
||||
// flag still triggers a re-fetch when the in-flight request finishes.
|
||||
await firstUpdate
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(2)
|
||||
|
||||
resolveSecond({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||
@@ -1032,4 +1032,86 @@ describe('useQueueStore', () => {
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('update() partial failures', () => {
|
||||
it('reconciles when the queue fetch succeeds, even with an empty snapshot', async () => {
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs')
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).toHaveBeenCalledWith(new Set())
|
||||
})
|
||||
|
||||
it('preserves prior queue state and skips reconcile when the queue fetch fails', async () => {
|
||||
mockGetQueue
|
||||
.mockResolvedValueOnce({
|
||||
Running: [createRunningJob(0, 'run-1')],
|
||||
Pending: []
|
||||
})
|
||||
.mockRejectedValueOnce(new Error('network down'))
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs')
|
||||
|
||||
await store.update()
|
||||
await store.update()
|
||||
|
||||
// First update reconciles with run-1; second update's queue fetch
|
||||
// rejects, so reconcile must not be called again.
|
||||
expect(reconcileSpy).toHaveBeenCalledTimes(1)
|
||||
expect(reconcileSpy).toHaveBeenLastCalledWith(new Set(['run-1']))
|
||||
expect(store.runningTasks).toHaveLength(1)
|
||||
expect(store.runningTasks[0].jobId).toBe('run-1')
|
||||
})
|
||||
|
||||
it('still updates history when only the queue fetch fails', async () => {
|
||||
mockGetQueue.mockRejectedValue(new Error('queue down'))
|
||||
mockGetHistory.mockResolvedValue([createHistoryJob(0, 'hist-1')])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(store.historyTasks).toHaveLength(1)
|
||||
expect(store.historyTasks[0].jobId).toBe('hist-1')
|
||||
})
|
||||
|
||||
it('still updates queue when only the history fetch fails', async () => {
|
||||
mockGetQueue.mockResolvedValue({
|
||||
Running: [createRunningJob(0, 'run-1')],
|
||||
Pending: []
|
||||
})
|
||||
mockGetHistory.mockRejectedValue(new Error('history down'))
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(store.runningTasks).toHaveLength(1)
|
||||
expect(store.runningTasks[0].jobId).toBe('run-1')
|
||||
})
|
||||
|
||||
it('preserves prior state and skips reconcile when both fetches fail', async () => {
|
||||
mockGetQueue
|
||||
.mockResolvedValueOnce({
|
||||
Running: [createRunningJob(0, 'run-1')],
|
||||
Pending: []
|
||||
})
|
||||
.mockRejectedValueOnce(new Error('queue down'))
|
||||
mockGetHistory
|
||||
.mockResolvedValueOnce([createHistoryJob(0, 'hist-1')])
|
||||
.mockRejectedValueOnce(new Error('history down'))
|
||||
const executionStore = useExecutionStore()
|
||||
const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs')
|
||||
|
||||
await store.update()
|
||||
await store.update()
|
||||
|
||||
expect(reconcileSpy).toHaveBeenCalledTimes(1)
|
||||
expect(store.runningTasks).toHaveLength(1)
|
||||
expect(store.runningTasks[0].jobId).toBe('run-1')
|
||||
expect(store.historyTasks).toHaveLength(1)
|
||||
expect(store.historyTasks[0].jobId).toBe('hist-1')
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -525,68 +525,74 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
dirty = false
|
||||
isLoading.value = true
|
||||
try {
|
||||
const [queue, history] = await Promise.all([
|
||||
api.getQueue(),
|
||||
const [queueResult, historyResult] = await Promise.allSettled([
|
||||
api.getQueue({ throwOnError: true }),
|
||||
api.getHistory(maxHistoryItems.value)
|
||||
])
|
||||
|
||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
||||
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
||||
if (queueResult.status === 'fulfilled') {
|
||||
const queue = queueResult.value
|
||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
||||
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
||||
|
||||
const currentHistory = toValue(historyTasks)
|
||||
const appearedTasks = [...pendingTasks.value, ...runningTasks.value]
|
||||
const executionStore = useExecutionStore()
|
||||
appearedTasks.forEach((task) => {
|
||||
const jobIdString = String(task.jobId)
|
||||
const workflowId = task.workflowId
|
||||
if (workflowId && jobIdString) {
|
||||
executionStore.registerJobWorkflowIdMapping(jobIdString, workflowId)
|
||||
}
|
||||
})
|
||||
|
||||
const appearedTasks = [...pendingTasks.value, ...runningTasks.value]
|
||||
const executionStore = useExecutionStore()
|
||||
appearedTasks.forEach((task) => {
|
||||
const jobIdString = String(task.jobId)
|
||||
const workflowId = task.workflowId
|
||||
if (workflowId && jobIdString) {
|
||||
executionStore.registerJobWorkflowIdMapping(jobIdString, workflowId)
|
||||
}
|
||||
})
|
||||
|
||||
// Only reconcile when the queue fetch returned data. api.getQueue()
|
||||
// returns empty Running/Pending on transient errors, which would
|
||||
// incorrectly clear all initializing prompts.
|
||||
const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0
|
||||
if (queueHasData) {
|
||||
const activeJobIds = new Set([
|
||||
...queue.Running.map((j) => j.id),
|
||||
...queue.Pending.map((j) => j.id)
|
||||
])
|
||||
executionStore.reconcileInitializingJobs(activeJobIds)
|
||||
} else {
|
||||
console.error('Failed to fetch queue:', queueResult.reason)
|
||||
}
|
||||
|
||||
// Sort by create_time descending and limit to maxItems
|
||||
const sortedHistory = [...history]
|
||||
.sort((a, b) => b.create_time - a.create_time)
|
||||
.slice(0, toValue(maxHistoryItems))
|
||||
if (historyResult.status === 'fulfilled') {
|
||||
const history = historyResult.value
|
||||
const currentHistory = toValue(historyTasks)
|
||||
|
||||
// Reuse existing TaskItemImpl instances or create new
|
||||
// Must recreate if outputs_count changed (e.g., API started returning it)
|
||||
const existingByJobId = new Map(
|
||||
currentHistory.map((impl) => [impl.jobId, impl])
|
||||
)
|
||||
// Sort by create_time descending and limit to maxItems
|
||||
const sortedHistory = [...history]
|
||||
.sort((a, b) => b.create_time - a.create_time)
|
||||
.slice(0, toValue(maxHistoryItems))
|
||||
|
||||
const nextHistoryTasks = sortedHistory.map((job) => {
|
||||
const existing = existingByJobId.get(job.id)
|
||||
if (!existing) return new TaskItemImpl(job)
|
||||
// Recreate if outputs_count changed to ensure lazy loading works
|
||||
if (existing.outputsCount !== (job.outputs_count ?? undefined)) {
|
||||
return new TaskItemImpl(job)
|
||||
// Reuse existing TaskItemImpl instances or create new
|
||||
// Must recreate if outputs_count changed (e.g., API started returning it)
|
||||
const existingByJobId = new Map(
|
||||
currentHistory.map((impl) => [impl.jobId, impl])
|
||||
)
|
||||
|
||||
const nextHistoryTasks = sortedHistory.map((job) => {
|
||||
const existing = existingByJobId.get(job.id)
|
||||
if (!existing) return new TaskItemImpl(job)
|
||||
// Recreate if outputs_count changed to ensure lazy loading works
|
||||
if (existing.outputsCount !== (job.outputs_count ?? undefined)) {
|
||||
return new TaskItemImpl(job)
|
||||
}
|
||||
return existing
|
||||
})
|
||||
|
||||
const isHistoryUnchanged =
|
||||
nextHistoryTasks.length === currentHistory.length &&
|
||||
nextHistoryTasks.every(
|
||||
(task, index) => task === currentHistory[index]
|
||||
)
|
||||
|
||||
if (!isHistoryUnchanged) {
|
||||
historyTasks.value = nextHistoryTasks
|
||||
}
|
||||
return existing
|
||||
})
|
||||
|
||||
const isHistoryUnchanged =
|
||||
nextHistoryTasks.length === currentHistory.length &&
|
||||
nextHistoryTasks.every((task, index) => task === currentHistory[index])
|
||||
|
||||
if (!isHistoryUnchanged) {
|
||||
historyTasks.value = nextHistoryTasks
|
||||
hasFetchedHistorySnapshot.value = true
|
||||
} else {
|
||||
console.error('Failed to fetch history:', historyResult.reason)
|
||||
}
|
||||
hasFetchedHistorySnapshot.value = true
|
||||
} finally {
|
||||
isLoading.value = false
|
||||
inFlight = false
|
||||
|
||||
214
src/views/GraphView.test.ts
Normal file
214
src/views/GraphView.test.ts
Normal file
@@ -0,0 +1,214 @@
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
import { render } from '@testing-library/vue'
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { ref } from 'vue'
|
||||
|
||||
import type * as VueUseCore from '@vueuse/core'
|
||||
import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh'
|
||||
import { useReconnectingNotification } from '@/composables/useReconnectingNotification'
|
||||
import type * as DistTypes from '@/platform/distribution/types'
|
||||
import type * as I18nModule from '@/i18n'
|
||||
|
||||
const apiMock = vi.hoisted(() => new EventTarget())
|
||||
|
||||
vi.mock('@/scripts/api', () => ({ api: apiMock }))
|
||||
|
||||
vi.mock('@/scripts/app', () => ({
|
||||
app: {
|
||||
rootGraph: { getNodeById: vi.fn(), nodes: [] },
|
||||
ui: {
|
||||
menuContainer: { style: { setProperty: vi.fn() } },
|
||||
restoreMenuPosition: vi.fn()
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/composables/useReconnectQueueRefresh', () => {
|
||||
const refreshOnReconnect = vi.fn(async () => {})
|
||||
return { useReconnectQueueRefresh: () => refreshOnReconnect }
|
||||
})
|
||||
|
||||
vi.mock('@/composables/useReconnectingNotification', () => {
|
||||
const onReconnected = vi.fn()
|
||||
const onReconnecting = vi.fn()
|
||||
return {
|
||||
useReconnectingNotification: () => ({ onReconnected, onReconnecting })
|
||||
}
|
||||
})
|
||||
|
||||
vi.mock('@vueuse/core', async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof VueUseCore>()
|
||||
return { ...actual, useIntervalFn: vi.fn(() => ({ pause: vi.fn() })) }
|
||||
})
|
||||
|
||||
vi.mock('@/base/common/async', () => ({ runWhenGlobalIdle: vi.fn() }))
|
||||
vi.mock('@/composables/useBrowserTabTitle', () => ({
|
||||
useBrowserTabTitle: vi.fn()
|
||||
}))
|
||||
vi.mock('@/composables/useCoreCommands', () => ({ useCoreCommands: () => [] }))
|
||||
vi.mock('@/platform/remote/comfyui/useQueuePolling', () => ({
|
||||
useQueuePolling: vi.fn()
|
||||
}))
|
||||
vi.mock('@/composables/useErrorHandling', () => ({
|
||||
useErrorHandling: () => ({
|
||||
wrapWithErrorHandling: (f: unknown) => f,
|
||||
wrapWithErrorHandlingAsync: (f: unknown) => f
|
||||
})
|
||||
}))
|
||||
vi.mock('@/composables/useProgressFavicon', () => ({
|
||||
useProgressFavicon: vi.fn()
|
||||
}))
|
||||
vi.mock('@/i18n', async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof I18nModule>()
|
||||
return { ...actual, loadLocale: vi.fn().mockResolvedValue(undefined) }
|
||||
})
|
||||
vi.mock('@/platform/distribution/types', async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof DistTypes>()
|
||||
return { ...actual, isCloud: false, isDesktop: false }
|
||||
})
|
||||
vi.mock('@/platform/settings/settingStore', () => ({
|
||||
useSettingStore: () => ({ get: vi.fn(() => undefined), set: vi.fn() })
|
||||
}))
|
||||
vi.mock('@/platform/telemetry', () => ({ useTelemetry: () => undefined }))
|
||||
vi.mock('@/platform/updates/common/useFrontendVersionMismatchWarning', () => ({
|
||||
useFrontendVersionMismatchWarning: vi.fn()
|
||||
}))
|
||||
vi.mock('@/platform/updates/common/versionCompatibilityStore', () => ({
|
||||
useVersionCompatibilityStore: () => ({
|
||||
initialize: vi.fn().mockResolvedValue(undefined)
|
||||
})
|
||||
}))
|
||||
vi.mock('@/renderer/core/canvas/canvasStore', async () => {
|
||||
const { defineStore } = await import('pinia')
|
||||
return {
|
||||
useCanvasStore: defineStore('canvas-test-stub', () => ({
|
||||
linearMode: ref(false)
|
||||
}))
|
||||
}
|
||||
})
|
||||
vi.mock('@/services/autoQueueService', () => ({
|
||||
setupAutoQueueHandler: vi.fn()
|
||||
}))
|
||||
vi.mock('@/platform/keybindings/keybindingService', () => ({
|
||||
useKeybindingService: () => ({
|
||||
registerCoreKeybindings: vi.fn(),
|
||||
keybindHandler: vi.fn()
|
||||
})
|
||||
}))
|
||||
vi.mock('@/composables/useAppMode', () => ({
|
||||
useAppMode: () => ({ isBuilderMode: ref(false) })
|
||||
}))
|
||||
vi.mock('@/stores/assetsStore', () => ({
|
||||
useAssetsStore: () => ({ updateHistory: vi.fn() })
|
||||
}))
|
||||
vi.mock('@/stores/commandStore', () => ({
|
||||
useCommandStore: () => ({ registerCommands: vi.fn() })
|
||||
}))
|
||||
vi.mock('@/stores/executionStore', () => ({
|
||||
useExecutionStore: () => ({
|
||||
bindExecutionEvents: vi.fn(),
|
||||
unbindExecutionEvents: vi.fn(),
|
||||
activeJobId: null,
|
||||
clearActiveJobIfStale: vi.fn()
|
||||
})
|
||||
}))
|
||||
vi.mock('@/stores/authStore', () => ({
|
||||
useAuthStore: () => ({ isAuthenticated: false })
|
||||
}))
|
||||
vi.mock('@/stores/menuItemStore', () => ({
|
||||
useMenuItemStore: () => ({ registerCoreMenuCommands: vi.fn() })
|
||||
}))
|
||||
vi.mock('@/stores/modelStore', () => ({ useModelStore: () => ({}) }))
|
||||
vi.mock('@/stores/nodeDefStore', () => ({
|
||||
useNodeDefStore: () => ({}),
|
||||
useNodeFrequencyStore: () => ({})
|
||||
}))
|
||||
vi.mock('@/stores/queueStore', () => ({
|
||||
useQueueStore: () => ({
|
||||
update: vi.fn(),
|
||||
runningTasks: [],
|
||||
pendingTasks: [],
|
||||
tasks: [],
|
||||
maxHistoryItems: 64
|
||||
}),
|
||||
useQueuePendingTaskCountStore: () => ({ update: vi.fn() })
|
||||
}))
|
||||
vi.mock('@/stores/serverConfigStore', () => ({
|
||||
useServerConfigStore: () => ({})
|
||||
}))
|
||||
vi.mock('@/stores/workspace/bottomPanelStore', () => ({
|
||||
useBottomPanelStore: () => ({
|
||||
registerCoreBottomPanelTabs: vi.fn().mockResolvedValue(undefined)
|
||||
})
|
||||
}))
|
||||
vi.mock('@/stores/workspace/colorPaletteStore', () => ({
|
||||
useColorPaletteStore: () => ({
|
||||
completedActivePalette: { light_theme: true, colors: { comfy_base: {} } }
|
||||
})
|
||||
}))
|
||||
vi.mock('@/stores/workspace/sidebarTabStore', () => ({
|
||||
useSidebarTabStore: () => ({
|
||||
registerCoreSidebarTabs: vi.fn(),
|
||||
activeSidebarTabId: null
|
||||
})
|
||||
}))
|
||||
vi.mock('@/utils/envUtil', () => ({
|
||||
electronAPI: () => ({
|
||||
changeTheme: vi.fn(),
|
||||
Events: { incrementUserProperty: vi.fn(), trackEvent: vi.fn() }
|
||||
})
|
||||
}))
|
||||
|
||||
// Module-mock heavy child components so we don't pay their import cost.
|
||||
const stubModule = { default: { template: '<div />' } }
|
||||
vi.mock('@/components/graph/GraphCanvas.vue', () => stubModule)
|
||||
vi.mock('@/views/LinearView.vue', () => stubModule)
|
||||
vi.mock('@/components/builder/BuilderToolbar.vue', () => stubModule)
|
||||
vi.mock('@/components/builder/BuilderMenu.vue', () => stubModule)
|
||||
vi.mock('@/components/builder/BuilderFooterToolbar.vue', () => stubModule)
|
||||
vi.mock(
|
||||
'@/workbench/extensions/manager/components/ManagerProgressToast.vue',
|
||||
() => stubModule
|
||||
)
|
||||
vi.mock(
|
||||
'@/platform/cloud/notification/components/DesktopCloudNotificationController.vue',
|
||||
() => stubModule
|
||||
)
|
||||
vi.mock(
|
||||
'@/platform/assets/components/ModelImportProgressDialog.vue',
|
||||
() => stubModule
|
||||
)
|
||||
vi.mock(
|
||||
'@/platform/assets/components/AssetExportProgressDialog.vue',
|
||||
() => stubModule
|
||||
)
|
||||
vi.mock(
|
||||
'@/platform/workspace/components/toasts/InviteAcceptedToast.vue',
|
||||
() => stubModule
|
||||
)
|
||||
vi.mock('@/components/toast/GlobalToast.vue', () => stubModule)
|
||||
vi.mock('@/components/toast/RerouteMigrationToast.vue', () => stubModule)
|
||||
vi.mock('@/components/MenuHamburger.vue', () => stubModule)
|
||||
vi.mock('@/components/dialog/UnloadWindowConfirmDialog.vue', () => stubModule)
|
||||
|
||||
describe('GraphView - reconnect wiring', () => {
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
})
|
||||
|
||||
it('wires the reconnected event to the toast and queue refresh', async () => {
|
||||
const GraphView = (await import('./GraphView.vue')).default
|
||||
render(GraphView)
|
||||
|
||||
apiMock.dispatchEvent(new Event('reconnected'))
|
||||
|
||||
const { onReconnected } = useReconnectingNotification()
|
||||
const refreshOnReconnect = useReconnectQueueRefresh()
|
||||
await vi.waitFor(() => {
|
||||
expect(onReconnected).toHaveBeenCalledTimes(1)
|
||||
expect(refreshOnReconnect).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -56,6 +56,7 @@ import { useBrowserTabTitle } from '@/composables/useBrowserTabTitle'
|
||||
import { useCoreCommands } from '@/composables/useCoreCommands'
|
||||
import { useQueuePolling } from '@/platform/remote/comfyui/useQueuePolling'
|
||||
import { useErrorHandling } from '@/composables/useErrorHandling'
|
||||
import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh'
|
||||
import { useReconnectingNotification } from '@/composables/useReconnectingNotification'
|
||||
import { useProgressFavicon } from '@/composables/useProgressFavicon'
|
||||
import { SERVER_CONFIG_ITEMS } from '@/constants/serverConfig'
|
||||
@@ -248,11 +249,17 @@ const onExecutionSuccess = async () => {
|
||||
}
|
||||
|
||||
const { onReconnecting, onReconnected } = useReconnectingNotification()
|
||||
const refreshOnReconnect = useReconnectQueueRefresh()
|
||||
|
||||
const handleReconnected = async () => {
|
||||
onReconnected()
|
||||
await refreshOnReconnect()
|
||||
}
|
||||
|
||||
useEventListener(api, 'status', onStatus)
|
||||
useEventListener(api, 'execution_success', onExecutionSuccess)
|
||||
useEventListener(api, 'reconnecting', onReconnecting)
|
||||
useEventListener(api, 'reconnected', onReconnected)
|
||||
useEventListener(api, 'reconnected', handleReconnected)
|
||||
|
||||
onMounted(() => {
|
||||
executionStore.bindExecutionEvents()
|
||||
|
||||
Reference in New Issue
Block a user