mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-04-19 22:09:37 +00:00
fix: App mode - handle socket/response race when tracking jobs (#10244)
## Summary Improves the handling of the job tracking where the websocket vs http response are processed in a non-deterministic order ## Changes - **What**: Update onJobStart watch and guard to watch both the activeJobId and the path mapping to ensure both trigger a start, but do not double start ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-10244-fix-App-mode-handle-socket-response-race-when-tracking-jobs-3276d73d365081f2862bd6cdcce3aac7) by [Unito](https://www.unito.io)
This commit is contained in:
@@ -1312,4 +1312,96 @@ describe('linearOutputStore', () => {
|
||||
).toHaveLength(2)
|
||||
})
|
||||
})
|
||||
|
||||
describe('deferred path mapping (WebSocket/HTTP race)', () => {
|
||||
it('starts tracking when path mapping arrives after activeJobId', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
|
||||
// activeJobId set before path mapping exists (WebSocket race)
|
||||
activeJobIdRef.value = 'job-1'
|
||||
await nextTick()
|
||||
|
||||
// No skeleton yet — path mapping is missing
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
|
||||
// Path mapping arrives (HTTP response from queuePrompt)
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
await nextTick()
|
||||
|
||||
// Now onJobStart should have fired
|
||||
expect(store.inProgressItems).toHaveLength(1)
|
||||
expect(store.inProgressItems[0].state).toBe('skeleton')
|
||||
expect(store.inProgressItems[0].jobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('processes executed events after deferred start', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
|
||||
activeJobIdRef.value = 'job-1'
|
||||
await nextTick()
|
||||
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
await nextTick()
|
||||
|
||||
// Executed event arrives — should create an image item
|
||||
store.onNodeExecuted('job-1', makeExecutedDetail('job-1'))
|
||||
|
||||
const imageItems = store.inProgressItems.filter(
|
||||
(i) => i.state === 'image'
|
||||
)
|
||||
expect(imageItems).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('does not double-start if path mapping is already available', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
|
||||
// Path mapping set before activeJobId (normal case, no race)
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
activeJobIdRef.value = 'job-1'
|
||||
await nextTick()
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(1)
|
||||
|
||||
// Trigger path mapping update again — should not create a second skeleton
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
await nextTick()
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('ignores deferred mapping if activeJobId changed', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
|
||||
activeJobIdRef.value = 'job-1'
|
||||
await nextTick()
|
||||
|
||||
// Job changes before path mapping arrives
|
||||
activeJobIdRef.value = null
|
||||
await nextTick()
|
||||
|
||||
setJobWorkflowPath('job-1', 'workflows/test-workflow.json')
|
||||
await nextTick()
|
||||
|
||||
// Should not have started job-1
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('ignores deferred mapping for a different workflow', async () => {
|
||||
const { nextTick } = await import('vue')
|
||||
const store = useLinearOutputStore()
|
||||
|
||||
activeJobIdRef.value = 'job-1'
|
||||
await nextTick()
|
||||
|
||||
// Path maps to a different workflow than the active one
|
||||
setJobWorkflowPath('job-1', 'workflows/other-workflow.json')
|
||||
await nextTick()
|
||||
|
||||
expect(store.inProgressItems).toHaveLength(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -262,17 +262,27 @@ export const useLinearOutputStore = defineStore('linearOutput', () => {
|
||||
onNodeExecuted(jobId, detail)
|
||||
}
|
||||
|
||||
// Watch both activeJobId and the path mapping together. The path mapping
|
||||
// may arrive after activeJobId due to a race between WebSocket
|
||||
// (execution_start) and the HTTP response (queuePrompt > storeJob).
|
||||
// Watching both ensures onJobStart fires once the mapping is available.
|
||||
watch(
|
||||
() => executionStore.activeJobId,
|
||||
(jobId, oldJobId) => {
|
||||
[
|
||||
() => executionStore.activeJobId,
|
||||
() => executionStore.jobIdToSessionWorkflowPath
|
||||
],
|
||||
([jobId], [oldJobId]) => {
|
||||
if (!isAppMode.value) return
|
||||
if (oldJobId && oldJobId !== jobId) {
|
||||
onJobComplete(oldJobId)
|
||||
}
|
||||
// Start tracking only if the job belongs to this workflow.
|
||||
// Jobs from other workflows are picked up by reconcileOnEnter
|
||||
// when the user switches to that workflow's tab.
|
||||
if (jobId && isJobForActiveWorkflow(jobId)) {
|
||||
// Guard with trackedJobId to avoid double-starting when the
|
||||
// path mapping arrives after activeJobId was already set.
|
||||
if (
|
||||
jobId &&
|
||||
trackedJobId.value !== jobId &&
|
||||
isJobForActiveWorkflow(jobId)
|
||||
) {
|
||||
onJobStart(jobId)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user