fix: replace stale-request guard with single-flight coalescing in queueStore.update() (#10203)

## Summary

`queueStore.update()` previously used a request-ID guard that discarded
responses whenever a newer call had been initiated. During concurrent
job execution, rapid WebSocket events (`status`, `execution_success`,
etc.) created a sustained stream of `update()` calls where:

1. **Every response got discarded** — each returning response saw a
higher `updateRequestId`, so data was never applied to the store until
the event burst subsided
2. **All HTTP requests still fired** — the guard didn't suppress
outgoing calls, just discarded stale responses after the fact

This caused the UI to freeze showing stale job states (e.g. completed
jobs still showing as "Running") during bursts, then snap all at once
when the storm ended.

## Changes

Replace the request-ID guard with a single-flight coalescing pattern:

- **At most one fetch in flight** at a time — no request spam
- **Every response is applied** — no UI starvation
- **Dirty flag** triggers one re-fetch if calls arrived during flight —
eventual consistency

## Testing

Added 5 unit tests covering:
- Concurrent calls coalesce into a single re-fetch
- No response starvation (every response applied)
- No duplicate requests when no concurrent calls
- Loading state transitions through coalesced re-fetches
- Normal sequential behavior preserved

┆Issue is synchronized with this [Notion
page](https://www.notion.so/PR-10203-fix-replace-stale-request-guard-with-single-flight-coalescing-in-queueStore-update-3266d73d365081088656e4e55ca4dbd3)
by [Unito](https://www.unito.io)
This commit is contained in:
Christian Byrne
2026-03-17 14:26:59 -07:00
committed by GitHub
parent 2e5e04efd5
commit a00e4b6421
2 changed files with 185 additions and 71 deletions

View File

@@ -350,6 +350,119 @@ describe('useQueueStore', () => {
})
})
describe('update() - single-flight coalescing', () => {
it('should coalesce concurrent calls into one re-fetch', async () => {
let resolveQueue!: QueueResolver
mockGetQueue.mockImplementation(
() =>
new Promise<QueueResponse>((resolve) => {
resolveQueue = resolve
})
)
mockGetHistory.mockResolvedValue([])
// First call starts the in-flight request
const first = store.update()
expect(mockGetQueue).toHaveBeenCalledTimes(1)
// These calls arrive while the first is in flight — they should coalesce
void store.update()
void store.update()
void store.update()
// No additional HTTP requests fired
expect(mockGetQueue).toHaveBeenCalledTimes(1)
// Resolve the in-flight request
resolveQueue({ Running: [], Pending: [] })
await first
// A single re-fetch should fire because dirty was set
expect(mockGetQueue).toHaveBeenCalledTimes(2)
})
it('should apply every response (no starvation)', async () => {
const firstRunning = createRunningJob(1, 'run-1')
const secondRunning = createRunningJob(2, 'run-2')
let resolveQueue!: QueueResolver
mockGetQueue.mockImplementation(
() =>
new Promise<QueueResponse>((resolve) => {
resolveQueue = resolve
})
)
mockGetHistory.mockResolvedValue([])
// First call
const first = store.update()
// Second call coalesces
void store.update()
// Resolve first — data should be applied (not discarded)
resolveQueue({ Running: [firstRunning], Pending: [] })
await first
expect(store.runningTasks).toHaveLength(1)
expect(store.runningTasks[0].jobId).toBe('run-1')
// The coalesced re-fetch fires; resolve it with new data
resolveQueue({ Running: [secondRunning], Pending: [] })
// Wait for the re-fetch to complete
await new Promise((r) => setTimeout(r, 0))
expect(store.runningTasks).toHaveLength(1)
expect(store.runningTasks[0].jobId).toBe('run-2')
})
it('should not fire duplicate requests when no calls arrive during flight', async () => {
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([])
await store.update()
expect(mockGetQueue).toHaveBeenCalledTimes(1)
expect(mockGetHistory).toHaveBeenCalledTimes(1)
})
it('should clear loading state after coalesced re-fetch completes', async () => {
let resolveQueue!: QueueResolver
mockGetQueue.mockImplementation(
() =>
new Promise<QueueResponse>((resolve) => {
resolveQueue = resolve
})
)
mockGetHistory.mockResolvedValue([])
const first = store.update()
void store.update() // coalesce
resolveQueue({ Running: [], Pending: [] })
await first
// isLoading should be true again for the re-fetch
expect(store.isLoading).toBe(true)
resolveQueue({ Running: [], Pending: [] })
await new Promise((r) => setTimeout(r, 0))
expect(store.isLoading).toBe(false)
})
it('should allow new requests after coalesced re-fetch completes', async () => {
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
mockGetHistory.mockResolvedValue([])
await store.update()
expect(mockGetQueue).toHaveBeenCalledTimes(1)
await store.update()
expect(mockGetQueue).toHaveBeenCalledTimes(2)
})
})
describe('update() - sorting', () => {
it('should sort tasks by job.priority descending', async () => {
const job1 = createHistoryJob(1, 'hist-1')
@@ -826,101 +939,94 @@ describe('useQueueStore', () => {
})
})
describe('update deduplication', () => {
it('should discard stale responses when newer request completes first', async () => {
let resolveFirst: QueueResolver
let resolveSecond: QueueResolver
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveFirst = resolve
})
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
describe('update deduplication (coalescing)', () => {
it('should coalesce concurrent calls — second call does not fire its own request', async () => {
let resolveQueue!: QueueResolver
mockGetQueue.mockImplementation(
() =>
new Promise<QueueResponse>((resolve) => {
resolveQueue = resolve
})
)
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockReturnValueOnce(firstQueuePromise)
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
const secondUpdate = store.update()
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
// Only one HTTP request should have been made
expect(mockGetQueue).toHaveBeenCalledTimes(1)
// Second call returns immediately (coalesced)
await secondUpdate
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].jobId).toBe('new-job')
resolveFirst!({
Running: [],
Pending: [createPendingJob(1, 'stale-job')]
})
// Resolve the in-flight request
resolveQueue({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
await firstUpdate
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].jobId).toBe('new-job')
// A re-fetch fires because dirty was set
expect(mockGetQueue).toHaveBeenCalledTimes(2)
})
it('should set isLoading to false only for the latest request', async () => {
let resolveFirst: QueueResolver
let resolveSecond: QueueResolver
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveFirst = resolve
})
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
it('should clear isLoading after in-flight request completes', async () => {
let resolveQueue!: QueueResolver
mockGetQueue.mockImplementation(
() =>
new Promise<QueueResponse>((resolve) => {
resolveQueue = resolve
})
)
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockReturnValueOnce(firstQueuePromise)
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
expect(store.isLoading).toBe(true)
const secondUpdate = store.update()
expect(store.isLoading).toBe(true)
// Second call coalesces and returns immediately
void store.update()
resolveSecond!({ Running: [], Pending: [] })
await secondUpdate
expect(store.isLoading).toBe(false)
resolveFirst!({ Running: [], Pending: [] })
resolveQueue({ Running: [], Pending: [] })
await firstUpdate
// isLoading is true again because re-fetch was triggered
expect(store.isLoading).toBe(true)
// Resolve the re-fetch
resolveQueue({ Running: [], Pending: [] })
await new Promise((r) => setTimeout(r, 0))
expect(store.isLoading).toBe(false)
})
it('should handle stale request failure without affecting latest state', async () => {
let resolveSecond: QueueResolver
it('should handle in-flight failure and still trigger coalesced re-fetch', async () => {
let callCount = 0
let resolveSecond!: QueueResolver
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
mockGetQueue.mockImplementation(() => {
callCount++
if (callCount === 1) {
return Promise.reject(new Error('network error'))
}
return new Promise<QueueResponse>((resolve) => {
resolveSecond = resolve
})
})
mockGetHistory.mockResolvedValue([])
mockGetQueue
.mockRejectedValueOnce(new Error('stale network error'))
.mockReturnValueOnce(secondQueuePromise)
const firstUpdate = store.update()
const secondUpdate = store.update()
void store.update() // coalesces, sets dirty
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
await secondUpdate
// First call rejects — but dirty flag triggers re-fetch
await expect(firstUpdate).rejects.toThrow('network error')
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].jobId).toBe('new-job')
expect(store.isLoading).toBe(false)
// Re-fetch was triggered
expect(mockGetQueue).toHaveBeenCalledTimes(2)
await expect(firstUpdate).rejects.toThrow('stale network error')
resolveSecond({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
await new Promise((r) => setTimeout(r, 0))
expect(store.pendingTasks).toHaveLength(1)
expect(store.pendingTasks[0].jobId).toBe('new-job')

View File

@@ -488,8 +488,13 @@ export const useQueueStore = defineStore('queue', () => {
const maxHistoryItems = ref(64)
const isLoading = ref(false)
// Scoped per-store instance; incremented to dedupe concurrent update() calls
let updateRequestId = 0
// Single-flight coalescing: at most one fetch in flight at a time.
// If update() is called while a fetch is running, the call is coalesced
// and a single re-fetch fires after the current one completes.
// This prevents both request spam and UI starvation (where a rapid stream
// of calls causes every response to be discarded by a stale-request guard).
let inFlight = false
let dirty = false
const tasks = computed<TaskItemImpl[]>(
() =>
@@ -514,7 +519,13 @@ export const useQueueStore = defineStore('queue', () => {
)
const update = async () => {
const requestId = ++updateRequestId
if (inFlight) {
dirty = true
return
}
inFlight = true
dirty = false
isLoading.value = true
try {
const [queue, history] = await Promise.all([
@@ -522,8 +533,6 @@ export const useQueueStore = defineStore('queue', () => {
api.getHistory(maxHistoryItems.value)
])
if (requestId !== updateRequestId) return
// API returns pre-sorted data (sort_by=create_time&order=desc)
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
@@ -582,11 +591,10 @@ export const useQueueStore = defineStore('queue', () => {
}
hasFetchedHistorySnapshot.value = true
} finally {
// Only clear loading if this is the latest request.
// A stale request completing (success or error) should not touch loading state
// since a newer request is responsible for it.
if (requestId === updateRequestId) {
isLoading.value = false
isLoading.value = false
inFlight = false
if (dirty) {
void update()
}
}
}