[feat] Move partial execution to the backend and make work with subgraphs (#4624)

This commit is contained in:
Christian Byrne
2025-07-31 13:28:52 -07:00
committed by GitHub
parent aabea4b78d
commit b125e0aa3a
8 changed files with 663 additions and 59 deletions

View File

@@ -29,7 +29,11 @@ import { useBottomPanelStore } from '@/stores/workspace/bottomPanelStore'
import { useColorPaletteStore } from '@/stores/workspace/colorPaletteStore'
import { useSearchBoxStore } from '@/stores/workspace/searchBoxStore'
import { useWorkspaceStore } from '@/stores/workspaceStore'
import { getAllNonIoNodesInSubgraph } from '@/utils/graphTraversalUtil'
import {
getAllNonIoNodesInSubgraph,
getExecutionIdsForSelectedNodes
} from '@/utils/graphTraversalUtil'
import { filterOutputNodes } from '@/utils/nodeFilterUtil'
const moveSelectedNodesVersionAdded = '1.22.2'
@@ -363,10 +367,10 @@ export function useCoreCommands(): ComfyCommand[] {
versionAdded: '1.19.6',
function: async () => {
const batchCount = useQueueSettingsStore().batchCount
const queueNodeIds = getSelectedNodes()
.filter((node) => node.constructor.nodeData?.output_node)
.map((node) => node.id)
if (queueNodeIds.length === 0) {
const selectedNodes = getSelectedNodes()
const selectedOutputNodes = filterOutputNodes(selectedNodes)
if (selectedOutputNodes.length === 0) {
toastStore.add({
severity: 'error',
summary: t('toastMessages.nothingToQueue'),
@@ -375,7 +379,11 @@ export function useCoreCommands(): ComfyCommand[] {
})
return
}
await app.queuePrompt(0, batchCount, queueNodeIds)
// Get execution IDs for all selected output nodes and their descendants
const executionIds =
getExecutionIdsForSelectedNodes(selectedOutputNodes)
await app.queuePrompt(0, batchCount, executionIds)
}
},
{

View File

@@ -35,11 +35,13 @@ import type {
NodeId
} from '@/schemas/comfyWorkflowSchema'
import type { ComfyNodeDef } from '@/schemas/nodeDefSchema'
import type { NodeExecutionId } from '@/types/nodeIdentification'
import { WorkflowTemplates } from '@/types/workflowTemplateTypes'
interface QueuePromptRequestBody {
client_id: string
prompt: ComfyApiWorkflow
partial_execution_targets?: NodeExecutionId[]
extra_data: {
extra_pnginfo: {
workflow: ComfyWorkflowJSON
@@ -80,6 +82,18 @@ interface QueuePromptRequestBody {
number?: number
}
/**
* Options for queuePrompt method
*/
interface QueuePromptOptions {
/**
* Optional list of node execution IDs to execute (partial execution).
* Each ID represents a node's position in nested subgraphs.
* Format: Colon-separated path of node IDs (e.g., "123:456:789")
*/
partialExecutionTargets?: NodeExecutionId[]
}
/** Dictionary of Frontend-generated API calls */
interface FrontendApiCalls {
graphChanged: ComfyWorkflowJSON
@@ -610,18 +624,23 @@ export class ComfyApi extends EventTarget {
/**
* Queues a prompt to be executed
* @param {number} number The index at which to queue the prompt, passing -1 will insert the prompt at the front of the queue
* @param {object} prompt The prompt data to queue
* @param {object} data The prompt data to queue
* @param {QueuePromptOptions} options Optional execution options
* @throws {PromptExecutionError} If the prompt fails to execute
*/
async queuePrompt(
number: number,
data: { output: ComfyApiWorkflow; workflow: ComfyWorkflowJSON }
data: { output: ComfyApiWorkflow; workflow: ComfyWorkflowJSON },
options?: QueuePromptOptions
): Promise<PromptResponse> {
const { output: prompt, workflow } = data
const body: QueuePromptRequestBody = {
client_id: this.clientId ?? '', // TODO: Unify clientId access
prompt,
...(options?.partialExecutionTargets && {
partial_execution_targets: options.partialExecutionTargets
}),
extra_data: {
auth_token_comfy_org: this.authToken,
api_key_comfy_org: this.apiKey,

View File

@@ -59,6 +59,7 @@ import { useColorPaletteStore } from '@/stores/workspace/colorPaletteStore'
import { useWorkspaceStore } from '@/stores/workspaceStore'
import type { ComfyExtension, MissingNodeType } from '@/types/comfy'
import { ExtensionManager } from '@/types/extensionTypes'
import type { NodeExecutionId } from '@/types/nodeIdentification'
import { ColorAdjustOptions, adjustColor } from '@/utils/colorUtil'
import { graphToPrompt } from '@/utils/executionUtil'
import {
@@ -127,7 +128,7 @@ export class ComfyApp {
#queueItems: {
number: number
batchCount: number
queueNodeIds?: NodeId[]
queueNodeIds?: NodeExecutionId[]
}[] = []
/**
* If the queue is currently being processed
@@ -1239,20 +1240,16 @@ export class ComfyApp {
})
}
async graphToPrompt(
graph = this.graph,
options: { queueNodeIds?: NodeId[] } = {}
) {
async graphToPrompt(graph = this.graph) {
return graphToPrompt(graph, {
sortNodes: useSettingStore().get('Comfy.Workflow.SortNodeIdOnSave'),
queueNodeIds: options.queueNodeIds
sortNodes: useSettingStore().get('Comfy.Workflow.SortNodeIdOnSave')
})
}
async queuePrompt(
number: number,
batchCount: number = 1,
queueNodeIds?: NodeId[]
queueNodeIds?: NodeExecutionId[]
): Promise<boolean> {
this.#queueItems.push({ number, batchCount, queueNodeIds })
@@ -1281,11 +1278,13 @@ export class ComfyApp {
executeWidgetsCallback(subgraph.nodes, 'beforeQueued')
}
const p = await this.graphToPrompt(this.graph, { queueNodeIds })
const p = await this.graphToPrompt(this.graph)
try {
api.authToken = comfyOrgAuthToken
api.apiKey = comfyOrgApiKey ?? undefined
const res = await api.queuePrompt(number, p)
const res = await api.queuePrompt(number, p, {
partialExecutionTargets: queueNodeIds
})
delete api.authToken
delete api.apiKey
executionStore.lastNodeErrors = res.node_errors ?? null

View File

@@ -1,8 +1,7 @@
import type {
ExecutableLGraphNode,
ExecutionId,
LGraph,
NodeId
LGraph
} from '@comfyorg/litegraph'
import {
ExecutableNodeDTO,
@@ -18,31 +17,6 @@ import type {
import { ExecutableGroupNodeDTO, isGroupNode } from './executableGroupNodeDto'
import { compressWidgetInputSlots } from './litegraphUtil'
/**
* Recursively target node's parent nodes to the new output.
* @param nodeId The node id to add.
* @param oldOutput The old output.
* @param newOutput The new output.
* @returns The new output.
*/
function recursiveAddNodes(
nodeId: NodeId,
oldOutput: ComfyApiWorkflow,
newOutput: ComfyApiWorkflow
) {
const currentId = String(nodeId)
const currentNode = oldOutput[currentId]!
if (newOutput[currentId] == null) {
newOutput[currentId] = currentNode
for (const inputValue of Object.values(currentNode.inputs || [])) {
if (Array.isArray(inputValue)) {
recursiveAddNodes(inputValue[0], oldOutput, newOutput)
}
}
}
return newOutput
}
/**
* Converts the current graph workflow for sending to the API.
* @note Node widgets are updated before serialization to prepare queueing.
@@ -50,14 +24,13 @@ function recursiveAddNodes(
* @param graph The graph to convert.
* @param options The options for the conversion.
* - `sortNodes`: Whether to sort the nodes by execution order.
* - `queueNodeIds`: The output nodes to execute. Execute all output nodes if not provided.
* @returns The workflow and node links
*/
export const graphToPrompt = async (
graph: LGraph,
options: { sortNodes?: boolean; queueNodeIds?: NodeId[] } = {}
options: { sortNodes?: boolean } = {}
): Promise<{ workflow: ComfyWorkflowJSON; output: ComfyApiWorkflow }> => {
const { sortNodes = false, queueNodeIds } = options
const { sortNodes = false } = options
for (const node of graph.computeExecutionOrder(false)) {
const innerNodes = node.getInnerNodes
@@ -104,7 +77,7 @@ export const graphToPrompt = async (
nodeDtoMap.set(dto.id, dto)
}
let output: ComfyApiWorkflow = {}
const output: ComfyApiWorkflow = {}
// Process nodes in order of execution
for (const node of nodeDtoMap.values()) {
// Don't serialize muted nodes
@@ -180,14 +153,5 @@ export const graphToPrompt = async (
}
}
// Partial execution
if (queueNodeIds?.length) {
const newOutput = {}
for (const queueNodeId of queueNodeIds) {
recursiveAddNodes(queueNodeId, output, newOutput)
}
output = newOutput
}
return { workflow: workflow as ComfyWorkflowJSON, output }
}

View File

@@ -1,6 +1,6 @@
import type { LGraph, LGraphNode, Subgraph } from '@comfyorg/litegraph'
import type { NodeLocatorId } from '@/types/nodeIdentification'
import type { NodeExecutionId, NodeLocatorId } from '@/types/nodeIdentification'
import { parseNodeLocatorId } from '@/types/nodeIdentification'
import { isSubgraphIoNode } from './typeGuardUtil'
@@ -351,3 +351,106 @@ export function mapSubgraphNodes<T>(
export function getAllNonIoNodesInSubgraph(subgraph: Subgraph): LGraphNode[] {
return subgraph.nodes.filter((node) => !isSubgraphIoNode(node))
}
/**
* Performs depth-first traversal of nodes and their subgraphs.
* Generic visitor pattern that can be used for various node processing tasks.
*
* @param nodes - Starting nodes for traversal
* @param visitor - Function called for each node with its context
* @param expandSubgraphs - Whether to traverse into subgraph nodes (default: true)
*/
export function traverseNodesDepthFirst<T>(
nodes: LGraphNode[],
visitor: (node: LGraphNode, context: T) => T,
initialContext: T,
expandSubgraphs: boolean = true
): void {
type StackItem = { node: LGraphNode; context: T }
const stack: StackItem[] = []
// Initialize stack with starting nodes
for (const node of nodes) {
stack.push({ node, context: initialContext })
}
// Process stack iteratively (DFS)
while (stack.length > 0) {
const { node, context } = stack.pop()!
// Visit node and get updated context for children
const childContext = visitor(node, context)
// If it's a subgraph and we should expand, add children to stack
if (expandSubgraphs && node.isSubgraphNode?.() && node.subgraph) {
// Process children in reverse order to maintain left-to-right DFS processing
// when popping from stack (LIFO). Iterate backwards to avoid array reversal.
const children = node.subgraph.nodes
for (let i = children.length - 1; i >= 0; i--) {
stack.push({ node: children[i], context: childContext })
}
}
}
}
/**
* Collects nodes with custom data during depth-first traversal.
* Generic collector that can gather any type of data per node.
*
* @param nodes - Starting nodes for traversal
* @param collector - Function that returns data to collect for each node
* @param contextBuilder - Function that builds context for child nodes
* @param expandSubgraphs - Whether to traverse into subgraph nodes
* @returns Array of collected data
*/
export function collectFromNodes<T, C>(
nodes: LGraphNode[],
collector: (node: LGraphNode, context: C) => T | null,
contextBuilder: (node: LGraphNode, parentContext: C) => C,
initialContext: C,
expandSubgraphs: boolean = true
): T[] {
const results: T[] = []
traverseNodesDepthFirst(
nodes,
(node, context) => {
const data = collector(node, context)
if (data !== null) {
results.push(data)
}
return contextBuilder(node, context)
},
initialContext,
expandSubgraphs
)
return results
}
/**
* Collects execution IDs for selected nodes and all their descendants.
* Uses the generic DFS traversal with optimized string building.
*
* @param selectedNodes - The selected nodes to process
* @returns Array of execution IDs for selected nodes and all nodes within selected subgraphs
*/
export function getExecutionIdsForSelectedNodes(
selectedNodes: LGraphNode[]
): NodeExecutionId[] {
return collectFromNodes(
selectedNodes,
// Collector: build execution ID for each node
(node, parentExecutionId: string): NodeExecutionId => {
const nodeId = String(node.id)
return parentExecutionId ? `${parentExecutionId}:${nodeId}` : nodeId
},
// Context builder: pass execution ID to children
(node, parentExecutionId: string) => {
const nodeId = String(node.id)
return parentExecutionId ? `${parentExecutionId}:${nodeId}` : nodeId
},
'', // Initial context: empty parent execution ID
true // Expand subgraphs
)
}

View File

@@ -0,0 +1,21 @@
import type { LGraphNode } from '@comfyorg/litegraph'
/**
* Checks if a node is an output node.
* Output nodes are nodes that have the output_node flag set in their nodeData.
*
* @param node - The node to check
* @returns True if the node is an output node, false otherwise
*/
export const isOutputNode = (node: LGraphNode) =>
node.constructor.nodeData?.output_node
/**
* Filters nodes to find only output nodes.
* Output nodes are nodes that have the output_node flag set in their nodeData.
*
* @param nodes - Array of nodes to filter
* @returns Array of output nodes only
*/
export const filterOutputNodes = (nodes: LGraphNode[]): LGraphNode[] =>
nodes.filter(isOutputNode)