[feat] Add Jobs API infrastructure (PR 1 of 3)

Adds Jobs API types, fetchers, and new API methods without breaking existing code.
This is the foundation for migrating from legacy /history and /queue endpoints
to the unified /jobs endpoint.

New files:
- src/platform/remote/comfyui/jobs/types/jobTypes.ts - Zod schemas for Jobs API
- src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts - Fetchers for /jobs endpoint
- src/platform/remote/comfyui/jobs/index.ts - Barrel exports
- tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts

API additions (non-breaking):
- api.getQueueFromJobsApi() - Queue from /jobs endpoint
- api.getHistoryFromJobsApi() - History from /jobs endpoint
- api.getJobDetail() - Full job details including workflow and outputs

Part of Jobs API migration. See docs/JOBS_API_MIGRATION_PLAN.md for details.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Richard Yu
2025-12-04 14:45:01 -08:00
parent 431fe33ea3
commit 0a619ec060
4 changed files with 547 additions and 0 deletions

View File

@@ -0,0 +1,160 @@
/**
* @fileoverview Jobs API Fetchers
* @module platform/remote/comfyui/jobs/fetchers/fetchJobs
*
* Unified jobs API fetcher for history, queue, and job details.
* All distributions use the /jobs endpoint.
*/
import type { ComfyWorkflowJSON } from '@/platform/workflow/validation/schemas/workflowSchema'
import type { PromptId } from '@/schemas/apiSchema'
import type {
JobDetail,
JobListItem,
JobStatus,
RawJobListItem
} from '../types/jobTypes'
import { zJobDetail, zJobsListResponse } from '../types/jobTypes'
// ============================================================================
// Job List Fetchers
// ============================================================================
interface FetchJobsRawResult {
jobs: RawJobListItem[]
total: number
offset: number
}
/**
* Fetches raw jobs from /jobs endpoint
* @internal
*/
async function fetchJobsRaw(
fetchApi: (url: string) => Promise<Response>,
statuses: JobStatus[],
maxItems: number = 200,
offset: number = 0
): Promise<FetchJobsRawResult> {
const statusParam = statuses.join(',')
const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}`
try {
const res = await fetchApi(url)
if (!res.ok) {
console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`)
return { jobs: [], total: 0, offset: 0 }
}
const data = zJobsListResponse.parse(await res.json())
return { jobs: data.jobs, total: data.pagination.total, offset }
} catch (error) {
console.error('[Jobs API] Error fetching jobs:', error)
return { jobs: [], total: 0, offset: 0 }
}
}
// Large offset to ensure running/pending jobs sort above history
const QUEUE_PRIORITY_BASE = 1_000_000
/**
* Assigns synthetic priority to jobs.
* Only assigns if job doesn't already have a server-provided priority.
*/
function assignPriority(
jobs: RawJobListItem[],
basePriority: number
): JobListItem[] {
return jobs.map((job, index) => ({
...job,
priority: job.priority ?? basePriority - index
}))
}
/**
* Fetches history (completed jobs)
* Assigns synthetic priority starting from total (lower than queue jobs).
*/
export async function fetchHistory(
fetchApi: (url: string) => Promise<Response>,
maxItems: number = 200,
offset: number = 0
): Promise<JobListItem[]> {
const { jobs, total } = await fetchJobsRaw(
fetchApi,
['completed'],
maxItems,
offset
)
// History gets priority based on total count (lower than queue)
return assignPriority(jobs, total - offset)
}
/**
* Fetches queue (in_progress + pending jobs)
* Pending jobs get highest priority, then running jobs.
*/
export async function fetchQueue(
fetchApi: (url: string) => Promise<Response>
): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> {
const { jobs } = await fetchJobsRaw(
fetchApi,
['in_progress', 'pending'],
200,
0
)
const running = jobs.filter((j) => j.status === 'in_progress')
const pending = jobs.filter((j) => j.status === 'pending')
// Pending gets highest priority, then running
// Both are above any history job due to QUEUE_PRIORITY_BASE
return {
Running: assignPriority(running, QUEUE_PRIORITY_BASE + running.length),
Pending: assignPriority(
pending,
QUEUE_PRIORITY_BASE + running.length + pending.length
)
}
}
// ============================================================================
// Job Detail Fetcher
// ============================================================================
/**
* Fetches full job details from /jobs/{job_id}
*/
export async function fetchJobDetail(
fetchApi: (url: string) => Promise<Response>,
promptId: PromptId
): Promise<JobDetail | undefined> {
try {
const res = await fetchApi(`/jobs/${promptId}`)
if (!res.ok) {
console.warn(`Job not found for prompt ${promptId}`)
return undefined
}
return zJobDetail.parse(await res.json())
} catch (error) {
console.error(`Failed to fetch job detail for prompt ${promptId}:`, error)
return undefined
}
}
/**
* Extracts workflow from job detail response.
* The workflow is nested at: workflow.extra_data.extra_pnginfo.workflow
*/
export function extractWorkflow(
job: JobDetail | undefined
): ComfyWorkflowJSON | undefined {
// Cast is safe - workflow will be validated by loadGraphData -> validateComfyWorkflow
const workflowData = job?.workflow as
| { extra_data?: { extra_pnginfo?: { workflow?: unknown } } }
| undefined
return workflowData?.extra_data?.extra_pnginfo?.workflow as
| ComfyWorkflowJSON
| undefined
}

