Compare commits

...

1 Commits

Author SHA1 Message Date
bymyself
ebb62a1d57 fix: RAF-batch WebSocket progress events
Defer reactive state updates for progress and progress_state events to requestAnimationFrame callbacks. Multiple progress events within the same frame are coalesced — only the latest value is applied. Pending RAFs are cancelled on unbindExecutionEvents and execution completion.
2026-05-03 23:15:33 -07:00
4 changed files with 477 additions and 7 deletions

View File

@@ -1,6 +1,7 @@
import { setActivePinia } from 'pinia'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { app } from '@/scripts/app'
import { api } from '@/scripts/api'
import { MAX_PROGRESS_JOBS, useExecutionStore } from '@/stores/executionStore'
import { useExecutionErrorStore } from '@/stores/executionErrorStore'
import { useMissingNodesErrorStore } from '@/platform/nodeReplacement/missingNodesErrorStore'
@@ -340,9 +341,12 @@ describe('useExecutionStore - nodeProgressStatesByJob eviction', () => {
handler(
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
)
// Flush the RAF so the batched update is applied immediately
vi.advanceTimersByTime(16)
}
beforeEach(() => {
vi.useFakeTimers()
vi.clearAllMocks()
apiEventHandlers.clear()
setActivePinia(createTestingPinia({ stubActions: false }))
@@ -350,6 +354,10 @@ describe('useExecutionStore - nodeProgressStatesByJob eviction', () => {
store.bindExecutionEvents()
})
afterEach(() => {
vi.useRealTimers()
})
it('should retain entries below the limit', () => {
for (let i = 0; i < 5; i++) {
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
@@ -755,6 +763,312 @@ describe('useMissingNodesErrorStore - setMissingNodeTypes', () => {
})
})
describe('useExecutionStore - RAF batching', () => {
let store: ReturnType<typeof useExecutionStore>
function getRegisteredHandler(eventName: string) {
const calls = vi.mocked(api.addEventListener).mock.calls
const call = calls.find(([name]) => name === eventName)
return call?.[1] as (e: CustomEvent) => void
}
beforeEach(() => {
vi.useFakeTimers()
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
afterEach(() => {
vi.useRealTimers()
})
describe('handleProgress', () => {
function makeProgressEvent(value: number, max: number): CustomEvent {
return new CustomEvent('progress', {
detail: { value, max, prompt_id: 'job-1', node: '1' }
})
}
it('batches multiple progress events into one reactive update per frame', () => {
const handler = getRegisteredHandler('progress')
handler(makeProgressEvent(1, 10))
handler(makeProgressEvent(5, 10))
handler(makeProgressEvent(9, 10))
expect(store._executingNodeProgress).toBeNull()
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toEqual({
value: 9,
max: 10,
prompt_id: 'job-1',
node: '1'
})
})
it('does not update reactive state before RAF fires', () => {
const handler = getRegisteredHandler('progress')
handler(makeProgressEvent(3, 10))
expect(store._executingNodeProgress).toBeNull()
})
it('allows a new batch after the previous RAF fires', () => {
const handler = getRegisteredHandler('progress')
handler(makeProgressEvent(1, 10))
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toEqual(
expect.objectContaining({ value: 1 })
)
handler(makeProgressEvent(7, 10))
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toEqual(
expect.objectContaining({ value: 7 })
)
})
})
describe('handleProgressState', () => {
function makeProgressStateEvent(
nodeId: string,
state: string,
value = 0,
max = 10
): CustomEvent {
return new CustomEvent('progress_state', {
detail: {
prompt_id: 'job-1',
nodes: {
[nodeId]: {
value,
max,
state,
node_id: nodeId,
prompt_id: 'job-1',
display_node_id: nodeId
}
}
}
})
}
it('batches multiple progress_state events into one reactive update per frame', () => {
const handler = getRegisteredHandler('progress_state')
handler(makeProgressStateEvent('1', 'running', 1))
handler(makeProgressStateEvent('1', 'running', 5))
handler(makeProgressStateEvent('1', 'running', 9))
expect(Object.keys(store.nodeProgressStates)).toHaveLength(0)
vi.advanceTimersByTime(16)
expect(store.nodeProgressStates['1']).toEqual(
expect.objectContaining({ value: 9, state: 'running' })
)
})
it('does not update reactive state before RAF fires', () => {
const handler = getRegisteredHandler('progress_state')
handler(makeProgressStateEvent('1', 'running'))
expect(Object.keys(store.nodeProgressStates)).toHaveLength(0)
})
})
describe('pending RAF is discarded when execution completes', () => {
it('discards pending progress RAF on execution_success', () => {
const progressHandler = getRegisteredHandler('progress')
const startHandler = getRegisteredHandler('execution_start')
const successHandler = getRegisteredHandler('execution_success')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
progressHandler(
new CustomEvent('progress', {
detail: { value: 5, max: 10, prompt_id: 'job-1', node: '1' }
})
)
successHandler(
new CustomEvent('execution_success', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toBeNull()
})
it('discards pending progress_state RAF on execution_success', () => {
const progressStateHandler = getRegisteredHandler('progress_state')
const startHandler = getRegisteredHandler('execution_start')
const successHandler = getRegisteredHandler('execution_success')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
progressStateHandler(
new CustomEvent('progress_state', {
detail: {
prompt_id: 'job-1',
nodes: {
'1': {
value: 5,
max: 10,
state: 'running',
node_id: '1',
prompt_id: 'job-1',
display_node_id: '1'
}
}
}
})
)
successHandler(
new CustomEvent('execution_success', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
vi.advanceTimersByTime(16)
expect(Object.keys(store.nodeProgressStates)).toHaveLength(0)
})
it('discards pending progress RAF on execution_error', () => {
const progressHandler = getRegisteredHandler('progress')
const startHandler = getRegisteredHandler('execution_start')
const errorHandler = getRegisteredHandler('execution_error')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
progressHandler(
new CustomEvent('progress', {
detail: { value: 5, max: 10, prompt_id: 'job-1', node: '1' }
})
)
errorHandler(
new CustomEvent('execution_error', {
detail: {
prompt_id: 'job-1',
node_id: '1',
node_type: 'TestNode',
exception_message: 'error',
exception_type: 'RuntimeError',
traceback: []
}
})
)
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toBeNull()
})
it('discards pending progress RAF on execution_interrupted', () => {
const progressHandler = getRegisteredHandler('progress')
const startHandler = getRegisteredHandler('execution_start')
const interruptedHandler = getRegisteredHandler('execution_interrupted')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0 }
})
)
progressHandler(
new CustomEvent('progress', {
detail: { value: 5, max: 10, prompt_id: 'job-1', node: '1' }
})
)
interruptedHandler(
new CustomEvent('execution_interrupted', {
detail: {
prompt_id: 'job-1',
node_id: '1',
node_type: 'TestNode',
executed: []
}
})
)
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toBeNull()
})
})
describe('unbindExecutionEvents cancels pending RAFs', () => {
it('cancels pending progress RAF on unbind', () => {
const handler = getRegisteredHandler('progress')
handler(
new CustomEvent('progress', {
detail: { value: 5, max: 10, prompt_id: 'job-1', node: '1' }
})
)
store.unbindExecutionEvents()
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toBeNull()
})
it('cancels pending progress_state RAF on unbind', () => {
const handler = getRegisteredHandler('progress_state')
handler(
new CustomEvent('progress_state', {
detail: {
prompt_id: 'job-1',
nodes: {
'1': {
value: 0,
max: 10,
state: 'running',
node_id: '1',
prompt_id: 'job-1',
display_node_id: '1'
}
}
}
})
)
store.unbindExecutionEvents()
vi.advanceTimersByTime(16)
expect(Object.keys(store.nodeProgressStates)).toHaveLength(0)
})
})
})
describe('useExecutionStore - WebSocket event handlers', () => {
let store: ReturnType<typeof useExecutionStore>
@@ -895,12 +1209,21 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
describe('progress', () => {
it('sets _executingNodeProgress from the event payload', () => {
const payload = { value: 3, max: 10, prompt_id: 'job-1', node: 'n1' }
it('sets _executingNodeProgress from the event payload (RAF-batched)', () => {
vi.useFakeTimers()
try {
const payload = { value: 3, max: 10, prompt_id: 'job-1', node: 'n1' }
fire('progress', payload)
fire('progress', payload)
// RAF-batched: not applied synchronously
expect(store._executingNodeProgress).toBeNull()
expect(store._executingNodeProgress).toEqual(payload)
vi.advanceTimersByTime(16)
expect(store._executingNodeProgress).toEqual(payload)
} finally {
vi.useRealTimers()
}
})
})

View File

@@ -35,6 +35,7 @@ import { useExecutionErrorStore } from '@/stores/executionErrorStore'
import type { NodeLocatorId } from '@/types/nodeIdentification'
import { classifyCloudValidationError } from '@/utils/executionErrorUtil'
import { executionIdToNodeLocatorId } from '@/utils/graphTraversalUtil'
import { createRafCoalescer } from '@/utils/rafBatch'
interface QueuedJob {
/**
@@ -244,6 +245,8 @@ export const useExecutionStore = defineStore('execution', () => {
api.removeEventListener('status', handleStatus)
api.removeEventListener('execution_error', handleExecutionError)
api.removeEventListener('progress_text', handleProgressText)
cancelPendingProgressUpdates()
}
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
@@ -292,6 +295,10 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleExecuting(e: CustomEvent<NodeId | null>): void {
// Cancel any pending progress RAF before clearing state to prevent
// stale data from being written back on the next frame.
progressCoalescer.cancel()
// Clear the current node progress when a new node starts executing
_executingNodeProgress.value = null
@@ -334,8 +341,15 @@ export const useExecutionStore = defineStore('execution', () => {
nodeProgressStatesByJob.value = pruned
}
const progressStateCoalescer =
createRafCoalescer<ProgressStateWsMessage>(_applyProgressState)
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes, prompt_id: jobId } = e.detail
progressStateCoalescer.push(e.detail)
}
function _applyProgressState(detail: ProgressStateWsMessage) {
const { nodes, prompt_id: jobId } = detail
// Revoke previews for nodes that are starting to execute
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
@@ -371,8 +385,17 @@ export const useExecutionStore = defineStore('execution', () => {
}
}
const progressCoalescer = createRafCoalescer<ProgressWsMessage>((detail) => {
_executingNodeProgress.value = detail
})
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
_executingNodeProgress.value = e.detail
progressCoalescer.push(e.detail)
}
function cancelPendingProgressUpdates() {
progressCoalescer.cancel()
progressStateCoalescer.cancel()
}
function handleStatus() {
@@ -494,6 +517,8 @@ export const useExecutionStore = defineStore('execution', () => {
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: JobId | null) {
cancelPendingProgressUpdates()
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}
const jobId = jobIdParam ?? activeJobId.value ?? null

View File

@@ -0,0 +1,85 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createRafCoalescer } from '@/utils/rafBatch'
describe('createRafCoalescer', () => {
beforeEach(() => {
vi.useFakeTimers()
})
afterEach(() => {
vi.useRealTimers()
})
it('applies the latest pushed value on the next frame', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(1)
coalescer.push(2)
coalescer.push(3)
expect(apply).not.toHaveBeenCalled()
vi.advanceTimersByTime(16)
expect(apply).toHaveBeenCalledOnce()
expect(apply).toHaveBeenCalledWith(3)
})
it('does not apply after cancel', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(42)
coalescer.cancel()
vi.advanceTimersByTime(16)
expect(apply).not.toHaveBeenCalled()
})
it('applies immediately on flush', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(99)
coalescer.flush()
expect(apply).toHaveBeenCalledOnce()
expect(apply).toHaveBeenCalledWith(99)
})
it('does nothing on flush when no value is pending', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.flush()
expect(apply).not.toHaveBeenCalled()
})
it('does not double-apply after flush', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(1)
coalescer.flush()
vi.advanceTimersByTime(16)
expect(apply).toHaveBeenCalledOnce()
})
it('reports scheduled state correctly', () => {
const coalescer = createRafCoalescer<number>(vi.fn())
expect(coalescer.isScheduled()).toBe(false)
coalescer.push(1)
expect(coalescer.isScheduled()).toBe(true)
vi.advanceTimersByTime(16)
expect(coalescer.isScheduled()).toBe(false)
})
})

View File

@@ -27,3 +27,40 @@ export function createRafBatch(run: () => void) {
return { schedule, cancel, flush, isScheduled }
}
/**
* Last-write-wins RAF coalescer. Buffers the latest value and applies it
* on the next animation frame, coalescing multiple pushes into a single
* reactive update.
*/
export function createRafCoalescer<T>(apply: (value: T) => void) {
let hasPending = false
let pendingValue: T | undefined
const batch = createRafBatch(() => {
if (!hasPending) return
const value = pendingValue as T
hasPending = false
pendingValue = undefined
apply(value)
})
const push = (value: T) => {
pendingValue = value
hasPending = true
batch.schedule()
}
const cancel = () => {
hasPending = false
pendingValue = undefined
batch.cancel()
}
const flush = () => {
if (!hasPending) return
batch.flush()
}
return { push, cancel, flush, isScheduled: batch.isScheduled }
}