feat: add focusedJobId for concurrent job execution support

This commit is contained in:
bymyself
2026-03-12 04:29:55 -07:00
parent a00e4b6421
commit 55a73bcd0b
6 changed files with 428 additions and 23 deletions

View File

@@ -0,0 +1,51 @@
import { computed } from 'vue'
import { remoteConfig } from '@/platform/remoteConfig/remoteConfig'
import { useSettingStore } from '@/platform/settings/settingStore'
export function useConcurrentExecution() {
const settingStore = useSettingStore()
const isFeatureEnabled = computed(
() => remoteConfig.value.concurrent_execution_enabled === true
)
const isUserEnabled = computed(
() => settingStore.get('Comfy.Cloud.ConcurrentExecution') === true
)
const isConcurrentExecutionEnabled = computed(
() => isFeatureEnabled.value && isUserEnabled.value
)
const maxConcurrentJobs = computed(
() => remoteConfig.value.max_concurrent_jobs ?? 1
)
const hasSeenOnboarding = computed(
() =>
settingStore.get('Comfy.Cloud.ConcurrentExecution.OnboardingSeen') ===
true
)
async function setUserEnabled(enabled: boolean) {
await settingStore.set('Comfy.Cloud.ConcurrentExecution', enabled)
}
async function markOnboardingSeen() {
await settingStore.set(
'Comfy.Cloud.ConcurrentExecution.OnboardingSeen',
true
)
}
return {
isFeatureEnabled,
isUserEnabled,
isConcurrentExecutionEnabled,
maxConcurrentJobs,
hasSeenOnboarding,
setUserEnabled,
markOnboardingSeen
}
}

View File

@@ -55,4 +55,6 @@ export type RemoteConfig = {
comfyhub_upload_enabled?: boolean
comfyhub_profile_gate_enabled?: boolean
sentry_dsn?: string
max_concurrent_jobs?: number
concurrent_execution_enabled?: boolean
}

View File

@@ -1268,5 +1268,21 @@ export const CORE_SETTINGS: SettingParams[] = [
type: 'boolean',
defaultValue: false,
versionAdded: '1.42.0'
},
{
id: 'Comfy.Cloud.ConcurrentExecution',
name: 'Run jobs in parallel',
tooltip:
'When enabled, multiple workflow runs execute concurrently instead of queuing sequentially.',
type: isCloud ? 'boolean' : 'hidden',
defaultValue: true,
versionAdded: '1.42.0'
},
{
id: 'Comfy.Cloud.ConcurrentExecution.OnboardingSeen',
name: 'Concurrent execution onboarding dialog seen',
type: 'hidden',
defaultValue: false,
versionAdded: '1.42.0'
}
]

View File

@@ -466,7 +466,9 @@ const zSettings = z.object({
'Comfy.RightSidePanel.IsOpen': z.boolean(),
'Comfy.RightSidePanel.ShowErrorsTab': z.boolean(),
'Comfy.Node.AlwaysShowAdvancedWidgets': z.boolean(),
'LiteGraph.Group.SelectChildrenOnClick': z.boolean()
'LiteGraph.Group.SelectChildrenOnClick': z.boolean(),
'Comfy.Cloud.ConcurrentExecution': z.boolean(),
'Comfy.Cloud.ConcurrentExecution.OnboardingSeen': z.boolean()
})
export type EmbeddingsResponse = z.infer<typeof zEmbeddingsResponse>

View File

