From cb86a1c94e7bf9732df2affb3e2867ac9e2d3753 Mon Sep 17 00:00:00 2001 From: Comfy Org PR Bot Date: Tue, 3 Feb 2026 10:39:41 +0900 Subject: [PATCH] [backport cloud/1.38] fix: dedupe queueStore.update() to prevent race conditions (#8558) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backport of #8523 to `cloud/1.38` Automatically created by backport workflow. ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-8558-backport-cloud-1-38-fix-dedupe-queueStore-update-to-prevent-race-conditions-2fc6d73d3650812494dec497cc6b01fa) by [Unito](https://www.unito.io) Co-authored-by: Christian Byrne Co-authored-by: Amp --- src/stores/queueStore.test.ts | 105 ++++++++++++++++++++++++++++++++++ src/stores/queueStore.ts | 13 ++++- 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/stores/queueStore.test.ts b/src/stores/queueStore.test.ts index 063658b2f9..2d14c3263a 100644 --- a/src/stores/queueStore.test.ts +++ b/src/stores/queueStore.test.ts @@ -44,6 +44,9 @@ const createTaskOutput = ( } }) +type QueueResponse = { Running: JobListItem[]; Pending: JobListItem[] } +type QueueResolver = (value: QueueResponse) => void + // Mock API vi.mock('@/scripts/api', () => ({ api: { @@ -797,4 +800,106 @@ describe('useQueueStore', () => { expect(mockGetHistory).toHaveBeenCalled() }) }) + + describe('update deduplication', () => { + it('should discard stale responses when newer request completes first', async () => { + let resolveFirst: QueueResolver + let resolveSecond: QueueResolver + + const firstQueuePromise = new Promise((resolve) => { + resolveFirst = resolve + }) + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockReturnValueOnce(firstQueuePromise) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + const secondUpdate = store.update() + + resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] }) + await secondUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + + resolveFirst!({ + Running: [], + Pending: [createPendingJob(1, 'stale-job')] + }) + await firstUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + }) + + it('should set isLoading to false only for the latest request', async () => { + let resolveFirst: QueueResolver + let resolveSecond: QueueResolver + + const firstQueuePromise = new Promise((resolve) => { + resolveFirst = resolve + }) + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockReturnValueOnce(firstQueuePromise) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + expect(store.isLoading).toBe(true) + + const secondUpdate = store.update() + expect(store.isLoading).toBe(true) + + resolveSecond!({ Running: [], Pending: [] }) + await secondUpdate + + expect(store.isLoading).toBe(false) + + resolveFirst!({ Running: [], Pending: [] }) + await firstUpdate + + expect(store.isLoading).toBe(false) + }) + + it('should handle stale request failure without affecting latest state', async () => { + let resolveSecond: QueueResolver + + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockRejectedValueOnce(new Error('stale network error')) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + const secondUpdate = store.update() + + resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] }) + await secondUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + expect(store.isLoading).toBe(false) + + await expect(firstUpdate).rejects.toThrow('stale network error') + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + expect(store.isLoading).toBe(false) + }) + }) }) diff --git a/src/stores/queueStore.ts b/src/stores/queueStore.ts index c302a1076e..2b9becf18f 100644 --- a/src/stores/queueStore.ts +++ b/src/stores/queueStore.ts @@ -475,6 +475,9 @@ export const useQueueStore = defineStore('queue', () => { const maxHistoryItems = ref(64) const isLoading = ref(false) + // Scoped per-store instance; incremented to dedupe concurrent update() calls + let updateRequestId = 0 + const tasks = computed( () => [ @@ -498,6 +501,7 @@ export const useQueueStore = defineStore('queue', () => { ) const update = async () => { + const requestId = ++updateRequestId isLoading.value = true try { const [queue, history] = await Promise.all([ @@ -505,6 +509,8 @@ export const useQueueStore = defineStore('queue', () => { api.getHistory(maxHistoryItems.value) ]) + if (requestId !== updateRequestId) return + // API returns pre-sorted data (sort_by=create_time&order=desc) runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job)) pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job)) @@ -545,7 +551,12 @@ export const useQueueStore = defineStore('queue', () => { return existing }) } finally { - isLoading.value = false + // Only clear loading if this is the latest request. + // A stale request completing (success or error) should not touch loading state + // since a newer request is responsible for it. + if (requestId === updateRequestId) { + isLoading.value = false + } } }