mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-03-31 09:45:46 +00:00
Compare commits
11 Commits
coderabbit
...
concurrent
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f80d4fc54 | ||
|
|
60d93ab847 | ||
|
|
66adc95fb6 | ||
|
|
a876972c64 | ||
|
|
702072b7f7 | ||
|
|
c2c4f84d24 | ||
|
|
638f1b9938 | ||
|
|
6c9f1fb4ca | ||
|
|
2664e5d629 | ||
|
|
304516a9ed | ||
|
|
123284d03a |
@@ -62,7 +62,9 @@ const config: KnipConfig = {
|
||||
// Agent review check config, not part of the build
|
||||
'.agents/checks/eslint.strict.config.js',
|
||||
// Loaded via @plugin directive in CSS, not detected by knip
|
||||
'packages/design-system/src/css/lucideStrokePlugin.js'
|
||||
'packages/design-system/src/css/lucideStrokePlugin.js',
|
||||
// Pending integration in stacked PR (concurrent job execution)
|
||||
'src/composables/useConcurrentExecution.ts'
|
||||
],
|
||||
compilers: {
|
||||
// https://github.com/webpro-nl/knip/issues/1008#issuecomment-3207756199
|
||||
|
||||
@@ -384,6 +384,17 @@ describe('TopMenuSection', () => {
|
||||
configureSettings(pinia, true)
|
||||
const executionStore = useExecutionStore(pinia)
|
||||
executionStore.activeJobId = 'job-1'
|
||||
executionStore.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const ComfyActionbarStub = createComfyActionbarStub(actionbarTarget)
|
||||
|
||||
|
||||
50
src/composables/useConcurrentExecution.ts
Normal file
50
src/composables/useConcurrentExecution.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import { computed } from 'vue'
|
||||
|
||||
import { useFeatureFlags } from '@/composables/useFeatureFlags'
|
||||
import { useSettingStore } from '@/platform/settings/settingStore'
|
||||
|
||||
export function useConcurrentExecution() {
|
||||
const settingStore = useSettingStore()
|
||||
const { flags } = useFeatureFlags()
|
||||
|
||||
const isFeatureEnabled = computed(
|
||||
() => flags.concurrentExecutionEnabled === true
|
||||
)
|
||||
|
||||
const isUserEnabled = computed(
|
||||
() => settingStore.get('Comfy.Cloud.ConcurrentExecution') === true
|
||||
)
|
||||
|
||||
const isConcurrentExecutionEnabled = computed(
|
||||
() => isFeatureEnabled.value && isUserEnabled.value
|
||||
)
|
||||
|
||||
const maxConcurrentJobs = computed(() => flags.maxConcurrentJobs as number)
|
||||
|
||||
const hasSeenOnboarding = computed(
|
||||
() =>
|
||||
settingStore.get('Comfy.Cloud.ConcurrentExecution.OnboardingSeen') ===
|
||||
true
|
||||
)
|
||||
|
||||
async function setUserEnabled(enabled: boolean) {
|
||||
await settingStore.set('Comfy.Cloud.ConcurrentExecution', enabled)
|
||||
}
|
||||
|
||||
async function markOnboardingSeen() {
|
||||
await settingStore.set(
|
||||
'Comfy.Cloud.ConcurrentExecution.OnboardingSeen',
|
||||
true
|
||||
)
|
||||
}
|
||||
|
||||
return {
|
||||
isFeatureEnabled,
|
||||
isUserEnabled,
|
||||
isConcurrentExecutionEnabled,
|
||||
maxConcurrentJobs,
|
||||
hasSeenOnboarding,
|
||||
setUserEnabled,
|
||||
markOnboardingSeen
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,9 @@ export enum ServerFeatureFlag {
|
||||
NODE_LIBRARY_ESSENTIALS_ENABLED = 'node_library_essentials_enabled',
|
||||
WORKFLOW_SHARING_ENABLED = 'workflow_sharing_enabled',
|
||||
COMFYHUB_UPLOAD_ENABLED = 'comfyhub_upload_enabled',
|
||||
COMFYHUB_PROFILE_GATE_ENABLED = 'comfyhub_profile_gate_enabled'
|
||||
COMFYHUB_PROFILE_GATE_ENABLED = 'comfyhub_profile_gate_enabled',
|
||||
CONCURRENT_EXECUTION_ENABLED = 'concurrent_execution_enabled',
|
||||
MAX_CONCURRENT_JOBS = 'max_concurrent_jobs'
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -156,6 +158,31 @@ export function useFeatureFlags() {
|
||||
remoteConfig.value.comfyhub_profile_gate_enabled,
|
||||
false
|
||||
)
|
||||
},
|
||||
get concurrentExecutionEnabled() {
|
||||
const override = getDevOverride<boolean>(
|
||||
ServerFeatureFlag.CONCURRENT_EXECUTION_ENABLED
|
||||
)
|
||||
if (override !== undefined) return override
|
||||
|
||||
return (
|
||||
remoteConfig.value.concurrent_execution_enabled ??
|
||||
api.getServerFeature(
|
||||
ServerFeatureFlag.CONCURRENT_EXECUTION_ENABLED,
|
||||
false
|
||||
)
|
||||
)
|
||||
},
|
||||
get maxConcurrentJobs() {
|
||||
const override = getDevOverride<number>(
|
||||
ServerFeatureFlag.MAX_CONCURRENT_JOBS
|
||||
)
|
||||
if (override !== undefined) return Math.max(1, override)
|
||||
|
||||
const configured =
|
||||
remoteConfig.value.max_concurrent_jobs ??
|
||||
api.getServerFeature(ServerFeatureFlag.MAX_CONCURRENT_JOBS, 1)
|
||||
return Math.max(1, configured)
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -55,4 +55,6 @@ export type RemoteConfig = {
|
||||
comfyhub_upload_enabled?: boolean
|
||||
comfyhub_profile_gate_enabled?: boolean
|
||||
sentry_dsn?: string
|
||||
max_concurrent_jobs?: number
|
||||
concurrent_execution_enabled?: boolean
|
||||
}
|
||||
|
||||
@@ -1268,5 +1268,21 @@ export const CORE_SETTINGS: SettingParams[] = [
|
||||
type: 'boolean',
|
||||
defaultValue: false,
|
||||
versionAdded: '1.42.0'
|
||||
},
|
||||
{
|
||||
id: 'Comfy.Cloud.ConcurrentExecution',
|
||||
name: 'Run jobs in parallel',
|
||||
tooltip:
|
||||
'When enabled, multiple workflow runs execute concurrently instead of queuing sequentially.',
|
||||
type: 'boolean',
|
||||
defaultValue: true,
|
||||
versionAdded: '1.42.0'
|
||||
},
|
||||
{
|
||||
id: 'Comfy.Cloud.ConcurrentExecution.OnboardingSeen',
|
||||
name: 'Concurrent execution onboarding dialog seen',
|
||||
type: 'hidden',
|
||||
defaultValue: false,
|
||||
versionAdded: '1.42.0'
|
||||
}
|
||||
]
|
||||
|
||||
@@ -466,7 +466,9 @@ const zSettings = z.object({
|
||||
'Comfy.RightSidePanel.IsOpen': z.boolean(),
|
||||
'Comfy.RightSidePanel.ShowErrorsTab': z.boolean(),
|
||||
'Comfy.Node.AlwaysShowAdvancedWidgets': z.boolean(),
|
||||
'LiteGraph.Group.SelectChildrenOnClick': z.boolean()
|
||||
'LiteGraph.Group.SelectChildrenOnClick': z.boolean(),
|
||||
'Comfy.Cloud.ConcurrentExecution': z.boolean(),
|
||||
'Comfy.Cloud.ConcurrentExecution.OnboardingSeen': z.boolean()
|
||||
})
|
||||
|
||||
export type EmbeddingsResponse = z.infer<typeof zEmbeddingsResponse>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { setActivePinia } from 'pinia'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { ref } from 'vue'
|
||||
import { app } from '@/scripts/app'
|
||||
import { MAX_PROGRESS_JOBS, useExecutionStore } from '@/stores/executionStore'
|
||||
import { useExecutionErrorStore } from '@/stores/executionErrorStore'
|
||||
@@ -15,6 +16,20 @@ import type { NodeProgressState } from '@/schemas/apiSchema'
|
||||
import { createMockLGraphNode } from '@/utils/__tests__/litegraphTestUtils'
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
|
||||
const mockConcurrentExecutionEnabled = ref(false)
|
||||
|
||||
vi.mock('@/composables/useConcurrentExecution', () => ({
|
||||
useConcurrentExecution: () => ({
|
||||
isConcurrentExecutionEnabled: mockConcurrentExecutionEnabled,
|
||||
isFeatureEnabled: ref(false),
|
||||
isUserEnabled: ref(false),
|
||||
maxConcurrentJobs: ref(1),
|
||||
hasSeenOnboarding: ref(false),
|
||||
setUserEnabled: vi.fn(),
|
||||
markOnboardingSeen: vi.fn()
|
||||
})
|
||||
}))
|
||||
|
||||
// Mock the workflowStore
|
||||
vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
|
||||
const { ComfyWorkflow } = await vi.importActual<typeof WorkflowStoreModule>(
|
||||
@@ -56,7 +71,8 @@ vi.mock('@/scripts/api', () => ({
|
||||
apiEventHandlers.delete(event)
|
||||
}),
|
||||
clientId: 'test-client',
|
||||
apiURL: vi.fn((path: string) => `/api${path}`)
|
||||
apiURL: vi.fn((path: string) => `/api${path}`),
|
||||
getServerFeature: vi.fn()
|
||||
}
|
||||
}))
|
||||
|
||||
@@ -430,6 +446,300 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - focusedJobId management', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should initialize focusedJobId as null', () => {
|
||||
expect(store.focusedJobId).toBeNull()
|
||||
})
|
||||
|
||||
it('should set focusedJobId and update nodeProgressStates via setFocusedJob', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
},
|
||||
'job-2': {
|
||||
'node-2': {
|
||||
value: 30,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-2',
|
||||
prompt_id: 'job-2'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
store.setFocusedJob('job-1')
|
||||
expect(store.focusedJobId).toBe('job-1')
|
||||
expect(store.nodeProgressStates).toEqual({
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
})
|
||||
|
||||
store.setFocusedJob('job-2')
|
||||
expect(store.focusedJobId).toBe('job-2')
|
||||
expect(store.nodeProgressStates).toEqual({
|
||||
'node-2': {
|
||||
value: 30,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-2',
|
||||
prompt_id: 'job-2'
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
it('should clear nodeProgressStates when setFocusedJob is called with null', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
store.setFocusedJob('job-1')
|
||||
expect(store.nodeProgressStates).not.toEqual({})
|
||||
|
||||
store.setFocusedJob(null)
|
||||
expect(store.focusedJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('should return undefined for focusedJob when no job is focused', () => {
|
||||
expect(store.focusedJob).toBeUndefined()
|
||||
})
|
||||
|
||||
it('should return the focused QueuedJob from focusedJob computed', () => {
|
||||
store.queuedJobs = {
|
||||
'job-1': { nodes: { n1: false, n2: true } }
|
||||
}
|
||||
store.setFocusedJob('job-1')
|
||||
expect(store.focusedJob).toEqual({ nodes: { n1: false, n2: true } })
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - isConcurrentExecutionActive', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should be false when no jobs are running', () => {
|
||||
expect(store.isConcurrentExecutionActive).toBe(false)
|
||||
})
|
||||
|
||||
it('should be false when only one job is running', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
expect(store.isConcurrentExecutionActive).toBe(false)
|
||||
})
|
||||
|
||||
it('should be true when multiple jobs are running', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
},
|
||||
'job-2': {
|
||||
'node-2': {
|
||||
value: 30,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-2',
|
||||
prompt_id: 'job-2'
|
||||
}
|
||||
}
|
||||
}
|
||||
expect(store.isConcurrentExecutionActive).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - isIdle with multi-job', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mockConcurrentExecutionEnabled.value = true
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should be true when no jobs are running', () => {
|
||||
expect(store.isIdle).toBe(true)
|
||||
})
|
||||
|
||||
it('should be false when at least one job is running', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 0,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
expect(store.isIdle).toBe(false)
|
||||
})
|
||||
|
||||
it('should be true when all jobs are finished (not running)', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 100,
|
||||
max: 100,
|
||||
state: 'finished',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
expect(store.isIdle).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - resetExecutionState auto-advance', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should set focusedJobId to null when last running job finishes', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
}
|
||||
}
|
||||
store.activeJobId = 'job-1'
|
||||
store.setFocusedJob('job-1')
|
||||
|
||||
// When the last job finishes, nodeProgressStatesByJob will be empty
|
||||
// and focusedJobId should become null
|
||||
store.nodeProgressStatesByJob = {}
|
||||
store.setFocusedJob(null)
|
||||
expect(store.focusedJobId).toBeNull()
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('should not change focusedJobId when a non-focused job finishes', () => {
|
||||
store.nodeProgressStatesByJob = {
|
||||
'job-1': {
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
},
|
||||
'job-2': {
|
||||
'node-2': {
|
||||
value: 30,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-2',
|
||||
prompt_id: 'job-2'
|
||||
}
|
||||
}
|
||||
}
|
||||
store.setFocusedJob('job-1')
|
||||
|
||||
// job-2 finishes — focusedJobId should stay on job-1
|
||||
const updated = { ...store.nodeProgressStatesByJob }
|
||||
delete updated['job-2']
|
||||
store.nodeProgressStatesByJob = updated
|
||||
|
||||
// Focus should still be on job-1
|
||||
expect(store.focusedJobId).toBe('job-1')
|
||||
expect(store.nodeProgressStates).toEqual({
|
||||
'node-1': {
|
||||
value: 50,
|
||||
max: 100,
|
||||
state: 'running',
|
||||
node_id: 'node-1',
|
||||
prompt_id: 'job-1'
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - executionProgress from focusedJob', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mockConcurrentExecutionEnabled.value = true
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
})
|
||||
|
||||
it('should compute executionProgress from focused job nodes', () => {
|
||||
store.queuedJobs = {
|
||||
'job-1': { nodes: { n1: true, n2: false, n3: false } },
|
||||
'job-2': { nodes: { n4: true, n5: true } }
|
||||
}
|
||||
store.setFocusedJob('job-1')
|
||||
// 1 out of 3 done
|
||||
expect(store.executionProgress).toBeCloseTo(1 / 3)
|
||||
|
||||
store.setFocusedJob('job-2')
|
||||
// 2 out of 2 done
|
||||
expect(store.executionProgress).toBe(1)
|
||||
})
|
||||
|
||||
it('should return 0 when no job is focused', () => {
|
||||
expect(store.executionProgress).toBe(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionErrorStore - Node Error Lookups', () => {
|
||||
let store: ReturnType<typeof useExecutionErrorStore>
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import { defineStore } from 'pinia'
|
||||
import { computed, ref, shallowRef } from 'vue'
|
||||
|
||||
import { useNodeProgressText } from '@/composables/node/useNodeProgressText'
|
||||
import { useConcurrentExecution } from '@/composables/useConcurrentExecution'
|
||||
import { isCloud } from '@/platform/distribution/types'
|
||||
import { useTelemetry } from '@/platform/telemetry'
|
||||
import type { ComfyWorkflow } from '@/platform/workflow/management/stores/workflowStore'
|
||||
@@ -57,9 +58,11 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
const workflowStore = useWorkflowStore()
|
||||
const canvasStore = useCanvasStore()
|
||||
const executionErrorStore = useExecutionErrorStore()
|
||||
const { isConcurrentExecutionEnabled } = useConcurrentExecution()
|
||||
|
||||
const clientId = ref<string | null>(null)
|
||||
const activeJobId = ref<string | null>(null)
|
||||
const focusedJobId = ref<string | null>(null)
|
||||
const queuedJobs = ref<Record<NodeId, QueuedJob>>({})
|
||||
// This is the progress of all nodes in the currently executing workflow
|
||||
const nodeProgressStates = ref<Record<string, NodeProgressState>>({})
|
||||
@@ -170,7 +173,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
const executingNode = computed<ComfyNode | null>(() => {
|
||||
if (!executingNodeId.value) return null
|
||||
|
||||
const workflow: ComfyWorkflow | undefined = activeJob.value?.workflow
|
||||
const job = isConcurrentExecutionEnabled.value
|
||||
? focusedJob.value
|
||||
: activeJob.value
|
||||
const workflow: ComfyWorkflow | undefined = job?.workflow
|
||||
if (!workflow) return null
|
||||
|
||||
const canvasState: ComfyWorkflowJSON | null =
|
||||
@@ -195,20 +201,36 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
() => queuedJobs.value[activeJobId.value ?? '']
|
||||
)
|
||||
|
||||
const focusedJob = computed<QueuedJob | undefined>(
|
||||
() => queuedJobs.value[focusedJobId.value ?? '']
|
||||
)
|
||||
|
||||
const isConcurrentExecutionActive = computed(
|
||||
() => runningJobIds.value.length > 1
|
||||
)
|
||||
|
||||
const _currentJob = computed(() =>
|
||||
isConcurrentExecutionEnabled.value ? focusedJob.value : activeJob.value
|
||||
)
|
||||
|
||||
const totalNodesToExecute = computed<number>(() => {
|
||||
if (!activeJob.value) return 0
|
||||
return Object.values(activeJob.value.nodes).length
|
||||
if (!_currentJob.value) return 0
|
||||
return Object.values(_currentJob.value.nodes).length
|
||||
})
|
||||
|
||||
const isIdle = computed<boolean>(() => !activeJobId.value)
|
||||
const isIdle = computed<boolean>(() =>
|
||||
isConcurrentExecutionEnabled.value
|
||||
? runningJobIds.value.length === 0
|
||||
: !activeJobId.value
|
||||
)
|
||||
|
||||
const nodesExecuted = computed<number>(() => {
|
||||
if (!activeJob.value) return 0
|
||||
return Object.values(activeJob.value.nodes).filter(Boolean).length
|
||||
if (!_currentJob.value) return 0
|
||||
return Object.values(_currentJob.value.nodes).filter(Boolean).length
|
||||
})
|
||||
|
||||
const executionProgress = computed<number>(() => {
|
||||
if (!activeJob.value) return 0
|
||||
if (!_currentJob.value) return 0
|
||||
const total = totalNodesToExecute.value
|
||||
const done = nodesExecuted.value
|
||||
return total > 0 ? done / total : 0
|
||||
@@ -251,6 +273,15 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
|
||||
clearInitializationByJobId(activeJobId.value)
|
||||
|
||||
if (isConcurrentExecutionEnabled.value) {
|
||||
// Auto-focus the first job, or if the current focused job is no longer running
|
||||
if (!focusedJobId.value || !queuedJobs.value[focusedJobId.value]) {
|
||||
focusedJobId.value = activeJobId.value
|
||||
}
|
||||
} else {
|
||||
focusedJobId.value = activeJobId.value
|
||||
}
|
||||
|
||||
// Ensure path mapping exists — execution_start can arrive via WebSocket
|
||||
// before the HTTP response from queuePrompt triggers storeJob.
|
||||
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
|
||||
@@ -260,9 +291,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
const job = queuedJobs.value[e.detail.prompt_id]
|
||||
if (!job) return
|
||||
for (const n of e.detail.nodes) {
|
||||
activeJob.value.nodes[n] = true
|
||||
job.nodes[n] = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,22 +302,21 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
e: CustomEvent<ExecutionInterruptedWsMessage>
|
||||
) {
|
||||
const jobId = e.detail.prompt_id
|
||||
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
|
||||
clearInitializationByJobId(jobId)
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
activeJob.value.nodes[e.detail.node] = true
|
||||
const job = queuedJobs.value[e.detail.prompt_id]
|
||||
if (!job) return
|
||||
job.nodes[e.detail.node] = true
|
||||
}
|
||||
|
||||
function handleExecutionSuccess(e: CustomEvent<ExecutionSuccessWsMessage>) {
|
||||
if (isCloud && activeJobId.value) {
|
||||
useTelemetry()?.trackExecutionSuccess({
|
||||
jobId: activeJobId.value
|
||||
})
|
||||
}
|
||||
const jobId = e.detail.prompt_id
|
||||
if (jobId) {
|
||||
useTelemetry()?.trackExecutionSuccess({ jobId })
|
||||
}
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
@@ -354,22 +385,34 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
const shouldUpdate = isConcurrentExecutionEnabled.value
|
||||
? jobId === focusedJobId.value
|
||||
: true
|
||||
|
||||
if (shouldUpdate) {
|
||||
if (!isConcurrentExecutionEnabled.value) evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
if (
|
||||
isConcurrentExecutionEnabled.value &&
|
||||
e.detail.prompt_id !== focusedJobId.value
|
||||
)
|
||||
return
|
||||
_executingNodeProgress.value = e.detail
|
||||
}
|
||||
|
||||
@@ -492,8 +535,6 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
* Reset execution-related state after a run completes or is stopped.
|
||||
*/
|
||||
function resetExecutionState(jobIdParam?: string | null) {
|
||||
executionIdToLocatorCache.clear()
|
||||
nodeProgressStates.value = {}
|
||||
const jobId = jobIdParam ?? activeJobId.value ?? null
|
||||
if (jobId) {
|
||||
const map = { ...nodeProgressStatesByJob.value }
|
||||
@@ -501,14 +542,46 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
nodeProgressStatesByJob.value = map
|
||||
useJobPreviewStore().clearPreview(jobId)
|
||||
}
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
|
||||
if (isConcurrentExecutionEnabled.value) {
|
||||
if (jobId === activeJobId.value || !jobIdParam) {
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
}
|
||||
activeJobId.value = null
|
||||
}
|
||||
if (jobId === focusedJobId.value || !jobIdParam) {
|
||||
// Auto-advance to next running job, or null
|
||||
focusedJobId.value =
|
||||
runningJobIds.value.find((id) => id !== jobId) ?? null
|
||||
nodeProgressStates.value = focusedJobId.value
|
||||
? (nodeProgressStatesByJob.value[focusedJobId.value] ?? {})
|
||||
: {}
|
||||
}
|
||||
} else {
|
||||
executionIdToLocatorCache.clear()
|
||||
nodeProgressStates.value = {}
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
}
|
||||
activeJobId.value = null
|
||||
focusedJobId.value = null
|
||||
}
|
||||
activeJobId.value = null
|
||||
|
||||
_executingNodeProgress.value = null
|
||||
executionErrorStore.clearPromptError()
|
||||
}
|
||||
|
||||
function setFocusedJob(jobId: string | null) {
|
||||
focusedJobId.value = jobId
|
||||
_executingNodeProgress.value = null
|
||||
if (jobId) {
|
||||
nodeProgressStates.value = nodeProgressStatesByJob.value[jobId] ?? {}
|
||||
} else {
|
||||
nodeProgressStates.value = {}
|
||||
}
|
||||
}
|
||||
|
||||
function getNodeIdIfExecuting(nodeId: string | number) {
|
||||
const nodeIdStr = String(nodeId)
|
||||
return nodeIdStr.includes(':')
|
||||
@@ -625,6 +698,10 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
executingNodeId,
|
||||
executingNodeIds,
|
||||
activeJob,
|
||||
focusedJobId,
|
||||
focusedJob,
|
||||
isConcurrentExecutionActive,
|
||||
setFocusedJob,
|
||||
totalNodesToExecute,
|
||||
nodesExecuted,
|
||||
executionProgress,
|
||||
|
||||
Reference in New Issue
Block a user