mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-04-28 18:22:40 +00:00
[backport cloud/1.38] fix: dedupe queueStore.update() to prevent race conditions (#8558)
Backport of #8523 to `cloud/1.38` Automatically created by backport workflow. ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-8558-backport-cloud-1-38-fix-dedupe-queueStore-update-to-prevent-race-conditions-2fc6d73d3650812494dec497cc6b01fa) by [Unito](https://www.unito.io) Co-authored-by: Christian Byrne <cbyrne@comfy.org> Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -44,6 +44,9 @@ const createTaskOutput = (
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
type QueueResponse = { Running: JobListItem[]; Pending: JobListItem[] }
|
||||||
|
type QueueResolver = (value: QueueResponse) => void
|
||||||
|
|
||||||
// Mock API
|
// Mock API
|
||||||
vi.mock('@/scripts/api', () => ({
|
vi.mock('@/scripts/api', () => ({
|
||||||
api: {
|
api: {
|
||||||
@@ -797,4 +800,106 @@ describe('useQueueStore', () => {
|
|||||||
expect(mockGetHistory).toHaveBeenCalled()
|
expect(mockGetHistory).toHaveBeenCalled()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe('update deduplication', () => {
|
||||||
|
it('should discard stale responses when newer request completes first', async () => {
|
||||||
|
let resolveFirst: QueueResolver
|
||||||
|
let resolveSecond: QueueResolver
|
||||||
|
|
||||||
|
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||||
|
resolveFirst = resolve
|
||||||
|
})
|
||||||
|
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||||
|
resolveSecond = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
mockGetHistory.mockResolvedValue([])
|
||||||
|
|
||||||
|
mockGetQueue
|
||||||
|
.mockReturnValueOnce(firstQueuePromise)
|
||||||
|
.mockReturnValueOnce(secondQueuePromise)
|
||||||
|
|
||||||
|
const firstUpdate = store.update()
|
||||||
|
const secondUpdate = store.update()
|
||||||
|
|
||||||
|
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||||
|
await secondUpdate
|
||||||
|
|
||||||
|
expect(store.pendingTasks).toHaveLength(1)
|
||||||
|
expect(store.pendingTasks[0].promptId).toBe('new-job')
|
||||||
|
|
||||||
|
resolveFirst!({
|
||||||
|
Running: [],
|
||||||
|
Pending: [createPendingJob(1, 'stale-job')]
|
||||||
|
})
|
||||||
|
await firstUpdate
|
||||||
|
|
||||||
|
expect(store.pendingTasks).toHaveLength(1)
|
||||||
|
expect(store.pendingTasks[0].promptId).toBe('new-job')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should set isLoading to false only for the latest request', async () => {
|
||||||
|
let resolveFirst: QueueResolver
|
||||||
|
let resolveSecond: QueueResolver
|
||||||
|
|
||||||
|
const firstQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||||
|
resolveFirst = resolve
|
||||||
|
})
|
||||||
|
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||||
|
resolveSecond = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
mockGetHistory.mockResolvedValue([])
|
||||||
|
|
||||||
|
mockGetQueue
|
||||||
|
.mockReturnValueOnce(firstQueuePromise)
|
||||||
|
.mockReturnValueOnce(secondQueuePromise)
|
||||||
|
|
||||||
|
const firstUpdate = store.update()
|
||||||
|
expect(store.isLoading).toBe(true)
|
||||||
|
|
||||||
|
const secondUpdate = store.update()
|
||||||
|
expect(store.isLoading).toBe(true)
|
||||||
|
|
||||||
|
resolveSecond!({ Running: [], Pending: [] })
|
||||||
|
await secondUpdate
|
||||||
|
|
||||||
|
expect(store.isLoading).toBe(false)
|
||||||
|
|
||||||
|
resolveFirst!({ Running: [], Pending: [] })
|
||||||
|
await firstUpdate
|
||||||
|
|
||||||
|
expect(store.isLoading).toBe(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should handle stale request failure without affecting latest state', async () => {
|
||||||
|
let resolveSecond: QueueResolver
|
||||||
|
|
||||||
|
const secondQueuePromise = new Promise<QueueResponse>((resolve) => {
|
||||||
|
resolveSecond = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
mockGetHistory.mockResolvedValue([])
|
||||||
|
|
||||||
|
mockGetQueue
|
||||||
|
.mockRejectedValueOnce(new Error('stale network error'))
|
||||||
|
.mockReturnValueOnce(secondQueuePromise)
|
||||||
|
|
||||||
|
const firstUpdate = store.update()
|
||||||
|
const secondUpdate = store.update()
|
||||||
|
|
||||||
|
resolveSecond!({ Running: [], Pending: [createPendingJob(2, 'new-job')] })
|
||||||
|
await secondUpdate
|
||||||
|
|
||||||
|
expect(store.pendingTasks).toHaveLength(1)
|
||||||
|
expect(store.pendingTasks[0].promptId).toBe('new-job')
|
||||||
|
expect(store.isLoading).toBe(false)
|
||||||
|
|
||||||
|
await expect(firstUpdate).rejects.toThrow('stale network error')
|
||||||
|
|
||||||
|
expect(store.pendingTasks).toHaveLength(1)
|
||||||
|
expect(store.pendingTasks[0].promptId).toBe('new-job')
|
||||||
|
expect(store.isLoading).toBe(false)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -475,6 +475,9 @@ export const useQueueStore = defineStore('queue', () => {
|
|||||||
const maxHistoryItems = ref(64)
|
const maxHistoryItems = ref(64)
|
||||||
const isLoading = ref(false)
|
const isLoading = ref(false)
|
||||||
|
|
||||||
|
// Scoped per-store instance; incremented to dedupe concurrent update() calls
|
||||||
|
let updateRequestId = 0
|
||||||
|
|
||||||
const tasks = computed<TaskItemImpl[]>(
|
const tasks = computed<TaskItemImpl[]>(
|
||||||
() =>
|
() =>
|
||||||
[
|
[
|
||||||
@@ -498,6 +501,7 @@ export const useQueueStore = defineStore('queue', () => {
|
|||||||
)
|
)
|
||||||
|
|
||||||
const update = async () => {
|
const update = async () => {
|
||||||
|
const requestId = ++updateRequestId
|
||||||
isLoading.value = true
|
isLoading.value = true
|
||||||
try {
|
try {
|
||||||
const [queue, history] = await Promise.all([
|
const [queue, history] = await Promise.all([
|
||||||
@@ -505,6 +509,8 @@ export const useQueueStore = defineStore('queue', () => {
|
|||||||
api.getHistory(maxHistoryItems.value)
|
api.getHistory(maxHistoryItems.value)
|
||||||
])
|
])
|
||||||
|
|
||||||
|
if (requestId !== updateRequestId) return
|
||||||
|
|
||||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||||
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
||||||
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
||||||
@@ -545,7 +551,12 @@ export const useQueueStore = defineStore('queue', () => {
|
|||||||
return existing
|
return existing
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
isLoading.value = false
|
// Only clear loading if this is the latest request.
|
||||||
|
// A stale request completing (success or error) should not touch loading state
|
||||||
|
// since a newer request is responsible for it.
|
||||||
|
if (requestId === updateRequestId) {
|
||||||
|
isLoading.value = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user