Compare commits

...

1 Commits

Author SHA1 Message Date
Hunter Senft-Grupp
a7da951b90 feat: support parallel job execution in executionStore
- handleExecutionStart: don't clobber activeJobId when another job is running,
  only switch if idle or if the new job matches the current canvas workflow
- handleProgressState: only update nodeProgressStates for the active job,
  always update per-job state
- resetExecutionState: only clear/promote when the active job finishes,
  promote next running job if available
2026-02-22 02:47:06 -05:00

View File

@@ -215,9 +215,26 @@ export const useExecutionStore = defineStore('execution', () => {
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
executionErrorStore.clearAllErrors()
activeJobId.value = e.detail.prompt_id
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
const jobId = e.detail.prompt_id
queuedJobs.value[jobId] ??= { nodes: {} }
clearInitializationByJobId(jobId)
// Only set activeJobId if idle or if this job's workflow matches the current canvas
if (!activeJobId.value) {
activeJobId.value = jobId
} else {
const newJobWorkflowId = jobIdToWorkflowId.value.get(jobId)
const currentWorkflow = workflowStore.activeWorkflow
if (
newJobWorkflowId &&
currentWorkflow &&
String(
currentWorkflow.activeState?.id ?? currentWorkflow.initialState?.id
) === newJobWorkflowId
) {
activeJobId.value = jobId
}
}
}
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
@@ -282,12 +299,16 @@ export const useExecutionStore = defineStore('execution', () => {
}
}
// Update the progress states for all nodes
// Update per-job progress (always)
nodeProgressStatesByJob.value = {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
nodeProgressStates.value = nodes
// Only update the "active" progress states if this is the active job
if (jobId === activeJobId.value) {
nodeProgressStates.value = nodes
}
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
@@ -424,20 +445,30 @@ export const useExecutionStore = defineStore('execution', () => {
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: string | null) {
nodeProgressStates.value = {}
const jobId = jobIdParam ?? activeJobId.value ?? null
if (jobId) {
const map = { ...nodeProgressStatesByJob.value }
delete map[jobId]
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
delete queuedJobs.value[jobId]
}
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
// Only clear active state if this was the active job
if (jobId && jobId === activeJobId.value) {
// Promote next running job if any
const nextRunning = runningJobIds.value.find((id) => id !== jobId)
if (nextRunning) {
activeJobId.value = nextRunning
nodeProgressStates.value =
nodeProgressStatesByJob.value[nextRunning] ?? {}
} else {
activeJobId.value = null
nodeProgressStates.value = {}
_executingNodeProgress.value = null
}
executionErrorStore.clearPromptError()
}
activeJobId.value = null
_executingNodeProgress.value = null
executionErrorStore.clearPromptError()
}
function getNodeIdIfExecuting(nodeId: string | number) {