fix: dedupe queueStore.update() to prevent race conditions (#8523)

## Summary

Prevents stale API responses from overwriting fresh queue state during
rapid consecutive `update()` calls.

## Problem

When `queueStore.update()` is called multiple times in quick succession
(e.g., during websocket reconnection), responses could resolve
out-of-order. A stale response resolving after a fresh one would
overwrite the current state with outdated data.

## Solution

Added request ID tracking that:
1. Increments a counter on each `update()` call
2. Guards all state mutations with a staleness check
3. Only sets `isLoading=false` for the latest request

This pattern matches the existing approach in `jobOutputCache.ts`.

## Changes

- `src/stores/queueStore.ts`: Added `updateRequestId` counter and
staleness guards
- `src/stores/queueStore.test.ts`: Added tests for deduplication
behavior

## Testing

- All 49 existing tests pass
- Added 3 new tests for race condition handling:
  - Stale responses are discarded
  - isLoading state is correctly managed
  - Stale request failures don't affect latest state

## Fixes COM-12784

┆Issue is synchronized with this [Notion
page](https://www.notion.so/PR-8523-fix-dedupe-queueStore-update-to-prevent-race-conditions-2fa6d73d3650810daa0dc63af359e1ea)
by [Unito](https://www.unito.io)

---------

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Christian Byrne
2026-01-31 22:01:14 -08:00
committed by GitHub
parent 69129414d0
commit 544ef5bb70
2 changed files with 117 additions and 1 deletions

View File

@@ -49,6 +49,9 @@ const createTaskOutput = (
}
})
type QueueResponse = { Running: JobListItem[]; Pending: JobListItem[] }
type QueueResolver = (value: QueueResponse) => void
// Mock API
vi.mock('@/scripts/api', () => ({
api: {
@@ -802,4 +805,106 @@ describe('useQueueStore', () => {
expect(mockGetHistory).toHaveBeenCalled()
})
})
describe('update deduplication', () => {
it('should discard stale responses when newer request completes first', async () => {
let resolveFirst: QueueResolver
let resolveSecond: QueueResolver
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveFirst = resolve
})
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockReturnValueOnce(firstQueuePromise)
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
const secondUpdate = store.update()
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
await secondUpdate
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].promptId).toBe('new-job')
resolveFirst!({
Running: [],
Pending: [createPendingJob(1, 'stale-job')]
})
await firstUpdate
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].promptId).toBe('new-job')
})
it('should set isLoading to false only for the latest request', async () => {
let resolveFirst: QueueResolver
let resolveSecond: QueueResolver
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveFirst = resolve
})
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockReturnValueOnce(firstQueuePromise)
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
expect(store.isLoading).toBe(true)
const secondUpdate = store.update()
expect(store.isLoading).toBe(true)
resolveSecond!({ Running: [], Pending: [] })
await secondUpdate
expect(store.isLoading).toBe(false)
resolveFirst!({ Running: [], Pending: [] })
await firstUpdate
expect(store.isLoading).toBe(false)
})
it('should handle stale request failure without affecting latest state', async () => {
let resolveSecond: QueueResolver
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockRejectedValueOnce(new Error('stale network error'))
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
const secondUpdate = store.update()
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
await secondUpdate
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].promptId).toBe('new-job')
expect(store.isLoading).toBe(false)
await expect(firstUpdate).rejects.toThrow('stale network error')
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].promptId).toBe('new-job')
expect(store.isLoading).toBe(false)
})
})
})

View File

@@ -475,6 +475,9 @@ export const useQueueStore = defineStore('queue', () => {
const maxHistoryItems = ref(64)
const isLoading = ref(false)
// Scoped per-store instance; incremented to dedupe concurrent update() calls
let updateRequestId = 0
const tasks = computed<TaskItemImpl[]>(
() =>
[
@@ -498,6 +501,7 @@ export const useQueueStore = defineStore('queue', () => {
)
const update = async () => {
const requestId = ++updateRequestId
isLoading.value = true
try {
const [queue, history] = await Promise.all([
@@ -505,6 +509,8 @@ export const useQueueStore = defineStore('queue', () => {
api.getHistory(maxHistoryItems.value)
])
if (requestId !== updateRequestId) return
// API returns pre-sorted data (sort_by=create_time&order=desc)
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
@@ -545,7 +551,12 @@ export const useQueueStore = defineStore('queue', () => {
return existing
})
} finally {
isLoading.value = false
// Only clear loading if this is the latest request.
// A stale request completing (success or error) should not touch loading state
// since a newer request is responsible for it.
if (requestId === updateRequestId) {
isLoading.value = false
}
}
}