feat: Add Jobs API infrastructure (PR 1 of 3) (#7169)

## Summary

Add Jobs API infrastructure in preparation for migrating from legacy
`/history`, `/history_v2`, and `/queue` endpoints to the unified `/jobs`
API.

**This is PR 1 of 3** - Additive changes only, no breaking changes.

## Changes

- **What**:
- Add Zod schemas for runtime validation of Jobs API responses
(`JobListItem`, `JobDetail`)
- Add `fetchQueue`, `fetchHistory`, `fetchJobDetail` fetchers for
`/jobs` endpoint
- Add `extractWorkflow` utility for extracting workflow from nested job
detail response
- Add synthetic priority assignment for queue ordering (pending >
running > history)
  - Add comprehensive tests for all new fetchers

- **Non-breaking**: All changes are additive - existing code continues
to work

## Review Focus

1. **Zod schema flexibility**: Using `.passthrough()` to allow extra API
fields - ensures forward compatibility but less strict validation
2. **Priority computation**: Synthetic priority ensures display order:
pending (queued) → running → completed (history)
3. **Test coverage**: Verify tests adequately cover edge cases

## Files Added

- `src/platform/remote/comfyui/jobs/` - New Jobs API module
  - `types/jobTypes.ts` - Zod schemas and TypeScript types
  - `fetchers/fetchJobs.ts` - API fetchers with validation
  - `index.ts` - Barrel exports
-
`tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts`
- Tests

## Next PRs

- **PR 2**: Migrate `getQueue()` and `getHistory()` to use Jobs API
- **PR 3**: Remove legacy history code and unused types

┆Issue is synchronized with this [Notion
page](https://www.notion.so/PR-7169-feat-Add-Jobs-API-infrastructure-PR-1-of-3-2bf6d73d3650812eae4ac0555a86969c)
by [Unito](https://www.unito.io)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
ric-yu
2025-12-11 14:17:36 -08:00
committed by GitHub
parent bf8d9de1c1
commit c83f3ff1a7
3 changed files with 544 additions and 0 deletions

View File

@@ -0,0 +1,146 @@
/**
* @fileoverview Jobs API Fetchers
* @module platform/remote/comfyui/jobs/fetchJobs
*
* Unified jobs API fetcher for history, queue, and job details.
* All distributions use the /jobs endpoint.
*/
import type { PromptId } from '@/schemas/apiSchema'
import type {
JobDetail,
JobListItem,
JobStatus,
RawJobListItem
} from './jobTypes'
import { zJobDetail, zJobsListResponse, zWorkflowContainer } from './jobTypes'
interface FetchJobsRawResult {
jobs: RawJobListItem[]
total: number
offset: number
}
/**
* Fetches raw jobs from /jobs endpoint
* @internal
*/
async function fetchJobsRaw(
fetchApi: (url: string) => Promise<Response>,
statuses: JobStatus[],
maxItems: number = 200,
offset: number = 0
): Promise<FetchJobsRawResult> {
const statusParam = statuses.join(',')
const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}`
try {
const res = await fetchApi(url)
if (!res.ok) {
console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`)
return { jobs: [], total: 0, offset: 0 }
}
const data = zJobsListResponse.parse(await res.json())
return { jobs: data.jobs, total: data.pagination.total, offset }
} catch (error) {
console.error('[Jobs API] Error fetching jobs:', error)
return { jobs: [], total: 0, offset: 0 }
}
}
// Large offset to ensure running/pending jobs sort above history
const QUEUE_PRIORITY_BASE = 1_000_000
/**
* Assigns synthetic priority to jobs.
* Only assigns if job doesn't already have a server-provided priority.
*/
function assignPriority(
jobs: RawJobListItem[],
basePriority: number
): JobListItem[] {
return jobs.map((job, index) => ({
...job,
priority: job.priority ?? basePriority - index
}))
}
/**
* Fetches history (completed jobs)
* Assigns synthetic priority starting from total (lower than queue jobs).
*/
export async function fetchHistory(
fetchApi: (url: string) => Promise<Response>,
maxItems: number = 200,
offset: number = 0
): Promise<JobListItem[]> {
const { jobs, total } = await fetchJobsRaw(
fetchApi,
['completed'],
maxItems,
offset
)
// History gets priority based on total count (lower than queue)
return assignPriority(jobs, total - offset)
}
/**
* Fetches queue (in_progress + pending jobs)
* Pending jobs get highest priority, then running jobs.
*/
export async function fetchQueue(
fetchApi: (url: string) => Promise<Response>
): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> {
const { jobs } = await fetchJobsRaw(
fetchApi,
['in_progress', 'pending'],
200,
0
)
const running = jobs.filter((j) => j.status === 'in_progress')
const pending = jobs.filter((j) => j.status === 'pending')
// Pending gets highest priority, then running
// Both are above any history job due to QUEUE_PRIORITY_BASE
return {
Running: assignPriority(running, QUEUE_PRIORITY_BASE + running.length),
Pending: assignPriority(
pending,
QUEUE_PRIORITY_BASE + running.length + pending.length
)
}
}
/**
* Fetches full job details from /jobs/{job_id}
*/
export async function fetchJobDetail(
fetchApi: (url: string) => Promise<Response>,
promptId: PromptId
): Promise<JobDetail | undefined> {
try {
const res = await fetchApi(`/jobs/${encodeURIComponent(promptId)}`)
if (!res.ok) {
console.warn(`Job not found for prompt ${promptId}`)
return undefined
}
return zJobDetail.parse(await res.json())
} catch (error) {
console.error(`Failed to fetch job detail for prompt ${promptId}:`, error)
return undefined
}
}
/**
* Extracts workflow from job detail response.
* The workflow is nested at: workflow.extra_data.extra_pnginfo.workflow
* Full workflow validation happens downstream via validateComfyWorkflow.
*/
export function extractWorkflow(job: JobDetail | undefined): unknown {
const parsed = zWorkflowContainer.safeParse(job?.workflow)
if (!parsed.success) return undefined
return parsed.data.extra_data?.extra_pnginfo?.workflow
}