View File

@@ -0,0 +1,13 @@
/**
* @fileoverview Jobs API module
* @module platform/remote/comfyui/jobs
*
* Unified jobs API for history, queue, and job details.
*/
export {
extractWorkflow,
fetchHistory,
fetchJobDetail,
fetchQueue
} from './fetchers/fetchJobs'

View File

@@ -0,0 +1,114 @@
/**
* @fileoverview Jobs API types - Backend job API format
* @module platform/remote/comfyui/jobs/types/jobTypes
*
* These types represent the jobs API format returned by the backend.
* Jobs API provides a memory-optimized alternative to history API.
*/
import { z } from 'zod'
import { resultItemType, zTaskOutput } from '@/schemas/apiSchema'
// ============================================================================
// Zod Schemas
// ============================================================================
const zJobStatus = z.enum([
'pending',
'in_progress',
'completed',
'failed',
'cancelled'
])
const zPreviewOutput = z
.object({
filename: z.string(),
subfolder: z.string(),
type: resultItemType
})
.passthrough() // Allow extra fields like nodeId, mediaType
/**
* Execution error details for error jobs.
* Contains the same structure as ExecutionErrorWsMessage from WebSocket.
*/
const zExecutionError = z
.object({
prompt_id: z.string().optional(),
timestamp: z.number().optional(),
node_id: z.string(),
node_type: z.string(),
executed: z.array(z.string()).optional(),
exception_message: z.string(),
exception_type: z.string(),
traceback: z.array(z.string()),
current_inputs: z.unknown(),
current_outputs: z.unknown()
})
.passthrough()
/**
* Raw job from API - uses passthrough to allow extra fields
*/
const zRawJobListItem = z
.object({
id: z.string(),
status: zJobStatus,
create_time: z.number(),
execution_start_time: z.number().nullable().optional(),
execution_end_time: z.number().nullable().optional(),
preview_output: zPreviewOutput.nullable().optional(),
outputs_count: z.number().optional(),
execution_error: zExecutionError.nullable().optional(),
workflow_id: z.string().nullable().optional(),
priority: z.number().optional()
})
.passthrough()
/**
* Job detail - returned by GET /api/jobs/{job_id} (detail endpoint)
* Includes full workflow and outputs for re-execution and downloads
*/
export const zJobDetail = zRawJobListItem
.extend({
workflow: z.unknown().optional(),
outputs: zTaskOutput.optional(),
update_time: z.number().optional(),
execution_status: z.unknown().optional(),
execution_meta: z.unknown().optional()
})
.passthrough()
/**
* Pagination info from API
*/
const zPaginationInfo = z
.object({
offset: z.number(),
limit: z.number(),
total: z.number(),
has_more: z.boolean()
})
.passthrough()
/**
* Jobs list response structure
*/
export const zJobsListResponse = z
.object({
jobs: z.array(zRawJobListItem),
pagination: zPaginationInfo
})
.passthrough()
// ============================================================================
// TypeScript Types (derived from Zod schemas)
// ============================================================================
export type JobStatus = z.infer<typeof zJobStatus>
export type RawJobListItem = z.infer<typeof zRawJobListItem>
/** Job list item with priority always set (server-provided or synthetic) */
export type JobListItem = RawJobListItem & { priority: number }
export type JobDetail = z.infer<typeof zJobDetail>

View File

