refactor: extract createRafCoalescer utility for last-write-wins RAF batching

Replace manual _pending* variables + createRafBatch + null-check
callbacks with a typed createRafCoalescer<T> that encapsulates the
coalescing pattern. Extract cancelPendingProgressUpdates() helper
to deduplicate the 3 cancel sites in executionStore.
This commit is contained in:
bymyself
2026-03-25 12:42:58 -07:00
parent d4b2264091
commit 49d2cbd1fa
3 changed files with 137 additions and 30 deletions

View File

@@ -33,7 +33,7 @@ import { useExecutionErrorStore } from '@/stores/executionErrorStore'
import type { NodeLocatorId } from '@/types/nodeIdentification'
import { classifyCloudValidationError } from '@/utils/executionErrorUtil'
import { executionIdToNodeLocatorId } from '@/utils/graphTraversalUtil'
import { createRafBatch } from '@/utils/rafBatch'
import { createRafCoalescer } from '@/utils/rafBatch'
interface QueuedJob {
/**
@@ -244,10 +244,7 @@ export const useExecutionStore = defineStore('execution', () => {
api.removeEventListener('execution_error', handleExecutionError)
api.removeEventListener('progress_text', handleProgressText)
progressBatch.cancel()
_pendingProgress = null
progressStateBatch.cancel()
_pendingProgressState = null
cancelPendingProgressUpdates()
}
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
@@ -298,8 +295,7 @@ export const useExecutionStore = defineStore('execution', () => {
function handleExecuting(e: CustomEvent<NodeId | null>): void {
// Cancel any pending progress RAF before clearing state to prevent
// stale data from being written back on the next frame.
progressBatch.cancel()
_pendingProgress = null
progressCoalescer.cancel()
// Clear the current node progress when a new node starts executing
_executingNodeProgress.value = null
@@ -343,17 +339,11 @@ export const useExecutionStore = defineStore('execution', () => {
nodeProgressStatesByJob.value = pruned
}
let _pendingProgressState: ProgressStateWsMessage | null = null
const progressStateBatch = createRafBatch(() => {
if (_pendingProgressState) {
_applyProgressState(_pendingProgressState)
_pendingProgressState = null
}
})
const progressStateCoalescer =
createRafCoalescer<ProgressStateWsMessage>(_applyProgressState)
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
_pendingProgressState = e.detail
progressStateBatch.schedule()
progressStateCoalescer.push(e.detail)
}
function _applyProgressState(detail: ProgressStateWsMessage) {
@@ -393,17 +383,17 @@ export const useExecutionStore = defineStore('execution', () => {
}
}
let _pendingProgress: ProgressWsMessage | null = null
const progressBatch = createRafBatch(() => {
if (_pendingProgress) {
_executingNodeProgress.value = _pendingProgress
_pendingProgress = null
}
const progressCoalescer = createRafCoalescer<ProgressWsMessage>((detail) => {
_executingNodeProgress.value = detail
})
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
_pendingProgress = e.detail
progressBatch.schedule()
progressCoalescer.push(e.detail)
}
function cancelPendingProgressUpdates() {
progressCoalescer.cancel()
progressStateCoalescer.cancel()
}
function handleStatus() {
@@ -525,12 +515,7 @@ export const useExecutionStore = defineStore('execution', () => {
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: string | null) {
// Cancel pending RAFs before clearing state to prevent stale data
// from being written back on the next frame.
progressBatch.cancel()
_pendingProgress = null
progressStateBatch.cancel()
_pendingProgressState = null
cancelPendingProgressUpdates()
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}

View File

@@ -0,0 +1,85 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createRafCoalescer } from '@/utils/rafBatch'
describe('createRafCoalescer', () => {
beforeEach(() => {
vi.useFakeTimers()
})
afterEach(() => {
vi.useRealTimers()
})
it('applies the latest pushed value on the next frame', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(1)
coalescer.push(2)
coalescer.push(3)
expect(apply).not.toHaveBeenCalled()
vi.advanceTimersByTime(16)
expect(apply).toHaveBeenCalledOnce()
expect(apply).toHaveBeenCalledWith(3)
})
it('does not apply after cancel', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(42)
coalescer.cancel()
vi.advanceTimersByTime(16)
expect(apply).not.toHaveBeenCalled()
})
it('applies immediately on flush', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(99)
coalescer.flush()
expect(apply).toHaveBeenCalledOnce()
expect(apply).toHaveBeenCalledWith(99)
})
it('does nothing on flush when no value is pending', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.flush()
expect(apply).not.toHaveBeenCalled()
})
it('does not double-apply after flush', () => {
const apply = vi.fn()
const coalescer = createRafCoalescer<number>(apply)
coalescer.push(1)
coalescer.flush()
vi.advanceTimersByTime(16)
expect(apply).toHaveBeenCalledOnce()
})
it('reports scheduled state correctly', () => {
const coalescer = createRafCoalescer<number>(vi.fn())
expect(coalescer.isScheduled()).toBe(false)
coalescer.push(1)
expect(coalescer.isScheduled()).toBe(true)
vi.advanceTimersByTime(16)
expect(coalescer.isScheduled()).toBe(false)
})
})

View File

@@ -27,3 +27,40 @@ export function createRafBatch(run: () => void) {
return { schedule, cancel, flush, isScheduled }
}
/**
* Last-write-wins RAF coalescer. Buffers the latest value and applies it
* on the next animation frame, coalescing multiple pushes into a single
* reactive update.
*/
export function createRafCoalescer<T>(apply: (value: T) => void) {
let hasPending = false
let pendingValue: T | undefined
const batch = createRafBatch(() => {
if (!hasPending) return
const value = pendingValue as T
hasPending = false
pendingValue = undefined
apply(value)
})
const push = (value: T) => {
pendingValue = value
hasPending = true
batch.schedule()
}
const cancel = () => {
hasPending = false
pendingValue = undefined
batch.cancel()
}
const flush = () => {
if (!hasPending) return
batch.flush()
}
return { push, cancel, flush, isScheduled: batch.isScheduled }
}