From 15b8771cc2957d72f6ee7ba216d2eae8bc99afb7 Mon Sep 17 00:00:00 2001 From: pythongosssss <125205205+pythongosssss@users.noreply.github.com> Date: Mon, 11 May 2026 10:28:23 +0100 Subject: [PATCH] fix: clear active job on reconnect if no longer in queue (#12067) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary When a socket disconnects messages can be missed and lead to a stale UI state, this updates the state on reconnect and clears the active job if it is no longer running ## Changes - **What**: - add call to update queue on reconnect - clear active job if job not in queue response - tests ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-12067-fix-clear-active-job-on-reconnect-if-no-longer-in-queue-3596d73d365081f79d42d73966420c50) by [Unito](https://www.unito.io) --- .../fixtures/helpers/ExecutionHelper.ts | 16 +- .../tests/wsReconnectStaleJob.spec.ts | 211 +++++++++++++++++ .../useReconnectQueueRefresh.test.ts | 88 +++++++ src/composables/useReconnectQueueRefresh.ts | 25 ++ src/scripts/api.ts | 3 +- src/stores/executionStore.test.ts | 51 +++++ src/stores/executionStore.ts | 11 + src/stores/queueStore.test.ts | 94 +++++++- src/stores/queueStore.ts | 98 ++++---- src/views/GraphView.test.ts | 214 ++++++++++++++++++ src/views/GraphView.vue | 9 +- 11 files changed, 765 insertions(+), 55 deletions(-) create mode 100644 browser_tests/tests/wsReconnectStaleJob.spec.ts create mode 100644 src/composables/useReconnectQueueRefresh.test.ts create mode 100644 src/composables/useReconnectQueueRefresh.ts create mode 100644 src/views/GraphView.test.ts diff --git a/browser_tests/fixtures/helpers/ExecutionHelper.ts b/browser_tests/fixtures/helpers/ExecutionHelper.ts index 3483a13667..e13f8c0db0 100644 --- a/browser_tests/fixtures/helpers/ExecutionHelper.ts +++ b/browser_tests/fixtures/helpers/ExecutionHelper.ts @@ -1,6 +1,10 @@ import type { WebSocketRoute } from '@playwright/test' -import type { NodeError, PromptResponse } from '@/schemas/apiSchema' +import type { + NodeError, + NodeProgressState, + PromptResponse +} from '@/schemas/apiSchema' import type { RawJobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import type { ComfyPage } from '@e2e/fixtures/ComfyPage' import { createMockJob } from '@e2e/fixtures/helpers/AssetsHelper' @@ -230,6 +234,16 @@ export class ExecutionHelper { ) } + /** Send `progress_state` WS event with per-node execution state. */ + progressState(jobId: string, nodes: Record): void { + this.requireWs().send( + JSON.stringify({ + type: 'progress_state', + data: { prompt_id: jobId, nodes } + }) + ) + } + /** * Complete a job by adding it to mock history, sending execution_success, * and triggering a history refresh via a status event. diff --git a/browser_tests/tests/wsReconnectStaleJob.spec.ts b/browser_tests/tests/wsReconnectStaleJob.spec.ts new file mode 100644 index 0000000000..d67d404c7d --- /dev/null +++ b/browser_tests/tests/wsReconnectStaleJob.spec.ts @@ -0,0 +1,211 @@ +import type { WebSocketRoute } from '@playwright/test' +import { mergeTests } from '@playwright/test' +import type { z } from 'zod' + +import { + comfyExpect as expect, + comfyPageFixture +} from '@e2e/fixtures/ComfyPage' +import type { ComfyPage } from '@e2e/fixtures/ComfyPage' +import { ExecutionHelper } from '@e2e/fixtures/helpers/ExecutionHelper' +import { webSocketFixture } from '@e2e/fixtures/ws' +import type { + RawJobListItem, + zJobsListResponse +} from '@/platform/remote/comfyui/jobs/jobTypes' + +type JobsListResponse = z.infer + +const test = mergeTests(comfyPageFixture, webSocketFixture) + +const KSAMPLER_NODE = '3' +const EXECUTING_CLASS = /outline-node-stroke-executing/ + +const QUEUE_ROUTE = /\/api\/jobs\?[^/]*status=in_progress,pending/ +const HISTORY_ROUTE = /\/api\/jobs\?[^/]*status=completed/ + +function jobsResponse(jobs: RawJobListItem[]): JobsListResponse { + return { + jobs, + pagination: { offset: 0, limit: 200, total: jobs.length, has_more: false } + } +} + +async function mockJobsRoute( + comfyPage: ComfyPage, + pattern: RegExp, + body: string, + status: number = 200 +): Promise<() => number> { + let count = 0 + await comfyPage.page.route(pattern, async (route) => { + count += 1 + await route.fulfill({ + status, + contentType: 'application/json', + body + }) + }) + return () => count +} + +const emptyJobsBody = JSON.stringify(jobsResponse([])) + +type Scenario = { + name: string + /** Built per-test so it can incorporate the runtime-assigned jobId. */ + queueBody: (jobId: string) => string + /** Whether the active job state should still be reflected after reconnect. */ + expectsActiveAfter: boolean +} + +const scenarios: Scenario[] = [ + { + name: 'clears stale active job when queue is empty after reconnect', + queueBody: () => emptyJobsBody, + expectsActiveAfter: false + }, + { + name: 'preserves active job when the job is still in the queue', + queueBody: (jobId) => + JSON.stringify( + jobsResponse([ + { id: jobId, status: 'in_progress', create_time: Date.now() } + ]) + ), + expectsActiveAfter: true + } +] + +/** + * Stub the queue/history endpoints per `scenario`, close the WS, and wait + * for the auto-reconnect to issue a fresh queue fetch. + */ +async function triggerReconnect( + comfyPage: ComfyPage, + ws: WebSocketRoute, + scenario: Scenario, + jobId: string +): Promise { + await mockJobsRoute(comfyPage, HISTORY_ROUTE, emptyJobsBody) + const queueFetches = await mockJobsRoute( + comfyPage, + QUEUE_ROUTE, + scenario.queueBody(jobId) + ) + const fetchesBeforeClose = queueFetches() + await ws.close() + await expect.poll(queueFetches).toBeGreaterThan(fetchesBeforeClose) +} + +test.describe('WebSocket reconnect with stale job', { tag: '@ui' }, () => { + test.describe('app mode skeleton', () => { + test.beforeEach(async ({ comfyPage }) => { + await comfyPage.appMode.enterAppModeWithInputs([[KSAMPLER_NODE, 'seed']]) + await expect(comfyPage.appMode.linearWidgets).toBeVisible() + }) + + for (const scenario of scenarios) { + test(scenario.name, async ({ comfyPage, getWebSocket }) => { + const ws = await getWebSocket() + const exec = new ExecutionHelper(comfyPage, ws) + + const jobId = await exec.run() + exec.executionStart(jobId) + + // Skeleton visibility is the deterministic sync point: it appears + // once both `storeJob` (HTTP) and `executionStart` (WS) have been + // processed, regardless of arrival order. + const firstSkeleton = comfyPage.appMode.outputHistory.skeletons.first() + await expect(firstSkeleton).toBeVisible() + + await triggerReconnect(comfyPage, ws, scenario, jobId) + + if (scenario.expectsActiveAfter) { + await expect(firstSkeleton).toBeVisible() + } else { + await expect(comfyPage.appMode.outputHistory.skeletons).toHaveCount(0) + } + }) + } + + test('preserves active job when the queue endpoint fails on reconnect', async ({ + comfyPage, + getWebSocket + }) => { + const ws = await getWebSocket() + const exec = new ExecutionHelper(comfyPage, ws) + + const jobId = await exec.run() + exec.executionStart(jobId) + + const firstSkeleton = comfyPage.appMode.outputHistory.skeletons.first() + await expect(firstSkeleton).toBeVisible() + + await mockJobsRoute(comfyPage, HISTORY_ROUTE, emptyJobsBody) + + // Prime queueStore.runningTasks with the active job — a WS status + // event drives GraphView.onStatus -> queueStore.update(). + const primer = await mockJobsRoute( + comfyPage, + QUEUE_ROUTE, + JSON.stringify( + jobsResponse([ + { id: jobId, status: 'in_progress', create_time: Date.now() } + ]) + ) + ) + exec.status(1) + await expect.poll(primer).toBeGreaterThanOrEqual(1) + + // Swap to a failing handler so the reconnect-driven fetch 500s. + // The fix should preserve runningTasks from the priming call rather + // than overwriting it with empty/error state. + await comfyPage.page.unroute(QUEUE_ROUTE) + const failed = await mockJobsRoute(comfyPage, QUEUE_ROUTE, '{}', 500) + + const before = failed() + await ws.close() + await expect.poll(failed).toBeGreaterThan(before) + + await expect(firstSkeleton).toBeVisible() + }) + }) + + test.describe('vue node executing class', { tag: '@vue-nodes' }, () => { + for (const scenario of scenarios) { + test(scenario.name, async ({ comfyPage, getWebSocket }) => { + const ws = await getWebSocket() + const exec = new ExecutionHelper(comfyPage, ws) + + // The executing outline lives on the outer `[data-node-id]` + // container, not the inner wrapper. + const ksamplerNode = comfyPage.vueNodes.getNodeLocator(KSAMPLER_NODE) + await expect(ksamplerNode).toBeVisible() + + const jobId = await exec.run() + exec.executionStart(jobId) + exec.progressState(jobId, { + [KSAMPLER_NODE]: { + value: 0, + max: 1, + state: 'running', + node_id: KSAMPLER_NODE, + display_node_id: KSAMPLER_NODE, + prompt_id: jobId + } + }) + + await expect(ksamplerNode).toHaveClass(EXECUTING_CLASS) + + await triggerReconnect(comfyPage, ws, scenario, jobId) + + if (scenario.expectsActiveAfter) { + await expect(ksamplerNode).toHaveClass(EXECUTING_CLASS) + } else { + await expect(ksamplerNode).not.toHaveClass(EXECUTING_CLASS) + } + }) + } + }) +}) diff --git a/src/composables/useReconnectQueueRefresh.test.ts b/src/composables/useReconnectQueueRefresh.test.ts new file mode 100644 index 0000000000..dc21ff7b47 --- /dev/null +++ b/src/composables/useReconnectQueueRefresh.test.ts @@ -0,0 +1,88 @@ +import { createTestingPinia } from '@pinia/testing' +import { setActivePinia } from 'pinia' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh' +import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' +import { api } from '@/scripts/api' +import { useExecutionStore } from '@/stores/executionStore' + +function makeJob(id: string, status: JobListItem['status']): JobListItem { + return { + id, + status, + create_time: 0, + update_time: 0, + last_state_update: 0, + priority: 0 + } +} + +vi.mock('@/scripts/api', () => ({ + api: { + getQueue: vi.fn(), + getHistory: vi.fn(), + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + apiURL: vi.fn((p: string) => `/api${p}`) + } +})) + +describe('useReconnectQueueRefresh', () => { + beforeEach(() => { + setActivePinia(createTestingPinia({ stubActions: false })) + vi.restoreAllMocks() + vi.mocked(api.getQueue).mockResolvedValue({ Running: [], Pending: [] }) + vi.mocked(api.getHistory).mockResolvedValue([]) + }) + + it('forwards running+pending job ids to clearActiveJobIfStale', async () => { + vi.mocked(api.getQueue).mockResolvedValue({ + Running: [makeJob('run-1', 'in_progress')], + Pending: [makeJob('pend-1', 'pending'), makeJob('pend-2', 'pending')] + }) + const executionStore = useExecutionStore() + const clearSpy = vi + .spyOn(executionStore, 'clearActiveJobIfStale') + .mockImplementation(() => {}) + + const refresh = useReconnectQueueRefresh() + await refresh() + + expect(clearSpy).toHaveBeenCalledTimes(1) + expect(clearSpy).toHaveBeenCalledWith( + new Set(['run-1', 'pend-1', 'pend-2']) + ) + }) + + it('passes an empty set when the queue is genuinely empty', async () => { + const executionStore = useExecutionStore() + const clearSpy = vi + .spyOn(executionStore, 'clearActiveJobIfStale') + .mockImplementation(() => {}) + + const refresh = useReconnectQueueRefresh() + await refresh() + + expect(clearSpy).toHaveBeenCalledWith(new Set()) + }) + + it('reuses the prior queue snapshot when the fetch fails, so a still-running job is not falsely cleared', async () => { + vi.mocked(api.getQueue) + .mockResolvedValueOnce({ + Running: [makeJob('run-1', 'in_progress')], + Pending: [] + }) + .mockRejectedValueOnce(new Error('network down')) + const executionStore = useExecutionStore() + const clearSpy = vi + .spyOn(executionStore, 'clearActiveJobIfStale') + .mockImplementation(() => {}) + + const refresh = useReconnectQueueRefresh() + await refresh() // primes the store with run-1 + await refresh() // network failure here — store must not go empty + + expect(clearSpy).toHaveBeenLastCalledWith(new Set(['run-1'])) + }) +}) diff --git a/src/composables/useReconnectQueueRefresh.ts b/src/composables/useReconnectQueueRefresh.ts new file mode 100644 index 0000000000..267ade9850 --- /dev/null +++ b/src/composables/useReconnectQueueRefresh.ts @@ -0,0 +1,25 @@ +import { useExecutionStore } from '@/stores/executionStore' +import { useQueueStore } from '@/stores/queueStore' + +/** + * After a WebSocket reconnect, refresh the queue from the server and clear + * any active job that finished during the disconnect window. Returns the + * handler so the caller can wire it to the `reconnected` api event. + * + * `update()` preserves the previous queue snapshot when the fetch fails, so + * if the network is still flaky we reconcile against the last known good + * state rather than an empty (and falsely "stale") set. + */ +export function useReconnectQueueRefresh() { + const queueStore = useQueueStore() + const executionStore = useExecutionStore() + + return async function refreshOnReconnect() { + await queueStore.update() + const activeJobIds = new Set([ + ...queueStore.runningTasks.map((t) => t.jobId), + ...queueStore.pendingTasks.map((t) => t.jobId) + ]) + executionStore.clearActiveJobIfStale(activeJobIds) + } +} diff --git a/src/scripts/api.ts b/src/scripts/api.ts index 23f074b99c..1acc69717d 100644 --- a/src/scripts/api.ts +++ b/src/scripts/api.ts @@ -1005,13 +1005,14 @@ export class ComfyApi extends EventTarget { * Gets the current state of the queue * @returns The currently running and queued items */ - async getQueue(): Promise<{ + async getQueue(options?: { throwOnError?: boolean }): Promise<{ Running: JobListItem[] Pending: JobListItem[] }> { try { return await fetchQueue(this.fetchApi.bind(this)) } catch (error) { + if (options?.throwOnError) throw error console.error('Failed to fetch queue:', error) return { Running: [], Pending: [] } } diff --git a/src/stores/executionStore.test.ts b/src/stores/executionStore.test.ts index dc143d844f..392883f7a7 100644 --- a/src/stores/executionStore.test.ts +++ b/src/stores/executionStore.test.ts @@ -440,6 +440,57 @@ describe('useExecutionStore - reconcileInitializingJobs', () => { }) }) +describe('useExecutionStore - clearActiveJobIfStale', () => { + let store: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + setActivePinia(createTestingPinia({ stubActions: false })) + store = useExecutionStore() + }) + + it('clears the active job and progress state when not in the active set', () => { + store.activeJobId = 'job-1' + store.queuedJobs = { 'job-1': { nodes: { 'node-1': false } } } + store.nodeProgressStates = { + 'node-1': { + value: 5, + max: 10, + state: 'running', + node_id: 'node-1', + display_node_id: 'node-1', + prompt_id: 'job-1' + } + } + + store.clearActiveJobIfStale(new Set(['job-2'])) + + expect(store.activeJobId).toBeNull() + expect(store.queuedJobs['job-1']).toBeUndefined() + expect(store.nodeProgressStates).toEqual({}) + }) + + it('preserves the active job when present in the active set', () => { + store.activeJobId = 'job-1' + store.queuedJobs = { 'job-1': { nodes: {} } } + + store.clearActiveJobIfStale(new Set(['job-1', 'job-2'])) + + expect(store.activeJobId).toBe('job-1') + expect(store.queuedJobs['job-1']).toBeDefined() + }) + + it('is a no-op when there is no active job', () => { + store.activeJobId = null + store.queuedJobs = { other: { nodes: {} } } + + store.clearActiveJobIfStale(new Set()) + + expect(store.activeJobId).toBeNull() + expect(store.queuedJobs['other']).toBeDefined() + }) +}) + describe('useExecutionStore - progress_text startup guard', () => { let store: ReturnType diff --git a/src/stores/executionStore.ts b/src/stores/executionStore.ts index 1df4702858..d438dfd220 100644 --- a/src/stores/executionStore.ts +++ b/src/stores/executionStore.ts @@ -485,6 +485,16 @@ export const useExecutionStore = defineStore('execution', () => { clearInitializationByJobIds(orphaned) } + /** + * Clears the active job if the server's queue snapshot doesn't list it. + * Used after WS reconnect to recover from stale state when a job finished + * during the disconnect window. + */ + function clearActiveJobIfStale(activeJobIds: Set) { + const id = activeJobId.value + if (id && !activeJobIds.has(id)) resetExecutionState(id) + } + function isJobInitializing(jobId: JobId | number | undefined): boolean { if (!jobId) return false return initializingJobIds.value.has(String(jobId)) @@ -643,6 +653,7 @@ export const useExecutionStore = defineStore('execution', () => { clearInitializationByJobId, clearInitializationByJobIds, reconcileInitializingJobs, + clearActiveJobIfStale, bindExecutionEvents, unbindExecutionEvents, storeJob, diff --git a/src/stores/queueStore.test.ts b/src/stores/queueStore.test.ts index 9b5a62d2d6..2f5a72611a 100644 --- a/src/stores/queueStore.test.ts +++ b/src/stores/queueStore.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import type { TaskOutput } from '@/schemas/apiSchema' import { api } from '@/scripts/api' +import { useExecutionStore } from '@/stores/executionStore' import { TaskItemImpl, useQueueStore } from '@/stores/queueStore' // Fixture factory for JobListItem @@ -340,11 +341,11 @@ describe('useQueueStore', () => { expect(store.isLoading).toBe(false) }) - it('should clear loading state even if API fails', async () => { + it('should clear loading state even if the queue fetch fails', async () => { mockGetQueue.mockRejectedValue(new Error('API error')) mockGetHistory.mockResolvedValue([]) - await expect(store.update()).rejects.toThrow('API error') + await store.update() expect(store.isLoading).toBe(false) }) }) @@ -1018,10 +1019,9 @@ describe('useQueueStore', () => { const firstUpdate = store.update() void store.update() // coalesces, sets dirty - // First call rejects — but dirty flag triggers re-fetch - await expect(firstUpdate).rejects.toThrow('network error') - - // Re-fetch was triggered + // First call resolves (allSettled absorbs the failure) but the dirty + // flag still triggers a re-fetch when the in-flight request finishes. + await firstUpdate expect(mockGetQueue).toHaveBeenCalledTimes(2) resolveSecond({ Running: [], Pending: [createPendingJob(2, 'new-job')] }) @@ -1032,4 +1032,86 @@ describe('useQueueStore', () => { expect(store.isLoading).toBe(false) }) }) + + describe('update() partial failures', () => { + it('reconciles when the queue fetch succeeds, even with an empty snapshot', async () => { + mockGetQueue.mockResolvedValue({ Running: [], Pending: [] }) + mockGetHistory.mockResolvedValue([]) + const executionStore = useExecutionStore() + const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs') + + await store.update() + + expect(reconcileSpy).toHaveBeenCalledWith(new Set()) + }) + + it('preserves prior queue state and skips reconcile when the queue fetch fails', async () => { + mockGetQueue + .mockResolvedValueOnce({ + Running: [createRunningJob(0, 'run-1')], + Pending: [] + }) + .mockRejectedValueOnce(new Error('network down')) + mockGetHistory.mockResolvedValue([]) + const executionStore = useExecutionStore() + const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs') + + await store.update() + await store.update() + + // First update reconciles with run-1; second update's queue fetch + // rejects, so reconcile must not be called again. + expect(reconcileSpy).toHaveBeenCalledTimes(1) + expect(reconcileSpy).toHaveBeenLastCalledWith(new Set(['run-1'])) + expect(store.runningTasks).toHaveLength(1) + expect(store.runningTasks[0].jobId).toBe('run-1') + }) + + it('still updates history when only the queue fetch fails', async () => { + mockGetQueue.mockRejectedValue(new Error('queue down')) + mockGetHistory.mockResolvedValue([createHistoryJob(0, 'hist-1')]) + + await store.update() + + expect(store.historyTasks).toHaveLength(1) + expect(store.historyTasks[0].jobId).toBe('hist-1') + }) + + it('still updates queue when only the history fetch fails', async () => { + mockGetQueue.mockResolvedValue({ + Running: [createRunningJob(0, 'run-1')], + Pending: [] + }) + mockGetHistory.mockRejectedValue(new Error('history down')) + + await store.update() + + expect(store.runningTasks).toHaveLength(1) + expect(store.runningTasks[0].jobId).toBe('run-1') + }) + + it('preserves prior state and skips reconcile when both fetches fail', async () => { + mockGetQueue + .mockResolvedValueOnce({ + Running: [createRunningJob(0, 'run-1')], + Pending: [] + }) + .mockRejectedValueOnce(new Error('queue down')) + mockGetHistory + .mockResolvedValueOnce([createHistoryJob(0, 'hist-1')]) + .mockRejectedValueOnce(new Error('history down')) + const executionStore = useExecutionStore() + const reconcileSpy = vi.spyOn(executionStore, 'reconcileInitializingJobs') + + await store.update() + await store.update() + + expect(reconcileSpy).toHaveBeenCalledTimes(1) + expect(store.runningTasks).toHaveLength(1) + expect(store.runningTasks[0].jobId).toBe('run-1') + expect(store.historyTasks).toHaveLength(1) + expect(store.historyTasks[0].jobId).toBe('hist-1') + expect(store.isLoading).toBe(false) + }) + }) }) diff --git a/src/stores/queueStore.ts b/src/stores/queueStore.ts index f0f660c1db..d1a909fe09 100644 --- a/src/stores/queueStore.ts +++ b/src/stores/queueStore.ts @@ -525,68 +525,74 @@ export const useQueueStore = defineStore('queue', () => { dirty = false isLoading.value = true try { - const [queue, history] = await Promise.all([ - api.getQueue(), + const [queueResult, historyResult] = await Promise.allSettled([ + api.getQueue({ throwOnError: true }), api.getHistory(maxHistoryItems.value) ]) - // API returns pre-sorted data (sort_by=create_time&order=desc) - runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job)) - pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job)) + if (queueResult.status === 'fulfilled') { + const queue = queueResult.value + // API returns pre-sorted data (sort_by=create_time&order=desc) + runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job)) + pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job)) - const currentHistory = toValue(historyTasks) + const appearedTasks = [...pendingTasks.value, ...runningTasks.value] + const executionStore = useExecutionStore() + appearedTasks.forEach((task) => { + const jobIdString = String(task.jobId) + const workflowId = task.workflowId + if (workflowId && jobIdString) { + executionStore.registerJobWorkflowIdMapping(jobIdString, workflowId) + } + }) - const appearedTasks = [...pendingTasks.value, ...runningTasks.value] - const executionStore = useExecutionStore() - appearedTasks.forEach((task) => { - const jobIdString = String(task.jobId) - const workflowId = task.workflowId - if (workflowId && jobIdString) { - executionStore.registerJobWorkflowIdMapping(jobIdString, workflowId) - } - }) - - // Only reconcile when the queue fetch returned data. api.getQueue() - // returns empty Running/Pending on transient errors, which would - // incorrectly clear all initializing prompts. - const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0 - if (queueHasData) { const activeJobIds = new Set([ ...queue.Running.map((j) => j.id), ...queue.Pending.map((j) => j.id) ]) executionStore.reconcileInitializingJobs(activeJobIds) + } else { + console.error('Failed to fetch queue:', queueResult.reason) } - // Sort by create_time descending and limit to maxItems - const sortedHistory = [...history] - .sort((a, b) => b.create_time - a.create_time) - .slice(0, toValue(maxHistoryItems)) + if (historyResult.status === 'fulfilled') { + const history = historyResult.value + const currentHistory = toValue(historyTasks) - // Reuse existing TaskItemImpl instances or create new - // Must recreate if outputs_count changed (e.g., API started returning it) - const existingByJobId = new Map( - currentHistory.map((impl) => [impl.jobId, impl]) - ) + // Sort by create_time descending and limit to maxItems + const sortedHistory = [...history] + .sort((a, b) => b.create_time - a.create_time) + .slice(0, toValue(maxHistoryItems)) - const nextHistoryTasks = sortedHistory.map((job) => { - const existing = existingByJobId.get(job.id) - if (!existing) return new TaskItemImpl(job) - // Recreate if outputs_count changed to ensure lazy loading works - if (existing.outputsCount !== (job.outputs_count ?? undefined)) { - return new TaskItemImpl(job) + // Reuse existing TaskItemImpl instances or create new + // Must recreate if outputs_count changed (e.g., API started returning it) + const existingByJobId = new Map( + currentHistory.map((impl) => [impl.jobId, impl]) + ) + + const nextHistoryTasks = sortedHistory.map((job) => { + const existing = existingByJobId.get(job.id) + if (!existing) return new TaskItemImpl(job) + // Recreate if outputs_count changed to ensure lazy loading works + if (existing.outputsCount !== (job.outputs_count ?? undefined)) { + return new TaskItemImpl(job) + } + return existing + }) + + const isHistoryUnchanged = + nextHistoryTasks.length === currentHistory.length && + nextHistoryTasks.every( + (task, index) => task === currentHistory[index] + ) + + if (!isHistoryUnchanged) { + historyTasks.value = nextHistoryTasks } - return existing - }) - - const isHistoryUnchanged = - nextHistoryTasks.length === currentHistory.length && - nextHistoryTasks.every((task, index) => task === currentHistory[index]) - - if (!isHistoryUnchanged) { - historyTasks.value = nextHistoryTasks + hasFetchedHistorySnapshot.value = true + } else { + console.error('Failed to fetch history:', historyResult.reason) } - hasFetchedHistorySnapshot.value = true } finally { isLoading.value = false inFlight = false diff --git a/src/views/GraphView.test.ts b/src/views/GraphView.test.ts new file mode 100644 index 0000000000..a1fc5242c2 --- /dev/null +++ b/src/views/GraphView.test.ts @@ -0,0 +1,214 @@ +import { createTestingPinia } from '@pinia/testing' +import { render } from '@testing-library/vue' +import { setActivePinia } from 'pinia' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { ref } from 'vue' + +import type * as VueUseCore from '@vueuse/core' +import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh' +import { useReconnectingNotification } from '@/composables/useReconnectingNotification' +import type * as DistTypes from '@/platform/distribution/types' +import type * as I18nModule from '@/i18n' + +const apiMock = vi.hoisted(() => new EventTarget()) + +vi.mock('@/scripts/api', () => ({ api: apiMock })) + +vi.mock('@/scripts/app', () => ({ + app: { + rootGraph: { getNodeById: vi.fn(), nodes: [] }, + ui: { + menuContainer: { style: { setProperty: vi.fn() } }, + restoreMenuPosition: vi.fn() + } + } +})) + +vi.mock('@/composables/useReconnectQueueRefresh', () => { + const refreshOnReconnect = vi.fn(async () => {}) + return { useReconnectQueueRefresh: () => refreshOnReconnect } +}) + +vi.mock('@/composables/useReconnectingNotification', () => { + const onReconnected = vi.fn() + const onReconnecting = vi.fn() + return { + useReconnectingNotification: () => ({ onReconnected, onReconnecting }) + } +}) + +vi.mock('@vueuse/core', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, useIntervalFn: vi.fn(() => ({ pause: vi.fn() })) } +}) + +vi.mock('@/base/common/async', () => ({ runWhenGlobalIdle: vi.fn() })) +vi.mock('@/composables/useBrowserTabTitle', () => ({ + useBrowserTabTitle: vi.fn() +})) +vi.mock('@/composables/useCoreCommands', () => ({ useCoreCommands: () => [] })) +vi.mock('@/platform/remote/comfyui/useQueuePolling', () => ({ + useQueuePolling: vi.fn() +})) +vi.mock('@/composables/useErrorHandling', () => ({ + useErrorHandling: () => ({ + wrapWithErrorHandling: (f: unknown) => f, + wrapWithErrorHandlingAsync: (f: unknown) => f + }) +})) +vi.mock('@/composables/useProgressFavicon', () => ({ + useProgressFavicon: vi.fn() +})) +vi.mock('@/i18n', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, loadLocale: vi.fn().mockResolvedValue(undefined) } +}) +vi.mock('@/platform/distribution/types', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, isCloud: false, isDesktop: false } +}) +vi.mock('@/platform/settings/settingStore', () => ({ + useSettingStore: () => ({ get: vi.fn(() => undefined), set: vi.fn() }) +})) +vi.mock('@/platform/telemetry', () => ({ useTelemetry: () => undefined })) +vi.mock('@/platform/updates/common/useFrontendVersionMismatchWarning', () => ({ + useFrontendVersionMismatchWarning: vi.fn() +})) +vi.mock('@/platform/updates/common/versionCompatibilityStore', () => ({ + useVersionCompatibilityStore: () => ({ + initialize: vi.fn().mockResolvedValue(undefined) + }) +})) +vi.mock('@/renderer/core/canvas/canvasStore', async () => { + const { defineStore } = await import('pinia') + return { + useCanvasStore: defineStore('canvas-test-stub', () => ({ + linearMode: ref(false) + })) + } +}) +vi.mock('@/services/autoQueueService', () => ({ + setupAutoQueueHandler: vi.fn() +})) +vi.mock('@/platform/keybindings/keybindingService', () => ({ + useKeybindingService: () => ({ + registerCoreKeybindings: vi.fn(), + keybindHandler: vi.fn() + }) +})) +vi.mock('@/composables/useAppMode', () => ({ + useAppMode: () => ({ isBuilderMode: ref(false) }) +})) +vi.mock('@/stores/assetsStore', () => ({ + useAssetsStore: () => ({ updateHistory: vi.fn() }) +})) +vi.mock('@/stores/commandStore', () => ({ + useCommandStore: () => ({ registerCommands: vi.fn() }) +})) +vi.mock('@/stores/executionStore', () => ({ + useExecutionStore: () => ({ + bindExecutionEvents: vi.fn(), + unbindExecutionEvents: vi.fn(), + activeJobId: null, + clearActiveJobIfStale: vi.fn() + }) +})) +vi.mock('@/stores/authStore', () => ({ + useAuthStore: () => ({ isAuthenticated: false }) +})) +vi.mock('@/stores/menuItemStore', () => ({ + useMenuItemStore: () => ({ registerCoreMenuCommands: vi.fn() }) +})) +vi.mock('@/stores/modelStore', () => ({ useModelStore: () => ({}) })) +vi.mock('@/stores/nodeDefStore', () => ({ + useNodeDefStore: () => ({}), + useNodeFrequencyStore: () => ({}) +})) +vi.mock('@/stores/queueStore', () => ({ + useQueueStore: () => ({ + update: vi.fn(), + runningTasks: [], + pendingTasks: [], + tasks: [], + maxHistoryItems: 64 + }), + useQueuePendingTaskCountStore: () => ({ update: vi.fn() }) +})) +vi.mock('@/stores/serverConfigStore', () => ({ + useServerConfigStore: () => ({}) +})) +vi.mock('@/stores/workspace/bottomPanelStore', () => ({ + useBottomPanelStore: () => ({ + registerCoreBottomPanelTabs: vi.fn().mockResolvedValue(undefined) + }) +})) +vi.mock('@/stores/workspace/colorPaletteStore', () => ({ + useColorPaletteStore: () => ({ + completedActivePalette: { light_theme: true, colors: { comfy_base: {} } } + }) +})) +vi.mock('@/stores/workspace/sidebarTabStore', () => ({ + useSidebarTabStore: () => ({ + registerCoreSidebarTabs: vi.fn(), + activeSidebarTabId: null + }) +})) +vi.mock('@/utils/envUtil', () => ({ + electronAPI: () => ({ + changeTheme: vi.fn(), + Events: { incrementUserProperty: vi.fn(), trackEvent: vi.fn() } + }) +})) + +// Module-mock heavy child components so we don't pay their import cost. +const stubModule = { default: { template: '
' } } +vi.mock('@/components/graph/GraphCanvas.vue', () => stubModule) +vi.mock('@/views/LinearView.vue', () => stubModule) +vi.mock('@/components/builder/BuilderToolbar.vue', () => stubModule) +vi.mock('@/components/builder/BuilderMenu.vue', () => stubModule) +vi.mock('@/components/builder/BuilderFooterToolbar.vue', () => stubModule) +vi.mock( + '@/workbench/extensions/manager/components/ManagerProgressToast.vue', + () => stubModule +) +vi.mock( + '@/platform/cloud/notification/components/DesktopCloudNotificationController.vue', + () => stubModule +) +vi.mock( + '@/platform/assets/components/ModelImportProgressDialog.vue', + () => stubModule +) +vi.mock( + '@/platform/assets/components/AssetExportProgressDialog.vue', + () => stubModule +) +vi.mock( + '@/platform/workspace/components/toasts/InviteAcceptedToast.vue', + () => stubModule +) +vi.mock('@/components/toast/GlobalToast.vue', () => stubModule) +vi.mock('@/components/toast/RerouteMigrationToast.vue', () => stubModule) +vi.mock('@/components/MenuHamburger.vue', () => stubModule) +vi.mock('@/components/dialog/UnloadWindowConfirmDialog.vue', () => stubModule) + +describe('GraphView - reconnect wiring', () => { + beforeEach(() => { + vi.restoreAllMocks() + setActivePinia(createTestingPinia({ stubActions: false })) + }) + + it('wires the reconnected event to the toast and queue refresh', async () => { + const GraphView = (await import('./GraphView.vue')).default + render(GraphView) + + apiMock.dispatchEvent(new Event('reconnected')) + + const { onReconnected } = useReconnectingNotification() + const refreshOnReconnect = useReconnectQueueRefresh() + await vi.waitFor(() => { + expect(onReconnected).toHaveBeenCalledTimes(1) + expect(refreshOnReconnect).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/src/views/GraphView.vue b/src/views/GraphView.vue index f9f42823c7..2d70873794 100644 --- a/src/views/GraphView.vue +++ b/src/views/GraphView.vue @@ -56,6 +56,7 @@ import { useBrowserTabTitle } from '@/composables/useBrowserTabTitle' import { useCoreCommands } from '@/composables/useCoreCommands' import { useQueuePolling } from '@/platform/remote/comfyui/useQueuePolling' import { useErrorHandling } from '@/composables/useErrorHandling' +import { useReconnectQueueRefresh } from '@/composables/useReconnectQueueRefresh' import { useReconnectingNotification } from '@/composables/useReconnectingNotification' import { useProgressFavicon } from '@/composables/useProgressFavicon' import { SERVER_CONFIG_ITEMS } from '@/constants/serverConfig' @@ -248,11 +249,17 @@ const onExecutionSuccess = async () => { } const { onReconnecting, onReconnected } = useReconnectingNotification() +const refreshOnReconnect = useReconnectQueueRefresh() + +const handleReconnected = async () => { + onReconnected() + await refreshOnReconnect() +} useEventListener(api, 'status', onStatus) useEventListener(api, 'execution_success', onExecutionSuccess) useEventListener(api, 'reconnecting', onReconnecting) -useEventListener(api, 'reconnected', onReconnected) +useEventListener(api, 'reconnected', handleReconnected) onMounted(() => { executionStore.bindExecutionEvents()