refactor: gate all concurrent execution changes behind feature flag, remove isCloud guards

- executionStore: all focusedJobId behavior, isIdle definition, progress
  filtering, and resetExecutionState now check isConcurrentExecutionEnabled
  and fall back to original single-job behavior when disabled
- useFeatureFlags: remove isCloud guard from concurrentExecutionEnabled so
  local/OSS can set their own feature flags
- coreSettings: remove isCloud guard on setting type visibility
This commit is contained in:
bymyself
2026-03-15 04:20:48 -07:00
parent a876972c64
commit 66adc95fb6
3 changed files with 59 additions and 27 deletions

View File

@@ -165,9 +165,6 @@ export function useFeatureFlags() {
)
if (override !== undefined) return override
if (!isCloud) return false
if (!isAuthenticatedConfigLoaded.value) return false
return (
remoteConfig.value.concurrent_execution_enabled ??
api.getServerFeature(

View File

@@ -1274,7 +1274,7 @@ export const CORE_SETTINGS: SettingParams[] = [
name: 'Run jobs in parallel',
tooltip:
'When enabled, multiple workflow runs execute concurrently instead of queuing sequentially.',
type: isCloud ? 'boolean' : 'hidden',
type: 'boolean',
defaultValue: true,
versionAdded: '1.42.0'
},

View File

@@ -2,7 +2,7 @@ import { defineStore } from 'pinia'
import { computed, ref, shallowRef } from 'vue'
import { useNodeProgressText } from '@/composables/node/useNodeProgressText'
import { isCloud } from '@/platform/distribution/types'
import { useConcurrentExecution } from '@/composables/useConcurrentExecution'
import { useTelemetry } from '@/platform/telemetry'
import type { ComfyWorkflow } from '@/platform/workflow/management/stores/workflowStore'
import { useWorkflowStore } from '@/platform/workflow/management/stores/workflowStore'
@@ -57,6 +57,7 @@ export const useExecutionStore = defineStore('execution', () => {
const workflowStore = useWorkflowStore()
const canvasStore = useCanvasStore()
const executionErrorStore = useExecutionErrorStore()
const { isConcurrentExecutionEnabled } = useConcurrentExecution()
const clientId = ref<string | null>(null)
const activeJobId = ref<string | null>(null)
@@ -171,7 +172,10 @@ export const useExecutionStore = defineStore('execution', () => {
const executingNode = computed<ComfyNode | null>(() => {
if (!executingNodeId.value) return null
const workflow: ComfyWorkflow | undefined = focusedJob.value?.workflow
const job = isConcurrentExecutionEnabled.value
? focusedJob.value
: activeJob.value
const workflow: ComfyWorkflow | undefined = job?.workflow
if (!workflow) return null
const canvasState: ComfyWorkflowJSON | null =
@@ -204,20 +208,28 @@ export const useExecutionStore = defineStore('execution', () => {
() => runningJobIds.value.length > 1
)
const _currentJob = computed(() =>
isConcurrentExecutionEnabled.value ? focusedJob.value : activeJob.value
)
const totalNodesToExecute = computed<number>(() => {
if (!focusedJob.value) return 0
return Object.values(focusedJob.value.nodes).length
if (!_currentJob.value) return 0
return Object.values(_currentJob.value.nodes).length
})
const isIdle = computed<boolean>(() => runningJobIds.value.length === 0)
const isIdle = computed<boolean>(() =>
isConcurrentExecutionEnabled.value
? runningJobIds.value.length === 0
: !activeJobId.value
)
const nodesExecuted = computed<number>(() => {
if (!focusedJob.value) return 0
return Object.values(focusedJob.value.nodes).filter(Boolean).length
if (!_currentJob.value) return 0
return Object.values(_currentJob.value.nodes).filter(Boolean).length
})
const executionProgress = computed<number>(() => {
if (!focusedJob.value) return 0
if (!_currentJob.value) return 0
const total = totalNodesToExecute.value
const done = nodesExecuted.value
return total > 0 ? done / total : 0
@@ -260,8 +272,12 @@ export const useExecutionStore = defineStore('execution', () => {
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
// Auto-focus the first job, or if the current focused job is no longer running
if (!focusedJobId.value || !queuedJobs.value[focusedJobId.value]) {
if (isConcurrentExecutionEnabled.value) {
// Auto-focus the first job, or if the current focused job is no longer running
if (!focusedJobId.value || !queuedJobs.value[focusedJobId.value]) {
focusedJobId.value = activeJobId.value
}
} else {
focusedJobId.value = activeJobId.value
}
@@ -297,7 +313,7 @@ export const useExecutionStore = defineStore('execution', () => {
function handleExecutionSuccess(e: CustomEvent<ExecutionSuccessWsMessage>) {
const jobId = e.detail.prompt_id
if (isCloud && jobId) {
if (jobId) {
useTelemetry()?.trackExecutionSuccess({ jobId })
}
resetExecutionState(jobId)
@@ -369,8 +385,12 @@ export const useExecutionStore = defineStore('execution', () => {
[jobId]: nodes
}
const shouldUpdate = isConcurrentExecutionEnabled.value
? jobId === focusedJobId.value
: true
if (jobId === focusedJobId.value) {
if (shouldUpdate) {
if (!isConcurrentExecutionEnabled.value) evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
@@ -387,7 +407,11 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
if (e.detail.prompt_id !== focusedJobId.value) return
if (
isConcurrentExecutionEnabled.value &&
e.detail.prompt_id !== focusedJobId.value
)
return
_executingNodeProgress.value = e.detail
}
@@ -510,7 +534,6 @@ export const useExecutionStore = defineStore('execution', () => {
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState(jobIdParam?: string | null) {
const jobId = jobIdParam ?? activeJobId.value ?? null
if (jobId) {
const map = { ...nodeProgressStatesByJob.value }
@@ -518,20 +541,32 @@ export const useExecutionStore = defineStore('execution', () => {
nodeProgressStatesByJob.value = map
useJobPreviewStore().clearPreview(jobId)
}
if (jobId === activeJobId.value || !jobIdParam) {
if (isConcurrentExecutionEnabled.value) {
if (jobId === activeJobId.value || !jobIdParam) {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
activeJobId.value = null
}
if (jobId === focusedJobId.value || !jobIdParam) {
// Auto-advance to next running job, or null
focusedJobId.value =
runningJobIds.value.find((id) => id !== jobId) ?? null
nodeProgressStates.value = focusedJobId.value
? (nodeProgressStatesByJob.value[focusedJobId.value] ?? {})
: {}
}
} else {
executionIdToLocatorCache.clear()
nodeProgressStates.value = {}
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
}
activeJobId.value = null
focusedJobId.value = null
}
if (jobId === focusedJobId.value || !jobIdParam) {
// Auto-advance to next running job, or null
focusedJobId.value =
runningJobIds.value.find((id) => id !== jobId) ?? null
nodeProgressStates.value = focusedJobId.value
? (nodeProgressStatesByJob.value[focusedJobId.value] ?? {})
: {}
}
_executingNodeProgress.value = null
executionErrorStore.clearPromptError()
}