refactor: encapsulate error extraction in TaskItemImpl getters (#7650)

## Summary
- Add `errorMessage` and `executionError` getters to `TaskItemImpl` that
extract error info from status messages
- Update `useJobErrorReporting` composable to use these getters instead
of standalone function
- Remove the standalone `extractExecutionError` function

This encapsulates error extraction within `TaskItemImpl`, preparing for
the Jobs API migration where the underlying data format will change but
the getter interface will remain stable.

## Test plan
- [x] All existing tests pass
- [x] New tests added for `TaskItemImpl.errorMessage` and
`TaskItemImpl.executionError` getters
- [x] TypeScript, lint, and knip checks pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

┆Issue is synchronized with this [Notion
page](https://www.notion.so/PR-7650-refactor-encapsulate-error-extraction-in-TaskItemImpl-getters-2ce6d73d365081caae33dcc7e1e07720)
by [Unito](https://www.unito.io)

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Christian Byrne <cbyrne@comfy.org>
This commit is contained in:
ric-yu
2026-01-15 20:11:22 -08:00
committed by GitHub
parent 089295606a
commit c0a649ef43
46 changed files with 1650 additions and 3067 deletions

View File

@@ -2,34 +2,27 @@ import _ from 'es-toolkit/compat'
import { defineStore } from 'pinia'
import { computed, ref, shallowRef, toRaw, toValue } from 'vue'
import { isCloud } from '@/platform/distribution/types'
import { reconcileHistory } from '@/platform/remote/comfyui/history/reconciliation'
import { useSettingStore } from '@/platform/settings/settingStore'
import { getWorkflowFromHistory } from '@/platform/workflow/cloud'
import { extractWorkflow } from '@/platform/remote/comfyui/jobs/fetchJobs'
import type {
ComfyWorkflowJSON,
NodeId
} from '@/platform/workflow/validation/schemas/workflowSchema'
APITaskType,
JobListItem,
TaskType
} from '@/platform/remote/comfyui/jobs/jobTypes'
import type { NodeId } from '@/platform/workflow/validation/schemas/workflowSchema'
import type {
HistoryTaskItem,
ResultItem,
StatusWsMessageStatus,
TaskItem,
TaskOutput,
TaskPrompt,
TaskStatus,
TaskType
TaskOutput
} from '@/schemas/apiSchema'
import { api } from '@/scripts/api'
import type { ComfyApp } from '@/scripts/app'
import { useExtensionService } from '@/services/extensionService'
import { getJobDetail } from '@/services/jobOutputCache'
import { useNodeOutputStore } from '@/stores/imagePreviewStore'
import { useExecutionStore } from '@/stores/executionStore'
import { useSettingStore } from '@/platform/settings/settingStore'
import { getMediaTypeFromFilename } from '@/utils/formatUtil'
// Task type used in the API.
type APITaskType = 'queue' | 'history'
enum TaskItemDisplayStatus {
Running = 'Running',
Pending = 'Pending',
@@ -212,32 +205,44 @@ export class ResultItemImpl {
get supportsPreview(): boolean {
return this.isImage || this.isVideo || this.isAudio || this.is3D
}
static filterPreviewable(
outputs: readonly ResultItemImpl[]
): ResultItemImpl[] {
return outputs.filter((o) => o.supportsPreview)
}
static findByUrl(items: readonly ResultItemImpl[], url?: string): number {
if (!url) return 0
const idx = items.findIndex((o) => o.url === url)
return idx >= 0 ? idx : 0
}
}
export class TaskItemImpl {
readonly taskType: TaskType
readonly prompt: TaskPrompt
readonly status?: TaskStatus
readonly job: JobListItem
readonly outputs: TaskOutput
readonly flatOutputs: ReadonlyArray<ResultItemImpl>
constructor(
taskType: TaskType,
prompt: TaskPrompt,
status?: TaskStatus,
job: JobListItem,
outputs?: TaskOutput,
flatOutputs?: ReadonlyArray<ResultItemImpl>
) {
this.taskType = taskType
this.prompt = prompt
this.status = status
this.job = job
// If no outputs provided but job has preview_output, create synthetic outputs
// using the real nodeId and mediaType from the backend response
const effectiveOutputs =
outputs ??
(job.preview_output
? {
[job.preview_output.nodeId]: {
[job.preview_output.mediaType]: [job.preview_output]
}
}
: {})
// Remove animated outputs from the outputs object
// outputs.animated is an array of boolean values that indicates if the images
// array in the result are animated or not.
// The queueStore does not use this information.
// It is part of the legacy API response. We should redesign the backend API.
// https://github.com/Comfy-Org/ComfyUI_frontend/issues/2739
this.outputs = _.mapValues(outputs ?? {}, (nodeOutputs) =>
this.outputs = _.mapValues(effectiveOutputs, (nodeOutputs) =>
_.omit(nodeOutputs, 'animated')
)
this.flatOutputs = flatOutputs ?? this.calculateFlatOutputs()
@@ -261,15 +266,31 @@ export class TaskItemImpl {
)
}
/** All outputs that support preview (images, videos, audio, 3D) */
get previewableOutputs(): readonly ResultItemImpl[] {
return ResultItemImpl.filterPreviewable(this.flatOutputs)
}
get previewOutput(): ResultItemImpl | undefined {
const previewable = this.previewableOutputs
// Prefer saved media files over the temp previews
return (
this.flatOutputs.find(
// Prefer saved media files over the temp previews
(output) => output.type === 'output' && output.supportsPreview
) ?? this.flatOutputs.find((output) => output.supportsPreview)
previewable.find((output) => output.type === 'output') ?? previewable[0]
)
}
// Derive taskType from job status
get taskType(): TaskType {
switch (this.job.status) {
case 'in_progress':
return 'Running'
case 'pending':
return 'Pending'
default:
return 'History'
}
}
get apiTaskType(): APITaskType {
switch (this.taskType) {
case 'Running':
@@ -285,61 +306,42 @@ export class TaskItemImpl {
}
get queueIndex() {
return this.prompt[0]
return this.job.priority
}
get promptId() {
return this.prompt[1]
return this.job.id
}
get promptInputs() {
return this.prompt[2]
get outputsCount(): number | undefined {
return this.job.outputs_count ?? undefined
}
get extraData() {
return this.prompt[3]
get status() {
return this.job.status
}
get outputsToExecute() {
return this.prompt[4]
get errorMessage(): string | undefined {
return this.job.execution_error?.exception_message ?? undefined
}
get extraPngInfo() {
return this.extraData.extra_pnginfo
get executionError() {
return this.job.execution_error ?? undefined
}
get clientId() {
return this.extraData.client_id
get workflowId(): string | undefined {
return this.job.workflow_id ?? undefined
}
get workflow(): ComfyWorkflowJSON | undefined {
return this.extraPngInfo?.workflow
get createTime(): number {
return this.job.create_time
}
get messages() {
return this.status?.messages || []
}
/**
* Server-provided creation time in milliseconds, when available.
*
* Sources:
* - Queue: 5th tuple element may be a metadata object with { create_time }.
* - History (Cloud V2): Adapter injects create_time into prompt[3].extra_data.
*/
get createTime(): number | undefined {
const extra = (this.extraData as any) || {}
const fromExtra =
typeof extra.create_time === 'number' ? extra.create_time : undefined
if (typeof fromExtra === 'number') return fromExtra
return undefined
}
get interrupted() {
return _.some(
this.messages,
(message) => message[0] === 'execution_interrupted'
get interrupted(): boolean {
return (
this.job.status === 'failed' &&
this.job.execution_error?.exception_type ===
'InterruptProcessingException'
)
}
@@ -352,42 +354,26 @@ export class TaskItemImpl {
}
get displayStatus(): TaskItemDisplayStatus {
switch (this.taskType) {
case 'Running':
switch (this.job.status) {
case 'in_progress':
return TaskItemDisplayStatus.Running
case 'Pending':
case 'pending':
return TaskItemDisplayStatus.Pending
case 'History':
if (this.interrupted) return TaskItemDisplayStatus.Cancelled
switch (this.status!.status_str) {
case 'success':
return TaskItemDisplayStatus.Completed
case 'error':
return TaskItemDisplayStatus.Failed
}
case 'completed':
return TaskItemDisplayStatus.Completed
case 'failed':
return TaskItemDisplayStatus.Failed
case 'cancelled':
return TaskItemDisplayStatus.Cancelled
}
}
get executionStartTimestamp() {
const message = this.messages.find(
(message) => message[0] === 'execution_start'
)
return message ? message[1].timestamp : undefined
return this.job.execution_start_time ?? undefined
}
get executionEndTimestamp() {
const messages = this.messages.filter((message) =>
[
'execution_success',
'execution_interrupted',
'execution_error'
].includes(message[0])
)
if (!messages.length) {
return undefined
}
return _.max(messages.map((message) => message[1].timestamp))
return this.job.execution_end_time ?? undefined
}
get executionTime() {
@@ -403,28 +389,48 @@ export class TaskItemImpl {
: undefined
}
public async loadWorkflow(app: ComfyApp) {
let workflowData = this.workflow
/**
* Loads full outputs for tasks that only have preview data
* Returns a new TaskItemImpl with full outputs and execution status
*/
public async loadFullOutputs(): Promise<TaskItemImpl> {
// Only load for history tasks (caller checks outputsCount > 1)
if (!this.isHistory) {
return this
}
const jobDetail = await getJobDetail(this.promptId)
if (isCloud && !workflowData && this.isHistory) {
workflowData = await getWorkflowFromHistory(
(url) => app.api.fetchApi(url),
this.promptId
)
if (!jobDetail?.outputs) {
return this
}
// Create new TaskItemImpl with full outputs
return new TaskItemImpl(this.job, jobDetail.outputs)
}
public async loadWorkflow(app: ComfyApp) {
if (!this.isHistory) {
return
}
// Single fetch for both workflow and outputs (with caching)
const jobDetail = await getJobDetail(this.promptId)
const workflowData = await extractWorkflow(jobDetail)
if (!workflowData) {
return
}
await app.loadGraphData(toRaw(workflowData))
if (!this.outputs) {
// Use full outputs from job detail, or fall back to existing outputs
const outputsToLoad = jobDetail?.outputs ?? this.outputs
if (!outputsToLoad) {
return
}
const nodeOutputsStore = useNodeOutputStore()
const rawOutputs = toRaw(this.outputs)
const rawOutputs = toRaw(outputsToLoad)
for (const nodeExecutionId in rawOutputs) {
nodeOutputsStore.setNodeOutputsByExecutionId(
nodeExecutionId,
@@ -445,15 +451,10 @@ export class TaskItemImpl {
return this.flatOutputs.map(
(output: ResultItemImpl, i: number) =>
new TaskItemImpl(
this.taskType,
[
this.queueIndex,
`${this.promptId}-${i}`,
this.promptInputs,
this.extraData,
this.outputsToExecute
],
this.status,
{
...this.job,
id: `${this.promptId}-${i}`
},
{
[output.nodeId]: {
[output.mediaType]: [output]
@@ -463,32 +464,8 @@ export class TaskItemImpl {
)
)
}
public toTaskItem(): TaskItem {
const item: HistoryTaskItem = {
taskType: 'History',
prompt: this.prompt,
status: this.status!,
outputs: this.outputs
}
return item
}
}
const sortNewestFirst = (a: TaskItemImpl, b: TaskItemImpl) =>
b.queueIndex - a.queueIndex
const toTaskItemImpls = (tasks: TaskItem[]): TaskItemImpl[] =>
tasks.map(
(task) =>
new TaskItemImpl(
task.taskType,
task.prompt,
'status' in task ? task.status : undefined,
'outputs' in task ? task.outputs : undefined
)
)
export const useQueueStore = defineStore('queue', () => {
// Use shallowRef because TaskItemImpl instances are immutable and arrays are
// replaced entirely (not mutated), so deep reactivity would waste performance
@@ -525,8 +502,9 @@ export const useQueueStore = defineStore('queue', () => {
api.getHistory(maxHistoryItems.value)
])
runningTasks.value = toTaskItemImpls(queue.Running).sort(sortNewestFirst)
pendingTasks.value = toTaskItemImpls(queue.Pending).sort(sortNewestFirst)
// API returns pre-sorted data (sort_by=create_time&order=desc)
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
const currentHistory = toValue(historyTasks)
@@ -534,7 +512,7 @@ export const useQueueStore = defineStore('queue', () => {
const executionStore = useExecutionStore()
appearedTasks.forEach((task) => {
const promptIdString = String(task.promptId)
const workflowId = task.workflow?.id
const workflowId = task.workflowId
if (workflowId && promptIdString) {
executionStore.registerPromptWorkflowIdMapping(
promptIdString,
@@ -543,22 +521,26 @@ export const useQueueStore = defineStore('queue', () => {
}
})
const items = reconcileHistory(
history.History,
currentHistory.map((impl) => impl.toTaskItem()),
toValue(maxHistoryItems),
toValue(lastHistoryQueueIndex)
)
// Sort by create_time descending and limit to maxItems
const sortedHistory = [...history]
.sort((a, b) => b.create_time - a.create_time)
.slice(0, toValue(maxHistoryItems))
// Reuse existing TaskItemImpl instances or create new
// Must recreate if outputs_count changed (e.g., API started returning it)
const existingByPromptId = new Map(
currentHistory.map((impl) => [impl.promptId, impl])
)
historyTasks.value = items.map(
(item) =>
existingByPromptId.get(item.prompt[1]) ?? toTaskItemImpls([item])[0]
)
historyTasks.value = sortedHistory.map((job) => {
const existing = existingByPromptId.get(job.id)
if (!existing) return new TaskItemImpl(job)
// Recreate if outputs_count changed to ensure lazy loading works
if (existing.outputsCount !== (job.outputs_count ?? undefined)) {
return new TaskItemImpl(job)
}
return existing
})
} finally {
isLoading.value = false
}