diff --git a/src/stores/executionStore.ts b/src/stores/executionStore.ts index 3f31a6ca6e..c4d7745cb6 100644 --- a/src/stores/executionStore.ts +++ b/src/stores/executionStore.ts @@ -33,7 +33,7 @@ import { useExecutionErrorStore } from '@/stores/executionErrorStore' import type { NodeLocatorId } from '@/types/nodeIdentification' import { classifyCloudValidationError } from '@/utils/executionErrorUtil' import { executionIdToNodeLocatorId } from '@/utils/graphTraversalUtil' -import { createRafBatch } from '@/utils/rafBatch' +import { createRafCoalescer } from '@/utils/rafBatch' interface QueuedJob { /** @@ -244,10 +244,7 @@ export const useExecutionStore = defineStore('execution', () => { api.removeEventListener('execution_error', handleExecutionError) api.removeEventListener('progress_text', handleProgressText) - progressBatch.cancel() - _pendingProgress = null - progressStateBatch.cancel() - _pendingProgressState = null + cancelPendingProgressUpdates() } function handleExecutionStart(e: CustomEvent) { @@ -298,8 +295,7 @@ export const useExecutionStore = defineStore('execution', () => { function handleExecuting(e: CustomEvent): void { // Cancel any pending progress RAF before clearing state to prevent // stale data from being written back on the next frame. - progressBatch.cancel() - _pendingProgress = null + progressCoalescer.cancel() // Clear the current node progress when a new node starts executing _executingNodeProgress.value = null @@ -343,17 +339,11 @@ export const useExecutionStore = defineStore('execution', () => { nodeProgressStatesByJob.value = pruned } - let _pendingProgressState: ProgressStateWsMessage | null = null - const progressStateBatch = createRafBatch(() => { - if (_pendingProgressState) { - _applyProgressState(_pendingProgressState) - _pendingProgressState = null - } - }) + const progressStateCoalescer = + createRafCoalescer(_applyProgressState) function handleProgressState(e: CustomEvent) { - _pendingProgressState = e.detail - progressStateBatch.schedule() + progressStateCoalescer.push(e.detail) } function _applyProgressState(detail: ProgressStateWsMessage) { @@ -393,17 +383,17 @@ export const useExecutionStore = defineStore('execution', () => { } } - let _pendingProgress: ProgressWsMessage | null = null - const progressBatch = createRafBatch(() => { - if (_pendingProgress) { - _executingNodeProgress.value = _pendingProgress - _pendingProgress = null - } + const progressCoalescer = createRafCoalescer((detail) => { + _executingNodeProgress.value = detail }) function handleProgress(e: CustomEvent) { - _pendingProgress = e.detail - progressBatch.schedule() + progressCoalescer.push(e.detail) + } + + function cancelPendingProgressUpdates() { + progressCoalescer.cancel() + progressStateCoalescer.cancel() } function handleStatus() { @@ -525,12 +515,7 @@ export const useExecutionStore = defineStore('execution', () => { * Reset execution-related state after a run completes or is stopped. */ function resetExecutionState(jobIdParam?: string | null) { - // Cancel pending RAFs before clearing state to prevent stale data - // from being written back on the next frame. - progressBatch.cancel() - _pendingProgress = null - progressStateBatch.cancel() - _pendingProgressState = null + cancelPendingProgressUpdates() executionIdToLocatorCache.clear() nodeProgressStates.value = {} diff --git a/src/utils/rafBatch.test.ts b/src/utils/rafBatch.test.ts new file mode 100644 index 0000000000..125fece2d5 --- /dev/null +++ b/src/utils/rafBatch.test.ts @@ -0,0 +1,85 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import { createRafCoalescer } from '@/utils/rafBatch' + +describe('createRafCoalescer', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('applies the latest pushed value on the next frame', () => { + const apply = vi.fn() + const coalescer = createRafCoalescer(apply) + + coalescer.push(1) + coalescer.push(2) + coalescer.push(3) + + expect(apply).not.toHaveBeenCalled() + + vi.advanceTimersByTime(16) + + expect(apply).toHaveBeenCalledOnce() + expect(apply).toHaveBeenCalledWith(3) + }) + + it('does not apply after cancel', () => { + const apply = vi.fn() + const coalescer = createRafCoalescer(apply) + + coalescer.push(42) + coalescer.cancel() + + vi.advanceTimersByTime(16) + + expect(apply).not.toHaveBeenCalled() + }) + + it('applies immediately on flush', () => { + const apply = vi.fn() + const coalescer = createRafCoalescer(apply) + + coalescer.push(99) + coalescer.flush() + + expect(apply).toHaveBeenCalledOnce() + expect(apply).toHaveBeenCalledWith(99) + }) + + it('does nothing on flush when no value is pending', () => { + const apply = vi.fn() + const coalescer = createRafCoalescer(apply) + + coalescer.flush() + + expect(apply).not.toHaveBeenCalled() + }) + + it('does not double-apply after flush', () => { + const apply = vi.fn() + const coalescer = createRafCoalescer(apply) + + coalescer.push(1) + coalescer.flush() + + vi.advanceTimersByTime(16) + + expect(apply).toHaveBeenCalledOnce() + }) + + it('reports scheduled state correctly', () => { + const coalescer = createRafCoalescer(vi.fn()) + + expect(coalescer.isScheduled()).toBe(false) + + coalescer.push(1) + expect(coalescer.isScheduled()).toBe(true) + + vi.advanceTimersByTime(16) + expect(coalescer.isScheduled()).toBe(false) + }) +}) diff --git a/src/utils/rafBatch.ts b/src/utils/rafBatch.ts index a8756ef245..6095addec1 100644 --- a/src/utils/rafBatch.ts +++ b/src/utils/rafBatch.ts @@ -27,3 +27,40 @@ export function createRafBatch(run: () => void) { return { schedule, cancel, flush, isScheduled } } + +/** + * Last-write-wins RAF coalescer. Buffers the latest value and applies it + * on the next animation frame, coalescing multiple pushes into a single + * reactive update. + */ +export function createRafCoalescer(apply: (value: T) => void) { + let hasPending = false + let pendingValue: T | undefined + + const batch = createRafBatch(() => { + if (!hasPending) return + const value = pendingValue as T + hasPending = false + pendingValue = undefined + apply(value) + }) + + const push = (value: T) => { + pendingValue = value + hasPending = true + batch.schedule() + } + + const cancel = () => { + hasPending = false + pendingValue = undefined + batch.cancel() + } + + const flush = () => { + if (!hasPending) return + batch.flush() + } + + return { push, cancel, flush, isScheduled: batch.isScheduled } +}