From 544ef5bb706316a05d6ab8c4263487ccc7c664c3 Mon Sep 17 00:00:00 2001 From: Christian Byrne Date: Sat, 31 Jan 2026 22:01:14 -0800 Subject: [PATCH] fix: dedupe queueStore.update() to prevent race conditions (#8523) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Prevents stale API responses from overwriting fresh queue state during rapid consecutive `update()` calls. ## Problem When `queueStore.update()` is called multiple times in quick succession (e.g., during websocket reconnection), responses could resolve out-of-order. A stale response resolving after a fresh one would overwrite the current state with outdated data. ## Solution Added request ID tracking that: 1. Increments a counter on each `update()` call 2. Guards all state mutations with a staleness check 3. Only sets `isLoading=false` for the latest request This pattern matches the existing approach in `jobOutputCache.ts`. ## Changes - `src/stores/queueStore.ts`: Added `updateRequestId` counter and staleness guards - `src/stores/queueStore.test.ts`: Added tests for deduplication behavior ## Testing - All 49 existing tests pass - Added 3 new tests for race condition handling: - Stale responses are discarded - isLoading state is correctly managed - Stale request failures don't affect latest state ## Fixes COM-12784 ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-8523-fix-dedupe-queueStore-update-to-prevent-race-conditions-2fa6d73d3650810daa0dc63af359e1ea) by [Unito](https://www.unito.io) --------- Co-authored-by: Amp --- src/stores/queueStore.test.ts | 105 ++++++++++++++++++++++++++++++++++ src/stores/queueStore.ts | 13 ++++- 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/stores/queueStore.test.ts b/src/stores/queueStore.test.ts index a183d3c54d..d2f314590a 100644 --- a/src/stores/queueStore.test.ts +++ b/src/stores/queueStore.test.ts @@ -49,6 +49,9 @@ const createTaskOutput = ( } }) +type QueueResponse = { Running: JobListItem[]; Pending: JobListItem[] } +type QueueResolver = (value: QueueResponse) => void + // Mock API vi.mock('@/scripts/api', () => ({ api: { @@ -802,4 +805,106 @@ describe('useQueueStore', () => { expect(mockGetHistory).toHaveBeenCalled() }) }) + + describe('update deduplication', () => { + it('should discard stale responses when newer request completes first', async () => { + let resolveFirst: QueueResolver + let resolveSecond: QueueResolver + + const firstQueuePromise = new Promise((resolve) => { + resolveFirst = resolve + }) + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockReturnValueOnce(firstQueuePromise) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + const secondUpdate = store.update() + + resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] }) + await secondUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + + resolveFirst!({ + Running: [], + Pending: [createPendingJob(1, 'stale-job')] + }) + await firstUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + }) + + it('should set isLoading to false only for the latest request', async () => { + let resolveFirst: QueueResolver + let resolveSecond: QueueResolver + + const firstQueuePromise = new Promise((resolve) => { + resolveFirst = resolve + }) + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockReturnValueOnce(firstQueuePromise) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + expect(store.isLoading).toBe(true) + + const secondUpdate = store.update() + expect(store.isLoading).toBe(true) + + resolveSecond!({ Running: [], Pending: [] }) + await secondUpdate + + expect(store.isLoading).toBe(false) + + resolveFirst!({ Running: [], Pending: [] }) + await firstUpdate + + expect(store.isLoading).toBe(false) + }) + + it('should handle stale request failure without affecting latest state', async () => { + let resolveSecond: QueueResolver + + const secondQueuePromise = new Promise((resolve) => { + resolveSecond = resolve + }) + + mockGetHistory.mockResolvedValue([]) + + mockGetQueue + .mockRejectedValueOnce(new Error('stale network error')) + .mockReturnValueOnce(secondQueuePromise) + + const firstUpdate = store.update() + const secondUpdate = store.update() + + resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] }) + await secondUpdate + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + expect(store.isLoading).toBe(false) + + await expect(firstUpdate).rejects.toThrow('stale network error') + + expect(store.pendingTasks).toHaveLength(1) + expect(store.pendingTasks[0].promptId).toBe('new-job') + expect(store.isLoading).toBe(false) + }) + }) }) diff --git a/src/stores/queueStore.ts b/src/stores/queueStore.ts index c302a1076e..2b9becf18f 100644 --- a/src/stores/queueStore.ts +++ b/src/stores/queueStore.ts @@ -475,6 +475,9 @@ export const useQueueStore = defineStore('queue', () => { const maxHistoryItems = ref(64) const isLoading = ref(false) + // Scoped per-store instance; incremented to dedupe concurrent update() calls + let updateRequestId = 0 + const tasks = computed( () => [ @@ -498,6 +501,7 @@ export const useQueueStore = defineStore('queue', () => { ) const update = async () => { + const requestId = ++updateRequestId isLoading.value = true try { const [queue, history] = await Promise.all([ @@ -505,6 +509,8 @@ export const useQueueStore = defineStore('queue', () => { api.getHistory(maxHistoryItems.value) ]) + if (requestId !== updateRequestId) return + // API returns pre-sorted data (sort_by=create_time&order=desc) runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job)) pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job)) @@ -545,7 +551,12 @@ export const useQueueStore = defineStore('queue', () => { return existing }) } finally { - isLoading.value = false + // Only clear loading if this is the latest request. + // A stale request completing (success or error) should not touch loading state + // since a newer request is responsible for it. + if (requestId === updateRequestId) { + isLoading.value = false + } } }