@@ -430,6 +430,298 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
})
})
describe('useExecutionStore - focusedJobId management', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('should initialize focusedJobId as null', () => {
expect(store.focusedJobId).toBeNull()
})
it('should set focusedJobId and update nodeProgressStates via setFocusedJob', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
},
'job-2': {
'node-2': {
value: 30,
max: 100,
state: 'running',
node_id: 'node-2',
prompt_id: 'job-2'
}
}
}
store.setFocusedJob('job-1')
expect(store.focusedJobId).toBe('job-1')
expect(store.nodeProgressStates).toEqual({
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
})
store.setFocusedJob('job-2')
expect(store.focusedJobId).toBe('job-2')
expect(store.nodeProgressStates).toEqual({
'node-2': {
value: 30,
max: 100,
state: 'running',
node_id: 'node-2',
prompt_id: 'job-2'
}
})
})
it('should clear nodeProgressStates when setFocusedJob is called with null', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
}
}
store.setFocusedJob('job-1')
expect(store.nodeProgressStates).not.toEqual({})
store.setFocusedJob(null)
expect(store.focusedJobId).toBeNull()
expect(store.nodeProgressStates).toEqual({})
})
it('should return undefined for focusedJob when no job is focused', () => {
expect(store.focusedJob).toBeUndefined()
})
it('should return the focused QueuedJob from focusedJob computed', () => {
store.queuedJobs = {
'job-1': { nodes: { n1: false, n2: true } }
}
store.setFocusedJob('job-1')
expect(store.focusedJob).toEqual({ nodes: { n1: false, n2: true } })
})
})
describe('useExecutionStore - isConcurrentExecutionActive', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('should be false when no jobs are running', () => {
expect(store.isConcurrentExecutionActive).toBe(false)
})
it('should be false when only one job is running', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
}
}
expect(store.isConcurrentExecutionActive).toBe(false)
})
it('should be true when multiple jobs are running', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
},
'job-2': {
'node-2': {
value: 30,
max: 100,
state: 'running',
node_id: 'node-2',
prompt_id: 'job-2'
}
}
}
expect(store.isConcurrentExecutionActive).toBe(true)
})
})
describe('useExecutionStore - isIdle with multi-job', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('should be true when no jobs are running', () => {
expect(store.isIdle).toBe(true)
})
it('should be false when at least one job is running', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 0,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
}
}
expect(store.isIdle).toBe(false)
})
it('should be true when all jobs are finished (not running)', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 100,
max: 100,
state: 'finished',
node_id: 'node-1',
prompt_id: 'job-1'
}
}
}
expect(store.isIdle).toBe(true)
})
})
describe('useExecutionStore - resetExecutionState auto-advance', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('should set focusedJobId to null when last running job finishes', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
}
}
store.activeJobId = 'job-1'
store.setFocusedJob('job-1')
// When the last job finishes, nodeProgressStatesByJob will be empty
// and focusedJobId should become null
store.nodeProgressStatesByJob = {}
store.setFocusedJob(null)
expect(store.focusedJobId).toBeNull()
expect(store.nodeProgressStates).toEqual({})
})
it('should not change focusedJobId when a non-focused job finishes', () => {
store.nodeProgressStatesByJob = {
'job-1': {
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
},
'job-2': {
'node-2': {
value: 30,
max: 100,
state: 'running',
node_id: 'node-2',
prompt_id: 'job-2'
}
}
}
store.setFocusedJob('job-1')
// job-2 finishes — focusedJobId should stay on job-1
const updated = { ...store.nodeProgressStatesByJob }
delete updated['job-2']
store.nodeProgressStatesByJob = updated
// Focus should still be on job-1
expect(store.focusedJobId).toBe('job-1')
expect(store.nodeProgressStates).toEqual({
'node-1': {
value: 50,
max: 100,
state: 'running',
node_id: 'node-1',
prompt_id: 'job-1'
}
})
})
})
describe('useExecutionStore - executionProgress from focusedJob', () => {
let store: ReturnType<typeof useExecutionStore>
beforeEach(() => {
vi.clearAllMocks()
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
})
it('should compute executionProgress from focused job nodes', () => {
store.queuedJobs = {
'job-1': { nodes: { n1: true, n2: false, n3: false } },
'job-2': { nodes: { n4: true, n5: true } }
}
store.setFocusedJob('job-1')
// 1 out of 3 done
expect(store.executionProgress).toBeCloseTo(1 / 3)
store.setFocusedJob('job-2')
// 2 out of 2 done
expect(store.executionProgress).toBe(1)
})
it('should return 0 when no job is focused', () => {
expect(store.executionProgress).toBe(0)
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
let store: ReturnType<typeof useExecutionErrorStore>

View File

@@ -60,6 +60,7 @@ export const useExecutionStore = defineStore('execution', () => {
const clientId = ref<string | null>(null)
const activeJobId = ref<string | null>(null)
const focusedJobId = ref<string | null>(null)
const queuedJobs = ref<Record<NodeId, QueuedJob>>({})
// This is the progress of all nodes in the currently executing workflow
const nodeProgressStates = ref<Record<string, NodeProgressState>>({})
@@ -170,7 +171,7 @@ export const useExecutionStore = defineStore('execution', () => {
const executingNode = computed<ComfyNode | null>(() => {
if (!executingNodeId.value) return null
const workflow: ComfyWorkflow | undefined = activeJob.value?.workflow
const workflow: ComfyWorkflow | undefined = focusedJob.value?.workflow
if (!workflow) return null
const canvasState: ComfyWorkflowJSON | null =
@@ -195,20 +196,28 @@ export const useExecutionStore = defineStore('execution', () => {
() => queuedJobs.value[activeJobId.value ?? '']
)
const focusedJob = computed<QueuedJob | undefined>(
() => queuedJobs.value[focusedJobId.value ?? '']
)
const isConcurrentExecutionActive = computed(
() => runningJobIds.value.length > 1
)
const totalNodesToExecute = computed<number>(() => {
if (!activeJob.value) return 0
return Object.values(activeJob.value.nodes).length
if (!focusedJob.value) return 0
return Object.values(focusedJob.value.nodes).length
})
const isIdle = computed<boolean>(() => !activeJobId.value)
const isIdle = computed<boolean>(() => runningJobIds.value.length === 0)
const nodesExecuted = computed<number>(() => {
if (!activeJob.value) return 0
return Object.values(activeJob.value.nodes).filter(Boolean).length
if (!focusedJob.value) return 0
return Object.values(focusedJob.value.nodes).filter(Boolean).length
})
const executionProgress = computed<number>(() => {
if (!activeJob.value) return 0
if (!focusedJob.value) return 0
const total = totalNodesToExecute.value
const done = nodesExecuted.value
return total > 0 ? done / total : 0
@@ -251,6 +260,14 @@ export const useExecutionStore = defineStore('execution', () => {
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
// Auto-focus the first job, or if the current focused job is no longer running
if (
!focusedJobId.value ||
!nodeProgressStatesByJob.value[focusedJobId.value]
) {
focusedJobId.value = activeJobId.value
}
// Ensure path mapping exists — execution_start can arrive via WebSocket
// before the HTTP response from queuePrompt triggers storeJob.
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
@@ -354,17 +371,20 @@ export const useExecutionStore = defineStore('execution', () => {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
if (jobId === focusedJobId.value) {
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
}
}
}
}
@@ -492,8 +512,7 @@ export const useExecutionStore = defineStore('execution', () => {
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: string | null) {
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}
const jobId = jobIdParam ?? activeJobId.value ?? null
if (jobId) {
const map = { ...nodeProgressStatesByJob.value }
@@ -501,14 +520,33 @@ export const useExecutionStore = defineStore('execution', () => {
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
}
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
if (jobId === activeJobId.value || !jobIdParam) {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
activeJobId.value = null
}
if (jobId === focusedJobId.value || !jobIdParam) {
// Auto-advance to next running job, or null
focusedJobId.value =
runningJobIds.value.find((id) => id !== jobId) ?? null
nodeProgressStates.value = focusedJobId.value
? (nodeProgressStatesByJob.value[focusedJobId.value] ?? {})
: {}
}
activeJobId.value = null
_executingNodeProgress.value = null
executionErrorStore.clearPromptError()
}
function setFocusedJob(jobId: string | null) {
focusedJobId.value = jobId
if (jobId) {
nodeProgressStates.value = nodeProgressStatesByJob.value[jobId] ?? {}
} else {
nodeProgressStates.value = {}
}
}
function getNodeIdIfExecuting(nodeId: string | number) {
const nodeIdStr = String(nodeId)
return nodeIdStr.includes(':')
@@ -625,6 +663,10 @@ export const useExecutionStore = defineStore('execution', () => {
executingNodeId,
executingNodeIds,
activeJob,
focusedJobId,
focusedJob,
isConcurrentExecutionActive,
setFocusedJob,
totalNodesToExecute,
nodesExecuted,
executionProgress,