mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-04-20 14:30:41 +00:00
feat: support parallel job execution in executionStore
- handleExecutionStart: don't clobber activeJobId when another job is running, only switch if idle or if the new job matches the current canvas workflow - handleProgressState: only update nodeProgressStates for the active job, always update per-job state - resetExecutionState: only clear/promote when the active job finishes, promote next running job if available
This commit is contained in:
@@ -215,9 +215,26 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
|
||||
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
|
||||
executionErrorStore.clearAllErrors()
|
||||
activeJobId.value = e.detail.prompt_id
|
||||
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
|
||||
clearInitializationByJobId(activeJobId.value)
|
||||
const jobId = e.detail.prompt_id
|
||||
queuedJobs.value[jobId] ??= { nodes: {} }
|
||||
clearInitializationByJobId(jobId)
|
||||
|
||||
// Only set activeJobId if idle or if this job's workflow matches the current canvas
|
||||
if (!activeJobId.value) {
|
||||
activeJobId.value = jobId
|
||||
} else {
|
||||
const newJobWorkflowId = jobIdToWorkflowId.value.get(jobId)
|
||||
const currentWorkflow = workflowStore.activeWorkflow
|
||||
if (
|
||||
newJobWorkflowId &&
|
||||
currentWorkflow &&
|
||||
String(
|
||||
currentWorkflow.activeState?.id ?? currentWorkflow.initialState?.id
|
||||
) === newJobWorkflowId
|
||||
) {
|
||||
activeJobId.value = jobId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
@@ -282,12 +299,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
// Update per-job progress (always)
|
||||
nodeProgressStatesByJob.value = {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// Only update the "active" progress states if this is the active job
|
||||
if (jobId === activeJobId.value) {
|
||||
nodeProgressStates.value = nodes
|
||||
}
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
@@ -424,20 +445,30 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
* Reset execution-related state after a run completes or is stopped.
|
||||
*/
|
||||
function resetExecutionState(jobIdParam?: string | null) {
|
||||
nodeProgressStates.value = {}
|
||||
const jobId = jobIdParam ?? activeJobId.value ?? null
|
||||
if (jobId) {
|
||||
const map = { ...nodeProgressStatesByJob.value }
|
||||
delete map[jobId]
|
||||
nodeProgressStatesByJob.value = map
|
||||
useJobPreviewStore().clearPreview(jobId)
|
||||
delete queuedJobs.value[jobId]
|
||||
}
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
|
||||
// Only clear active state if this was the active job
|
||||
if (jobId && jobId === activeJobId.value) {
|
||||
// Promote next running job if any
|
||||
const nextRunning = runningJobIds.value.find((id) => id !== jobId)
|
||||
if (nextRunning) {
|
||||
activeJobId.value = nextRunning
|
||||
nodeProgressStates.value =
|
||||
nodeProgressStatesByJob.value[nextRunning] ?? {}
|
||||
} else {
|
||||
activeJobId.value = null
|
||||
nodeProgressStates.value = {}
|
||||
_executingNodeProgress.value = null
|
||||
}
|
||||
executionErrorStore.clearPromptError()
|
||||
}
|
||||
activeJobId.value = null
|
||||
_executingNodeProgress.value = null
|
||||
executionErrorStore.clearPromptError()
|
||||
}
|
||||
|
||||
function getNodeIdIfExecuting(nodeId: string | number) {
|
||||
|
||||
Reference in New Issue
Block a user