mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-04-20 06:20:11 +00:00
[backport cloud/1.42] fix: replace stale-request guard with single-flight coalescing in queueStore.update() (#10217)
Backport of #10203 to `cloud/1.42` Automatically created by backport workflow. ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-10217-backport-cloud-1-42-fix-replace-stale-request-guard-with-single-flight-coalescing-in--3266d73d365081729194e5f9151b92ac) by [Unito](https://www.unito.io) Co-authored-by: Christian Byrne <cbyrne@comfy.org>
This commit is contained in:
@@ -350,6 +350,119 @@ describe('useQueueStore', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('update() - single-flight coalescing', () => {
|
||||
it('should coalesce concurrent calls into one re-fetch', async () => {
|
||||
let resolveQueue!: QueueResolver
|
||||
mockGetQueue.mockImplementation(
|
||||
() =>
|
||||
new Promise<QueueResponse>((resolve) => {
|
||||
resolveQueue = resolve
|
||||
})
|
||||
)
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
// First call starts the in-flight request
|
||||
const first = store.update()
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(1)
|
||||
|
||||
// These calls arrive while the first is in flight — they should coalesce
|
||||
void store.update()
|
||||
void store.update()
|
||||
void store.update()
|
||||
|
||||
// No additional HTTP requests fired
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(1)
|
||||
|
||||
// Resolve the in-flight request
|
||||
resolveQueue({ Running: [], Pending: [] })
|
||||
await first
|
||||
|
||||
// A single re-fetch should fire because dirty was set
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('should apply every response (no starvation)', async () => {
|
||||
const firstRunning = createRunningJob(1, 'run-1')
|
||||
const secondRunning = createRunningJob(2, 'run-2')
|
||||
|
||||
let resolveQueue!: QueueResolver
|
||||
mockGetQueue.mockImplementation(
|
||||
() =>
|
||||
new Promise<QueueResponse>((resolve) => {
|
||||
resolveQueue = resolve
|
||||
})
|
||||
)
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
// First call
|
||||
const first = store.update()
|
||||
|
||||
// Second call coalesces
|
||||
void store.update()
|
||||
|
||||
// Resolve first — data should be applied (not discarded)
|
||||
resolveQueue({ Running: [firstRunning], Pending: [] })
|
||||
await first
|
||||
|
||||
expect(store.runningTasks).toHaveLength(1)
|
||||
expect(store.runningTasks[0].jobId).toBe('run-1')
|
||||
|
||||
// The coalesced re-fetch fires; resolve it with new data
|
||||
resolveQueue({ Running: [secondRunning], Pending: [] })
|
||||
// Wait for the re-fetch to complete
|
||||
await new Promise((r) => setTimeout(r, 0))
|
||||
|
||||
expect(store.runningTasks).toHaveLength(1)
|
||||
expect(store.runningTasks[0].jobId).toBe('run-2')
|
||||
})
|
||||
|
||||
it('should not fire duplicate requests when no calls arrive during flight', async () => {
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
await store.update()
|
||||
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(1)
|
||||
expect(mockGetHistory).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('should clear loading state after coalesced re-fetch completes', async () => {
|
||||
let resolveQueue!: QueueResolver
|
||||
mockGetQueue.mockImplementation(
|
||||
() =>
|
||||
new Promise<QueueResponse>((resolve) => {
|
||||
resolveQueue = resolve
|
||||
})
|
||||
)
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
const first = store.update()
|
||||
void store.update() // coalesce
|
||||
|
||||
resolveQueue({ Running: [], Pending: [] })
|
||||
await first
|
||||
|
||||
// isLoading should be true again for the re-fetch
|
||||
expect(store.isLoading).toBe(true)
|
||||
|
||||
resolveQueue({ Running: [], Pending: [] })
|
||||
await new Promise((r) => setTimeout(r, 0))
|
||||
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
|
||||
it('should allow new requests after coalesced re-fetch completes', async () => {
|
||||
mockGetQueue.mockResolvedValue({ Running: [], Pending: [] })
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
await store.update()
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(1)
|
||||
|
||||
await store.update()
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
|
||||
describe('update() - sorting', () => {
|
||||
it('should sort tasks by job.priority descending', async () => {
|
||||
const job1 = createHistoryJob(1, 'hist-1')
|
||||
@@ -826,101 +939,94 @@ describe('useQueueStore', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('update deduplication', () => {
|
||||
it('should discard stale responses when newer request completes first', async () => {
|
||||
let resolveFirst: QueueResolver
|
||||
let resolveSecond: QueueResolver
|
||||
|
||||
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||
resolveFirst = resolve
|
||||
})
|
||||
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||
resolveSecond = resolve
|
||||
})
|
||||
describe('update deduplication (coalescing)', () => {
|
||||
it('should coalesce concurrent calls — second call does not fire its own request', async () => {
|
||||
let resolveQueue!: QueueResolver
|
||||
|
||||
mockGetQueue.mockImplementation(
|
||||
() =>
|
||||
new Promise<QueueResponse>((resolve) => {
|
||||
resolveQueue = resolve
|
||||
})
|
||||
)
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
mockGetQueue
|
||||
.mockReturnValueOnce(firstQueuePromise)
|
||||
.mockReturnValueOnce(secondQueuePromise)
|
||||
|
||||
const firstUpdate = store.update()
|
||||
const secondUpdate = store.update()
|
||||
|
||||
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||
// Only one HTTP request should have been made
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(1)
|
||||
|
||||
// Second call returns immediately (coalesced)
|
||||
await secondUpdate
|
||||
|
||||
expect(store.pendingTasks).toHaveLength(1)
|
||||
expect(store.pendingTasks[0].jobId).toBe('new-job')
|
||||
|
||||
resolveFirst!({
|
||||
Running: [],
|
||||
Pending: [createPendingJob(1, 'stale-job')]
|
||||
})
|
||||
// Resolve the in-flight request
|
||||
resolveQueue({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||
await firstUpdate
|
||||
|
||||
expect(store.pendingTasks).toHaveLength(1)
|
||||
expect(store.pendingTasks[0].jobId).toBe('new-job')
|
||||
|
||||
// A re-fetch fires because dirty was set
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('should set isLoading to false only for the latest request', async () => {
|
||||
let resolveFirst: QueueResolver
|
||||
let resolveSecond: QueueResolver
|
||||
|
||||
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||
resolveFirst = resolve
|
||||
})
|
||||
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||
resolveSecond = resolve
|
||||
})
|
||||
it('should clear isLoading after in-flight request completes', async () => {
|
||||
let resolveQueue!: QueueResolver
|
||||
|
||||
mockGetQueue.mockImplementation(
|
||||
() =>
|
||||
new Promise<QueueResponse>((resolve) => {
|
||||
resolveQueue = resolve
|
||||
})
|
||||
)
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
mockGetQueue
|
||||
.mockReturnValueOnce(firstQueuePromise)
|
||||
.mockReturnValueOnce(secondQueuePromise)
|
||||
|
||||
const firstUpdate = store.update()
|
||||
expect(store.isLoading).toBe(true)
|
||||
|
||||
const secondUpdate = store.update()
|
||||
expect(store.isLoading).toBe(true)
|
||||
// Second call coalesces and returns immediately
|
||||
void store.update()
|
||||
|
||||
resolveSecond!({ Running: [], Pending: [] })
|
||||
await secondUpdate
|
||||
|
||||
expect(store.isLoading).toBe(false)
|
||||
|
||||
resolveFirst!({ Running: [], Pending: [] })
|
||||
resolveQueue({ Running: [], Pending: [] })
|
||||
await firstUpdate
|
||||
|
||||
// isLoading is true again because re-fetch was triggered
|
||||
expect(store.isLoading).toBe(true)
|
||||
|
||||
// Resolve the re-fetch
|
||||
resolveQueue({ Running: [], Pending: [] })
|
||||
await new Promise((r) => setTimeout(r, 0))
|
||||
|
||||
expect(store.isLoading).toBe(false)
|
||||
})
|
||||
|
||||
it('should handle stale request failure without affecting latest state', async () => {
|
||||
let resolveSecond: QueueResolver
|
||||
it('should handle in-flight failure and still trigger coalesced re-fetch', async () => {
|
||||
let callCount = 0
|
||||
let resolveSecond!: QueueResolver
|
||||
|
||||
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||
resolveSecond = resolve
|
||||
mockGetQueue.mockImplementation(() => {
|
||||
callCount++
|
||||
if (callCount === 1) {
|
||||
return Promise.reject(new Error('network error'))
|
||||
}
|
||||
return new Promise<QueueResponse>((resolve) => {
|
||||
resolveSecond = resolve
|
||||
})
|
||||
})
|
||||
|
||||
mockGetHistory.mockResolvedValue([])
|
||||
|
||||
mockGetQueue
|
||||
.mockRejectedValueOnce(new Error('stale network error'))
|
||||
.mockReturnValueOnce(secondQueuePromise)
|
||||
|
||||
const firstUpdate = store.update()
|
||||
const secondUpdate = store.update()
|
||||
void store.update() // coalesces, sets dirty
|
||||
|
||||
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||
await secondUpdate
|
||||
// First call rejects — but dirty flag triggers re-fetch
|
||||
await expect(firstUpdate).rejects.toThrow('network error')
|
||||
|
||||
expect(store.pendingTasks).toHaveLength(1)
|
||||
expect(store.pendingTasks[0].jobId).toBe('new-job')
|
||||
expect(store.isLoading).toBe(false)
|
||||
// Re-fetch was triggered
|
||||
expect(mockGetQueue).toHaveBeenCalledTimes(2)
|
||||
|
||||
await expect(firstUpdate).rejects.toThrow('stale network error')
|
||||
resolveSecond({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||
await new Promise((r) => setTimeout(r, 0))
|
||||
|
||||
expect(store.pendingTasks).toHaveLength(1)
|
||||
expect(store.pendingTasks[0].jobId).toBe('new-job')
|
||||
|
||||
@@ -488,8 +488,13 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
const maxHistoryItems = ref(64)
|
||||
const isLoading = ref(false)
|
||||
|
||||
// Scoped per-store instance; incremented to dedupe concurrent update() calls
|
||||
let updateRequestId = 0
|
||||
// Single-flight coalescing: at most one fetch in flight at a time.
|
||||
// If update() is called while a fetch is running, the call is coalesced
|
||||
// and a single re-fetch fires after the current one completes.
|
||||
// This prevents both request spam and UI starvation (where a rapid stream
|
||||
// of calls causes every response to be discarded by a stale-request guard).
|
||||
let inFlight = false
|
||||
let dirty = false
|
||||
|
||||
const tasks = computed<TaskItemImpl[]>(
|
||||
() =>
|
||||
@@ -514,7 +519,13 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
)
|
||||
|
||||
const update = async () => {
|
||||
const requestId = ++updateRequestId
|
||||
if (inFlight) {
|
||||
dirty = true
|
||||
return
|
||||
}
|
||||
|
||||
inFlight = true
|
||||
dirty = false
|
||||
isLoading.value = true
|
||||
try {
|
||||
const [queue, history] = await Promise.all([
|
||||
@@ -522,8 +533,6 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
api.getHistory(maxHistoryItems.value)
|
||||
])
|
||||
|
||||
if (requestId !== updateRequestId) return
|
||||
|
||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
||||
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
||||
@@ -582,11 +591,10 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
}
|
||||
hasFetchedHistorySnapshot.value = true
|
||||
} finally {
|
||||
// Only clear loading if this is the latest request.
|
||||
// A stale request completing (success or error) should not touch loading state
|
||||
// since a newer request is responsible for it.
|
||||
if (requestId === updateRequestId) {
|
||||
isLoading.value = false
|
||||
isLoading.value = false
|
||||
inFlight = false
|
||||
if (dirty) {
|
||||
void update()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user