@@ -0,0 +1,260 @@
import { describe, expect, it, vi } from 'vitest'
import {
extractWorkflow,
fetchHistory,
fetchJobDetail,
fetchQueue
} from '@/platform/remote/comfyui/jobs'
// Helper to create a mock job
function createMockJob(
id: string,
status: 'pending' | 'in_progress' | 'completed' = 'completed',
overrides: Record<string, unknown> = {}
) {
return {
id,
status,
create_time: Date.now(),
execution_start_time: null,
execution_end_time: null,
preview_output: null,
outputs_count: 0,
...overrides
}
}
// Helper to create mock API response
function createMockResponse(
jobs: ReturnType<typeof createMockJob>[],
total: number = jobs.length
) {
return {
jobs,
pagination: {
offset: 0,
limit: 200,
total,
has_more: false
}
}
}
describe('fetchJobs', () => {
describe('fetchHistory', () => {
it('fetches completed jobs', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('job1', 'completed'),
createMockJob('job2', 'completed')
])
)
})
const result = await fetchHistory(mockFetch)
expect(mockFetch).toHaveBeenCalledWith(
'/jobs?status=completed&limit=200&offset=0'
)
expect(result).toHaveLength(2)
expect(result[0].id).toBe('job1')
expect(result[1].id).toBe('job2')
})
it('assigns synthetic priorities', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse(
[
createMockJob('job1', 'completed'),
createMockJob('job2', 'completed'),
createMockJob('job3', 'completed')
],
3
)
)
})
const result = await fetchHistory(mockFetch)
// Priority should be assigned from total down
expect(result[0].priority).toBe(3) // total - 0 - 0
expect(result[1].priority).toBe(2) // total - 0 - 1
expect(result[2].priority).toBe(1) // total - 0 - 2
})
it('preserves server-provided priority', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('job1', 'completed', { priority: 999 })
])
)
})
const result = await fetchHistory(mockFetch)
expect(result[0].priority).toBe(999)
})
it('returns empty array on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchHistory(mockFetch)
expect(result).toEqual([])
})
it('returns empty array on non-ok response', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: false,
status: 500
})
const result = await fetchHistory(mockFetch)
expect(result).toEqual([])
})
})
describe('fetchQueue', () => {
it('fetches running and pending jobs', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('running1', 'in_progress'),
createMockJob('pending1', 'pending'),
createMockJob('pending2', 'pending')
])
)
})
const result = await fetchQueue(mockFetch)
expect(mockFetch).toHaveBeenCalledWith(
'/jobs?status=in_progress,pending&limit=200&offset=0'
)
expect(result.Running).toHaveLength(1)
expect(result.Pending).toHaveLength(2)
expect(result.Running[0].id).toBe('running1')
expect(result.Pending[0].id).toBe('pending1')
})
it('assigns queue priorities above history', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('running1', 'in_progress'),
createMockJob('pending1', 'pending')
])
)
})
const result = await fetchQueue(mockFetch)
// Queue priorities should be above 1_000_000 (QUEUE_PRIORITY_BASE)
expect(result.Running[0].priority).toBeGreaterThan(1_000_000)
expect(result.Pending[0].priority).toBeGreaterThan(1_000_000)
// Pending should have higher priority than running
expect(result.Pending[0].priority).toBeGreaterThan(
result.Running[0].priority
)
})
it('returns empty arrays on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchQueue(mockFetch)
expect(result).toEqual({ Running: [], Pending: [] })
})
})
describe('fetchJobDetail', () => {
it('fetches job detail by id', async () => {
const jobDetail = {
...createMockJob('job1', 'completed'),
workflow: { extra_data: { extra_pnginfo: { workflow: {} } } },
outputs: {
'1': {
images: [{ filename: 'test.png', subfolder: '', type: 'output' }]
}
}
}
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () => Promise.resolve(jobDetail)
})
const result = await fetchJobDetail(mockFetch, 'job1')
expect(mockFetch).toHaveBeenCalledWith('/jobs/job1')
expect(result?.id).toBe('job1')
expect(result?.outputs).toBeDefined()
})
it('returns undefined for non-ok response', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: false,
status: 404
})
const result = await fetchJobDetail(mockFetch, 'nonexistent')
expect(result).toBeUndefined()
})
it('returns undefined on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchJobDetail(mockFetch, 'job1')
expect(result).toBeUndefined()
})
})
describe('extractWorkflow', () => {
it('extracts workflow from nested structure', () => {
const jobDetail = {
...createMockJob('job1', 'completed'),
workflow: {
extra_data: {
extra_pnginfo: {
workflow: { nodes: [], links: [] }
}
}
}
}
const workflow = extractWorkflow(jobDetail)
expect(workflow).toEqual({ nodes: [], links: [] })
})
it('returns undefined if workflow not present', () => {
const jobDetail = createMockJob('job1', 'completed')
const workflow = extractWorkflow(jobDetail)
expect(workflow).toBeUndefined()
})
it('returns undefined for undefined input', () => {
const workflow = extractWorkflow(undefined)
expect(workflow).toBeUndefined()
})
})
})