mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-05 21:54:50 +00:00
Compare commits
4 Commits
v1.44.17
...
glary/scop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a67e3dbce9 | ||
|
|
7e75921fb5 | ||
|
|
48b953be31 | ||
|
|
72877c8c1a |
@@ -17,6 +17,7 @@ import {
|
||||
type ComfyNode,
|
||||
type ComfyWorkflowJSON
|
||||
} from '@/platform/workflow/validation/schemas/workflowSchema'
|
||||
import type { ExecutingWsMessage } from '@/schemas/apiSchema'
|
||||
import type { ComfyNodeDef, InputSpec } from '@/schemas/nodeDefSchema'
|
||||
import { useDialogService } from '@/services/dialogService'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
@@ -1446,7 +1447,11 @@ export class GroupNodeHandler {
|
||||
).runningInternalNodeId = innerNodeIndex
|
||||
api.dispatchCustomEvent(
|
||||
type as 'executing',
|
||||
getEvent(detail, `${this.node.id}`, this.node) as string
|
||||
getEvent(
|
||||
detail,
|
||||
`${this.node.id}`,
|
||||
this.node
|
||||
) as unknown as ExecutingWsMessage
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1459,8 +1464,11 @@ export class GroupNodeHandler {
|
||||
|
||||
const executing = handleEvent(
|
||||
'executing',
|
||||
(d) => (typeof d === 'string' ? d : undefined),
|
||||
(_d, id) => id
|
||||
(d) => (typeof d === 'string' ? d : (d?.display_node ?? d?.node)),
|
||||
(d, id) =>
|
||||
typeof d === 'object'
|
||||
? { ...d, node: id, display_node: id }
|
||||
: { prompt_id: '', node: id, display_node: id }
|
||||
)
|
||||
|
||||
const executed = handleEvent(
|
||||
|
||||
@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
|
||||
const zNodeType = z.string()
|
||||
const zJobId = z.string()
|
||||
export type JobId = z.infer<typeof zJobId>
|
||||
const zWorkflowId = z.string()
|
||||
export const resultItemType = z.enum(['input', 'output', 'temp'])
|
||||
export type ResultItemType = z.infer<typeof resultItemType>
|
||||
|
||||
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
|
||||
value: z.number().int(),
|
||||
max: z.number().int(),
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
node: zNodeId
|
||||
})
|
||||
|
||||
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
|
||||
state: z.enum(['pending', 'running', 'finished', 'error']),
|
||||
node_id: zNodeId,
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
display_node_id: zNodeId.optional(),
|
||||
parent_node_id: zNodeId.optional(),
|
||||
real_node_id: zNodeId.optional()
|
||||
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
|
||||
|
||||
const zProgressStateWsMessage = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
nodes: z.record(zNodeId, zNodeProgressState)
|
||||
})
|
||||
|
||||
const zExecutingWsMessage = z.object({
|
||||
node: zNodeId,
|
||||
display_node: zNodeId,
|
||||
prompt_id: zJobId
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
|
||||
const zExecutionWsMessageBase = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
timestamp: z.number().int()
|
||||
})
|
||||
|
||||
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
|
||||
const zProgressTextWsMessage = z.object({
|
||||
nodeId: zNodeId,
|
||||
text: z.string(),
|
||||
prompt_id: z.string().optional()
|
||||
prompt_id: z.string().optional(),
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zNotificationWsMessage = z.object({
|
||||
|
||||
@@ -23,8 +23,7 @@ import type {
|
||||
} from '@/platform/workflow/templates/types/template'
|
||||
import type {
|
||||
ComfyApiWorkflow,
|
||||
ComfyWorkflowJSON,
|
||||
NodeId
|
||||
ComfyWorkflowJSON
|
||||
} from '@/platform/workflow/validation/schemas/workflowSchema'
|
||||
import type {
|
||||
AssetDownloadWsMessage,
|
||||
@@ -213,11 +212,7 @@ type AsCustomEvents<T> = {
|
||||
|
||||
/** Handles differing event and API signatures. */
|
||||
type ApiToEventType<T = ApiCalls> = {
|
||||
[K in keyof T]: K extends 'status'
|
||||
? StatusWsMessageStatus
|
||||
: K extends 'executing'
|
||||
? NodeId
|
||||
: T[K]
|
||||
[K in keyof T]: K extends 'status' ? StatusWsMessageStatus : T[K]
|
||||
}
|
||||
|
||||
/** Dictionary of types used in the detail for a custom event */
|
||||
@@ -728,10 +723,7 @@ export class ComfyApi extends EventTarget {
|
||||
this.dispatchCustomEvent('status', msg.data.status ?? null)
|
||||
break
|
||||
case 'executing':
|
||||
this.dispatchCustomEvent(
|
||||
'executing',
|
||||
msg.data.display_node || msg.data.node
|
||||
)
|
||||
this.dispatchCustomEvent('executing', msg.data)
|
||||
break
|
||||
case 'execution_start':
|
||||
case 'execution_error':
|
||||
|
||||
@@ -11,12 +11,22 @@ const {
|
||||
mockNodeExecutionIdToNodeLocatorId,
|
||||
mockNodeIdToNodeLocatorId,
|
||||
mockNodeLocatorIdToNodeExecutionId,
|
||||
mockShowTextPreview
|
||||
mockShowTextPreview,
|
||||
mockActiveWorkflow,
|
||||
mockRevokePreviewsByExecutionId
|
||||
} = vi.hoisted(() => ({
|
||||
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
|
||||
mockShowTextPreview: vi.fn()
|
||||
mockShowTextPreview: vi.fn(),
|
||||
mockActiveWorkflow: {
|
||||
current: null as null | {
|
||||
activeState?: { id?: string }
|
||||
initialState?: { id?: string }
|
||||
path?: string
|
||||
}
|
||||
},
|
||||
mockRevokePreviewsByExecutionId: vi.fn()
|
||||
}))
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
|
||||
useWorkflowStore: vi.fn(() => ({
|
||||
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
|
||||
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
|
||||
get activeWorkflow() {
|
||||
return mockActiveWorkflow.current
|
||||
}
|
||||
}))
|
||||
}
|
||||
})
|
||||
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
vi.mock('@/stores/nodeOutputStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
|
||||
})
|
||||
}))
|
||||
|
||||
@@ -440,6 +453,469 @@ describe('useExecutionStore - reconcileInitializingJobs', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - active workflow gating', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
function fireProgress(
|
||||
jobId: string,
|
||||
nodeId: string,
|
||||
workflowId?: string,
|
||||
value = 5,
|
||||
max = 10
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value,
|
||||
max,
|
||||
prompt_id: jobId,
|
||||
node: nodeId,
|
||||
workflow_id: workflowId
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('always updates per-job progress regardless of active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
|
||||
})
|
||||
|
||||
it('skips global mirror when message workflow_id mismatches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates global mirror when message workflow_id matches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('falls back to jobIdToWorkflowId mapping when workflow_id missing', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when no id mapping is registered', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('preserves single-tab behaviour when ownership is unresolvable', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(
|
||||
makeProgressNodes('1', 'job-unknown')
|
||||
)
|
||||
})
|
||||
|
||||
it('updates mirror when there is no active workflow', () => {
|
||||
mockActiveWorkflow.current = null
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('skips preview revocation for non-active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('revokes previews for active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
|
||||
})
|
||||
|
||||
it('skips _executingNodeProgress on workflow_id mismatch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgress('job-other', '1', 'wf-other')
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('updates _executingNodeProgress on workflow_id match', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgress('job-1', '1', 'wf-active', 7, 10)
|
||||
|
||||
expect(store._executingNodeProgress).toEqual({
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
})
|
||||
|
||||
it('execution_start from a non-active workflow does not steal activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
})
|
||||
|
||||
it('execution_start from active workflow adopts activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('execution_success from a non-active workflow does not clear activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
const successHandler = apiEventHandlers.get('execution_success')
|
||||
if (!successHandler) throw new Error('execution_success handler not bound')
|
||||
successHandler(
|
||||
new CustomEvent('execution_success', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('execution_interrupted from a non-active workflow does not clear activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
const intHandler = apiEventHandlers.get('execution_interrupted')
|
||||
if (!intHandler) throw new Error('execution_interrupted handler not bound')
|
||||
intHandler(
|
||||
new CustomEvent('execution_interrupted', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
node_id: '1',
|
||||
node_type: 'X',
|
||||
executed: [],
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('executing from a non-active workflow does not clear _executingNodeProgress', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
fireProgress('job-1', '1', 'wf-active', 5, 10)
|
||||
expect(store._executingNodeProgress).not.toBeNull()
|
||||
|
||||
const handler = apiEventHandlers.get('executing')
|
||||
if (!handler) throw new Error('executing handler not bound')
|
||||
handler(
|
||||
new CustomEvent('executing', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
node: '2',
|
||||
display_node: '2',
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store._executingNodeProgress).not.toBeNull()
|
||||
})
|
||||
|
||||
it('execution_cached from a non-active workflow does not mark active job nodes', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
const cachedHandler = apiEventHandlers.get('execution_cached')
|
||||
if (!cachedHandler) throw new Error('execution_cached handler not bound')
|
||||
cachedHandler(
|
||||
new CustomEvent('execution_cached', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other',
|
||||
nodes: ['n1', 'n2']
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJob?.nodes).toEqual({})
|
||||
})
|
||||
|
||||
it('executed from a non-active workflow does not mark active job nodes', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
const executedHandler = apiEventHandlers.get('executed')
|
||||
if (!executedHandler) throw new Error('executed handler not bound')
|
||||
executedHandler(
|
||||
new CustomEvent('executed', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
node: 'n1',
|
||||
display_node: 'n1',
|
||||
workflow_id: 'wf-other',
|
||||
output: {}
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJob?.nodes['n1']).toBeUndefined()
|
||||
})
|
||||
|
||||
it('execution_error from a non-active workflow does not clear active job state but still clears the errored job initializing flag', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
store.initializingJobIds = new Set(['job-other'])
|
||||
|
||||
const errorHandler = apiEventHandlers.get('execution_error')
|
||||
if (!errorHandler) throw new Error('execution_error handler not bound')
|
||||
errorHandler(
|
||||
new CustomEvent('execution_error', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other',
|
||||
node_id: 'n1',
|
||||
node_type: 'X',
|
||||
executed: [],
|
||||
exception_message: 'oops',
|
||||
exception_type: 'RuntimeError',
|
||||
traceback: [],
|
||||
current_inputs: {},
|
||||
current_outputs: {}
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
expect(store.initializingJobIds.has('job-other')).toBe(false)
|
||||
expect(useExecutionErrorStore().lastExecutionError).toBeNull()
|
||||
})
|
||||
|
||||
it('revokes preview when node transitions pending -> running', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const pendingNodes: Record<string, NodeProgressState> = {
|
||||
n1: {
|
||||
value: 0,
|
||||
max: 10,
|
||||
state: 'pending',
|
||||
node_id: 'n1',
|
||||
prompt_id: 'job-1',
|
||||
display_node_id: 'n1'
|
||||
}
|
||||
}
|
||||
fireProgressState('job-1', pendingNodes, 'wf-active')
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
const runningNodes: Record<string, NodeProgressState> = {
|
||||
n1: { ...pendingNodes.n1, state: 'running', value: 1 }
|
||||
}
|
||||
fireProgressState('job-1', runningNodes, 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('n1')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - progress_text startup guard', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
@@ -447,6 +923,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
nodeId: string
|
||||
text: string
|
||||
prompt_id?: string
|
||||
workflow_id?: string
|
||||
}) {
|
||||
const handler = apiEventHandlers.get('progress_text')
|
||||
if (!handler) throw new Error('progress_text handler not bound')
|
||||
@@ -456,6 +933,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -488,6 +966,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
|
||||
it('skips progress_text whose workflow_id mismatches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-other',
|
||||
workflow_id: 'wf-other'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('forwards progress_text whose workflow_id matches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionErrorStore - Node Error Lookups', () => {
|
||||
@@ -767,6 +1289,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -878,7 +1401,35 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
})
|
||||
|
||||
describe('executing', () => {
|
||||
it('clears _executingNodeProgress and activeJobId when detail is null', () => {
|
||||
it('clears _executingNodeProgress when workflow_id matches the active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
fire('execution_start', {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
store._executingNodeProgress = {
|
||||
value: 1,
|
||||
max: 2,
|
||||
prompt_id: 'job-1',
|
||||
node: '1'
|
||||
}
|
||||
|
||||
fire('executing', {
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
display_node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('clears _executingNodeProgress when ownership is unresolvable (legacy fallback)', () => {
|
||||
mockActiveWorkflow.current = null
|
||||
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
|
||||
store._executingNodeProgress = {
|
||||
value: 1,
|
||||
@@ -887,10 +1438,13 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
node: '1'
|
||||
}
|
||||
|
||||
fire('executing', null)
|
||||
fire('executing', {
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
display_node: '1'
|
||||
})
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
expect(store.activeJobId).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import type {
|
||||
import { useCanvasStore } from '@/renderer/core/canvas/canvasStore'
|
||||
import type {
|
||||
ExecutedWsMessage,
|
||||
ExecutingWsMessage,
|
||||
ExecutionCachedWsMessage,
|
||||
ExecutionErrorWsMessage,
|
||||
ExecutionInterruptedWsMessage,
|
||||
@@ -247,22 +248,32 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
|
||||
executionIdToLocatorCache.clear()
|
||||
executionErrorStore.clearAllErrors()
|
||||
activeJobId.value = e.detail.prompt_id
|
||||
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
|
||||
clearInitializationByJobId(activeJobId.value)
|
||||
const jobId = e.detail.prompt_id
|
||||
queuedJobs.value[jobId] ??= { nodes: {} }
|
||||
clearInitializationByJobId(jobId)
|
||||
|
||||
// Ensure path mapping exists — execution_start can arrive via WebSocket
|
||||
// before the HTTP response from queuePrompt triggers storeJob.
|
||||
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
|
||||
const path = queuedJobs.value[activeJobId.value]?.workflow?.path
|
||||
if (path) ensureSessionWorkflowPath(activeJobId.value, path)
|
||||
if (!jobIdToSessionWorkflowPath.value.has(jobId)) {
|
||||
const path = queuedJobs.value[jobId]?.workflow?.path
|
||||
if (path) ensureSessionWorkflowPath(jobId, path)
|
||||
}
|
||||
|
||||
// Only adopt as the global active job and clear shared UI state when the
|
||||
// starting job belongs to the active workflow. Otherwise a job started
|
||||
// from another tab would steal activeJobId and clobber the active tab's
|
||||
// execution UI.
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
|
||||
executionIdToLocatorCache.clear()
|
||||
executionErrorStore.clearAllErrors()
|
||||
activeJobId.value = jobId
|
||||
}
|
||||
|
||||
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
for (const n of e.detail.nodes) {
|
||||
activeJob.value.nodes[n] = true
|
||||
}
|
||||
@@ -272,12 +283,15 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
e: CustomEvent<ExecutionInterruptedWsMessage>
|
||||
) {
|
||||
const jobId = e.detail.prompt_id
|
||||
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
|
||||
clearInitializationByJobId(jobId)
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
activeJob.value.nodes[e.detail.node] = true
|
||||
}
|
||||
|
||||
@@ -288,22 +302,14 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
})
|
||||
}
|
||||
const jobId = e.detail.prompt_id
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecuting(e: CustomEvent<NodeId | null>): void {
|
||||
// Clear the current node progress when a new node starts executing
|
||||
function handleExecuting(e: CustomEvent<ExecutingWsMessage>): void {
|
||||
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
|
||||
_executingNodeProgress.value = null
|
||||
|
||||
if (!activeJob.value) return
|
||||
|
||||
// Update the executing nodes list
|
||||
if (typeof e.detail !== 'string') {
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
}
|
||||
activeJobId.value = null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -335,43 +341,92 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
|
||||
jobId,
|
||||
messageWorkflowId
|
||||
)
|
||||
|
||||
// Revoke previews for nodes that are starting to execute
|
||||
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
// This node just started executing, revoke its previews
|
||||
// Note that we're doing the *actual* node id instead of the display node id
|
||||
// here intentionally. That way, we don't clear the preview every time a new node
|
||||
// within an expanded graph starts executing.
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
if (isActiveWorkflowMessage) {
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (
|
||||
nodeState.state === 'running' &&
|
||||
previousForJob[nodeId]?.state !== 'running'
|
||||
) {
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
nodeProgressStatesByJob.value = {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
if (isActiveWorkflowMessage) {
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a WebSocket execution message belongs to the
|
||||
* currently active workflow tab. Used to gate writes to the global
|
||||
* "current execution" mirror so a job initiated from another open
|
||||
* workflow cannot leak its progress into the active one.
|
||||
*
|
||||
* Resolution order:
|
||||
* 1. `workflow_id` carried on the WS message (when backend supports it).
|
||||
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
|
||||
* from this tab.
|
||||
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
|
||||
*
|
||||
* When the workflow cannot be resolved at all (e.g. job queued in a
|
||||
* different browser session), the message is treated as belonging to
|
||||
* the active workflow to preserve current behaviour for the existing
|
||||
* single-tab common case.
|
||||
*/
|
||||
function messageMatchesActiveWorkflow(
|
||||
jobId: JobId,
|
||||
messageWorkflowId: string | undefined
|
||||
): boolean {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return true
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
|
||||
if (messageWorkflowId && activeId) {
|
||||
return messageWorkflowId === activeId
|
||||
}
|
||||
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
if (mappedId && activeId) return mappedId === activeId
|
||||
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
if (mappedPath && activeWorkflow.path) {
|
||||
return mappedPath === activeWorkflow.path
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
|
||||
_executingNodeProgress.value = e.detail
|
||||
}
|
||||
|
||||
@@ -393,17 +448,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
error: e.detail.exception_message
|
||||
})
|
||||
|
||||
// Cloud wraps validation errors (400) in exception_message as embedded JSON.
|
||||
if (handleCloudValidationError(e.detail)) return
|
||||
}
|
||||
|
||||
// Service-level errors (e.g. "Job has stagnated") have no associated node.
|
||||
// Route them as job errors
|
||||
if (handleServiceLevelError(e.detail)) return
|
||||
|
||||
// OSS path / Cloud fallback (real runtime errors)
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
clearInitializationByJobId(e.detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
resetExecutionState(e.detail.prompt_id)
|
||||
}
|
||||
|
||||
@@ -413,6 +467,9 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
|
||||
return true
|
||||
|
||||
resetExecutionState(detail.prompt_id)
|
||||
executionErrorStore.lastPromptError = {
|
||||
type: detail.exception_type ?? 'error',
|
||||
@@ -431,6 +488,9 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
if (!result) return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
|
||||
return true
|
||||
|
||||
resetExecutionState(detail.prompt_id)
|
||||
|
||||
if (result.kind === 'nodeErrors') {
|
||||
@@ -519,14 +579,27 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
|
||||
const { nodeId, text, prompt_id } = e.detail
|
||||
const { nodeId, text, prompt_id, workflow_id } = e.detail
|
||||
if (!text || !nodeId) return
|
||||
|
||||
// Filter: only accept progress for the active prompt
|
||||
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
|
||||
return
|
||||
// Prefer the workflow-ownership gate when ownership can be resolved
|
||||
// (workflow_id on the message, or a registered mapping). Only fall back
|
||||
// to the legacy active-prompt guard when ownership is unresolvable;
|
||||
// otherwise activeJobId pointing at a different workflow's job would
|
||||
// incorrectly drop messages for the visible workflow.
|
||||
if (prompt_id) {
|
||||
const canResolveWorkflow =
|
||||
Boolean(workflow_id) ||
|
||||
jobIdToWorkflowId.value.has(prompt_id) ||
|
||||
jobIdToSessionWorkflowPath.value.has(prompt_id)
|
||||
|
||||
if (canResolveWorkflow) {
|
||||
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
|
||||
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle execution node IDs for subgraphs
|
||||
const currentId = getNodeIdIfExecuting(nodeId)
|
||||
if (!currentId) return
|
||||
const node = canvasStore.canvas?.graph?.getNodeById(currentId)
|
||||
|
||||
Reference in New Issue
Block a user