mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-03-13 09:00:16 +00:00
fix: cap nodeProgressStatesByJob to prevent unbounded memory growth (#9249)
This has semi-significant performance impact if you use the same workflow for more than a day. I left Comfy running with a mouse jiggler and this reduced my instance to a crawl after about an hour. `nodeProgressStatesByJob` accumulated an entry for every job that ever executed during a session. In long-running sessions this grew without bound, since entries were only removed for the active job on `resetExecutionState`. Add `MAX_PROGRESS_JOBS` (1000) and `evictOldProgressJobs()`, called after each `handleProgressState` update. When the map exceeds the limit, the oldest entries (by ES2015+ insertion order) are pruned — keeping only the most recent 1000. This mirrors the pattern used by assetsStore's `MAX_HISTORY_ITEMS`. Also adds tests for: - nodeLocationProgressStates computed reactivity (recomputes on wholesale replacement, produces new references) - Eviction behavior (retains below limit, evicts oldest above limit, preserves most recent, no-op when updating existing job) - API event handler wiring via captured apiEventHandlers map Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-9249-fix-cap-nodeProgressStatesByJob-to-prevent-unbounded-memory-growth-3136d73d365081e49b36d8ade0d4dd6e) by [Unito](https://www.unito.io) --------- Co-authored-by: GitHub Action <action@github.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { app } from '@/scripts/app'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { MAX_PROGRESS_JOBS, useExecutionStore } from '@/stores/executionStore'
|
||||
import { useExecutionErrorStore } from '@/stores/executionErrorStore'
|
||||
import { executionIdToNodeLocatorId } from '@/utils/graphTraversalUtil'
|
||||
|
||||
@@ -11,6 +11,7 @@ const mockNodeIdToNodeLocatorId = vi.fn()
|
||||
const mockNodeLocatorIdToNodeExecutionId = vi.fn()
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
import type { NodeProgressState } from '@/schemas/apiSchema'
|
||||
import { createMockLGraphNode } from '@/utils/__tests__/litegraphTestUtils'
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
|
||||
@@ -40,6 +41,37 @@ vi.mock('@/composables/node/useNodeProgressText', () => ({
|
||||
})
|
||||
}))
|
||||
|
||||
/**
|
||||
* Captures event handlers registered via api.addEventListener so tests
|
||||
* can invoke them directly (e.g. to simulate WebSocket progress events).
|
||||
*/
|
||||
type EventHandler = (...args: unknown[]) => void
|
||||
const apiEventHandlers = new Map<string, EventHandler>()
|
||||
vi.mock('@/scripts/api', () => ({
|
||||
api: {
|
||||
addEventListener: vi.fn((event: string, handler: EventHandler) => {
|
||||
apiEventHandlers.set(event, handler)
|
||||
}),
|
||||
removeEventListener: vi.fn((event: string) => {
|
||||
apiEventHandlers.delete(event)
|
||||
}),
|
||||
clientId: 'test-client',
|
||||
apiURL: vi.fn((path: string) => `/api${path}`)
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
})
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/jobPreviewStore', () => ({
|
||||
useJobPreviewStore: () => ({
|
||||
clearPreview: vi.fn()
|
||||
})
|
||||
}))
|
||||
|
||||
// Mock the app import with proper implementation
|
||||
vi.mock('@/scripts/app', () => ({
|
||||
app: {
|
||||
@@ -270,6 +302,92 @@ describe('useExecutionStore - nodeLocationProgressStates caching', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - nodeProgressStatesByJob eviction', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('should retain entries below the limit', () => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(5)
|
||||
})
|
||||
|
||||
it('should evict oldest entries when exceeding MAX_PROGRESS_JOBS', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS + 10; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
const keys = Object.keys(store.nodeProgressStatesByJob)
|
||||
expect(keys).toHaveLength(MAX_PROGRESS_JOBS)
|
||||
// Oldest jobs (0-9) should be evicted; newest should remain
|
||||
expect(keys).not.toContain('job-0')
|
||||
expect(keys).not.toContain('job-9')
|
||||
expect(keys).toContain(`job-${MAX_PROGRESS_JOBS + 9}`)
|
||||
expect(keys).toContain(`job-${MAX_PROGRESS_JOBS}`)
|
||||
})
|
||||
|
||||
it('should keep the most recently added job after eviction', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS + 1; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
const lastJobId = `job-${MAX_PROGRESS_JOBS}`
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty(lastJobId)
|
||||
})
|
||||
|
||||
it('should not evict when updating an existing job', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(
|
||||
MAX_PROGRESS_JOBS
|
||||
)
|
||||
|
||||
// Update an existing job — should not trigger eviction
|
||||
fireProgressState('job-0', makeProgressNodes('0', 'job-0'))
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(
|
||||
MAX_PROGRESS_JOBS
|
||||
)
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-0')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
|
||||
@@ -46,6 +46,13 @@ interface QueuedJob {
|
||||
workflow?: ComfyWorkflow
|
||||
}
|
||||
|
||||
/**
|
||||
* Maximum number of job entries retained in {@link nodeProgressStatesByJob}.
|
||||
* When exceeded, the oldest entries (by insertion order) are evicted to
|
||||
* prevent unbounded memory growth in long-running sessions.
|
||||
*/
|
||||
export const MAX_PROGRESS_JOBS = 1000
|
||||
|
||||
export const useExecutionStore = defineStore('execution', () => {
|
||||
const workflowStore = useWorkflowStore()
|
||||
const canvasStore = useCanvasStore()
|
||||
@@ -297,6 +304,34 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evicts the oldest entries from {@link nodeProgressStatesByJob} when the
|
||||
* map exceeds {@link MAX_PROGRESS_JOBS}, preventing unbounded memory
|
||||
* growth in long-running sessions.
|
||||
*
|
||||
* Relies on ES2015+ object key insertion order: the first keys returned
|
||||
* by `Object.keys` are the oldest entries.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* // Given 105 entries, evicts the 5 oldest:
|
||||
* evictOldProgressJobs()
|
||||
* Object.keys(nodeProgressStatesByJob.value).length // => 100
|
||||
* ```
|
||||
*/
|
||||
function evictOldProgressJobs() {
|
||||
const current = nodeProgressStatesByJob.value
|
||||
const keys = Object.keys(current)
|
||||
if (keys.length <= MAX_PROGRESS_JOBS) return
|
||||
|
||||
const pruned: Record<string, Record<string, NodeProgressState>> = {}
|
||||
const keysToKeep = keys.slice(keys.length - MAX_PROGRESS_JOBS)
|
||||
for (const key of keysToKeep) {
|
||||
pruned[key] = current[key]
|
||||
}
|
||||
nodeProgressStatesByJob.value = pruned
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
|
||||
@@ -319,6 +354,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
|
||||
Reference in New Issue
Block a user