mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-04-19 22:09:37 +00:00
[backport cloud/1.41] fix: cap nodeProgressStatesByJob to prevent unbounded memory growth (#9249) (#10054)
Backport of #9249 to cloud/1.41 Co-authored-by: John Haugeland <stonecypher@gmail.com> Co-authored-by: GitHub Action <action@github.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { app } from '@/scripts/app'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { MAX_PROGRESS_JOBS, useExecutionStore } from '@/stores/executionStore'
|
||||
import { useExecutionErrorStore } from '@/stores/executionErrorStore'
|
||||
import { executionIdToNodeLocatorId } from '@/utils/graphTraversalUtil'
|
||||
|
||||
@@ -11,6 +11,7 @@ const mockNodeIdToNodeLocatorId = vi.fn()
|
||||
const mockNodeLocatorIdToNodeExecutionId = vi.fn()
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
import type { NodeProgressState } from '@/schemas/apiSchema'
|
||||
import { createMockLGraphNode } from '@/utils/__tests__/litegraphTestUtils'
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
|
||||
@@ -40,6 +41,37 @@ vi.mock('@/composables/node/useNodeProgressText', () => ({
|
||||
})
|
||||
}))
|
||||
|
||||
/**
|
||||
* Captures event handlers registered via api.addEventListener so tests
|
||||
* can invoke them directly (e.g. to simulate WebSocket progress events).
|
||||
*/
|
||||
type EventHandler = (...args: unknown[]) => void
|
||||
const apiEventHandlers = new Map<string, EventHandler>()
|
||||
vi.mock('@/scripts/api', () => ({
|
||||
api: {
|
||||
addEventListener: vi.fn((event: string, handler: EventHandler) => {
|
||||
apiEventHandlers.set(event, handler)
|
||||
}),
|
||||
removeEventListener: vi.fn((event: string) => {
|
||||
apiEventHandlers.delete(event)
|
||||
}),
|
||||
clientId: 'test-client',
|
||||
apiURL: vi.fn((path: string) => `/api${path}`)
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
})
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/jobPreviewStore', () => ({
|
||||
useJobPreviewStore: () => ({
|
||||
clearPreview: vi.fn()
|
||||
})
|
||||
}))
|
||||
|
||||
// Mock the app import with proper implementation
|
||||
vi.mock('@/scripts/app', () => ({
|
||||
app: {
|
||||
@@ -136,6 +168,226 @@ describe('useExecutionStore - NodeLocatorId conversions', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - nodeLocationProgressStates caching', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mockNodeExecutionIdToNodeLocatorId.mockReset()
|
||||
mockNodeIdToNodeLocatorId.mockReset()
|
||||
mockNodeLocatorIdToNodeExecutionId.mockReset()
|
||||
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should resolve execution IDs to locator IDs for subgraph nodes', () => {
|
||||
const mockSubgraph = {
|
||||
id: 'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
|
||||
nodes: []
|
||||
}
|
||||
const mockNode = createMockLGraphNode({
|
||||
id: 123,
|
||||
isSubgraphNode: () => true,
|
||||
subgraph: mockSubgraph
|
||||
})
|
||||
vi.mocked(app.rootGraph.getNodeById).mockReturnValue(mockNode)
|
||||
|
||||
store.nodeProgressStates = {
|
||||
node1: {
|
||||
display_node_id: '123:456',
|
||||
state: 'running',
|
||||
value: 50,
|
||||
max: 100,
|
||||
prompt_id: 'test',
|
||||
node_id: 'node1'
|
||||
}
|
||||
}
|
||||
|
||||
const result = store.nodeLocationProgressStates
|
||||
|
||||
expect(result['123']).toBeDefined()
|
||||
expect(result['a1b2c3d4-e5f6-7890-abcd-ef1234567890:456']).toBeDefined()
|
||||
})
|
||||
|
||||
it('should not re-traverse graph for same execution IDs across progress updates', () => {
|
||||
const mockSubgraph = {
|
||||
id: 'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
|
||||
nodes: []
|
||||
}
|
||||
const mockNode = createMockLGraphNode({
|
||||
id: 123,
|
||||
isSubgraphNode: () => true,
|
||||
subgraph: mockSubgraph
|
||||
})
|
||||
vi.mocked(app.rootGraph.getNodeById).mockReturnValue(mockNode)
|
||||
|
||||
store.nodeProgressStates = {
|
||||
node1: {
|
||||
display_node_id: '123:456',
|
||||
state: 'running',
|
||||
value: 50,
|
||||
max: 100,
|
||||
prompt_id: 'test',
|
||||
node_id: 'node1'
|
||||
}
|
||||
}
|
||||
|
||||
// First evaluation triggers graph traversal
|
||||
expect(store.nodeLocationProgressStates['123']).toBeDefined()
|
||||
const callCountAfterFirst = vi.mocked(app.rootGraph.getNodeById).mock.calls
|
||||
.length
|
||||
|
||||
// Second update with same execution IDs but different progress
|
||||
store.nodeProgressStates = {
|
||||
node1: {
|
||||
display_node_id: '123:456',
|
||||
state: 'running',
|
||||
value: 75,
|
||||
max: 100,
|
||||
prompt_id: 'test',
|
||||
node_id: 'node1'
|
||||
}
|
||||
}
|
||||
|
||||
expect(store.nodeLocationProgressStates['123']).toBeDefined()
|
||||
|
||||
// getNodeById should NOT be called again for the same execution ID
|
||||
expect(vi.mocked(app.rootGraph.getNodeById).mock.calls.length).toBe(
|
||||
callCountAfterFirst
|
||||
)
|
||||
})
|
||||
|
||||
it('should correctly resolve multiple sibling nodes in the same subgraph', () => {
|
||||
const mockSubgraph = {
|
||||
id: 'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
|
||||
nodes: []
|
||||
}
|
||||
const mockNode = createMockLGraphNode({
|
||||
id: 123,
|
||||
isSubgraphNode: () => true,
|
||||
subgraph: mockSubgraph
|
||||
})
|
||||
vi.mocked(app.rootGraph.getNodeById).mockReturnValue(mockNode)
|
||||
|
||||
// Two sibling nodes in the same subgraph
|
||||
store.nodeProgressStates = {
|
||||
node1: {
|
||||
display_node_id: '123:456',
|
||||
state: 'running',
|
||||
value: 50,
|
||||
max: 100,
|
||||
prompt_id: 'test',
|
||||
node_id: 'node1'
|
||||
},
|
||||
node2: {
|
||||
display_node_id: '123:789',
|
||||
state: 'running',
|
||||
value: 30,
|
||||
max: 100,
|
||||
prompt_id: 'test',
|
||||
node_id: 'node2'
|
||||
}
|
||||
}
|
||||
|
||||
const result = store.nodeLocationProgressStates
|
||||
|
||||
// Both sibling nodes should be resolved with the correct subgraph UUID
|
||||
expect(result['a1b2c3d4-e5f6-7890-abcd-ef1234567890:456']).toBeDefined()
|
||||
expect(result['a1b2c3d4-e5f6-7890-abcd-ef1234567890:789']).toBeDefined()
|
||||
|
||||
// The shared parent "123" should also have a merged state
|
||||
expect(result['123']).toBeDefined()
|
||||
expect(result['123'].state).toBe('running')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - nodeProgressStatesByJob eviction', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } })
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('should retain entries below the limit', () => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(5)
|
||||
})
|
||||
|
||||
it('should evict oldest entries when exceeding MAX_PROGRESS_JOBS', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS + 10; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
const keys = Object.keys(store.nodeProgressStatesByJob)
|
||||
expect(keys).toHaveLength(MAX_PROGRESS_JOBS)
|
||||
// Oldest jobs (0-9) should be evicted; newest should remain
|
||||
expect(keys).not.toContain('job-0')
|
||||
expect(keys).not.toContain('job-9')
|
||||
expect(keys).toContain(`job-${MAX_PROGRESS_JOBS + 9}`)
|
||||
expect(keys).toContain(`job-${MAX_PROGRESS_JOBS}`)
|
||||
})
|
||||
|
||||
it('should keep the most recently added job after eviction', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS + 1; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
|
||||
const lastJobId = `job-${MAX_PROGRESS_JOBS}`
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty(lastJobId)
|
||||
})
|
||||
|
||||
it('should not evict when updating an existing job', () => {
|
||||
for (let i = 0; i < MAX_PROGRESS_JOBS; i++) {
|
||||
fireProgressState(`job-${i}`, makeProgressNodes(`${i}`, `job-${i}`))
|
||||
}
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(
|
||||
MAX_PROGRESS_JOBS
|
||||
)
|
||||
|
||||
// Update an existing job — should not trigger eviction
|
||||
fireProgressState('job-0', makeProgressNodes('0', 'job-0'))
|
||||
expect(Object.keys(store.nodeProgressStatesByJob)).toHaveLength(
|
||||
MAX_PROGRESS_JOBS
|
||||
)
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-0')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
|
||||
@@ -46,6 +46,13 @@ interface QueuedJob {
|
||||
workflow?: ComfyWorkflow
|
||||
}
|
||||
|
||||
/**
|
||||
* Maximum number of job entries retained in {@link nodeProgressStatesByJob}.
|
||||
* When exceeded, the oldest entries (by insertion order) are evicted to
|
||||
* prevent unbounded memory growth in long-running sessions.
|
||||
*/
|
||||
export const MAX_PROGRESS_JOBS = 1000
|
||||
|
||||
export const useExecutionStore = defineStore('execution', () => {
|
||||
const workflowStore = useWorkflowStore()
|
||||
const canvasStore = useCanvasStore()
|
||||
@@ -278,6 +285,34 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evicts the oldest entries from {@link nodeProgressStatesByJob} when the
|
||||
* map exceeds {@link MAX_PROGRESS_JOBS}, preventing unbounded memory
|
||||
* growth in long-running sessions.
|
||||
*
|
||||
* Relies on ES2015+ object key insertion order: the first keys returned
|
||||
* by `Object.keys` are the oldest entries.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* // Given 105 entries, evicts the 5 oldest:
|
||||
* evictOldProgressJobs()
|
||||
* Object.keys(nodeProgressStatesByJob.value).length // => 100
|
||||
* ```
|
||||
*/
|
||||
function evictOldProgressJobs() {
|
||||
const current = nodeProgressStatesByJob.value
|
||||
const keys = Object.keys(current)
|
||||
if (keys.length <= MAX_PROGRESS_JOBS) return
|
||||
|
||||
const pruned: Record<string, Record<string, NodeProgressState>> = {}
|
||||
const keysToKeep = keys.slice(keys.length - MAX_PROGRESS_JOBS)
|
||||
for (const key of keysToKeep) {
|
||||
pruned[key] = current[key]
|
||||
}
|
||||
nodeProgressStatesByJob.value = pruned
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
|
||||
@@ -300,6 +335,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
|
||||
Reference in New Issue
Block a user