diff --git a/initial-report.md b/initial-report.md new file mode 100644 index 000000000..572bc14d4 --- /dev/null +++ b/initial-report.md @@ -0,0 +1,43 @@ +# Parallel Workflow Execution - Initial Findings + +## Cloud PR 1743 behavior + +- The PR enables parallel execution by making the per-user concurrency limit + dynamic: `DispatchConfig.MaxConcurrentJobsPerUser` is added and read at runtime. +- Dispatcher passes the runtime value into the state machine via + `TriggerEventGloballyWithArgs`, so changes apply without restart. +- State machine uses the runtime limit to move jobs from `queued_limited` to + `queued_waiting`, which is the gate for concurrent execution. +- Environment defaults set local/ephemeral/dev to 5 and staging/prod to 1. + +Key references: +- `common/dynamicconfig/types.go` +- `services/dispatcher/server/services/job_dispatch/dispatcher.go` +- `common/jobstate/state.go` +- `common/dynamicconfig/defaults/ephemeral.json` +- `infrastructure/dynamicconfig/prod/config.json` +- `infrastructure/dynamicconfig/staging/config.json` +- `infrastructure/dynamicconfig/dev/config.json` + +## Frontend audit notes (current behavior) + +- Queue fetching already supports multiple running/pending jobs via `/jobs`, + mapping to `Running`/`Pending` lists. + - `src/platform/remote/comfyui/jobs/fetchJobs.ts` + - `src/stores/queueStore.ts` +- The concurrency indicator is based on `runningWorkflowCount`, which counts + prompts with running nodes and should reflect parallel execution. + - `src/stores/executionStore.ts` + - `src/components/queue/QueueProgressOverlay.vue` +- The progress overlay is single-prompt focused: + - `executionStore.activePromptId` tracks one prompt. + - `nodeProgressStates` is overwritten per `progress_state`, so the progress + bar and current node are effectively “last prompt wins.” + - `src/stores/executionStore.ts` + - `src/components/queue/QueueProgressOverlay.vue` + +## Implication + +If the frontend is expected to show per-job progress for multiple concurrent +workflows, the execution store and progress overlay will need to evolve beyond +the single-active-prompt assumption. diff --git a/plan.md b/plan.md new file mode 100644 index 000000000..ba4b43b81 --- /dev/null +++ b/plan.md @@ -0,0 +1,107 @@ +# Parallel Execution Implementation Plan + +## Goals +- Support multiple simultaneous workflow executions across: + - Separate browser tabs (multiple clients). + - In-app workflow tabs (multiple workflows). +- Show correct progress, outputs, and errors per workflow/job. +- Avoid regressions for single-run behavior and existing UI. + +## Non-goals (initial scope) +- Redesign the queue UI beyond enabling per-job progress indicators. +- New backend features beyond prompt context for WS events (see dependencies). + +## Dependencies / coordination (verified vs `../cloud-3` + `../backend-3`) +- Prompt-scoped data is already available for core execution events: + - `execution_start`, `execution_success`, `execution_error`, `execution_interrupted`, + `executed`, `progress`, `progress_state`, and `executing` include `prompt_id` in + `../backend-3` and are forwarded unchanged through `../cloud-3`. + - Frontend currently *drops* `prompt_id` for `executing`; this can be fixed + without backend changes. +- `progress_text` is a binary message containing only `nodeId` + text in + `../backend-3` (`send_progress_text`); `../cloud-3` forwards it as raw binary. + - Per-prompt progress text is **not possible** without backend changes. + - If we want it, we need to extend ComfyUI/backend to include `prompt_id` in + the binary payload or add a parallel JSON event. +- `b_preview_with_metadata` includes `prompt_id` and is forwarded as binary; usable + as-is for prompt/workflow scoping. +- Decide whether UI should filter by `client_id` (current tab only) or show all + user jobs; document and apply consistently. + +## Phase 1: Data model and execution state +1. Refactor execution store to be prompt-centric: + - Replace `activePromptId` single value with: + - `promptStates: Record`. + - `runningPromptIds` derived from prompt states. + - `promptIdToWorkflowId` remains the primary routing map. + - Provide selectors: + - `getPromptProgress(promptId)` + - `getPromptNodeProgressStates(promptId)` + - `getWorkflowRunningPromptIds(workflowId)` + - `getActivePromptForWorkflow(workflowId)` (if needed) + - Redefine `isIdle` as `runningPromptIds.length === 0`. + - Keep backward-compat getters but scope to: + - active workflow tab, or + - "most recently started prompt". + +2. Update event handlers to write prompt-scoped state: + - `execution_start` creates prompt state. + - `progress_state` merges into `promptStates[promptId].nodeProgressStates`. + - `execution_success/error/interrupted` remove prompt state only for that id. + - Preserve initialization tracking per prompt. + +## Phase 2: Prompt-aware node progress and canvas updates +1. Update graph/node progress to be scoped to the active workflow tab: + - Derive `nodeLocationProgressStates` from the prompt(s) mapped to the + active workflow only. + - Ensure `GraphCanvas.vue` applies progress for active workflow only. +2. Update vue-node execution composables to use workflow-scoped progress. +3. Update `groupNode` progress rendering to use prompt-scoped state. + +## Phase 3: Outputs and previews isolation +1. Introduce per-workflow output maps in `imagePreviewStore`: + - `outputsByWorkflowId[workflowId][nodeLocatorId]`. + - `previewsByWorkflowId[workflowId][nodeLocatorId]`. +2. When switching active workflow tab: + - Swap `app.nodeOutputs` and `app.nodePreviewImages` to the selected map. +3. Update `executed` and `b_preview_with_metadata` handlers to: + - Use `prompt_id -> workflowId` mapping to store outputs/previews in the + correct workflow bucket. +4. Update `ChangeTracker` and history loading paths to restore per-workflow + outputs without overwriting other workflows. + +## Phase 4: UI updates for multi-run visibility +1. Queue overlay: + - `useQueueProgress` to compute per-job progress for all running tasks. + - `useJobList` to attach progress to each running item, not just one. + - `QueueOverlayActive` to show aggregated or multi-job state. +2. Actionbar interrupt: + - Route interrupt to prompt(s) for the active workflow tab. + - Optionally add a "stop all running" action. +3. Browser tab title + favicon: + - Use aggregate progress (e.g., max or average of running jobs) or show + count of running jobs with a generic progress indicator. + +## Phase 5: Browser-tab concurrency policy +1. Decide and implement filtering: + - Option A: show only jobs for the current `client_id` in the UI. + - Option B: show all jobs for the user, but mark which client started them. +2. Apply consistent filtering in: + - queue overlay, + - completion summary, + - progress favicon/title. + +## Phase 6: Tests and validation +- Unit tests: + - `executionStore` selectors and lifecycle per prompt. + - `useQueueProgress` and `useJobList` showing per-job progress. +- Update existing stories/tests that assume single `activePromptId`. +- Manual validation checklist: + - Two workflows running in two in-app tabs: progress and outputs isolated. + - Two browser tabs running distinct workflows: no cross-talk in UI. + - Interrupt from actionbar affects intended prompt(s). + +## Phase 7: Rollout and cleanup +- Remove deprecated single-prompt fields after migration is stable. +- Update documentation/comments where prompt-scoped behavior is required. +- Coordinate backend/WS schema changes with `../backend-3` and `../cloud-3`. diff --git a/src/components/queue/job/JobDetailsPopover.stories.ts b/src/components/queue/job/JobDetailsPopover.stories.ts index 2343a8fa2..c7d928427 100644 --- a/src/components/queue/job/JobDetailsPopover.stories.ts +++ b/src/components/queue/job/JobDetailsPopover.stories.ts @@ -34,7 +34,7 @@ function resetStores() { queue.runningTasks = [] queue.historyTasks = [] - exec.nodeProgressStatesByPrompt = {} + exec.setNodeProgressStatesByPrompt({}) } function makePendingTask( @@ -146,7 +146,7 @@ export const Queued: Story = { // Queued at (in metadata on prompt[4]) // One running workflow - exec.nodeProgressStatesByPrompt = { + exec.setNodeProgressStatesByPrompt({ p1: { '1': { value: 1, @@ -156,7 +156,7 @@ export const Queued: Story = { prompt_id: 'p1' } } - } as any + } as any) return { args: { ...args, jobId } } }, @@ -196,7 +196,7 @@ export const QueuedParallel: Story = { ] // Two parallel workflows running - exec.nodeProgressStatesByPrompt = { + exec.setNodeProgressStatesByPrompt({ p1: { '1': { value: 1, @@ -215,7 +215,7 @@ export const QueuedParallel: Story = { prompt_id: 'p2' } } - } as any + } as any) return { args: { ...args, jobId } } }, @@ -246,7 +246,7 @@ export const Running: Story = { makeHistoryTask('hist-r3', 252, 60, true) ] - exec.nodeProgressStatesByPrompt = { + exec.setNodeProgressStatesByPrompt({ p1: { '1': { value: 5, @@ -256,7 +256,7 @@ export const Running: Story = { prompt_id: 'p1' } } - } as any + } as any) return { args: { ...args, jobId } } }, @@ -291,7 +291,7 @@ export const QueuedZeroAheadSingleRunning: Story = { queue.runningTasks = [makeRunningTaskWithStart('running-1', 505, 20)] - exec.nodeProgressStatesByPrompt = { + exec.setNodeProgressStatesByPrompt({ p1: { '1': { value: 1, @@ -301,7 +301,7 @@ export const QueuedZeroAheadSingleRunning: Story = { prompt_id: 'p1' } } - } as any + } as any) return { args: { ...args, jobId } } }, @@ -339,7 +339,7 @@ export const QueuedZeroAheadMultiRunning: Story = { makeRunningTaskWithStart('running-b', 507, 10) ] - exec.nodeProgressStatesByPrompt = { + exec.setNodeProgressStatesByPrompt({ p1: { '1': { value: 2, @@ -358,7 +358,7 @@ export const QueuedZeroAheadMultiRunning: Story = { prompt_id: 'p2' } } - } as any + } as any) return { args: { ...args, jobId } } }, diff --git a/src/scripts/api.ts b/src/scripts/api.ts index 268aef8a2..657fab74a 100644 --- a/src/scripts/api.ts +++ b/src/scripts/api.ts @@ -187,12 +187,22 @@ type AsCustomEvents = { readonly [K in keyof T]: CustomEvent } +type ExecutingEventDetail = + | ExecutingWsMessage + | { + node: NodeId | null + display_node?: NodeId + prompt_id?: string | null + } + | NodeId + | null + /** Handles differing event and API signatures. */ type ApiToEventType = { [K in keyof T]: K extends 'status' ? StatusWsMessageStatus : K extends 'executing' - ? NodeId + ? ExecutingEventDetail : T[K] } @@ -646,10 +656,7 @@ export class ComfyApi extends EventTarget { this.dispatchCustomEvent('status', msg.data.status ?? null) break case 'executing': - this.dispatchCustomEvent( - 'executing', - msg.data.display_node || msg.data.node - ) + this.dispatchCustomEvent('executing', msg.data) break case 'execution_start': case 'execution_error': diff --git a/src/stores/executionStore.ts b/src/stores/executionStore.ts index b9f89c29a..1d211bdab 100644 --- a/src/stores/executionStore.ts +++ b/src/stores/executionStore.ts @@ -15,6 +15,7 @@ import type { import { useCanvasStore } from '@/renderer/core/canvas/canvasStore' import type { ExecutedWsMessage, + ExecutingWsMessage, ExecutionCachedWsMessage, ExecutionErrorWsMessage, ExecutionInterruptedWsMessage, @@ -46,6 +47,30 @@ interface QueuedPrompt { workflow?: ComfyWorkflow } +interface PromptExecutionState extends QueuedPrompt { + nodeProgressStates: Record + executingNodeProgress: ProgressWsMessage | null + isRunning: boolean + startedAt?: number + lastUpdatedAt: number +} + +type PromptProgress = { + totalNodes: number + executedNodes: number + progress: number +} + +type ExecutingEventDetail = + | ExecutingWsMessage + | { + node: NodeId | null + display_node?: NodeId + prompt_id?: string | null + } + | NodeId + | null + const subgraphNodeIdToSubgraph = (id: string, graph: LGraph | Subgraph) => { const node = graph.getNodeById(id) if (node?.isSubgraphNode()) return node.subgraph @@ -108,15 +133,10 @@ export const useExecutionStore = defineStore('execution', () => { const canvasStore = useCanvasStore() const clientId = ref(null) - const activePromptId = ref(null) - const queuedPrompts = ref>({}) + const promptStates = ref>({}) + const lastStartedPromptId = ref(null) const lastNodeErrors = ref | null>(null) const lastExecutionError = ref(null) - // This is the progress of all nodes in the currently executing workflow - const nodeProgressStates = ref>({}) - const nodeProgressStatesByPrompt = ref< - Record> - >({}) /** * Map of prompt_id to workflow ID for quick lookup across the app. @@ -154,6 +174,209 @@ export const useExecutionStore = defineStore('execution', () => { return mergedState } + const createPromptState = (startedAt?: number): PromptExecutionState => ({ + nodes: {}, + nodeProgressStates: {}, + executingNodeProgress: null, + isRunning: false, + startedAt, + lastUpdatedAt: startedAt ?? Date.now() + }) + + const getPromptState = ( + promptId: string | number | null | undefined + ): PromptExecutionState | undefined => { + if (!promptId) return undefined + return promptStates.value[String(promptId)] + } + + const ensurePromptState = ( + promptId: string, + startedAt?: number + ): PromptExecutionState => { + const existing = promptStates.value[promptId] + if (existing) { + if (startedAt && !existing.startedAt) { + existing.startedAt = startedAt + } + return existing + } + + const nextState = createPromptState(startedAt) + promptStates.value = { + ...promptStates.value, + [promptId]: nextState + } + return nextState + } + + const touchPromptState = ( + promptState: PromptExecutionState, + timestamp?: number + ) => { + promptState.lastUpdatedAt = timestamp ?? Date.now() + } + + const isPromptRunning = (promptState: PromptExecutionState) => + promptState.isRunning || + Object.values(promptState.nodeProgressStates).some( + (node) => node.state === 'running' + ) + + const getPromptSortTimestamp = (promptState: PromptExecutionState) => + promptState.startedAt ?? promptState.lastUpdatedAt ?? 0 + + const getMostRecentPromptId = (promptIds: string[]): string | null => { + let bestPromptId: string | null = null + let bestTimestamp = -1 + for (const promptId of promptIds) { + const state = promptStates.value[promptId] + if (!state) continue + const timestamp = getPromptSortTimestamp(state) + if (timestamp >= bestTimestamp) { + bestTimestamp = timestamp + bestPromptId = promptId + } + } + return bestPromptId + } + + const activeWorkflowId = computed(() => { + const activeWorkflow = workflowStore.activeWorkflow + return ( + activeWorkflow?.activeState?.id ?? + activeWorkflow?.initialState?.id ?? + null + ) + }) + + const applyNodeProgressStatesByPrompt = ( + next: Record> + ) => { + const nextStates: Record = {} + for (const [promptId, nodes] of Object.entries(next)) { + const current = promptStates.value[promptId] + const hasRunningNode = Object.values(nodes).some( + (node) => node.state === 'running' + ) + nextStates[promptId] = { + nodes: current?.nodes ?? {}, + workflow: current?.workflow, + nodeProgressStates: nodes, + executingNodeProgress: current?.executingNodeProgress ?? null, + isRunning: (current?.isRunning ?? false) || hasRunningNode, + startedAt: current?.startedAt, + lastUpdatedAt: Date.now() + } + } + promptStates.value = nextStates + } + + const nodeProgressStatesByPrompt = computed< + Record> + >({ + get: () => { + const result: Record> = {} + for (const [promptId, state] of Object.entries(promptStates.value)) { + result[promptId] = state.nodeProgressStates + } + return result + }, + set: (next) => applyNodeProgressStatesByPrompt(next) + }) + + const setNodeProgressStatesByPrompt = ( + next: Record> + ) => { + applyNodeProgressStatesByPrompt(next) + } + + const getPromptNodeProgressStates = ( + promptId: string | number | null | undefined + ): Record => + getPromptState(promptId)?.nodeProgressStates ?? {} + + const getPromptProgress = ( + promptId: string | number | null | undefined + ): PromptProgress | null => { + const promptState = getPromptState(promptId) + if (!promptState) return null + const totalNodes = Object.values(promptState.nodes).length + const executedNodes = Object.values(promptState.nodes).filter( + Boolean + ).length + return { + totalNodes, + executedNodes, + progress: totalNodes > 0 ? executedNodes / totalNodes : 0 + } + } + + const getWorkflowRunningPromptIds = (workflowId: string | null) => { + if (!workflowId) return [] + return Object.entries(promptStates.value) + .filter(([promptId, state]) => { + return ( + promptIdToWorkflowId.value.get(promptId) === workflowId && + isPromptRunning(state) + ) + }) + .map(([promptId]) => promptId) + } + + const getActivePromptForWorkflow = (workflowId: string | null) => { + const runningPrompts = getWorkflowRunningPromptIds(workflowId) + if (runningPrompts.length === 0) return null + return getMostRecentPromptId(runningPrompts) + } + + const runningPromptIds = computed(() => { + return Object.entries(promptStates.value) + .filter(([_, state]) => isPromptRunning(state)) + .map(([promptId]) => promptId) + }) + + const runningWorkflowCount = computed( + () => runningPromptIds.value.length + ) + + const activePromptId = computed(() => { + if (runningPromptIds.value.length === 0) return null + + const activeWorkflowPrompt = getActivePromptForWorkflow( + activeWorkflowId.value + ) + if (activeWorkflowPrompt) return activeWorkflowPrompt + + if ( + lastStartedPromptId.value && + runningPromptIds.value.includes(lastStartedPromptId.value) + ) { + return lastStartedPromptId.value + } + + return getMostRecentPromptId(runningPromptIds.value) + }) + + const activePrompt = computed(() => + getPromptState(activePromptId.value) + ) + + const nodeProgressStates = computed>(() => + getPromptNodeProgressStates(activePromptId.value) + ) + + const queuedPrompts = computed>(() => { + const result: Record = {} + for (const [promptId, state] of Object.entries(promptStates.value)) { + result[promptId] = { + nodes: state.nodes, + workflow: state.workflow + } + } + return result + }) + const nodeLocationProgressStates = computed< Record >(() => { @@ -211,34 +434,27 @@ export const useExecutionStore = defineStore('execution', () => { }) // This is the progress of the currently executing node (for backward compatibility) - const _executingNodeProgress = ref(null) + const _executingNodeProgress = computed(() => { + return getPromptState(activePromptId.value)?.executingNodeProgress ?? null + }) const executingNodeProgress = computed(() => _executingNodeProgress.value ? _executingNodeProgress.value.value / _executingNodeProgress.value.max : null ) - const activePrompt = computed( - () => queuedPrompts.value[activePromptId.value ?? ''] - ) - const totalNodesToExecute = computed(() => { - if (!activePrompt.value) return 0 - return Object.values(activePrompt.value.nodes).length + return getPromptProgress(activePromptId.value)?.totalNodes ?? 0 }) - const isIdle = computed(() => !activePromptId.value) + const isIdle = computed(() => runningPromptIds.value.length === 0) const nodesExecuted = computed(() => { - if (!activePrompt.value) return 0 - return Object.values(activePrompt.value.nodes).filter(Boolean).length + return getPromptProgress(activePromptId.value)?.executedNodes ?? 0 }) const executionProgress = computed(() => { - if (!activePrompt.value) return 0 - const total = totalNodesToExecute.value - const done = nodesExecuted.value - return total > 0 ? done / total : 0 + return getPromptProgress(activePromptId.value)?.progress ?? 0 }) const lastExecutionErrorNodeLocatorId = computed(() => { @@ -286,62 +502,76 @@ export const useExecutionStore = defineStore('execution', () => { function handleExecutionStart(e: CustomEvent) { lastExecutionError.value = null - activePromptId.value = e.detail.prompt_id - queuedPrompts.value[activePromptId.value] ??= { nodes: {} } - clearInitializationByPromptId(activePromptId.value) + const promptId = e.detail.prompt_id + const promptState = ensurePromptState(promptId, e.detail.timestamp) + promptState.isRunning = true + promptState.executingNodeProgress = null + touchPromptState(promptState, e.detail.timestamp) + lastStartedPromptId.value = promptId + clearInitializationByPromptId(promptId) } function handleExecutionCached(e: CustomEvent) { - if (!activePrompt.value) return + const promptState = ensurePromptState(e.detail.prompt_id) for (const n of e.detail.nodes) { - activePrompt.value.nodes[n] = true + promptState.nodes[n] = true } + touchPromptState(promptState) } function handleExecutionInterrupted( e: CustomEvent ) { const pid = e.detail.prompt_id - if (activePromptId.value) - clearInitializationByPromptId(activePromptId.value) + clearInitializationByPromptId(pid) resetExecutionState(pid) } function handleExecuted(e: CustomEvent) { - if (!activePrompt.value) return - activePrompt.value.nodes[e.detail.node] = true + const promptState = ensurePromptState(e.detail.prompt_id) + promptState.nodes[e.detail.node] = true + touchPromptState(promptState) } function handleExecutionSuccess(e: CustomEvent) { - if (isCloud && activePromptId.value) { + if (isCloud) { useTelemetry()?.trackExecutionSuccess({ - jobId: activePromptId.value + jobId: e.detail.prompt_id }) } const pid = e.detail.prompt_id resetExecutionState(pid) } - function handleExecuting(e: CustomEvent): void { - // Clear the current node progress when a new node starts executing - _executingNodeProgress.value = null + function handleExecuting(e: CustomEvent): void { + const detail = e.detail + const promptId = + detail && typeof detail === 'object' + ? detail.prompt_id + : (activePromptId.value ?? lastStartedPromptId.value) + if (!promptId) return - if (!activePrompt.value) return + const nodeId = + detail && typeof detail === 'object' + ? (detail.display_node ?? detail.node ?? null) + : detail - // Update the executing nodes list - if (typeof e.detail !== 'string') { - if (activePromptId.value) { - delete queuedPrompts.value[activePromptId.value] - } - activePromptId.value = null + const promptState = ensurePromptState(String(promptId)) + promptState.executingNodeProgress = null + const hasNode = nodeId !== null && nodeId !== undefined + promptState.isRunning = promptState.isRunning || hasNode + if (!hasNode) { + promptState.isRunning = false } + touchPromptState(promptState) } function handleProgressState(e: CustomEvent) { const { nodes, prompt_id: pid } = e.detail + const promptState = ensurePromptState(pid) // Revoke previews for nodes that are starting to execute - const previousForPrompt = nodeProgressStatesByPrompt.value[pid] || {} + const previousForPrompt = promptState.nodeProgressStates for (const nodeId in nodes) { const nodeState = nodes[nodeId] if (nodeState.state === 'running' && !previousForPrompt[nodeId]) { @@ -355,16 +585,18 @@ export const useExecutionStore = defineStore('execution', () => { } // Update the progress states for all nodes - nodeProgressStatesByPrompt.value = { - ...nodeProgressStatesByPrompt.value, - [pid]: nodes + promptState.nodeProgressStates = nodes + if (Object.values(nodes).some((node) => node.state === 'running')) { + promptState.isRunning = true } - nodeProgressStates.value = nodes + touchPromptState(promptState) - // If we have progress for the currently executing node, update it for backwards compatibility - if (executingNodeId.value && nodes[executingNodeId.value]) { - const nodeState = nodes[executingNodeId.value] - _executingNodeProgress.value = { + const runningNodeId = Object.entries(nodes).find( + ([, nodeState]) => nodeState.state === 'running' + )?.[0] + if (runningNodeId) { + const nodeState = nodes[runningNodeId] + promptState.executingNodeProgress = { value: nodeState.value, max: nodeState.max, prompt_id: nodeState.prompt_id, @@ -374,7 +606,10 @@ export const useExecutionStore = defineStore('execution', () => { } function handleProgress(e: CustomEvent) { - _executingNodeProgress.value = e.detail + const promptState = ensurePromptState(e.detail.prompt_id) + promptState.executingNodeProgress = e.detail + promptState.isRunning = true + touchPromptState(promptState) } function handleStatus() { @@ -438,18 +673,14 @@ export const useExecutionStore = defineStore('execution', () => { * Reset execution-related state after a run completes or is stopped. */ function resetExecutionState(pid?: string | null) { - nodeProgressStates.value = {} const promptId = pid ?? activePromptId.value ?? null - if (promptId) { - const map = { ...nodeProgressStatesByPrompt.value } - delete map[promptId] - nodeProgressStatesByPrompt.value = map + if (!promptId) return + const nextStates = { ...promptStates.value } + delete nextStates[promptId] + promptStates.value = nextStates + if (lastStartedPromptId.value === promptId) { + lastStartedPromptId.value = null } - if (activePromptId.value) { - delete queuedPrompts.value[activePromptId.value] - } - activePromptId.value = null - _executingNodeProgress.value = null } function getNodeIdIfExecuting(nodeId: string | number) { @@ -480,16 +711,16 @@ export const useExecutionStore = defineStore('execution', () => { id: string workflow: ComfyWorkflow }) { - queuedPrompts.value[id] ??= { nodes: {} } - const queuedPrompt = queuedPrompts.value[id] - queuedPrompt.nodes = { + const promptState = ensurePromptState(id) + promptState.nodes = { ...nodes.reduce((p: Record, n) => { p[n] = false return p }, {}), - ...queuedPrompt.nodes + ...promptState.nodes } - queuedPrompt.workflow = workflow + promptState.workflow = workflow + touchPromptState(promptState) const wid = workflow?.activeState?.id ?? workflow?.initialState?.id if (wid) { promptIdToWorkflowId.value.set(String(id), String(wid)) @@ -519,22 +750,6 @@ export const useExecutionStore = defineStore('execution', () => { return executionId } - const runningPromptIds = computed(() => { - const result: string[] = [] - for (const [pid, nodes] of Object.entries( - nodeProgressStatesByPrompt.value - )) { - if (Object.values(nodes).some((n) => n.state === 'running')) { - result.push(pid) - } - } - return result - }) - - const runningWorkflowCount = computed( - () => runningPromptIds.value.length - ) - /** Map of node errors indexed by locator ID. */ const nodeErrorsByLocatorId = computed>( () => { @@ -633,6 +848,7 @@ export const useExecutionStore = defineStore('execution', () => { isIdle, clientId, activePromptId, + promptStates, queuedPrompts, lastNodeErrors, lastExecutionError, @@ -648,6 +864,11 @@ export const useExecutionStore = defineStore('execution', () => { nodeProgressStates, nodeLocationProgressStates, nodeProgressStatesByPrompt, + setNodeProgressStatesByPrompt, + getPromptProgress, + getPromptNodeProgressStates, + getWorkflowRunningPromptIds, + getActivePromptForWorkflow, runningPromptIds, runningWorkflowCount, initializingPromptIds,