mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-03-04 12:40:00 +00:00
feat: Migrate to Jobs API (PR 2 of 3) (#7170)
## Summary
Migrate frontend from legacy `/history`, `/history_v2`, and `/queue`
endpoints to the unified `/jobs` API with memory optimization and lazy
loading.
**This is PR 2 of 3** - Core migration, depends on PR 1.
## Changes
- **What**:
- Replace `api.getQueue()` and `api.getHistory()` implementations to use
Jobs API fetchers
- Implement lazy loading for workflow and full outputs via `/jobs/{id}`
endpoint in `useJobMenu`
- Add `TaskItemImpl` class wrapping `JobListItem` for queue store
compatibility
- Rename `reconcileHistory` to `reconcileJobs` for clarity
- Use `execution_start_time` and `execution_end_time` from API for
execution timing
- Use `workflowId` from job instead of nested `workflow.id`
- Update `useJobMenu` to fetch job details on demand (`openJobWorkflow`,
`exportJobWorkflow`)
- **Breaking**: Requires backend Jobs API support (ComfyUI with `/jobs`
endpoint)
## Review Focus
1. **Lazy loading in `useJobMenu`**: `openJobWorkflow` and
`exportJobWorkflow` now fetch from API on demand instead of accessing
`taskRef.workflow`
2. **`TaskItemImpl` wrapper**: Adapts `JobListItem` to existing queue
store interface
3. **Error reporting**: Uses `execution_error` field from API for rich
error dialogs
4. **Memory optimization**: Only fetches full job details when needed
## Files Changed
- `src/scripts/api.ts` - Updated `getQueue()` and `getHistory()` to use
Jobs API
- `src/stores/queueStore.ts` - Added `TaskItemImpl`, updated to use
`JobListItem`
- `src/composables/useJobMenu.ts` - Lazy loading for workflow access
- `src/composables/useJobList.ts` - Updated types
- Various test files updated
## Dependencies
- **Depends on**: PR 1 (Jobs API Infrastructure) - #7169
## Next PR
- **PR 3**: Remove legacy history code and unused types
┆Issue is synchronized with this [Notion
page](https://www.notion.so/PR-7170-feat-Migrate-to-Jobs-API-PR-2-of-3-2bf6d73d3650811b94f4fbe69944bba6)
by [Unito](https://www.unito.io)
---------
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Christian Byrne <cbyrne@comfy.org>
This commit is contained in:
@@ -2,36 +2,27 @@ import _ from 'es-toolkit/compat'
|
||||
import { defineStore } from 'pinia'
|
||||
import { computed, ref, shallowRef, toRaw, toValue } from 'vue'
|
||||
|
||||
import { isCloud } from '@/platform/distribution/types'
|
||||
import { reconcileHistory } from '@/platform/remote/comfyui/history/reconciliation'
|
||||
import { useSettingStore } from '@/platform/settings/settingStore'
|
||||
import { getWorkflowFromHistory } from '@/platform/workflow/cloud'
|
||||
import { extractWorkflow } from '@/platform/remote/comfyui/jobs/fetchJobs'
|
||||
import type {
|
||||
ComfyWorkflowJSON,
|
||||
NodeId
|
||||
} from '@/platform/workflow/validation/schemas/workflowSchema'
|
||||
import { zExecutionErrorWsMessage } from '@/schemas/apiSchema'
|
||||
APITaskType,
|
||||
JobListItem,
|
||||
TaskType
|
||||
} from '@/platform/remote/comfyui/jobs/jobTypes'
|
||||
import type { NodeId } from '@/platform/workflow/validation/schemas/workflowSchema'
|
||||
import type {
|
||||
ExecutionErrorWsMessage,
|
||||
HistoryTaskItem,
|
||||
ResultItem,
|
||||
StatusWsMessageStatus,
|
||||
TaskItem,
|
||||
TaskOutput,
|
||||
TaskPrompt,
|
||||
TaskStatus,
|
||||
TaskType
|
||||
TaskOutput
|
||||
} from '@/schemas/apiSchema'
|
||||
import { api } from '@/scripts/api'
|
||||
import type { ComfyApp } from '@/scripts/app'
|
||||
import { useExtensionService } from '@/services/extensionService'
|
||||
import { getJobDetail } from '@/services/jobOutputCache'
|
||||
import { useNodeOutputStore } from '@/stores/imagePreviewStore'
|
||||
import { useExecutionStore } from '@/stores/executionStore'
|
||||
import { useSettingStore } from '@/platform/settings/settingStore'
|
||||
import { getMediaTypeFromFilename } from '@/utils/formatUtil'
|
||||
|
||||
// Task type used in the API.
|
||||
type APITaskType = 'queue' | 'history'
|
||||
|
||||
enum TaskItemDisplayStatus {
|
||||
Running = 'Running',
|
||||
Pending = 'Pending',
|
||||
@@ -214,32 +205,44 @@ export class ResultItemImpl {
|
||||
get supportsPreview(): boolean {
|
||||
return this.isImage || this.isVideo || this.isAudio || this.is3D
|
||||
}
|
||||
|
||||
static filterPreviewable(
|
||||
outputs: readonly ResultItemImpl[]
|
||||
): ResultItemImpl[] {
|
||||
return outputs.filter((o) => o.supportsPreview)
|
||||
}
|
||||
|
||||
static findByUrl(items: readonly ResultItemImpl[], url?: string): number {
|
||||
if (!url) return 0
|
||||
const idx = items.findIndex((o) => o.url === url)
|
||||
return idx >= 0 ? idx : 0
|
||||
}
|
||||
}
|
||||
|
||||
export class TaskItemImpl {
|
||||
readonly taskType: TaskType
|
||||
readonly prompt: TaskPrompt
|
||||
readonly status?: TaskStatus
|
||||
readonly job: JobListItem
|
||||
readonly outputs: TaskOutput
|
||||
readonly flatOutputs: ReadonlyArray<ResultItemImpl>
|
||||
|
||||
constructor(
|
||||
taskType: TaskType,
|
||||
prompt: TaskPrompt,
|
||||
status?: TaskStatus,
|
||||
job: JobListItem,
|
||||
outputs?: TaskOutput,
|
||||
flatOutputs?: ReadonlyArray<ResultItemImpl>
|
||||
) {
|
||||
this.taskType = taskType
|
||||
this.prompt = prompt
|
||||
this.status = status
|
||||
this.job = job
|
||||
// If no outputs provided but job has preview_output, create synthetic outputs
|
||||
// using the real nodeId and mediaType from the backend response
|
||||
const effectiveOutputs =
|
||||
outputs ??
|
||||
(job.preview_output
|
||||
? {
|
||||
[job.preview_output.nodeId]: {
|
||||
[job.preview_output.mediaType]: [job.preview_output]
|
||||
}
|
||||
}
|
||||
: {})
|
||||
// Remove animated outputs from the outputs object
|
||||
// outputs.animated is an array of boolean values that indicates if the images
|
||||
// array in the result are animated or not.
|
||||
// The queueStore does not use this information.
|
||||
// It is part of the legacy API response. We should redesign the backend API.
|
||||
// https://github.com/Comfy-Org/ComfyUI_frontend/issues/2739
|
||||
this.outputs = _.mapValues(outputs ?? {}, (nodeOutputs) =>
|
||||
this.outputs = _.mapValues(effectiveOutputs, (nodeOutputs) =>
|
||||
_.omit(nodeOutputs, 'animated')
|
||||
)
|
||||
this.flatOutputs = flatOutputs ?? this.calculateFlatOutputs()
|
||||
@@ -263,15 +266,31 @@ export class TaskItemImpl {
|
||||
)
|
||||
}
|
||||
|
||||
/** All outputs that support preview (images, videos, audio, 3D) */
|
||||
get previewableOutputs(): readonly ResultItemImpl[] {
|
||||
return ResultItemImpl.filterPreviewable(this.flatOutputs)
|
||||
}
|
||||
|
||||
get previewOutput(): ResultItemImpl | undefined {
|
||||
const previewable = this.previewableOutputs
|
||||
// Prefer saved media files over the temp previews
|
||||
return (
|
||||
this.flatOutputs.find(
|
||||
// Prefer saved media files over the temp previews
|
||||
(output) => output.type === 'output' && output.supportsPreview
|
||||
) ?? this.flatOutputs.find((output) => output.supportsPreview)
|
||||
previewable.find((output) => output.type === 'output') ?? previewable[0]
|
||||
)
|
||||
}
|
||||
|
||||
// Derive taskType from job status
|
||||
get taskType(): TaskType {
|
||||
switch (this.job.status) {
|
||||
case 'in_progress':
|
||||
return 'Running'
|
||||
case 'pending':
|
||||
return 'Pending'
|
||||
default:
|
||||
return 'History'
|
||||
}
|
||||
}
|
||||
|
||||
get apiTaskType(): APITaskType {
|
||||
switch (this.taskType) {
|
||||
case 'Running':
|
||||
@@ -287,85 +306,42 @@ export class TaskItemImpl {
|
||||
}
|
||||
|
||||
get queueIndex() {
|
||||
return this.prompt[0]
|
||||
return this.job.priority
|
||||
}
|
||||
|
||||
get promptId() {
|
||||
return this.prompt[1]
|
||||
return this.job.id
|
||||
}
|
||||
|
||||
get promptInputs() {
|
||||
return this.prompt[2]
|
||||
get outputsCount(): number | undefined {
|
||||
return this.job.outputs_count ?? undefined
|
||||
}
|
||||
|
||||
get extraData() {
|
||||
return this.prompt[3]
|
||||
get status() {
|
||||
return this.job.status
|
||||
}
|
||||
|
||||
get outputsToExecute() {
|
||||
return this.prompt[4]
|
||||
get errorMessage(): string | undefined {
|
||||
return this.job.execution_error?.exception_message ?? undefined
|
||||
}
|
||||
|
||||
get extraPngInfo() {
|
||||
return this.extraData.extra_pnginfo
|
||||
get executionError() {
|
||||
return this.job.execution_error ?? undefined
|
||||
}
|
||||
|
||||
get clientId() {
|
||||
return this.extraData.client_id
|
||||
get workflowId(): string | undefined {
|
||||
return this.job.workflow_id ?? undefined
|
||||
}
|
||||
|
||||
get workflow(): ComfyWorkflowJSON | undefined {
|
||||
return this.extraPngInfo?.workflow
|
||||
get createTime(): number {
|
||||
return this.job.create_time
|
||||
}
|
||||
|
||||
get messages() {
|
||||
return this.status?.messages || []
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the full execution error from status messages.
|
||||
* Returns the ExecutionErrorWsMessage for detailed error dialogs.
|
||||
* Uses Zod validation to ensure type safety.
|
||||
*/
|
||||
get executionError(): ExecutionErrorWsMessage | undefined {
|
||||
const messages = this.status?.messages
|
||||
if (!Array.isArray(messages) || !messages.length) return undefined
|
||||
for (const entry of messages) {
|
||||
if (entry[0] === 'execution_error') {
|
||||
const parsed = zExecutionErrorWsMessage.safeParse(entry[1])
|
||||
if (!parsed.success) {
|
||||
console.warn(
|
||||
'[TaskItemImpl.executionError] Validation failed:',
|
||||
parsed.error
|
||||
)
|
||||
return undefined
|
||||
}
|
||||
return parsed.data
|
||||
}
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Server-provided creation time in milliseconds, when available.
|
||||
*
|
||||
* Sources:
|
||||
* - Queue: 5th tuple element may be a metadata object with { create_time }.
|
||||
* - History (Cloud V2): Adapter injects create_time into prompt[3].extra_data.
|
||||
*/
|
||||
get createTime(): number | undefined {
|
||||
const extra = (this.extraData as any) || {}
|
||||
const fromExtra =
|
||||
typeof extra.create_time === 'number' ? extra.create_time : undefined
|
||||
if (typeof fromExtra === 'number') return fromExtra
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
get interrupted() {
|
||||
return _.some(
|
||||
this.messages,
|
||||
(message) => message[0] === 'execution_interrupted'
|
||||
get interrupted(): boolean {
|
||||
return (
|
||||
this.job.status === 'failed' &&
|
||||
this.job.execution_error?.exception_type ===
|
||||
'InterruptProcessingException'
|
||||
)
|
||||
}
|
||||
|
||||
@@ -378,42 +354,26 @@ export class TaskItemImpl {
|
||||
}
|
||||
|
||||
get displayStatus(): TaskItemDisplayStatus {
|
||||
switch (this.taskType) {
|
||||
case 'Running':
|
||||
switch (this.job.status) {
|
||||
case 'in_progress':
|
||||
return TaskItemDisplayStatus.Running
|
||||
case 'Pending':
|
||||
case 'pending':
|
||||
return TaskItemDisplayStatus.Pending
|
||||
case 'History':
|
||||
if (this.interrupted) return TaskItemDisplayStatus.Cancelled
|
||||
|
||||
switch (this.status!.status_str) {
|
||||
case 'success':
|
||||
return TaskItemDisplayStatus.Completed
|
||||
case 'error':
|
||||
return TaskItemDisplayStatus.Failed
|
||||
}
|
||||
case 'completed':
|
||||
return TaskItemDisplayStatus.Completed
|
||||
case 'failed':
|
||||
return TaskItemDisplayStatus.Failed
|
||||
case 'cancelled':
|
||||
return TaskItemDisplayStatus.Cancelled
|
||||
}
|
||||
}
|
||||
|
||||
get executionStartTimestamp() {
|
||||
const message = this.messages.find(
|
||||
(message) => message[0] === 'execution_start'
|
||||
)
|
||||
return message ? message[1].timestamp : undefined
|
||||
return this.job.execution_start_time ?? undefined
|
||||
}
|
||||
|
||||
get executionEndTimestamp() {
|
||||
const messages = this.messages.filter((message) =>
|
||||
[
|
||||
'execution_success',
|
||||
'execution_interrupted',
|
||||
'execution_error'
|
||||
].includes(message[0])
|
||||
)
|
||||
if (!messages.length) {
|
||||
return undefined
|
||||
}
|
||||
return _.max(messages.map((message) => message[1].timestamp))
|
||||
return this.job.execution_end_time ?? undefined
|
||||
}
|
||||
|
||||
get executionTime() {
|
||||
@@ -429,28 +389,48 @@ export class TaskItemImpl {
|
||||
: undefined
|
||||
}
|
||||
|
||||
public async loadWorkflow(app: ComfyApp) {
|
||||
let workflowData = this.workflow
|
||||
/**
|
||||
* Loads full outputs for tasks that only have preview data
|
||||
* Returns a new TaskItemImpl with full outputs and execution status
|
||||
*/
|
||||
public async loadFullOutputs(): Promise<TaskItemImpl> {
|
||||
// Only load for history tasks (caller checks outputsCount > 1)
|
||||
if (!this.isHistory) {
|
||||
return this
|
||||
}
|
||||
const jobDetail = await getJobDetail(this.promptId)
|
||||
|
||||
if (isCloud && !workflowData && this.isHistory) {
|
||||
workflowData = await getWorkflowFromHistory(
|
||||
(url) => app.api.fetchApi(url),
|
||||
this.promptId
|
||||
)
|
||||
if (!jobDetail?.outputs) {
|
||||
return this
|
||||
}
|
||||
|
||||
// Create new TaskItemImpl with full outputs
|
||||
return new TaskItemImpl(this.job, jobDetail.outputs)
|
||||
}
|
||||
|
||||
public async loadWorkflow(app: ComfyApp) {
|
||||
if (!this.isHistory) {
|
||||
return
|
||||
}
|
||||
|
||||
// Single fetch for both workflow and outputs (with caching)
|
||||
const jobDetail = await getJobDetail(this.promptId)
|
||||
|
||||
const workflowData = await extractWorkflow(jobDetail)
|
||||
if (!workflowData) {
|
||||
return
|
||||
}
|
||||
|
||||
await app.loadGraphData(toRaw(workflowData))
|
||||
|
||||
if (!this.outputs) {
|
||||
// Use full outputs from job detail, or fall back to existing outputs
|
||||
const outputsToLoad = jobDetail?.outputs ?? this.outputs
|
||||
if (!outputsToLoad) {
|
||||
return
|
||||
}
|
||||
|
||||
const nodeOutputsStore = useNodeOutputStore()
|
||||
const rawOutputs = toRaw(this.outputs)
|
||||
const rawOutputs = toRaw(outputsToLoad)
|
||||
for (const nodeExecutionId in rawOutputs) {
|
||||
nodeOutputsStore.setNodeOutputsByExecutionId(
|
||||
nodeExecutionId,
|
||||
@@ -471,15 +451,10 @@ export class TaskItemImpl {
|
||||
return this.flatOutputs.map(
|
||||
(output: ResultItemImpl, i: number) =>
|
||||
new TaskItemImpl(
|
||||
this.taskType,
|
||||
[
|
||||
this.queueIndex,
|
||||
`${this.promptId}-${i}`,
|
||||
this.promptInputs,
|
||||
this.extraData,
|
||||
this.outputsToExecute
|
||||
],
|
||||
this.status,
|
||||
{
|
||||
...this.job,
|
||||
id: `${this.promptId}-${i}`
|
||||
},
|
||||
{
|
||||
[output.nodeId]: {
|
||||
[output.mediaType]: [output]
|
||||
@@ -489,32 +464,8 @@ export class TaskItemImpl {
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
public toTaskItem(): TaskItem {
|
||||
const item: HistoryTaskItem = {
|
||||
taskType: 'History',
|
||||
prompt: this.prompt,
|
||||
status: this.status!,
|
||||
outputs: this.outputs
|
||||
}
|
||||
return item
|
||||
}
|
||||
}
|
||||
|
||||
const sortNewestFirst = (a: TaskItemImpl, b: TaskItemImpl) =>
|
||||
b.queueIndex - a.queueIndex
|
||||
|
||||
const toTaskItemImpls = (tasks: TaskItem[]): TaskItemImpl[] =>
|
||||
tasks.map(
|
||||
(task) =>
|
||||
new TaskItemImpl(
|
||||
task.taskType,
|
||||
task.prompt,
|
||||
'status' in task ? task.status : undefined,
|
||||
'outputs' in task ? task.outputs : undefined
|
||||
)
|
||||
)
|
||||
|
||||
export const useQueueStore = defineStore('queue', () => {
|
||||
// Use shallowRef because TaskItemImpl instances are immutable and arrays are
|
||||
// replaced entirely (not mutated), so deep reactivity would waste performance
|
||||
@@ -551,8 +502,9 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
api.getHistory(maxHistoryItems.value)
|
||||
])
|
||||
|
||||
runningTasks.value = toTaskItemImpls(queue.Running).sort(sortNewestFirst)
|
||||
pendingTasks.value = toTaskItemImpls(queue.Pending).sort(sortNewestFirst)
|
||||
// API returns pre-sorted data (sort_by=create_time&order=desc)
|
||||
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
|
||||
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
|
||||
|
||||
const currentHistory = toValue(historyTasks)
|
||||
|
||||
@@ -560,7 +512,7 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
const executionStore = useExecutionStore()
|
||||
appearedTasks.forEach((task) => {
|
||||
const promptIdString = String(task.promptId)
|
||||
const workflowId = task.workflow?.id
|
||||
const workflowId = task.workflowId
|
||||
if (workflowId && promptIdString) {
|
||||
executionStore.registerPromptWorkflowIdMapping(
|
||||
promptIdString,
|
||||
@@ -569,22 +521,26 @@ export const useQueueStore = defineStore('queue', () => {
|
||||
}
|
||||
})
|
||||
|
||||
const items = reconcileHistory(
|
||||
history.History,
|
||||
currentHistory.map((impl) => impl.toTaskItem()),
|
||||
toValue(maxHistoryItems),
|
||||
toValue(lastHistoryQueueIndex)
|
||||
)
|
||||
// Sort by create_time descending and limit to maxItems
|
||||
const sortedHistory = [...history]
|
||||
.sort((a, b) => b.create_time - a.create_time)
|
||||
.slice(0, toValue(maxHistoryItems))
|
||||
|
||||
// Reuse existing TaskItemImpl instances or create new
|
||||
// Must recreate if outputs_count changed (e.g., API started returning it)
|
||||
const existingByPromptId = new Map(
|
||||
currentHistory.map((impl) => [impl.promptId, impl])
|
||||
)
|
||||
|
||||
historyTasks.value = items.map(
|
||||
(item) =>
|
||||
existingByPromptId.get(item.prompt[1]) ?? toTaskItemImpls([item])[0]
|
||||
)
|
||||
historyTasks.value = sortedHistory.map((job) => {
|
||||
const existing = existingByPromptId.get(job.id)
|
||||
if (!existing) return new TaskItemImpl(job)
|
||||
// Recreate if outputs_count changed to ensure lazy loading works
|
||||
if (existing.outputsCount !== (job.outputs_count ?? undefined)) {
|
||||
return new TaskItemImpl(job)
|
||||
}
|
||||
return existing
|
||||
})
|
||||
} finally {
|
||||
isLoading.value = false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user