Implement workflow progress panel (#6092)

Adds a workflow progress panel component underneath the
`actionbar-container`.

I suggest starting a review at the extraneous changes that were needed.
Including but not limited to:

- `get createTime()` in queueStore
- `promptIdToWorkflowId`, `initializingPromptIds`, and
`nodeProgressStatesByPrompt` in executionStore
- `create_time` handling in v2ToV1Adapter
- `pointer-events-auto` on ComfyActionbar.vue

The rest of the changes should be contained under
`QueueProgressOverlay.vue`, and has less of a blast radius in case
something goes wrong.

---------

Co-authored-by: pythongosssss <125205205+pythongosssss@users.noreply.github.com>
Co-authored-by: GitHub Action <action@github.com>
Co-authored-by: github-actions <github-actions@github.com>
Co-authored-by: Jin Yi <jin12cc@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Alexander Brown <drjkl@comfy.org>
Co-authored-by: Johnpaul Chiwetelu <49923152+Myestery@users.noreply.github.com>
Co-authored-by: Christian Byrne <cbyrne@comfy.org>
Co-authored-by: Comfy Org PR Bot <snomiao+comfy-pr@gmail.com>
Co-authored-by: christian-byrne <72887196+christian-byrne@users.noreply.github.com>
This commit is contained in:
Benjamin Lu
2025-11-18 22:43:49 -08:00
committed by GitHub
parent 92968f3f9b
commit e42715086e
76 changed files with 7117 additions and 39 deletions

View File

@@ -17,9 +17,12 @@ import type {
ExecutedWsMessage,
ExecutionCachedWsMessage,
ExecutionErrorWsMessage,
ExecutionInterruptedWsMessage,
ExecutionStartWsMessage,
ExecutionSuccessWsMessage,
NodeError,
NodeProgressState,
NotificationWsMessage,
ProgressStateWsMessage,
ProgressTextWsMessage,
ProgressWsMessage
@@ -110,6 +113,16 @@ export const useExecutionStore = defineStore('execution', () => {
const lastExecutionError = ref<ExecutionErrorWsMessage | null>(null)
// This is the progress of all nodes in the currently executing workflow
const nodeProgressStates = ref<Record<string, NodeProgressState>>({})
const nodeProgressStatesByPrompt = ref<
Record<string, Record<string, NodeProgressState>>
>({})
/**
* Map of prompt_id to workflow ID for quick lookup across the app.
*/
const promptIdToWorkflowId = ref<Map<string, string>>(new Map())
const initializingPromptIds = ref<Set<string>>(new Set())
const mergeExecutionProgressStates = (
currentState: NodeProgressState | undefined,
@@ -241,6 +254,7 @@ export const useExecutionStore = defineStore('execution', () => {
})
function bindExecutionEvents() {
api.addEventListener('notification', handleNotification)
api.addEventListener('execution_start', handleExecutionStart)
api.addEventListener('execution_cached', handleExecutionCached)
api.addEventListener('execution_interrupted', handleExecutionInterrupted)
@@ -255,6 +269,7 @@ export const useExecutionStore = defineStore('execution', () => {
}
function unbindExecutionEvents() {
api.removeEventListener('notification', handleNotification)
api.removeEventListener('execution_start', handleExecutionStart)
api.removeEventListener('execution_cached', handleExecutionCached)
api.removeEventListener('execution_interrupted', handleExecutionInterrupted)
@@ -272,6 +287,7 @@ export const useExecutionStore = defineStore('execution', () => {
lastExecutionError.value = null
activePromptId.value = e.detail.prompt_id
queuedPrompts.value[activePromptId.value] ??= { nodes: {} }
clearInitializationByPromptId(activePromptId.value)
}
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
@@ -281,8 +297,13 @@ export const useExecutionStore = defineStore('execution', () => {
}
}
function handleExecutionInterrupted() {
resetExecutionState()
function handleExecutionInterrupted(
e: CustomEvent<ExecutionInterruptedWsMessage>
) {
const pid = e.detail.prompt_id
if (activePromptId.value)
clearInitializationByPromptId(activePromptId.value)
resetExecutionState(pid)
}
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
@@ -290,13 +311,14 @@ export const useExecutionStore = defineStore('execution', () => {
activePrompt.value.nodes[e.detail.node] = true
}
function handleExecutionSuccess() {
function handleExecutionSuccess(e: CustomEvent<ExecutionSuccessWsMessage>) {
if (isCloud && activePromptId.value) {
useTelemetry()?.trackExecutionSuccess({
jobId: activePromptId.value
})
}
resetExecutionState()
const pid = e.detail.prompt_id
resetExecutionState(pid)
}
function handleExecuting(e: CustomEvent<NodeId | null>): void {
@@ -315,12 +337,13 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes } = e.detail
const { nodes, prompt_id: pid } = e.detail
// Revoke previews for nodes that are starting to execute
const previousForPrompt = nodeProgressStatesByPrompt.value[pid] || {}
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !nodeProgressStates.value[nodeId]) {
if (nodeState.state === 'running' && !previousForPrompt[nodeId]) {
// This node just started executing, revoke its previews
// Note that we're doing the *actual* node id instead of the display node id
// here intentionally. That way, we don't clear the preview every time a new node
@@ -331,6 +354,10 @@ export const useExecutionStore = defineStore('execution', () => {
}
// Update the progress states for all nodes
nodeProgressStatesByPrompt.value = {
...nodeProgressStatesByPrompt.value,
[pid]: nodes
}
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
@@ -368,14 +395,55 @@ export const useExecutionStore = defineStore('execution', () => {
error: e.detail.exception_message
})
}
resetExecutionState()
const pid = e.detail?.prompt_id
// Clear initialization for errored prompt if present
if (e.detail?.prompt_id) clearInitializationByPromptId(e.detail.prompt_id)
resetExecutionState(pid)
}
/**
* Notification handler used for frontend/cloud initialization tracking.
* Marks a prompt as initializing when cloud notifies it is waiting for a machine.
*/
function handleNotification(e: CustomEvent<NotificationWsMessage>) {
const payload = e.detail
const text = payload?.value || ''
const id = payload?.id ? payload.id : ''
if (!id) return
// Until cloud implements a proper message
if (text.includes('Waiting for a machine')) {
const next = new Set(initializingPromptIds.value)
next.add(id)
initializingPromptIds.value = next
}
}
function clearInitializationByPromptId(promptId: string | null) {
if (!promptId) return
if (!initializingPromptIds.value.has(promptId)) return
const next = new Set(initializingPromptIds.value)
next.delete(promptId)
initializingPromptIds.value = next
}
function isPromptInitializing(
promptId: string | number | undefined
): boolean {
if (!promptId) return false
return initializingPromptIds.value.has(String(promptId))
}
/**
* Reset execution-related state after a run completes or is stopped.
*/
function resetExecutionState() {
function resetExecutionState(pid?: string | null) {
nodeProgressStates.value = {}
const promptId = pid ?? activePromptId.value ?? null
if (promptId) {
const map = { ...nodeProgressStatesByPrompt.value }
delete map[promptId]
nodeProgressStatesByPrompt.value = map
}
if (activePromptId.value) {
delete queuedPrompts.value[activePromptId.value]
}
@@ -421,6 +489,21 @@ export const useExecutionStore = defineStore('execution', () => {
...queuedPrompt.nodes
}
queuedPrompt.workflow = workflow
const wid = workflow?.activeState?.id ?? workflow?.initialState?.id
if (wid) {
promptIdToWorkflowId.value.set(String(id), String(wid))
}
}
/**
* Register or update a mapping from prompt_id to workflow ID.
*/
function registerPromptWorkflowIdMapping(
promptId: string,
workflowId: string
) {
if (!promptId || !workflowId) return
promptIdToWorkflowId.value.set(String(promptId), String(workflowId))
}
/**
@@ -435,6 +518,22 @@ export const useExecutionStore = defineStore('execution', () => {
return executionId
}
const runningPromptIds = computed<string[]>(() => {
const result: string[] = []
for (const [pid, nodes] of Object.entries(
nodeProgressStatesByPrompt.value
)) {
if (Object.values(nodes).some((n) => n.state === 'running')) {
result.push(pid)
}
}
return result
})
const runningWorkflowCount = computed<number>(
() => runningPromptIds.value.length
)
return {
isIdle,
clientId,
@@ -453,14 +552,21 @@ export const useExecutionStore = defineStore('execution', () => {
executingNodeProgress,
nodeProgressStates,
nodeLocationProgressStates,
nodeProgressStatesByPrompt,
runningPromptIds,
runningWorkflowCount,
initializingPromptIds,
isPromptInitializing,
bindExecutionEvents,
unbindExecutionEvents,
storePrompt,
registerPromptWorkflowIdMapping,
uniqueExecutingNodeIdStrings,
// Raw executing progress data for backward compatibility in ComfyApp.
_executingNodeProgress,
// NodeLocatorId conversion helpers
executionIdToNodeLocatorId,
nodeLocatorIdToExecutionId
nodeLocatorIdToExecutionId,
promptIdToWorkflowId
}
})