View File

@@ -0,0 +1,107 @@
/**
* @fileoverview Jobs API types - Backend job API format
* @module platform/remote/comfyui/jobs/jobTypes
*
* These types represent the jobs API format returned by the backend.
* Jobs API provides a memory-optimized alternative to history API.
*/
import { z } from 'zod'
import { resultItemType, zTaskOutput } from '@/schemas/apiSchema'
const zJobStatus = z.enum([
'pending',
'in_progress',
'completed',
'failed',
'cancelled'
])
const zPreviewOutput = z.object({
filename: z.string(),
subfolder: z.string(),
type: resultItemType
})
/**
* Execution error details for error jobs.
* Contains the same structure as ExecutionErrorWsMessage from WebSocket.
*/
const zExecutionError = z
.object({
prompt_id: z.string().optional(),
timestamp: z.number().optional(),
node_id: z.string(),
node_type: z.string(),
executed: z.array(z.string()).optional(),
exception_message: z.string(),
exception_type: z.string(),
traceback: z.array(z.string()),
current_inputs: z.unknown(),
current_outputs: z.unknown()
})
.passthrough()
/**
* Raw job from API - uses passthrough to allow extra fields
*/
const zRawJobListItem = z
.object({
id: z.string(),
status: zJobStatus,
create_time: z.number(),
execution_start_time: z.number().nullable().optional(),
execution_end_time: z.number().nullable().optional(),
preview_output: zPreviewOutput.nullable().optional(),
outputs_count: z.number().nullable().optional(),
execution_error: zExecutionError.nullable().optional(),
workflow_id: z.string().nullable().optional(),
priority: z.number().optional()
})
.passthrough()
/**
* Job detail - returned by GET /api/jobs/{job_id} (detail endpoint)
* Includes full workflow and outputs for re-execution and downloads
*/
export const zJobDetail = zRawJobListItem
.extend({
workflow: z.unknown().optional(),
outputs: zTaskOutput.optional(),
update_time: z.number().optional(),
execution_status: z.unknown().optional(),
execution_meta: z.unknown().optional()
})
.passthrough()
const zPaginationInfo = z.object({
offset: z.number(),
limit: z.number(),
total: z.number(),
has_more: z.boolean()
})
export const zJobsListResponse = z.object({
jobs: z.array(zRawJobListItem),
pagination: zPaginationInfo
})
/** Schema for workflow container structure in job detail responses */
export const zWorkflowContainer = z.object({
extra_data: z
.object({
extra_pnginfo: z
.object({
workflow: z.unknown()
})
.optional()
})
.optional()
})
export type JobStatus = z.infer<typeof zJobStatus>
export type RawJobListItem = z.infer<typeof zRawJobListItem>
/** Job list item with priority always set (server-provided or synthetic) */
export type JobListItem = RawJobListItem & { priority: number }
export type JobDetail = z.infer<typeof zJobDetail>

View File

