mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-02-02 14:27:40 +00:00
WIP
This commit is contained in:
43
initial-report.md
Normal file
43
initial-report.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Parallel Workflow Execution - Initial Findings
|
||||
|
||||
## Cloud PR 1743 behavior
|
||||
|
||||
- The PR enables parallel execution by making the per-user concurrency limit
|
||||
dynamic: `DispatchConfig.MaxConcurrentJobsPerUser` is added and read at runtime.
|
||||
- Dispatcher passes the runtime value into the state machine via
|
||||
`TriggerEventGloballyWithArgs`, so changes apply without restart.
|
||||
- State machine uses the runtime limit to move jobs from `queued_limited` to
|
||||
`queued_waiting`, which is the gate for concurrent execution.
|
||||
- Environment defaults set local/ephemeral/dev to 5 and staging/prod to 1.
|
||||
|
||||
Key references:
|
||||
- `common/dynamicconfig/types.go`
|
||||
- `services/dispatcher/server/services/job_dispatch/dispatcher.go`
|
||||
- `common/jobstate/state.go`
|
||||
- `common/dynamicconfig/defaults/ephemeral.json`
|
||||
- `infrastructure/dynamicconfig/prod/config.json`
|
||||
- `infrastructure/dynamicconfig/staging/config.json`
|
||||
- `infrastructure/dynamicconfig/dev/config.json`
|
||||
|
||||
## Frontend audit notes (current behavior)
|
||||
|
||||
- Queue fetching already supports multiple running/pending jobs via `/jobs`,
|
||||
mapping to `Running`/`Pending` lists.
|
||||
- `src/platform/remote/comfyui/jobs/fetchJobs.ts`
|
||||
- `src/stores/queueStore.ts`
|
||||
- The concurrency indicator is based on `runningWorkflowCount`, which counts
|
||||
prompts with running nodes and should reflect parallel execution.
|
||||
- `src/stores/executionStore.ts`
|
||||
- `src/components/queue/QueueProgressOverlay.vue`
|
||||
- The progress overlay is single-prompt focused:
|
||||
- `executionStore.activePromptId` tracks one prompt.
|
||||
- `nodeProgressStates` is overwritten per `progress_state`, so the progress
|
||||
bar and current node are effectively “last prompt wins.”
|
||||
- `src/stores/executionStore.ts`
|
||||
- `src/components/queue/QueueProgressOverlay.vue`
|
||||
|
||||
## Implication
|
||||
|
||||
If the frontend is expected to show per-job progress for multiple concurrent
|
||||
workflows, the execution store and progress overlay will need to evolve beyond
|
||||
the single-active-prompt assumption.
|
||||
107
plan.md
Normal file
107
plan.md
Normal file
@@ -0,0 +1,107 @@
|
||||
# Parallel Execution Implementation Plan
|
||||
|
||||
## Goals
|
||||
- Support multiple simultaneous workflow executions across:
|
||||
- Separate browser tabs (multiple clients).
|
||||
- In-app workflow tabs (multiple workflows).
|
||||
- Show correct progress, outputs, and errors per workflow/job.
|
||||
- Avoid regressions for single-run behavior and existing UI.
|
||||
|
||||
## Non-goals (initial scope)
|
||||
- Redesign the queue UI beyond enabling per-job progress indicators.
|
||||
- New backend features beyond prompt context for WS events (see dependencies).
|
||||
|
||||
## Dependencies / coordination (verified vs `../cloud-3` + `../backend-3`)
|
||||
- Prompt-scoped data is already available for core execution events:
|
||||
- `execution_start`, `execution_success`, `execution_error`, `execution_interrupted`,
|
||||
`executed`, `progress`, `progress_state`, and `executing` include `prompt_id` in
|
||||
`../backend-3` and are forwarded unchanged through `../cloud-3`.
|
||||
- Frontend currently *drops* `prompt_id` for `executing`; this can be fixed
|
||||
without backend changes.
|
||||
- `progress_text` is a binary message containing only `nodeId` + text in
|
||||
`../backend-3` (`send_progress_text`); `../cloud-3` forwards it as raw binary.
|
||||
- Per-prompt progress text is **not possible** without backend changes.
|
||||
- If we want it, we need to extend ComfyUI/backend to include `prompt_id` in
|
||||
the binary payload or add a parallel JSON event.
|
||||
- `b_preview_with_metadata` includes `prompt_id` and is forwarded as binary; usable
|
||||
as-is for prompt/workflow scoping.
|
||||
- Decide whether UI should filter by `client_id` (current tab only) or show all
|
||||
user jobs; document and apply consistently.
|
||||
|
||||
## Phase 1: Data model and execution state
|
||||
1. Refactor execution store to be prompt-centric:
|
||||
- Replace `activePromptId` single value with:
|
||||
- `promptStates: Record<promptId, PromptExecutionState>`.
|
||||
- `runningPromptIds` derived from prompt states.
|
||||
- `promptIdToWorkflowId` remains the primary routing map.
|
||||
- Provide selectors:
|
||||
- `getPromptProgress(promptId)`
|
||||
- `getPromptNodeProgressStates(promptId)`
|
||||
- `getWorkflowRunningPromptIds(workflowId)`
|
||||
- `getActivePromptForWorkflow(workflowId)` (if needed)
|
||||
- Redefine `isIdle` as `runningPromptIds.length === 0`.
|
||||
- Keep backward-compat getters but scope to:
|
||||
- active workflow tab, or
|
||||
- "most recently started prompt".
|
||||
|
||||
2. Update event handlers to write prompt-scoped state:
|
||||
- `execution_start` creates prompt state.
|
||||
- `progress_state` merges into `promptStates[promptId].nodeProgressStates`.
|
||||
- `execution_success/error/interrupted` remove prompt state only for that id.
|
||||
- Preserve initialization tracking per prompt.
|
||||
|
||||
## Phase 2: Prompt-aware node progress and canvas updates
|
||||
1. Update graph/node progress to be scoped to the active workflow tab:
|
||||
- Derive `nodeLocationProgressStates` from the prompt(s) mapped to the
|
||||
active workflow only.
|
||||
- Ensure `GraphCanvas.vue` applies progress for active workflow only.
|
||||
2. Update vue-node execution composables to use workflow-scoped progress.
|
||||
3. Update `groupNode` progress rendering to use prompt-scoped state.
|
||||
|
||||
## Phase 3: Outputs and previews isolation
|
||||
1. Introduce per-workflow output maps in `imagePreviewStore`:
|
||||
- `outputsByWorkflowId[workflowId][nodeLocatorId]`.
|
||||
- `previewsByWorkflowId[workflowId][nodeLocatorId]`.
|
||||
2. When switching active workflow tab:
|
||||
- Swap `app.nodeOutputs` and `app.nodePreviewImages` to the selected map.
|
||||
3. Update `executed` and `b_preview_with_metadata` handlers to:
|
||||
- Use `prompt_id -> workflowId` mapping to store outputs/previews in the
|
||||
correct workflow bucket.
|
||||
4. Update `ChangeTracker` and history loading paths to restore per-workflow
|
||||
outputs without overwriting other workflows.
|
||||
|
||||
## Phase 4: UI updates for multi-run visibility
|
||||
1. Queue overlay:
|
||||
- `useQueueProgress` to compute per-job progress for all running tasks.
|
||||
- `useJobList` to attach progress to each running item, not just one.
|
||||
- `QueueOverlayActive` to show aggregated or multi-job state.
|
||||
2. Actionbar interrupt:
|
||||
- Route interrupt to prompt(s) for the active workflow tab.
|
||||
- Optionally add a "stop all running" action.
|
||||
3. Browser tab title + favicon:
|
||||
- Use aggregate progress (e.g., max or average of running jobs) or show
|
||||
count of running jobs with a generic progress indicator.
|
||||
|
||||
## Phase 5: Browser-tab concurrency policy
|
||||
1. Decide and implement filtering:
|
||||
- Option A: show only jobs for the current `client_id` in the UI.
|
||||
- Option B: show all jobs for the user, but mark which client started them.
|
||||
2. Apply consistent filtering in:
|
||||
- queue overlay,
|
||||
- completion summary,
|
||||
- progress favicon/title.
|
||||
|
||||
## Phase 6: Tests and validation
|
||||
- Unit tests:
|
||||
- `executionStore` selectors and lifecycle per prompt.
|
||||
- `useQueueProgress` and `useJobList` showing per-job progress.
|
||||
- Update existing stories/tests that assume single `activePromptId`.
|
||||
- Manual validation checklist:
|
||||
- Two workflows running in two in-app tabs: progress and outputs isolated.
|
||||
- Two browser tabs running distinct workflows: no cross-talk in UI.
|
||||
- Interrupt from actionbar affects intended prompt(s).
|
||||
|
||||
## Phase 7: Rollout and cleanup
|
||||
- Remove deprecated single-prompt fields after migration is stable.
|
||||
- Update documentation/comments where prompt-scoped behavior is required.
|
||||
- Coordinate backend/WS schema changes with `../backend-3` and `../cloud-3`.
|
||||
@@ -34,7 +34,7 @@ function resetStores() {
|
||||
queue.runningTasks = []
|
||||
queue.historyTasks = []
|
||||
|
||||
exec.nodeProgressStatesByPrompt = {}
|
||||
exec.setNodeProgressStatesByPrompt({})
|
||||
}
|
||||
|
||||
function makePendingTask(
|
||||
@@ -146,7 +146,7 @@ export const Queued: Story = {
|
||||
// Queued at (in metadata on prompt[4])
|
||||
|
||||
// One running workflow
|
||||
exec.nodeProgressStatesByPrompt = {
|
||||
exec.setNodeProgressStatesByPrompt({
|
||||
p1: {
|
||||
'1': {
|
||||
value: 1,
|
||||
@@ -156,7 +156,7 @@ export const Queued: Story = {
|
||||
prompt_id: 'p1'
|
||||
}
|
||||
}
|
||||
} as any
|
||||
} as any)
|
||||
|
||||
return { args: { ...args, jobId } }
|
||||
},
|
||||
@@ -196,7 +196,7 @@ export const QueuedParallel: Story = {
|
||||
]
|
||||
|
||||
// Two parallel workflows running
|
||||
exec.nodeProgressStatesByPrompt = {
|
||||
exec.setNodeProgressStatesByPrompt({
|
||||
p1: {
|
||||
'1': {
|
||||
value: 1,
|
||||
@@ -215,7 +215,7 @@ export const QueuedParallel: Story = {
|
||||
prompt_id: 'p2'
|
||||
}
|
||||
}
|
||||
} as any
|
||||
} as any)
|
||||
|
||||
return { args: { ...args, jobId } }
|
||||
},
|
||||
@@ -246,7 +246,7 @@ export const Running: Story = {
|
||||
makeHistoryTask('hist-r3', 252, 60, true)
|
||||
]
|
||||
|
||||
exec.nodeProgressStatesByPrompt = {
|
||||
exec.setNodeProgressStatesByPrompt({
|
||||
p1: {
|
||||
'1': {
|
||||
value: 5,
|
||||
@@ -256,7 +256,7 @@ export const Running: Story = {
|
||||
prompt_id: 'p1'
|
||||
}
|
||||
}
|
||||
} as any
|
||||
} as any)
|
||||
|
||||
return { args: { ...args, jobId } }
|
||||
},
|
||||
@@ -291,7 +291,7 @@ export const QueuedZeroAheadSingleRunning: Story = {
|
||||
|
||||
queue.runningTasks = [makeRunningTaskWithStart('running-1', 505, 20)]
|
||||
|
||||
exec.nodeProgressStatesByPrompt = {
|
||||
exec.setNodeProgressStatesByPrompt({
|
||||
p1: {
|
||||
'1': {
|
||||
value: 1,
|
||||
@@ -301,7 +301,7 @@ export const QueuedZeroAheadSingleRunning: Story = {
|
||||
prompt_id: 'p1'
|
||||
}
|
||||
}
|
||||
} as any
|
||||
} as any)
|
||||
|
||||
return { args: { ...args, jobId } }
|
||||
},
|
||||
@@ -339,7 +339,7 @@ export const QueuedZeroAheadMultiRunning: Story = {
|
||||
makeRunningTaskWithStart('running-b', 507, 10)
|
||||
]
|
||||
|
||||
exec.nodeProgressStatesByPrompt = {
|
||||
exec.setNodeProgressStatesByPrompt({
|
||||
p1: {
|
||||
'1': {
|
||||
value: 2,
|
||||
@@ -358,7 +358,7 @@ export const QueuedZeroAheadMultiRunning: Story = {
|
||||
prompt_id: 'p2'
|
||||
}
|
||||
}
|
||||
} as any
|
||||
} as any)
|
||||
|
||||
return { args: { ...args, jobId } }
|
||||
},
|
||||
|
||||
@@ -187,12 +187,22 @@ type AsCustomEvents<T> = {
|
||||
readonly [K in keyof T]: CustomEvent<T[K]>
|
||||
}
|
||||
|
||||
type ExecutingEventDetail =
|
||||
| ExecutingWsMessage
|
||||
| {
|
||||
node: NodeId | null
|
||||
display_node?: NodeId
|
||||
prompt_id?: string | null
|
||||
}
|
||||
| NodeId
|
||||
| null
|
||||
|
||||
/** Handles differing event and API signatures. */
|
||||
type ApiToEventType<T = ApiCalls> = {
|
||||
[K in keyof T]: K extends 'status'
|
||||
? StatusWsMessageStatus
|
||||
: K extends 'executing'
|
||||
? NodeId
|
||||
? ExecutingEventDetail
|
||||
: T[K]
|
||||
}
|
||||
|
||||
@@ -646,10 +656,7 @@ export class ComfyApi extends EventTarget {
|
||||
this.dispatchCustomEvent('status', msg.data.status ?? null)
|
||||
break
|
||||
case 'executing':
|
||||
this.dispatchCustomEvent(
|
||||
'executing',
|
||||
msg.data.display_node || msg.data.node
|
||||
)
|
||||
this.dispatchCustomEvent('executing', msg.data)
|
||||
break
|
||||
case 'execution_start':
|
||||
case 'execution_error':
|
||||
|
||||
@@ -15,6 +15,7 @@ import type {
|
||||
import { useCanvasStore } from '@/renderer/core/canvas/canvasStore'
|
||||
import type {
|
||||
ExecutedWsMessage,
|
||||
ExecutingWsMessage,
|
||||
ExecutionCachedWsMessage,
|
||||
ExecutionErrorWsMessage,
|
||||
ExecutionInterruptedWsMessage,
|
||||
@@ -46,6 +47,30 @@ interface QueuedPrompt {
|
||||
workflow?: ComfyWorkflow
|
||||
}
|
||||
|
||||
interface PromptExecutionState extends QueuedPrompt {
|
||||
nodeProgressStates: Record<string, NodeProgressState>
|
||||
executingNodeProgress: ProgressWsMessage | null
|
||||
isRunning: boolean
|
||||
startedAt?: number
|
||||
lastUpdatedAt: number
|
||||
}
|
||||
|
||||
type PromptProgress = {
|
||||
totalNodes: number
|
||||
executedNodes: number
|
||||
progress: number
|
||||
}
|
||||
|
||||
type ExecutingEventDetail =
|
||||
| ExecutingWsMessage
|
||||
| {
|
||||
node: NodeId | null
|
||||
display_node?: NodeId
|
||||
prompt_id?: string | null
|
||||
}
|
||||
| NodeId
|
||||
| null
|
||||
|
||||
const subgraphNodeIdToSubgraph = (id: string, graph: LGraph | Subgraph) => {
|
||||
const node = graph.getNodeById(id)
|
||||
if (node?.isSubgraphNode()) return node.subgraph
|
||||
@@ -108,15 +133,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
const canvasStore = useCanvasStore()
|
||||
|
||||
const clientId = ref<string | null>(null)
|
||||
const activePromptId = ref<string | null>(null)
|
||||
const queuedPrompts = ref<Record<NodeId, QueuedPrompt>>({})
|
||||
const promptStates = ref<Record<string, PromptExecutionState>>({})
|
||||
const lastStartedPromptId = ref<string | null>(null)
|
||||
const lastNodeErrors = ref<Record<NodeId, NodeError> | null>(null)
|
||||
const lastExecutionError = ref<ExecutionErrorWsMessage | null>(null)
|
||||
// This is the progress of all nodes in the currently executing workflow
|
||||
const nodeProgressStates = ref<Record<string, NodeProgressState>>({})
|
||||
const nodeProgressStatesByPrompt = ref<
|
||||
Record<string, Record<string, NodeProgressState>>
|
||||
>({})
|
||||
|
||||
/**
|
||||
* Map of prompt_id to workflow ID for quick lookup across the app.
|
||||
@@ -154,6 +174,209 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return mergedState
|
||||
}
|
||||
|
||||
const createPromptState = (startedAt?: number): PromptExecutionState => ({
|
||||
nodes: {},
|
||||
nodeProgressStates: {},
|
||||
executingNodeProgress: null,
|
||||
isRunning: false,
|
||||
startedAt,
|
||||
lastUpdatedAt: startedAt ?? Date.now()
|
||||
})
|
||||
|
||||
const getPromptState = (
|
||||
promptId: string | number | null | undefined
|
||||
): PromptExecutionState | undefined => {
|
||||
if (!promptId) return undefined
|
||||
return promptStates.value[String(promptId)]
|
||||
}
|
||||
|
||||
const ensurePromptState = (
|
||||
promptId: string,
|
||||
startedAt?: number
|
||||
): PromptExecutionState => {
|
||||
const existing = promptStates.value[promptId]
|
||||
if (existing) {
|
||||
if (startedAt && !existing.startedAt) {
|
||||
existing.startedAt = startedAt
|
||||
}
|
||||
return existing
|
||||
}
|
||||
|
||||
const nextState = createPromptState(startedAt)
|
||||
promptStates.value = {
|
||||
...promptStates.value,
|
||||
[promptId]: nextState
|
||||
}
|
||||
return nextState
|
||||
}
|
||||
|
||||
const touchPromptState = (
|
||||
promptState: PromptExecutionState,
|
||||
timestamp?: number
|
||||
) => {
|
||||
promptState.lastUpdatedAt = timestamp ?? Date.now()
|
||||
}
|
||||
|
||||
const isPromptRunning = (promptState: PromptExecutionState) =>
|
||||
promptState.isRunning ||
|
||||
Object.values(promptState.nodeProgressStates).some(
|
||||
(node) => node.state === 'running'
|
||||
)
|
||||
|
||||
const getPromptSortTimestamp = (promptState: PromptExecutionState) =>
|
||||
promptState.startedAt ?? promptState.lastUpdatedAt ?? 0
|
||||
|
||||
const getMostRecentPromptId = (promptIds: string[]): string | null => {
|
||||
let bestPromptId: string | null = null
|
||||
let bestTimestamp = -1
|
||||
for (const promptId of promptIds) {
|
||||
const state = promptStates.value[promptId]
|
||||
if (!state) continue
|
||||
const timestamp = getPromptSortTimestamp(state)
|
||||
if (timestamp >= bestTimestamp) {
|
||||
bestTimestamp = timestamp
|
||||
bestPromptId = promptId
|
||||
}
|
||||
}
|
||||
return bestPromptId
|
||||
}
|
||||
|
||||
const activeWorkflowId = computed(() => {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
return (
|
||||
activeWorkflow?.activeState?.id ??
|
||||
activeWorkflow?.initialState?.id ??
|
||||
null
|
||||
)
|
||||
})
|
||||
|
||||
const applyNodeProgressStatesByPrompt = (
|
||||
next: Record<string, Record<string, NodeProgressState>>
|
||||
) => {
|
||||
const nextStates: Record<string, PromptExecutionState> = {}
|
||||
for (const [promptId, nodes] of Object.entries(next)) {
|
||||
const current = promptStates.value[promptId]
|
||||
const hasRunningNode = Object.values(nodes).some(
|
||||
(node) => node.state === 'running'
|
||||
)
|
||||
nextStates[promptId] = {
|
||||
nodes: current?.nodes ?? {},
|
||||
workflow: current?.workflow,
|
||||
nodeProgressStates: nodes,
|
||||
executingNodeProgress: current?.executingNodeProgress ?? null,
|
||||
isRunning: (current?.isRunning ?? false) || hasRunningNode,
|
||||
startedAt: current?.startedAt,
|
||||
lastUpdatedAt: Date.now()
|
||||
}
|
||||
}
|
||||
promptStates.value = nextStates
|
||||
}
|
||||
|
||||
const nodeProgressStatesByPrompt = computed<
|
||||
Record<string, Record<string, NodeProgressState>>
|
||||
>({
|
||||
get: () => {
|
||||
const result: Record<string, Record<string, NodeProgressState>> = {}
|
||||
for (const [promptId, state] of Object.entries(promptStates.value)) {
|
||||
result[promptId] = state.nodeProgressStates
|
||||
}
|
||||
return result
|
||||
},
|
||||
set: (next) => applyNodeProgressStatesByPrompt(next)
|
||||
})
|
||||
|
||||
const setNodeProgressStatesByPrompt = (
|
||||
next: Record<string, Record<string, NodeProgressState>>
|
||||
) => {
|
||||
applyNodeProgressStatesByPrompt(next)
|
||||
}
|
||||
|
||||
const getPromptNodeProgressStates = (
|
||||
promptId: string | number | null | undefined
|
||||
): Record<string, NodeProgressState> =>
|
||||
getPromptState(promptId)?.nodeProgressStates ?? {}
|
||||
|
||||
const getPromptProgress = (
|
||||
promptId: string | number | null | undefined
|
||||
): PromptProgress | null => {
|
||||
const promptState = getPromptState(promptId)
|
||||
if (!promptState) return null
|
||||
const totalNodes = Object.values(promptState.nodes).length
|
||||
const executedNodes = Object.values(promptState.nodes).filter(
|
||||
Boolean
|
||||
).length
|
||||
return {
|
||||
totalNodes,
|
||||
executedNodes,
|
||||
progress: totalNodes > 0 ? executedNodes / totalNodes : 0
|
||||
}
|
||||
}
|
||||
|
||||
const getWorkflowRunningPromptIds = (workflowId: string | null) => {
|
||||
if (!workflowId) return []
|
||||
return Object.entries(promptStates.value)
|
||||
.filter(([promptId, state]) => {
|
||||
return (
|
||||
promptIdToWorkflowId.value.get(promptId) === workflowId &&
|
||||
isPromptRunning(state)
|
||||
)
|
||||
})
|
||||
.map(([promptId]) => promptId)
|
||||
}
|
||||
|
||||
const getActivePromptForWorkflow = (workflowId: string | null) => {
|
||||
const runningPrompts = getWorkflowRunningPromptIds(workflowId)
|
||||
if (runningPrompts.length === 0) return null
|
||||
return getMostRecentPromptId(runningPrompts)
|
||||
}
|
||||
|
||||
const runningPromptIds = computed<string[]>(() => {
|
||||
return Object.entries(promptStates.value)
|
||||
.filter(([_, state]) => isPromptRunning(state))
|
||||
.map(([promptId]) => promptId)
|
||||
})
|
||||
|
||||
const runningWorkflowCount = computed<number>(
|
||||
() => runningPromptIds.value.length
|
||||
)
|
||||
|
||||
const activePromptId = computed<string | null>(() => {
|
||||
if (runningPromptIds.value.length === 0) return null
|
||||
|
||||
const activeWorkflowPrompt = getActivePromptForWorkflow(
|
||||
activeWorkflowId.value
|
||||
)
|
||||
if (activeWorkflowPrompt) return activeWorkflowPrompt
|
||||
|
||||
if (
|
||||
lastStartedPromptId.value &&
|
||||
runningPromptIds.value.includes(lastStartedPromptId.value)
|
||||
) {
|
||||
return lastStartedPromptId.value
|
||||
}
|
||||
|
||||
return getMostRecentPromptId(runningPromptIds.value)
|
||||
})
|
||||
|
||||
const activePrompt = computed<PromptExecutionState | undefined>(() =>
|
||||
getPromptState(activePromptId.value)
|
||||
)
|
||||
|
||||
const nodeProgressStates = computed<Record<string, NodeProgressState>>(() =>
|
||||
getPromptNodeProgressStates(activePromptId.value)
|
||||
)
|
||||
|
||||
const queuedPrompts = computed<Record<NodeId, QueuedPrompt>>(() => {
|
||||
const result: Record<NodeId, QueuedPrompt> = {}
|
||||
for (const [promptId, state] of Object.entries(promptStates.value)) {
|
||||
result[promptId] = {
|
||||
nodes: state.nodes,
|
||||
workflow: state.workflow
|
||||
}
|
||||
}
|
||||
return result
|
||||
})
|
||||
|
||||
const nodeLocationProgressStates = computed<
|
||||
Record<NodeLocatorId, NodeProgressState>
|
||||
>(() => {
|
||||
@@ -211,34 +434,27 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
})
|
||||
|
||||
// This is the progress of the currently executing node (for backward compatibility)
|
||||
const _executingNodeProgress = ref<ProgressWsMessage | null>(null)
|
||||
const _executingNodeProgress = computed<ProgressWsMessage | null>(() => {
|
||||
return getPromptState(activePromptId.value)?.executingNodeProgress ?? null
|
||||
})
|
||||
const executingNodeProgress = computed(() =>
|
||||
_executingNodeProgress.value
|
||||
? _executingNodeProgress.value.value / _executingNodeProgress.value.max
|
||||
: null
|
||||
)
|
||||
|
||||
const activePrompt = computed<QueuedPrompt | undefined>(
|
||||
() => queuedPrompts.value[activePromptId.value ?? '']
|
||||
)
|
||||
|
||||
const totalNodesToExecute = computed<number>(() => {
|
||||
if (!activePrompt.value) return 0
|
||||
return Object.values(activePrompt.value.nodes).length
|
||||
return getPromptProgress(activePromptId.value)?.totalNodes ?? 0
|
||||
})
|
||||
|
||||
const isIdle = computed<boolean>(() => !activePromptId.value)
|
||||
const isIdle = computed<boolean>(() => runningPromptIds.value.length === 0)
|
||||
|
||||
const nodesExecuted = computed<number>(() => {
|
||||
if (!activePrompt.value) return 0
|
||||
return Object.values(activePrompt.value.nodes).filter(Boolean).length
|
||||
return getPromptProgress(activePromptId.value)?.executedNodes ?? 0
|
||||
})
|
||||
|
||||
const executionProgress = computed<number>(() => {
|
||||
if (!activePrompt.value) return 0
|
||||
const total = totalNodesToExecute.value
|
||||
const done = nodesExecuted.value
|
||||
return total > 0 ? done / total : 0
|
||||
return getPromptProgress(activePromptId.value)?.progress ?? 0
|
||||
})
|
||||
|
||||
const lastExecutionErrorNodeLocatorId = computed(() => {
|
||||
@@ -286,62 +502,76 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
|
||||
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
|
||||
lastExecutionError.value = null
|
||||
activePromptId.value = e.detail.prompt_id
|
||||
queuedPrompts.value[activePromptId.value] ??= { nodes: {} }
|
||||
clearInitializationByPromptId(activePromptId.value)
|
||||
const promptId = e.detail.prompt_id
|
||||
const promptState = ensurePromptState(promptId, e.detail.timestamp)
|
||||
promptState.isRunning = true
|
||||
promptState.executingNodeProgress = null
|
||||
touchPromptState(promptState, e.detail.timestamp)
|
||||
lastStartedPromptId.value = promptId
|
||||
clearInitializationByPromptId(promptId)
|
||||
}
|
||||
|
||||
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
if (!activePrompt.value) return
|
||||
const promptState = ensurePromptState(e.detail.prompt_id)
|
||||
for (const n of e.detail.nodes) {
|
||||
activePrompt.value.nodes[n] = true
|
||||
promptState.nodes[n] = true
|
||||
}
|
||||
touchPromptState(promptState)
|
||||
}
|
||||
|
||||
function handleExecutionInterrupted(
|
||||
e: CustomEvent<ExecutionInterruptedWsMessage>
|
||||
) {
|
||||
const pid = e.detail.prompt_id
|
||||
if (activePromptId.value)
|
||||
clearInitializationByPromptId(activePromptId.value)
|
||||
clearInitializationByPromptId(pid)
|
||||
resetExecutionState(pid)
|
||||
}
|
||||
|
||||
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
|
||||
if (!activePrompt.value) return
|
||||
activePrompt.value.nodes[e.detail.node] = true
|
||||
const promptState = ensurePromptState(e.detail.prompt_id)
|
||||
promptState.nodes[e.detail.node] = true
|
||||
touchPromptState(promptState)
|
||||
}
|
||||
|
||||
function handleExecutionSuccess(e: CustomEvent<ExecutionSuccessWsMessage>) {
|
||||
if (isCloud && activePromptId.value) {
|
||||
if (isCloud) {
|
||||
useTelemetry()?.trackExecutionSuccess({
|
||||
jobId: activePromptId.value
|
||||
jobId: e.detail.prompt_id
|
||||
})
|
||||
}
|
||||
const pid = e.detail.prompt_id
|
||||
resetExecutionState(pid)
|
||||
}
|
||||
|
||||
function handleExecuting(e: CustomEvent<NodeId | null>): void {
|
||||
// Clear the current node progress when a new node starts executing
|
||||
_executingNodeProgress.value = null
|
||||
function handleExecuting(e: CustomEvent<ExecutingEventDetail>): void {
|
||||
const detail = e.detail
|
||||
const promptId =
|
||||
detail && typeof detail === 'object'
|
||||
? detail.prompt_id
|
||||
: (activePromptId.value ?? lastStartedPromptId.value)
|
||||
if (!promptId) return
|
||||
|
||||
if (!activePrompt.value) return
|
||||
const nodeId =
|
||||
detail && typeof detail === 'object'
|
||||
? (detail.display_node ?? detail.node ?? null)
|
||||
: detail
|
||||
|
||||
// Update the executing nodes list
|
||||
if (typeof e.detail !== 'string') {
|
||||
if (activePromptId.value) {
|
||||
delete queuedPrompts.value[activePromptId.value]
|
||||
}
|
||||
activePromptId.value = null
|
||||
const promptState = ensurePromptState(String(promptId))
|
||||
promptState.executingNodeProgress = null
|
||||
const hasNode = nodeId !== null && nodeId !== undefined
|
||||
promptState.isRunning = promptState.isRunning || hasNode
|
||||
if (!hasNode) {
|
||||
promptState.isRunning = false
|
||||
}
|
||||
touchPromptState(promptState)
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: pid } = e.detail
|
||||
const promptState = ensurePromptState(pid)
|
||||
|
||||
// Revoke previews for nodes that are starting to execute
|
||||
const previousForPrompt = nodeProgressStatesByPrompt.value[pid] || {}
|
||||
const previousForPrompt = promptState.nodeProgressStates
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForPrompt[nodeId]) {
|
||||
@@ -355,16 +585,18 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
nodeProgressStatesByPrompt.value = {
|
||||
...nodeProgressStatesByPrompt.value,
|
||||
[pid]: nodes
|
||||
promptState.nodeProgressStates = nodes
|
||||
if (Object.values(nodes).some((node) => node.state === 'running')) {
|
||||
promptState.isRunning = true
|
||||
}
|
||||
nodeProgressStates.value = nodes
|
||||
touchPromptState(promptState)
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
const runningNodeId = Object.entries(nodes).find(
|
||||
([, nodeState]) => nodeState.state === 'running'
|
||||
)?.[0]
|
||||
if (runningNodeId) {
|
||||
const nodeState = nodes[runningNodeId]
|
||||
promptState.executingNodeProgress = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
@@ -374,7 +606,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
_executingNodeProgress.value = e.detail
|
||||
const promptState = ensurePromptState(e.detail.prompt_id)
|
||||
promptState.executingNodeProgress = e.detail
|
||||
promptState.isRunning = true
|
||||
touchPromptState(promptState)
|
||||
}
|
||||
|
||||
function handleStatus() {
|
||||
@@ -438,18 +673,14 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
* Reset execution-related state after a run completes or is stopped.
|
||||
*/
|
||||
function resetExecutionState(pid?: string | null) {
|
||||
nodeProgressStates.value = {}
|
||||
const promptId = pid ?? activePromptId.value ?? null
|
||||
if (promptId) {
|
||||
const map = { ...nodeProgressStatesByPrompt.value }
|
||||
delete map[promptId]
|
||||
nodeProgressStatesByPrompt.value = map
|
||||
if (!promptId) return
|
||||
const nextStates = { ...promptStates.value }
|
||||
delete nextStates[promptId]
|
||||
promptStates.value = nextStates
|
||||
if (lastStartedPromptId.value === promptId) {
|
||||
lastStartedPromptId.value = null
|
||||
}
|
||||
if (activePromptId.value) {
|
||||
delete queuedPrompts.value[activePromptId.value]
|
||||
}
|
||||
activePromptId.value = null
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
|
||||
function getNodeIdIfExecuting(nodeId: string | number) {
|
||||
@@ -480,16 +711,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
id: string
|
||||
workflow: ComfyWorkflow
|
||||
}) {
|
||||
queuedPrompts.value[id] ??= { nodes: {} }
|
||||
const queuedPrompt = queuedPrompts.value[id]
|
||||
queuedPrompt.nodes = {
|
||||
const promptState = ensurePromptState(id)
|
||||
promptState.nodes = {
|
||||
...nodes.reduce((p: Record<string, boolean>, n) => {
|
||||
p[n] = false
|
||||
return p
|
||||
}, {}),
|
||||
...queuedPrompt.nodes
|
||||
...promptState.nodes
|
||||
}
|
||||
queuedPrompt.workflow = workflow
|
||||
promptState.workflow = workflow
|
||||
touchPromptState(promptState)
|
||||
const wid = workflow?.activeState?.id ?? workflow?.initialState?.id
|
||||
if (wid) {
|
||||
promptIdToWorkflowId.value.set(String(id), String(wid))
|
||||
@@ -519,22 +750,6 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return executionId
|
||||
}
|
||||
|
||||
const runningPromptIds = computed<string[]>(() => {
|
||||
const result: string[] = []
|
||||
for (const [pid, nodes] of Object.entries(
|
||||
nodeProgressStatesByPrompt.value
|
||||
)) {
|
||||
if (Object.values(nodes).some((n) => n.state === 'running')) {
|
||||
result.push(pid)
|
||||
}
|
||||
}
|
||||
return result
|
||||
})
|
||||
|
||||
const runningWorkflowCount = computed<number>(
|
||||
() => runningPromptIds.value.length
|
||||
)
|
||||
|
||||
/** Map of node errors indexed by locator ID. */
|
||||
const nodeErrorsByLocatorId = computed<Record<NodeLocatorId, NodeError>>(
|
||||
() => {
|
||||
@@ -633,6 +848,7 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
isIdle,
|
||||
clientId,
|
||||
activePromptId,
|
||||
promptStates,
|
||||
queuedPrompts,
|
||||
lastNodeErrors,
|
||||
lastExecutionError,
|
||||
@@ -648,6 +864,11 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
nodeProgressStates,
|
||||
nodeLocationProgressStates,
|
||||
nodeProgressStatesByPrompt,
|
||||
setNodeProgressStatesByPrompt,
|
||||
getPromptProgress,
|
||||
getPromptNodeProgressStates,
|
||||
getWorkflowRunningPromptIds,
|
||||
getActivePromptForWorkflow,
|
||||
runningPromptIds,
|
||||
runningWorkflowCount,
|
||||
initializingPromptIds,
|
||||
|
||||
Reference in New Issue
Block a user