@@ -0,0 +1,291 @@
import { describe, expect, it, vi } from 'vitest'
import {
extractWorkflow,
fetchHistory,
fetchJobDetail,
fetchQueue
} from '@/platform/remote/comfyui/jobs/fetchJobs'
import type {
RawJobListItem,
zJobsListResponse
} from '@/platform/remote/comfyui/jobs/jobTypes'
import type { z } from 'zod'
type JobsListResponse = z.infer<typeof zJobsListResponse>
function createMockJob(
id: string,
status: 'pending' | 'in_progress' | 'completed' = 'completed',
overrides: Partial<RawJobListItem> = {}
): RawJobListItem {
return {
id,
status,
create_time: Date.now(),
execution_start_time: null,
execution_end_time: null,
preview_output: null,
outputs_count: 0,
...overrides
}
}
function createMockResponse(
jobs: RawJobListItem[],
total: number = jobs.length
): JobsListResponse {
return {
jobs,
pagination: {
offset: 0,
limit: 200,
total,
has_more: false
}
}
}
describe('fetchJobs', () => {
describe('fetchHistory', () => {
it('fetches completed jobs', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('job1', 'completed'),
createMockJob('job2', 'completed')
])
)
})
const result = await fetchHistory(mockFetch)
expect(mockFetch).toHaveBeenCalledWith(
'/jobs?status=completed&limit=200&offset=0'
)
expect(result).toHaveLength(2)
expect(result[0].id).toBe('job1')
expect(result[1].id).toBe('job2')
})
it('assigns synthetic priorities', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse(
[
createMockJob('job1', 'completed'),
createMockJob('job2', 'completed'),
createMockJob('job3', 'completed')
],
3
)
)
})
const result = await fetchHistory(mockFetch)
// Priority should be assigned from total down
expect(result[0].priority).toBe(3) // total - 0 - 0
expect(result[1].priority).toBe(2) // total - 0 - 1
expect(result[2].priority).toBe(1) // total - 0 - 2
})
it('calculates priority correctly with non-zero offset', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse(
[
createMockJob('job4', 'completed'),
createMockJob('job5', 'completed')
],
10 // total of 10 jobs
)
)
})
// Fetch page 2 (offset=5)
const result = await fetchHistory(mockFetch, 200, 5)
expect(mockFetch).toHaveBeenCalledWith(
'/jobs?status=completed&limit=200&offset=5'
)
// Priority base is total - offset = 10 - 5 = 5
expect(result[0].priority).toBe(5) // (total - offset) - 0
expect(result[1].priority).toBe(4) // (total - offset) - 1
})
it('preserves server-provided priority', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('job1', 'completed', { priority: 999 })
])
)
})
const result = await fetchHistory(mockFetch)
expect(result[0].priority).toBe(999)
})
it('returns empty array on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchHistory(mockFetch)
expect(result).toEqual([])
})
it('returns empty array on non-ok response', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: false,
status: 500
})
const result = await fetchHistory(mockFetch)
expect(result).toEqual([])
})
})
describe('fetchQueue', () => {
it('fetches running and pending jobs', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('running1', 'in_progress'),
createMockJob('pending1', 'pending'),
createMockJob('pending2', 'pending')
])
)
})
const result = await fetchQueue(mockFetch)
expect(mockFetch).toHaveBeenCalledWith(
'/jobs?status=in_progress,pending&limit=200&offset=0'
)
expect(result.Running).toHaveLength(1)
expect(result.Pending).toHaveLength(2)
expect(result.Running[0].id).toBe('running1')
expect(result.Pending[0].id).toBe('pending1')
})
it('assigns queue priorities above history', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () =>
Promise.resolve(
createMockResponse([
createMockJob('running1', 'in_progress'),
createMockJob('pending1', 'pending')
])
)
})
const result = await fetchQueue(mockFetch)
// Queue priorities should be above 1_000_000 (QUEUE_PRIORITY_BASE)
expect(result.Running[0].priority).toBeGreaterThan(1_000_000)
expect(result.Pending[0].priority).toBeGreaterThan(1_000_000)
// Pending should have higher priority than running
expect(result.Pending[0].priority).toBeGreaterThan(
result.Running[0].priority
)
})
it('returns empty arrays on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchQueue(mockFetch)
expect(result).toEqual({ Running: [], Pending: [] })
})
})
describe('fetchJobDetail', () => {
it('fetches job detail by id', async () => {
const jobDetail = {
...createMockJob('job1', 'completed'),
workflow: { extra_data: { extra_pnginfo: { workflow: {} } } },
outputs: {
'1': {
images: [{ filename: 'test.png', subfolder: '', type: 'output' }]
}
}
}
const mockFetch = vi.fn().mockResolvedValue({
ok: true,
json: () => Promise.resolve(jobDetail)
})
const result = await fetchJobDetail(mockFetch, 'job1')
expect(mockFetch).toHaveBeenCalledWith('/jobs/job1')
expect(result?.id).toBe('job1')
expect(result?.outputs).toBeDefined()
})
it('returns undefined for non-ok response', async () => {
const mockFetch = vi.fn().mockResolvedValue({
ok: false,
status: 404
})
const result = await fetchJobDetail(mockFetch, 'nonexistent')
expect(result).toBeUndefined()
})
it('returns undefined on error', async () => {
const mockFetch = vi.fn().mockRejectedValue(new Error('Network error'))
const result = await fetchJobDetail(mockFetch, 'job1')
expect(result).toBeUndefined()
})
})
describe('extractWorkflow', () => {
it('extracts workflow from nested structure', () => {
const jobDetail = {
...createMockJob('job1', 'completed'),
workflow: {
extra_data: {
extra_pnginfo: {
workflow: { nodes: [], links: [] }
}
}
}
}
const workflow = extractWorkflow(jobDetail)
expect(workflow).toEqual({ nodes: [], links: [] })
})
it('returns undefined if workflow not present', () => {
const jobDetail = createMockJob('job1', 'completed')
const workflow = extractWorkflow(jobDetail)
expect(workflow).toBeUndefined()
})
it('returns undefined for undefined input', () => {
const workflow = extractWorkflow(undefined)
expect(workflow).toBeUndefined()
})
})
})