mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-24 14:45:36 +00:00
Compare commits
6 Commits
glary/hide
...
glary/scop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d14f1313c | ||
|
|
6a303494d4 | ||
|
|
11ef8c9337 | ||
|
|
60828f2286 | ||
|
|
3c3b0872c3 | ||
|
|
7b2b974968 |
@@ -17,7 +17,6 @@
|
||||
"test:visual:update": "playwright test --project visual --update-snapshots",
|
||||
"ashby:refresh-snapshot": "tsx ./scripts/refresh-ashby-snapshot.ts",
|
||||
"cloud-nodes:refresh-snapshot": "tsx ./scripts/refresh-cloud-nodes-snapshot.ts",
|
||||
"feature-flags:refresh-snapshot": "tsx ./scripts/refresh-feature-flags-snapshot.ts",
|
||||
"generate:models": "tsx ./scripts/generate-models.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
import { renameSync, writeFileSync } from 'node:fs'
|
||||
import { fileURLToPath } from 'node:url'
|
||||
|
||||
import { fetchFeatureFlagsForBuild } from '../src/utils/featureFlags'
|
||||
|
||||
const snapshotPath = fileURLToPath(
|
||||
new URL('../src/data/feature-flags.snapshot.json', import.meta.url)
|
||||
)
|
||||
const tempPath = `${snapshotPath}.tmp`
|
||||
|
||||
const outcome = await fetchFeatureFlagsForBuild()
|
||||
|
||||
if (outcome.status !== 'fresh') {
|
||||
const reason = 'reason' in outcome ? outcome.reason : '(none)'
|
||||
console.error(
|
||||
`Snapshot refresh aborted. Outcome: ${outcome.status}; reason: ${reason}`
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
writeFileSync(
|
||||
tempPath,
|
||||
JSON.stringify(outcome.snapshot, null, 2) + '\n',
|
||||
'utf8'
|
||||
)
|
||||
renameSync(tempPath, snapshotPath)
|
||||
process.stdout.write(
|
||||
`Wrote feature flags snapshot to ${snapshotPath}: cloudFreeTier=${outcome.snapshot.flags.cloudFreeTier}\n`
|
||||
)
|
||||
@@ -1,3 +1 @@
|
||||
import snapshot from '../data/feature-flags.snapshot.json' with { type: 'json' }
|
||||
|
||||
export const SHOW_FREE_TIER = snapshot.flags.cloudFreeTier
|
||||
export const SHOW_FREE_TIER = false
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"fetchedAt": "2026-05-13T20:30:41.221Z",
|
||||
"flags": {
|
||||
"cloudFreeTier": false
|
||||
}
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
export interface FeatureFlagsSnapshot {
|
||||
fetchedAt: string
|
||||
flags: {
|
||||
cloudFreeTier: boolean
|
||||
}
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
import { mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import type { FetchOutcome } from './featureFlags'
|
||||
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
|
||||
|
||||
import {
|
||||
reportFeatureFlagsOutcome,
|
||||
resetFeatureFlagsReporterForTests
|
||||
} from './featureFlags.ci'
|
||||
|
||||
function baseSnapshot(cloudFreeTier = false): FeatureFlagsSnapshot {
|
||||
return {
|
||||
fetchedAt: new Date().toISOString(),
|
||||
flags: { cloudFreeTier }
|
||||
}
|
||||
}
|
||||
|
||||
function freshOutcome(cloudFreeTier = false): FetchOutcome {
|
||||
return { status: 'fresh', snapshot: baseSnapshot(cloudFreeTier) }
|
||||
}
|
||||
|
||||
describe('reportFeatureFlagsOutcome', () => {
|
||||
let writeSpy: ReturnType<typeof vi.spyOn>
|
||||
let summaryDir: string
|
||||
let summaryPath: string
|
||||
const originalSummary = process.env.GITHUB_STEP_SUMMARY
|
||||
|
||||
beforeEach(() => {
|
||||
resetFeatureFlagsReporterForTests()
|
||||
writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true)
|
||||
summaryDir = mkdtempSync(join(tmpdir(), 'feature-flags-summary-'))
|
||||
summaryPath = join(summaryDir, 'summary.md')
|
||||
writeFileSync(summaryPath, '')
|
||||
process.env.GITHUB_STEP_SUMMARY = summaryPath
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
writeSpy.mockRestore()
|
||||
rmSync(summaryDir, { recursive: true, force: true })
|
||||
if (originalSummary === undefined) delete process.env.GITHUB_STEP_SUMMARY
|
||||
else process.env.GITHUB_STEP_SUMMARY = originalSummary
|
||||
})
|
||||
|
||||
it('emits nothing on a fresh outcome', () => {
|
||||
reportFeatureFlagsOutcome(freshOutcome())
|
||||
expect(writeSpy).not.toHaveBeenCalled()
|
||||
expect(readFileSync(summaryPath, 'utf8')).toContain('Fresh')
|
||||
})
|
||||
|
||||
it('records cloudFreeTier value in the step summary', () => {
|
||||
reportFeatureFlagsOutcome(freshOutcome(true))
|
||||
expect(readFileSync(summaryPath, 'utf8')).toContain(
|
||||
'| **cloudFreeTier** | true |'
|
||||
)
|
||||
})
|
||||
|
||||
it('emits exactly one annotation across repeated calls', () => {
|
||||
reportFeatureFlagsOutcome({
|
||||
status: 'stale',
|
||||
reason: 'HTTP 500 Server Error',
|
||||
snapshot: baseSnapshot()
|
||||
})
|
||||
reportFeatureFlagsOutcome({
|
||||
status: 'stale',
|
||||
reason: 'HTTP 500 Server Error',
|
||||
snapshot: baseSnapshot()
|
||||
})
|
||||
expect(writeSpy).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('emits ::error for schema-mismatch stale outcomes', () => {
|
||||
reportFeatureFlagsOutcome({
|
||||
status: 'stale',
|
||||
reason:
|
||||
'schema validation failed: new_free_tier_subscriptions: Expected boolean',
|
||||
snapshot: baseSnapshot()
|
||||
})
|
||||
const annotation = writeSpy.mock.calls[0]![0] as string
|
||||
expect(annotation).toContain('::error title=Feature flags schema mismatch')
|
||||
})
|
||||
|
||||
it('emits ::warning for transient API unavailability', () => {
|
||||
reportFeatureFlagsOutcome({
|
||||
status: 'stale',
|
||||
reason: 'HTTP 503 Service Unavailable',
|
||||
snapshot: baseSnapshot()
|
||||
})
|
||||
const annotation = writeSpy.mock.calls[0]![0] as string
|
||||
expect(annotation).toContain(
|
||||
'::warning title=Feature flags API unavailable'
|
||||
)
|
||||
})
|
||||
|
||||
it('emits ::error for a failed outcome', () => {
|
||||
reportFeatureFlagsOutcome({
|
||||
status: 'failed',
|
||||
reason: 'HTTP 500 Server Error'
|
||||
})
|
||||
const annotation = writeSpy.mock.calls[0]![0] as string
|
||||
expect(annotation).toContain('::error title=Feature flags fetch failed')
|
||||
expect(readFileSync(summaryPath, 'utf8')).toContain('Failed')
|
||||
})
|
||||
|
||||
it('does not throw when GITHUB_STEP_SUMMARY is not set', () => {
|
||||
delete process.env.GITHUB_STEP_SUMMARY
|
||||
expect(() => reportFeatureFlagsOutcome(freshOutcome())).not.toThrow()
|
||||
})
|
||||
})
|
||||
@@ -1,92 +0,0 @@
|
||||
import { appendFileSync } from 'node:fs'
|
||||
|
||||
import type { FetchOutcome } from './featureFlags'
|
||||
|
||||
let hasReported = false
|
||||
|
||||
export function resetFeatureFlagsReporterForTests(): void {
|
||||
hasReported = false
|
||||
}
|
||||
|
||||
export function reportFeatureFlagsOutcome(outcome: FetchOutcome): void {
|
||||
if (hasReported) return
|
||||
hasReported = true
|
||||
|
||||
const lines = buildAnnotations(outcome)
|
||||
for (const line of lines) {
|
||||
process.stdout.write(`${line}\n`)
|
||||
}
|
||||
|
||||
const summaryPath = process.env.GITHUB_STEP_SUMMARY
|
||||
if (summaryPath) {
|
||||
try {
|
||||
appendFileSync(summaryPath, buildStepSummary(outcome))
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error)
|
||||
process.stderr.write(
|
||||
`feature-flags reporter: failed to write GITHUB_STEP_SUMMARY: ${message}\n`
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildAnnotations(outcome: FetchOutcome): string[] {
|
||||
if (outcome.status === 'fresh') return []
|
||||
|
||||
if (outcome.status === 'stale') {
|
||||
return [staleAnnotation(outcome.reason)]
|
||||
}
|
||||
|
||||
return [
|
||||
`::error title=Feature flags fetch failed and no snapshot is available::Cannot build site without feature flags.%0A%0AReason: ${escapeAnnotation(outcome.reason)}%0A%0AAction items:%0A 1. Run \`pnpm --filter @comfyorg/website feature-flags:refresh-snapshot\` locally.%0A 2. Commit apps/website/src/data/feature-flags.snapshot.json.%0A 3. Push and re-run CI.`
|
||||
]
|
||||
}
|
||||
|
||||
function staleAnnotation(reason: string): string {
|
||||
const escaped = escapeAnnotation(reason)
|
||||
if (reason.startsWith('schema')) {
|
||||
return `::error title=Feature flags schema mismatch::${escaped}. The /features API contract has likely changed. Build continues with the snapshot, but future updates will fail until the schema is fixed.%0A%0AAction items:%0A 1. Inspect the response at https://api.comfy.org/features.%0A 2. Update apps/website/src/utils/featureFlags.schema.ts to match the new shape.`
|
||||
}
|
||||
if (reason.startsWith('HTTP 401') || reason.startsWith('HTTP 403')) {
|
||||
return `::error title=Feature flags authentication failed::${escaped}. The /features endpoint should be public; check the backend. Build continues with the last-known-good snapshot.`
|
||||
}
|
||||
return `::warning title=Feature flags API unavailable::${escaped}. Using last-known-good snapshot.%0A%0AAction items:%0A 1. Check the status of https://api.comfy.org/features.%0A 2. Re-run this workflow once the API is healthy.`
|
||||
}
|
||||
|
||||
function escapeAnnotation(value: string): string {
|
||||
return value.replace(/%/g, '%25').replace(/\r/g, '%0D').replace(/\n/g, '%0A')
|
||||
}
|
||||
|
||||
function buildStepSummary(outcome: FetchOutcome): string {
|
||||
const header = '## 🚩 Feature Flags (/features)\n'
|
||||
const rows: Array<[string, string]> = []
|
||||
|
||||
if (outcome.status === 'fresh') {
|
||||
rows.push(['Status', '✅ Fresh (fetched from /features)'])
|
||||
rows.push(['cloudFreeTier', String(outcome.snapshot.flags.cloudFreeTier)])
|
||||
} else if (outcome.status === 'stale') {
|
||||
rows.push(['Status', '⚠️ Stale (using snapshot — /features fetch failed)'])
|
||||
rows.push(['cloudFreeTier', String(outcome.snapshot.flags.cloudFreeTier)])
|
||||
rows.push(['Reason', outcome.reason])
|
||||
rows.push(['Snapshot age', describeSnapshotAge(outcome.snapshot.fetchedAt)])
|
||||
} else {
|
||||
rows.push(['Status', '❌ Failed (no snapshot available)'])
|
||||
rows.push(['Reason', outcome.reason])
|
||||
}
|
||||
|
||||
const table =
|
||||
'| | |\n|---|---|\n' +
|
||||
rows.map(([k, v]) => `| **${k}** | ${v} |`).join('\n') +
|
||||
'\n'
|
||||
|
||||
return `${header}${table}\n`
|
||||
}
|
||||
|
||||
function describeSnapshotAge(fetchedAt: string): string {
|
||||
const fetched = new Date(fetchedAt).getTime()
|
||||
if (Number.isNaN(fetched)) return 'unknown'
|
||||
const days = Math.floor((Date.now() - fetched) / 86_400_000)
|
||||
if (days <= 0) return 'today'
|
||||
if (days === 1) return '1 day'
|
||||
return `${days} days`
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
import { z } from 'zod'
|
||||
|
||||
export const FeaturesResponseSchema = z
|
||||
.object({
|
||||
new_free_tier_subscriptions: z.boolean().optional(),
|
||||
free_tier_credits: z.number().optional(),
|
||||
partner_node_conversion_rate: z.number().optional()
|
||||
})
|
||||
.passthrough()
|
||||
|
||||
export type FeaturesResponse = z.infer<typeof FeaturesResponseSchema>
|
||||
@@ -1,190 +0,0 @@
|
||||
import { mkdtempSync, rmSync, writeFileSync } from 'node:fs'
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { pathToFileURL } from 'node:url'
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
|
||||
|
||||
import {
|
||||
fetchFeatureFlagsForBuild,
|
||||
resetFeatureFlagsFetcherForTests
|
||||
} from './featureFlags'
|
||||
|
||||
const BASE_URL = 'https://api.test'
|
||||
const tempSnapshotDirs: string[] = []
|
||||
|
||||
function response(body: unknown, init: Partial<ResponseInit> = {}): Response {
|
||||
const base: ResponseInit = {
|
||||
status: 200,
|
||||
headers: { 'content-type': 'application/json' }
|
||||
}
|
||||
return new Response(JSON.stringify(body), { ...base, ...init })
|
||||
}
|
||||
|
||||
function makeSnapshot(cloudFreeTier: boolean): FeatureFlagsSnapshot {
|
||||
return {
|
||||
fetchedAt: '2026-04-01T00:00:00.000Z',
|
||||
flags: { cloudFreeTier }
|
||||
}
|
||||
}
|
||||
|
||||
function withSnapshotDir(snapshot: FeatureFlagsSnapshot | null): URL {
|
||||
const dir = mkdtempSync(join(tmpdir(), 'feature-flags-test-'))
|
||||
tempSnapshotDirs.push(dir)
|
||||
const file = join(dir, 'feature-flags.snapshot.json')
|
||||
if (snapshot) writeFileSync(file, JSON.stringify(snapshot))
|
||||
return pathToFileURL(file)
|
||||
}
|
||||
|
||||
describe('fetchFeatureFlagsForBuild', () => {
|
||||
beforeEach(() => {
|
||||
resetFeatureFlagsFetcherForTests()
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
for (const dir of tempSnapshotDirs.splice(0)) {
|
||||
rmSync(dir, { recursive: true, force: true })
|
||||
}
|
||||
vi.restoreAllMocks()
|
||||
})
|
||||
|
||||
it('returns fresh with cloudFreeTier=true when /features sets the flag', async () => {
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ new_free_tier_subscriptions: true })
|
||||
)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('fresh')
|
||||
if (outcome.status !== 'fresh') return
|
||||
expect(outcome.snapshot.flags.cloudFreeTier).toBe(true)
|
||||
expect(fetchImpl).toHaveBeenCalledWith(
|
||||
`${BASE_URL}/features`,
|
||||
expect.objectContaining({ method: 'GET' })
|
||||
)
|
||||
})
|
||||
|
||||
it('defaults cloudFreeTier to false when the flag is absent from /features', async () => {
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ partner_node_conversion_rate: 0.05 })
|
||||
)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('fresh')
|
||||
if (outcome.status !== 'fresh') return
|
||||
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
|
||||
})
|
||||
|
||||
it('returns fresh with cloudFreeTier=false when explicitly disabled', async () => {
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ new_free_tier_subscriptions: false })
|
||||
)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('fresh')
|
||||
if (outcome.status !== 'fresh') return
|
||||
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
|
||||
})
|
||||
|
||||
it('returns stale with snapshot when the API returns 401', async () => {
|
||||
const snapshotUrl = withSnapshotDir(makeSnapshot(true))
|
||||
const fetchImpl = vi.fn(async () => response({}, { status: 401 }))
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
snapshotUrl,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('stale')
|
||||
if (outcome.status !== 'stale') return
|
||||
expect(outcome.reason).toMatch(/^HTTP 401/)
|
||||
expect(outcome.snapshot.flags.cloudFreeTier).toBe(true)
|
||||
expect(fetchImpl).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('retries 5xx up to the configured limit then falls back to snapshot', async () => {
|
||||
const snapshotUrl = withSnapshotDir(makeSnapshot(false))
|
||||
const fetchImpl = vi.fn(async () => response({}, { status: 503 }))
|
||||
const sleep = vi.fn(async () => undefined)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
snapshotUrl,
|
||||
retryDelaysMs: [1, 1, 1],
|
||||
sleep,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('stale')
|
||||
expect(fetchImpl).toHaveBeenCalledTimes(4)
|
||||
expect(sleep).toHaveBeenCalledTimes(3)
|
||||
})
|
||||
|
||||
it('falls back to snapshot on schema validation failure', async () => {
|
||||
const snapshotUrl = withSnapshotDir(makeSnapshot(false))
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ new_free_tier_subscriptions: 'yes' })
|
||||
)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
snapshotUrl,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('stale')
|
||||
if (outcome.status !== 'stale') return
|
||||
expect(outcome.reason).toMatch(/^schema validation/)
|
||||
})
|
||||
|
||||
it('falls back to the bundled snapshot when fetch fails and the override is missing', async () => {
|
||||
const snapshotUrl = withSnapshotDir(null)
|
||||
const fetchImpl = vi.fn(async () => response({}, { status: 500 }))
|
||||
const sleep = vi.fn(async () => undefined)
|
||||
const outcome = await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
snapshotUrl,
|
||||
retryDelaysMs: [1, 1, 1],
|
||||
sleep,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
expect(outcome.status).toBe('stale')
|
||||
if (outcome.status !== 'stale') return
|
||||
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
|
||||
})
|
||||
|
||||
it('memoizes within a single process', async () => {
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ new_free_tier_subscriptions: true })
|
||||
)
|
||||
const opts = {
|
||||
baseUrl: BASE_URL,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
}
|
||||
const [a, b] = await Promise.all([
|
||||
fetchFeatureFlagsForBuild(opts),
|
||||
fetchFeatureFlagsForBuild(opts)
|
||||
])
|
||||
expect(a).toBe(b)
|
||||
expect(fetchImpl).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('never writes to the snapshot file on success', async () => {
|
||||
const snapshot = makeSnapshot(true)
|
||||
const snapshotUrl = withSnapshotDir(snapshot)
|
||||
const fs = await import('node:fs')
|
||||
const initial = fs.readFileSync(snapshotUrl).toString()
|
||||
const fetchImpl = vi.fn(async () =>
|
||||
response({ new_free_tier_subscriptions: false })
|
||||
)
|
||||
await fetchFeatureFlagsForBuild({
|
||||
baseUrl: BASE_URL,
|
||||
snapshotUrl,
|
||||
fetchImpl: fetchImpl as unknown as typeof fetch
|
||||
})
|
||||
const after = fs.readFileSync(snapshotUrl).toString()
|
||||
expect(after).toBe(initial)
|
||||
})
|
||||
})
|
||||
@@ -1,189 +0,0 @@
|
||||
import { readFile } from 'node:fs/promises'
|
||||
|
||||
import type { FeaturesResponse } from './featureFlags.schema'
|
||||
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
|
||||
|
||||
import { FeaturesResponseSchema } from './featureFlags.schema'
|
||||
|
||||
import bundledSnapshot from '../data/feature-flags.snapshot.json' with { type: 'json' }
|
||||
|
||||
const DEFAULT_BASE_URL = 'https://api.comfy.org'
|
||||
const DEFAULT_TIMEOUT_MS = 10_000
|
||||
const RETRY_DELAYS_MS = [1_000, 2_000, 4_000]
|
||||
|
||||
export type FetchOutcome =
|
||||
| { status: 'fresh'; snapshot: FeatureFlagsSnapshot }
|
||||
| { status: 'stale'; snapshot: FeatureFlagsSnapshot; reason: string }
|
||||
| { status: 'failed'; reason: string }
|
||||
|
||||
interface FetchFeatureFlagsOptions {
|
||||
baseUrl?: string
|
||||
timeoutMs?: number
|
||||
retryDelaysMs?: readonly number[]
|
||||
fetchImpl?: typeof fetch
|
||||
snapshotUrl?: URL
|
||||
sleep?: (ms: number) => Promise<void>
|
||||
}
|
||||
|
||||
let inflight: Promise<FetchOutcome> | undefined
|
||||
|
||||
export function resetFeatureFlagsFetcherForTests(): void {
|
||||
inflight = undefined
|
||||
}
|
||||
|
||||
export function fetchFeatureFlagsForBuild(
|
||||
options: FetchFeatureFlagsOptions = {}
|
||||
): Promise<FetchOutcome> {
|
||||
inflight ??= doFetchFeatureFlagsForBuild(options)
|
||||
return inflight
|
||||
}
|
||||
|
||||
async function doFetchFeatureFlagsForBuild(
|
||||
options: FetchFeatureFlagsOptions
|
||||
): Promise<FetchOutcome> {
|
||||
const result = await tryFetchAndParse(options)
|
||||
if (result.kind === 'ok') {
|
||||
return {
|
||||
status: 'fresh',
|
||||
snapshot: {
|
||||
fetchedAt: new Date().toISOString(),
|
||||
flags: deriveFlags(result.features)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fallback(result.reason, options.snapshotUrl)
|
||||
}
|
||||
|
||||
async function fallback(
|
||||
reason: string,
|
||||
snapshotUrl: URL | undefined
|
||||
): Promise<FetchOutcome> {
|
||||
const snapshot = await readSnapshot(snapshotUrl)
|
||||
if (snapshot) return { status: 'stale', snapshot, reason }
|
||||
return { status: 'failed', reason }
|
||||
}
|
||||
|
||||
interface FetchOk {
|
||||
kind: 'ok'
|
||||
features: FeaturesResponse
|
||||
}
|
||||
|
||||
interface FetchErr {
|
||||
kind: 'err'
|
||||
reason: string
|
||||
}
|
||||
|
||||
async function tryFetchAndParse(
|
||||
options: FetchFeatureFlagsOptions
|
||||
): Promise<FetchOk | FetchErr> {
|
||||
const baseUrl = options.baseUrl ?? DEFAULT_BASE_URL
|
||||
const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS
|
||||
const retryDelaysMs = options.retryDelaysMs ?? RETRY_DELAYS_MS
|
||||
const fetchImpl = options.fetchImpl ?? fetch
|
||||
const sleep = options.sleep ?? defaultSleep
|
||||
|
||||
const url = `${baseUrl.replace(/\/+$/, '')}/features`
|
||||
|
||||
let lastReason = 'unknown error'
|
||||
for (let attempt = 0; attempt <= retryDelaysMs.length; attempt++) {
|
||||
if (attempt > 0) await sleep(retryDelaysMs[attempt - 1])
|
||||
|
||||
const response = await callOnce(fetchImpl, url, timeoutMs)
|
||||
if (response.kind === 'err') {
|
||||
lastReason = response.reason
|
||||
if (!response.retryable) return response
|
||||
continue
|
||||
}
|
||||
|
||||
const parsed = FeaturesResponseSchema.safeParse(response.body)
|
||||
if (!parsed.success) {
|
||||
return {
|
||||
kind: 'err',
|
||||
reason: `schema validation failed: ${parsed.error.issues
|
||||
.map((i) => `${i.path.join('.') || '<root>'}: ${i.message}`)
|
||||
.join('; ')}`
|
||||
}
|
||||
}
|
||||
|
||||
return { kind: 'ok', features: parsed.data }
|
||||
}
|
||||
|
||||
return { kind: 'err', reason: lastReason }
|
||||
}
|
||||
|
||||
type CallResponse =
|
||||
| { kind: 'ok'; body: unknown }
|
||||
| { kind: 'err'; reason: string; retryable: boolean }
|
||||
|
||||
async function callOnce(
|
||||
fetchImpl: typeof fetch,
|
||||
url: string,
|
||||
timeoutMs: number
|
||||
): Promise<CallResponse> {
|
||||
const controller = new AbortController()
|
||||
const timer = setTimeout(() => controller.abort(), timeoutMs)
|
||||
try {
|
||||
const res = await fetchImpl(url, {
|
||||
method: 'GET',
|
||||
headers: { Accept: 'application/json' },
|
||||
signal: controller.signal
|
||||
})
|
||||
if (res.ok) {
|
||||
return { kind: 'ok', body: await res.json() }
|
||||
}
|
||||
const retryable =
|
||||
res.status === 429 || (res.status >= 500 && res.status < 600)
|
||||
return {
|
||||
kind: 'err',
|
||||
reason: `HTTP ${res.status} ${res.statusText || ''}`.trim(),
|
||||
retryable
|
||||
}
|
||||
} catch (error) {
|
||||
const reason =
|
||||
error instanceof Error
|
||||
? `network error: ${error.message}`
|
||||
: 'network error'
|
||||
return { kind: 'err', reason, retryable: true }
|
||||
} finally {
|
||||
clearTimeout(timer)
|
||||
}
|
||||
}
|
||||
|
||||
function deriveFlags(
|
||||
features: FeaturesResponse
|
||||
): FeatureFlagsSnapshot['flags'] {
|
||||
return {
|
||||
cloudFreeTier: features.new_free_tier_subscriptions ?? false
|
||||
}
|
||||
}
|
||||
|
||||
async function readSnapshot(
|
||||
snapshotUrl: URL | undefined
|
||||
): Promise<FeatureFlagsSnapshot | null> {
|
||||
if (snapshotUrl) {
|
||||
try {
|
||||
const text = await readFile(snapshotUrl, 'utf8')
|
||||
const parsed: unknown = JSON.parse(text)
|
||||
if (isFeatureFlagsSnapshot(parsed)) return parsed
|
||||
} catch {
|
||||
// Fall through to the bundled snapshot if the override is unreadable.
|
||||
}
|
||||
}
|
||||
return isFeatureFlagsSnapshot(bundledSnapshot) ? bundledSnapshot : null
|
||||
}
|
||||
|
||||
function isFeatureFlagsSnapshot(value: unknown): value is FeatureFlagsSnapshot {
|
||||
if (value === null || typeof value !== 'object') return false
|
||||
const candidate = value as { fetchedAt?: unknown; flags?: unknown }
|
||||
if (typeof candidate.fetchedAt !== 'string') return false
|
||||
if (candidate.flags === null || typeof candidate.flags !== 'object') {
|
||||
return false
|
||||
}
|
||||
const flags = candidate.flags as { cloudFreeTier?: unknown }
|
||||
return typeof flags.cloudFreeTier === 'boolean'
|
||||
}
|
||||
|
||||
function defaultSleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
{
|
||||
"id": "06e5b524-5a40-40b9-b561-199dfab18cf0",
|
||||
"revision": 0,
|
||||
"last_node_id": 12,
|
||||
"last_link_id": 10,
|
||||
"nodes": [
|
||||
{
|
||||
"id": 10,
|
||||
"type": "KSampler",
|
||||
"pos": [230, 110],
|
||||
"size": [270, 317.5666809082031],
|
||||
"flags": {},
|
||||
"order": 1,
|
||||
"mode": 0,
|
||||
"inputs": [
|
||||
{
|
||||
"name": "model",
|
||||
"type": "MODEL",
|
||||
"link": null
|
||||
},
|
||||
{
|
||||
"name": "positive",
|
||||
"type": "CONDITIONING",
|
||||
"link": null
|
||||
},
|
||||
{
|
||||
"name": "negative",
|
||||
"type": "CONDITIONING",
|
||||
"link": null
|
||||
},
|
||||
{
|
||||
"name": "latent_image",
|
||||
"type": "LATENT",
|
||||
"link": null
|
||||
},
|
||||
{
|
||||
"name": "denoise",
|
||||
"type": "FLOAT",
|
||||
"widget": {
|
||||
"name": "denoise"
|
||||
},
|
||||
"link": 10
|
||||
}
|
||||
],
|
||||
"outputs": [
|
||||
{
|
||||
"name": "LATENT",
|
||||
"type": "LATENT",
|
||||
"links": null
|
||||
}
|
||||
],
|
||||
"properties": {
|
||||
"Node name for S&R": "KSampler"
|
||||
},
|
||||
"widgets_values": [0, "randomize", 20, 8, "euler", "simple", 1]
|
||||
},
|
||||
{
|
||||
"id": 11,
|
||||
"type": "PrimitiveFloat",
|
||||
"pos": [-80.55032348632812, 375.2260443115233],
|
||||
"size": [270, 80.23332977294922],
|
||||
"flags": {},
|
||||
"order": 0,
|
||||
"mode": 0,
|
||||
"inputs": [],
|
||||
"outputs": [
|
||||
{
|
||||
"name": "FLOAT",
|
||||
"type": "FLOAT",
|
||||
"links": [10]
|
||||
}
|
||||
],
|
||||
"properties": {
|
||||
"Node name for S&R": "PrimitiveFloat"
|
||||
},
|
||||
"widgets_values": [0]
|
||||
}
|
||||
],
|
||||
"links": [[10, 11, 0, 10, 4, "FLOAT"]],
|
||||
"groups": [],
|
||||
"config": {},
|
||||
"extra": {
|
||||
"ds": {
|
||||
"scale": 0.8264462809917354,
|
||||
"offset": [1335.8909766107738, 692.7345403667316]
|
||||
},
|
||||
"frontendVersion": "1.45.4"
|
||||
},
|
||||
"version": 0.4
|
||||
}
|
||||
@@ -1133,108 +1133,3 @@ test.describe(
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
test.describe('Vue Node Widget Link Position', { tag: '@vue-nodes' }, () => {
|
||||
test('should keep widget-input link aligned after persisted-workflow reload', async ({
|
||||
comfyPage
|
||||
}) => {
|
||||
test.setTimeout(30000)
|
||||
|
||||
await comfyPage.workflow.loadWorkflow(
|
||||
'vueNodes/ksampler-denoise-widget-link'
|
||||
)
|
||||
await comfyPage.vueNodes.waitForNodes(2)
|
||||
await comfyPage.workflow.waitForDraftPersisted()
|
||||
await comfyPage.workflow.reloadAndWaitForApp()
|
||||
await comfyPage.vueNodes.waitForNodes(2)
|
||||
|
||||
const ksampler = await comfyPage.page.evaluate(() => {
|
||||
const node = window.app!.graph.nodes.find((n) => n.type === 'KSampler')
|
||||
if (!node) return null
|
||||
const findIndex = (name: string) =>
|
||||
node.inputs.findIndex(
|
||||
(input) => input.name === name || input.widget?.name === name
|
||||
)
|
||||
return {
|
||||
id: node.id,
|
||||
denoiseIndex: findIndex('denoise'),
|
||||
schedulerIndex: findIndex('scheduler')
|
||||
}
|
||||
})
|
||||
if (!ksampler) {
|
||||
throw new Error('KSampler should be present in fixture')
|
||||
}
|
||||
expect(
|
||||
ksampler.denoiseIndex,
|
||||
'denoise input slot not found'
|
||||
).toBeGreaterThanOrEqual(0)
|
||||
expect(
|
||||
ksampler.schedulerIndex,
|
||||
'scheduler input slot not found'
|
||||
).toBeGreaterThanOrEqual(0)
|
||||
|
||||
const denoiseSlot = slotLocator(
|
||||
comfyPage.page,
|
||||
ksampler.id,
|
||||
ksampler.denoiseIndex,
|
||||
true
|
||||
)
|
||||
const schedulerSlot = slotLocator(
|
||||
comfyPage.page,
|
||||
ksampler.id,
|
||||
ksampler.schedulerIndex,
|
||||
true
|
||||
)
|
||||
await expectVisibleAll(denoiseSlot, schedulerSlot)
|
||||
|
||||
await expect
|
||||
.poll(() =>
|
||||
getInputLinkDetails(comfyPage.page, ksampler.id, ksampler.denoiseIndex)
|
||||
)
|
||||
.toMatchObject({
|
||||
targetId: ksampler.id,
|
||||
targetSlot: ksampler.denoiseIndex
|
||||
})
|
||||
|
||||
// If the regression returns, getInputPos stays stale relative to the
|
||||
// grown slot DOM and the endpoint drifts toward scheduler. Re-read
|
||||
// positions each retry so layout settle doesn't cause flakes.
|
||||
await expect(async () => {
|
||||
const linkEnd = await comfyPage.page.evaluate(
|
||||
([nodeId, targetSlotIndex]) => {
|
||||
const node = window.app!.graph.getNodeById(nodeId)
|
||||
if (!node) return null
|
||||
const slotPos = node.getInputPos(targetSlotIndex)
|
||||
const [cx, cy] = window.app!.canvas.ds.convertOffsetToCanvas([
|
||||
slotPos[0],
|
||||
slotPos[1]
|
||||
])
|
||||
const rect = window.app!.canvas.canvas.getBoundingClientRect()
|
||||
return { x: cx + rect.left, y: cy + rect.top }
|
||||
},
|
||||
[ksampler.id, ksampler.denoiseIndex] as const
|
||||
)
|
||||
expect(linkEnd, 'link endpoint should resolve').not.toBeNull()
|
||||
|
||||
const denoiseCenter = await getCenter(denoiseSlot)
|
||||
const schedulerCenter = await getCenter(schedulerSlot)
|
||||
const distToDenoise = Math.hypot(
|
||||
linkEnd!.x - denoiseCenter.x,
|
||||
linkEnd!.y - denoiseCenter.y
|
||||
)
|
||||
const rowGap = Math.hypot(
|
||||
denoiseCenter.x - schedulerCenter.x,
|
||||
denoiseCenter.y - schedulerCenter.y
|
||||
)
|
||||
|
||||
// Bound at rowGap / 4 - half the inter-slot midpoint, so any drift
|
||||
// toward scheduler fails well before reaching it.
|
||||
expect(
|
||||
distToDenoise,
|
||||
`Link endpoint (${linkEnd!.x.toFixed(1)}, ${linkEnd!.y.toFixed(1)}) is ` +
|
||||
`${distToDenoise.toFixed(1)}px from denoise — should be within ` +
|
||||
`${(rowGap / 4).toFixed(1)}px (quarter of inter-slot gap ${rowGap.toFixed(1)}px)`
|
||||
).toBeLessThan(rowGap / 4)
|
||||
}).toPass({ timeout: 5000 })
|
||||
})
|
||||
})
|
||||
|
||||
@@ -90,7 +90,6 @@ import { useCanvasInteractions } from '@/renderer/core/canvas/useCanvasInteracti
|
||||
import AppInput from '@/renderer/extensions/linearMode/AppInput.vue'
|
||||
import { useNodeZIndex } from '@/renderer/extensions/vueNodes/composables/useNodeZIndex'
|
||||
import { useProcessedWidgets } from '@/renderer/extensions/vueNodes/composables/useProcessedWidgets'
|
||||
import { useVueElementTracking } from '@/renderer/extensions/vueNodes/composables/useVueNodeResizeTracking'
|
||||
import { cn } from '@comfyorg/tailwind-utils'
|
||||
|
||||
import InputSlot from './InputSlot.vue'
|
||||
@@ -135,9 +134,4 @@ const {
|
||||
processedWidgets,
|
||||
showAdvanced
|
||||
} = useProcessedWidgets(() => nodeData)
|
||||
|
||||
// Tracks widget-row growth that the node-level RO can't see
|
||||
if (nodeData?.id != null) {
|
||||
useVueElementTracking(String(nodeData.id), 'widgets-grid')
|
||||
}
|
||||
</script>
|
||||
|
||||
@@ -29,10 +29,7 @@ const raf = createRafBatch(() => {
|
||||
flushScheduledSlotLayoutSync()
|
||||
})
|
||||
|
||||
export function scheduleSlotLayoutSync(nodeId: string) {
|
||||
// Drop signals for unregistered nodes (e.g. preview nodes with synthetic
|
||||
// ids from LGraphNodePreview) - they'd otherwise pump setDirty per RAF.
|
||||
if (!useNodeSlotRegistryStore().getNode(nodeId)) return
|
||||
function scheduleSlotLayoutSync(nodeId: string) {
|
||||
pendingNodes.add(nodeId)
|
||||
raf.schedule()
|
||||
}
|
||||
|
||||
@@ -43,8 +43,7 @@ const testState = vi.hoisted(() => ({
|
||||
nodeLayouts: new Map<NodeId, NodeLayout>(),
|
||||
batchUpdateNodeBounds: vi.fn(),
|
||||
setSource: vi.fn(),
|
||||
syncNodeSlotLayoutsFromDOM: vi.fn(),
|
||||
scheduleSlotLayoutSync: vi.fn()
|
||||
syncNodeSlotLayoutsFromDOM: vi.fn()
|
||||
}))
|
||||
|
||||
vi.mock('@vueuse/core', () => ({
|
||||
@@ -74,7 +73,6 @@ vi.mock('@/renderer/core/layout/store/layoutStore', () => ({
|
||||
}))
|
||||
|
||||
vi.mock('./useSlotElementTracking', () => ({
|
||||
scheduleSlotLayoutSync: testState.scheduleSlotLayoutSync,
|
||||
syncNodeSlotLayoutsFromDOM: testState.syncNodeSlotLayoutsFromDOM
|
||||
}))
|
||||
|
||||
@@ -161,7 +159,6 @@ describe('useVueNodeResizeTracking', () => {
|
||||
testState.batchUpdateNodeBounds.mockReset()
|
||||
testState.setSource.mockReset()
|
||||
testState.syncNodeSlotLayoutsFromDOM.mockReset()
|
||||
testState.scheduleSlotLayoutSync.mockReset()
|
||||
resizeObserverState.observe.mockReset()
|
||||
resizeObserverState.unobserve.mockReset()
|
||||
resizeObserverState.disconnect.mockReset()
|
||||
@@ -320,25 +317,4 @@ describe('useVueNodeResizeTracking', () => {
|
||||
expect(testState.setSource).toHaveBeenCalledWith(LayoutSource.DOM)
|
||||
expect(testState.batchUpdateNodeBounds).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('widgets-grid resize schedules a slot resync without writing node bounds', () => {
|
||||
const parentNodeId: NodeId = 'parent-node'
|
||||
const element = document.createElement('div')
|
||||
element.dataset.widgetsGridNodeId = parentNodeId
|
||||
const boxSizes = [{ inlineSize: 200, blockSize: 80 }]
|
||||
const entry = {
|
||||
target: element,
|
||||
borderBoxSize: boxSizes,
|
||||
contentBoxSize: boxSizes,
|
||||
devicePixelContentBoxSize: boxSizes,
|
||||
contentRect: new DOMRect(0, 0, 200, 80)
|
||||
} satisfies ResizeEntryLike
|
||||
|
||||
resizeObserverState.callback?.([entry], createObserverMock())
|
||||
|
||||
expect(testState.scheduleSlotLayoutSync).toHaveBeenCalledWith(parentNodeId)
|
||||
expect(testState.batchUpdateNodeBounds).not.toHaveBeenCalled()
|
||||
expect(testState.setSource).not.toHaveBeenCalled()
|
||||
expect(testState.syncNodeSlotLayoutsFromDOM).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -24,10 +24,7 @@ import {
|
||||
} from '@/renderer/core/layout/utils/geometry'
|
||||
import { removeNodeTitleHeight } from '@/renderer/core/layout/utils/nodeSizeUtil'
|
||||
|
||||
import {
|
||||
scheduleSlotLayoutSync,
|
||||
syncNodeSlotLayoutsFromDOM
|
||||
} from './useSlotElementTracking'
|
||||
import { syncNodeSlotLayoutsFromDOM } from './useSlotElementTracking'
|
||||
|
||||
/**
|
||||
* Generic update item for element bounds tracking
|
||||
@@ -50,14 +47,14 @@ interface CachedNodeMeasurement {
|
||||
interface ElementTrackingConfig {
|
||||
/** Data attribute name (e.g., 'nodeId') */
|
||||
dataAttribute: string
|
||||
/** Handler for processing bounds updates. Omit for signal-only entries. */
|
||||
updateHandler?: (updates: ElementBoundsUpdate[]) => void
|
||||
/** Handler for processing bounds updates */
|
||||
updateHandler: (updates: ElementBoundsUpdate[]) => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Registry of tracking configurations by element type
|
||||
*/
|
||||
const trackingConfigs = new Map<string, ElementTrackingConfig>([
|
||||
const trackingConfigs: Map<string, ElementTrackingConfig> = new Map([
|
||||
[
|
||||
'node',
|
||||
{
|
||||
@@ -70,10 +67,7 @@ const trackingConfigs = new Map<string, ElementTrackingConfig>([
|
||||
layoutStore.batchUpdateNodeBounds(nodeUpdates)
|
||||
}
|
||||
}
|
||||
],
|
||||
// Signal-only: outer node stays at its persisted min-h floor during
|
||||
// widget hydration, so the inner grid's RO is the only slot-drift signal.
|
||||
['widgets-grid', { dataAttribute: 'widgetsGridNodeId' }]
|
||||
]
|
||||
])
|
||||
|
||||
// Elements whose ResizeObserver fired while the tab was hidden
|
||||
@@ -127,14 +121,6 @@ const resizeObserver = new ResizeObserver((entries) => {
|
||||
if (!(entry.target instanceof HTMLElement)) continue
|
||||
const element = entry.target
|
||||
|
||||
// Signal-only widgets-grid resize - route the parent node through the
|
||||
// slot-layout pipeline and skip bounds processing entirely.
|
||||
const widgetsGridParentNodeId = element.dataset.widgetsGridNodeId
|
||||
if (widgetsGridParentNodeId) {
|
||||
scheduleSlotLayoutSync(widgetsGridParentNodeId as NodeId)
|
||||
continue
|
||||
}
|
||||
|
||||
// Find which type this element belongs to
|
||||
let elementType: string | undefined
|
||||
let elementId: string | undefined
|
||||
@@ -252,7 +238,7 @@ const resizeObserver = new ResizeObserver((entries) => {
|
||||
// Flush per-type
|
||||
for (const [type, updates] of updatesByType) {
|
||||
const config = trackingConfigs.get(type)
|
||||
if (config?.updateHandler && updates.length) config.updateHandler(updates)
|
||||
if (config && updates.length) config.updateHandler(updates)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
|
||||
const zNodeType = z.string()
|
||||
const zJobId = z.string()
|
||||
export type JobId = z.infer<typeof zJobId>
|
||||
const zWorkflowId = z.string()
|
||||
export const resultItemType = z.enum(['input', 'output', 'temp'])
|
||||
export type ResultItemType = z.infer<typeof resultItemType>
|
||||
|
||||
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
|
||||
value: z.number().int(),
|
||||
max: z.number().int(),
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
node: zNodeId
|
||||
})
|
||||
|
||||
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
|
||||
state: z.enum(['pending', 'running', 'finished', 'error']),
|
||||
node_id: zNodeId,
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
display_node_id: zNodeId.optional(),
|
||||
parent_node_id: zNodeId.optional(),
|
||||
real_node_id: zNodeId.optional()
|
||||
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
|
||||
|
||||
const zProgressStateWsMessage = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
nodes: z.record(zNodeId, zNodeProgressState)
|
||||
})
|
||||
|
||||
const zExecutingWsMessage = z.object({
|
||||
node: zNodeId,
|
||||
display_node: zNodeId,
|
||||
prompt_id: zJobId
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
|
||||
|
||||
const zExecutionWsMessageBase = z.object({
|
||||
prompt_id: zJobId,
|
||||
workflow_id: zWorkflowId.optional(),
|
||||
timestamp: z.number().int()
|
||||
})
|
||||
|
||||
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
|
||||
const zProgressTextWsMessage = z.object({
|
||||
nodeId: zNodeId,
|
||||
text: z.string(),
|
||||
prompt_id: z.string().optional()
|
||||
prompt_id: z.string().optional(),
|
||||
workflow_id: zWorkflowId.optional()
|
||||
})
|
||||
|
||||
const zNotificationWsMessage = z.object({
|
||||
|
||||
@@ -11,12 +11,22 @@ const {
|
||||
mockNodeExecutionIdToNodeLocatorId,
|
||||
mockNodeIdToNodeLocatorId,
|
||||
mockNodeLocatorIdToNodeExecutionId,
|
||||
mockShowTextPreview
|
||||
mockShowTextPreview,
|
||||
mockActiveWorkflow,
|
||||
mockRevokePreviewsByExecutionId
|
||||
} = vi.hoisted(() => ({
|
||||
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeIdToNodeLocatorId: vi.fn(),
|
||||
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
|
||||
mockShowTextPreview: vi.fn()
|
||||
mockShowTextPreview: vi.fn(),
|
||||
mockActiveWorkflow: {
|
||||
current: null as null | {
|
||||
activeState?: { id?: string }
|
||||
initialState?: { id?: string }
|
||||
path?: string
|
||||
}
|
||||
},
|
||||
mockRevokePreviewsByExecutionId: vi.fn()
|
||||
}))
|
||||
|
||||
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
|
||||
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
|
||||
useWorkflowStore: vi.fn(() => ({
|
||||
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
|
||||
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
|
||||
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
|
||||
get activeWorkflow() {
|
||||
return mockActiveWorkflow.current
|
||||
}
|
||||
}))
|
||||
}
|
||||
})
|
||||
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
|
||||
}
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/imagePreviewStore', () => ({
|
||||
vi.mock('@/stores/nodeOutputStore', () => ({
|
||||
useNodeOutputStore: () => ({
|
||||
revokePreviewsByExecutionId: vi.fn()
|
||||
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
|
||||
})
|
||||
}))
|
||||
|
||||
@@ -491,6 +504,445 @@ describe('useExecutionStore - clearActiveJobIfStale', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - active workflow gating', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
function makeProgressNodes(
|
||||
nodeId: string,
|
||||
jobId: string
|
||||
): Record<string, NodeProgressState> {
|
||||
return {
|
||||
[nodeId]: {
|
||||
value: 5,
|
||||
max: 10,
|
||||
state: 'running',
|
||||
node_id: nodeId,
|
||||
prompt_id: jobId,
|
||||
display_node_id: nodeId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function fireProgressState(
|
||||
jobId: string,
|
||||
nodes: Record<string, NodeProgressState>,
|
||||
workflowId?: string
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress_state')
|
||||
if (!handler) throw new Error('progress_state handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress_state', {
|
||||
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
function fireProgress(
|
||||
jobId: string,
|
||||
nodeId: string,
|
||||
workflowId?: string,
|
||||
value = 5,
|
||||
max = 10
|
||||
) {
|
||||
const handler = apiEventHandlers.get('progress')
|
||||
if (!handler) throw new Error('progress handler not bound')
|
||||
handler(
|
||||
new CustomEvent('progress', {
|
||||
detail: {
|
||||
value,
|
||||
max,
|
||||
prompt_id: jobId,
|
||||
node: nodeId,
|
||||
workflow_id: workflowId
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
})
|
||||
|
||||
it('always updates per-job progress regardless of active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
|
||||
})
|
||||
|
||||
it('skips global mirror when message workflow_id mismatches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('updates global mirror when message workflow_id matches active workflow', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('falls back to jobIdToWorkflowId mapping when workflow_id missing', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('falls back to session path mapping when no id mapping is registered', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
|
||||
|
||||
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual({})
|
||||
})
|
||||
|
||||
it('preserves single-tab behaviour when ownership is unresolvable', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(
|
||||
makeProgressNodes('1', 'job-unknown')
|
||||
)
|
||||
})
|
||||
|
||||
it('updates mirror when there is no active workflow', () => {
|
||||
mockActiveWorkflow.current = null
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
|
||||
|
||||
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
|
||||
})
|
||||
|
||||
it('skips preview revocation for non-active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState(
|
||||
'job-other',
|
||||
makeProgressNodes('1', 'job-other'),
|
||||
'wf-other'
|
||||
)
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('revokes previews for active workflow messages', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
|
||||
})
|
||||
|
||||
it('skips _executingNodeProgress on workflow_id mismatch', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgress('job-other', '1', 'wf-other')
|
||||
|
||||
expect(store._executingNodeProgress).toBeNull()
|
||||
})
|
||||
|
||||
it('updates _executingNodeProgress on workflow_id match', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
|
||||
fireProgress('job-1', '1', 'wf-active', 7, 10)
|
||||
|
||||
expect(store._executingNodeProgress).toEqual({
|
||||
value: 7,
|
||||
max: 10,
|
||||
prompt_id: 'job-1',
|
||||
node: '1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
})
|
||||
|
||||
it('execution_start from a non-active workflow does not steal activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBeNull()
|
||||
})
|
||||
|
||||
it('execution_start from active workflow adopts activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const handler = apiEventHandlers.get('execution_start')
|
||||
if (!handler) throw new Error('execution_start handler not bound')
|
||||
handler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('execution_success from a non-active workflow does not clear activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
const successHandler = apiEventHandlers.get('execution_success')
|
||||
if (!successHandler) throw new Error('execution_success handler not bound')
|
||||
successHandler(
|
||||
new CustomEvent('execution_success', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('execution_interrupted from a non-active workflow does not clear activeJobId', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: {
|
||||
prompt_id: 'job-1',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-active'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
const intHandler = apiEventHandlers.get('execution_interrupted')
|
||||
if (!intHandler) throw new Error('execution_interrupted handler not bound')
|
||||
intHandler(
|
||||
new CustomEvent('execution_interrupted', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
node_id: '1',
|
||||
node_type: 'X',
|
||||
executed: [],
|
||||
workflow_id: 'wf-other'
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
})
|
||||
|
||||
it('execution_cached from a non-active workflow does not mark active job nodes', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
const cachedHandler = apiEventHandlers.get('execution_cached')
|
||||
if (!cachedHandler) throw new Error('execution_cached handler not bound')
|
||||
cachedHandler(
|
||||
new CustomEvent('execution_cached', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other',
|
||||
nodes: ['n1', 'n2']
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJob?.nodes).toEqual({})
|
||||
})
|
||||
|
||||
it('executed from a non-active workflow does not mark active job nodes', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
const executedHandler = apiEventHandlers.get('executed')
|
||||
if (!executedHandler) throw new Error('executed handler not bound')
|
||||
executedHandler(
|
||||
new CustomEvent('executed', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
node: 'n1',
|
||||
display_node: 'n1',
|
||||
workflow_id: 'wf-other',
|
||||
output: {}
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJob?.nodes['n1']).toBeUndefined()
|
||||
})
|
||||
|
||||
it('execution_error from a non-active workflow does not clear active job state but still clears the errored job initializing flag', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const startHandler = apiEventHandlers.get('execution_start')
|
||||
if (!startHandler) throw new Error('execution_start handler not bound')
|
||||
startHandler(
|
||||
new CustomEvent('execution_start', {
|
||||
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
|
||||
})
|
||||
)
|
||||
|
||||
store.initializingJobIds = new Set(['job-other'])
|
||||
|
||||
const errorHandler = apiEventHandlers.get('execution_error')
|
||||
if (!errorHandler) throw new Error('execution_error handler not bound')
|
||||
errorHandler(
|
||||
new CustomEvent('execution_error', {
|
||||
detail: {
|
||||
prompt_id: 'job-other',
|
||||
timestamp: 0,
|
||||
workflow_id: 'wf-other',
|
||||
node_id: 'n1',
|
||||
node_type: 'X',
|
||||
executed: [],
|
||||
exception_message: 'oops',
|
||||
exception_type: 'RuntimeError',
|
||||
traceback: [],
|
||||
current_inputs: {},
|
||||
current_outputs: {}
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
expect(store.activeJobId).toBe('job-1')
|
||||
expect(store.initializingJobIds.has('job-other')).toBe(false)
|
||||
expect(useExecutionErrorStore().lastExecutionError).toBeNull()
|
||||
})
|
||||
|
||||
it('revokes preview when node transitions pending -> running', () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const pendingNodes: Record<string, NodeProgressState> = {
|
||||
n1: {
|
||||
value: 0,
|
||||
max: 10,
|
||||
state: 'pending',
|
||||
node_id: 'n1',
|
||||
prompt_id: 'job-1',
|
||||
display_node_id: 'n1'
|
||||
}
|
||||
}
|
||||
fireProgressState('job-1', pendingNodes, 'wf-active')
|
||||
mockRevokePreviewsByExecutionId.mockClear()
|
||||
|
||||
const runningNodes: Record<string, NodeProgressState> = {
|
||||
n1: { ...pendingNodes.n1, state: 'running', value: 1 }
|
||||
}
|
||||
fireProgressState('job-1', runningNodes, 'wf-active')
|
||||
|
||||
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('n1')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionStore - progress_text startup guard', () => {
|
||||
let store: ReturnType<typeof useExecutionStore>
|
||||
|
||||
@@ -498,6 +950,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
nodeId: string
|
||||
text: string
|
||||
prompt_id?: string
|
||||
workflow_id?: string
|
||||
}) {
|
||||
const handler = apiEventHandlers.get('progress_text')
|
||||
if (!handler) throw new Error('progress_text handler not bound')
|
||||
@@ -507,6 +960,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -539,6 +993,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
|
||||
it('skips progress_text whose workflow_id mismatches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-other',
|
||||
workflow_id: 'wf-other'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('forwards progress_text whose workflow_id matches active workflow', async () => {
|
||||
mockActiveWorkflow.current = {
|
||||
activeState: { id: 'wf-active' },
|
||||
path: '/wf-active.json'
|
||||
}
|
||||
const mockNode = createMockLGraphNode({ id: 1 })
|
||||
const { useCanvasStore } =
|
||||
await import('@/renderer/core/canvas/canvasStore')
|
||||
useCanvasStore().canvas = {
|
||||
graph: { getNodeById: vi.fn(() => mockNode) }
|
||||
} as unknown as LGraphCanvas
|
||||
|
||||
fireProgressText({
|
||||
nodeId: '1',
|
||||
text: 'warming up',
|
||||
prompt_id: 'job-1',
|
||||
workflow_id: 'wf-active'
|
||||
})
|
||||
|
||||
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useExecutionErrorStore - Node Error Lookups', () => {
|
||||
@@ -818,6 +1316,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
apiEventHandlers.clear()
|
||||
mockActiveWorkflow.current = null
|
||||
setActivePinia(createTestingPinia({ stubActions: false }))
|
||||
store = useExecutionStore()
|
||||
store.bindExecutionEvents()
|
||||
@@ -832,10 +1331,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
})
|
||||
|
||||
it('clears initializing state for the starting job', () => {
|
||||
store.initializingJobIds = new Set([
|
||||
'job-1',
|
||||
'job-2'
|
||||
]) as unknown as Set<string>
|
||||
store.initializingJobIds = new Set(['job-1', 'job-2'])
|
||||
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
|
||||
|
||||
expect(store.initializingJobIds.has('job-1')).toBe(false)
|
||||
@@ -926,6 +1422,16 @@ describe('useExecutionStore - WebSocket event handlers', () => {
|
||||
expect(store.activeJobId).toBeNull()
|
||||
expect(store.queuedJobs['job-1']).toBeUndefined()
|
||||
})
|
||||
|
||||
it('clears initializing state for the completed job', () => {
|
||||
store.initializingJobIds = new Set(['job-1', 'job-2'])
|
||||
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
|
||||
|
||||
fire('execution_success', { prompt_id: 'job-1', timestamp: 0 })
|
||||
|
||||
expect(store.initializingJobIds.has('job-1')).toBe(false)
|
||||
expect(store.initializingJobIds.has('job-2')).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('executing', () => {
|
||||
|
||||
@@ -247,22 +247,32 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
|
||||
executionIdToLocatorCache.clear()
|
||||
executionErrorStore.clearAllErrors()
|
||||
activeJobId.value = e.detail.prompt_id
|
||||
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
|
||||
clearInitializationByJobId(activeJobId.value)
|
||||
const jobId = e.detail.prompt_id
|
||||
queuedJobs.value[jobId] ??= { nodes: {} }
|
||||
clearInitializationByJobId(jobId)
|
||||
|
||||
// Ensure path mapping exists — execution_start can arrive via WebSocket
|
||||
// before the HTTP response from queuePrompt triggers storeJob.
|
||||
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
|
||||
const path = queuedJobs.value[activeJobId.value]?.workflow?.path
|
||||
if (path) ensureSessionWorkflowPath(activeJobId.value, path)
|
||||
if (!jobIdToSessionWorkflowPath.value.has(jobId)) {
|
||||
const path = queuedJobs.value[jobId]?.workflow?.path
|
||||
if (path) ensureSessionWorkflowPath(jobId, path)
|
||||
}
|
||||
|
||||
// Only adopt as the global active job and clear shared UI state when the
|
||||
// starting job belongs to the active workflow. Otherwise a job started
|
||||
// from another tab would steal activeJobId and clobber the active tab's
|
||||
// execution UI.
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
|
||||
executionIdToLocatorCache.clear()
|
||||
executionErrorStore.clearAllErrors()
|
||||
activeJobId.value = jobId
|
||||
}
|
||||
|
||||
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
for (const n of e.detail.nodes) {
|
||||
activeJob.value.nodes[n] = true
|
||||
}
|
||||
@@ -272,12 +282,15 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
e: CustomEvent<ExecutionInterruptedWsMessage>
|
||||
) {
|
||||
const jobId = e.detail.prompt_id
|
||||
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
|
||||
clearInitializationByJobId(jobId)
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
|
||||
if (!activeJob.value) return
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
activeJob.value.nodes[e.detail.node] = true
|
||||
}
|
||||
|
||||
@@ -288,16 +301,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
})
|
||||
}
|
||||
const jobId = e.detail.prompt_id
|
||||
clearInitializationByJobId(jobId)
|
||||
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
|
||||
resetExecutionState(jobId)
|
||||
}
|
||||
|
||||
function handleExecuting(e: CustomEvent<NodeId | null>): void {
|
||||
// Clear the current node progress when a new node starts executing
|
||||
_executingNodeProgress.value = null
|
||||
|
||||
if (!activeJob.value) return
|
||||
|
||||
// Update the executing nodes list
|
||||
if (typeof e.detail !== 'string') {
|
||||
if (activeJobId.value) {
|
||||
delete queuedJobs.value[activeJobId.value]
|
||||
@@ -335,43 +348,110 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
|
||||
const { nodes, prompt_id: jobId } = e.detail
|
||||
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
|
||||
jobId,
|
||||
messageWorkflowId
|
||||
)
|
||||
|
||||
// Revoke previews for nodes that are starting to execute
|
||||
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
|
||||
// This node just started executing, revoke its previews
|
||||
// Note that we're doing the *actual* node id instead of the display node id
|
||||
// here intentionally. That way, we don't clear the preview every time a new node
|
||||
// within an expanded graph starts executing.
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
if (isActiveWorkflowMessage) {
|
||||
const { revokePreviewsByExecutionId } = useNodeOutputStore()
|
||||
for (const nodeId in nodes) {
|
||||
const nodeState = nodes[nodeId]
|
||||
if (
|
||||
nodeState.state === 'running' &&
|
||||
previousForJob[nodeId]?.state !== 'running'
|
||||
) {
|
||||
revokePreviewsByExecutionId(nodeId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the progress states for all nodes
|
||||
nodeProgressStatesByJob.value = {
|
||||
...nodeProgressStatesByJob.value,
|
||||
[jobId]: nodes
|
||||
}
|
||||
evictOldProgressJobs()
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
// If we have progress for the currently executing node, update it for backwards compatibility
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
if (isActiveWorkflowMessage) {
|
||||
nodeProgressStates.value = nodes
|
||||
|
||||
if (executingNodeId.value && nodes[executingNodeId.value]) {
|
||||
const nodeState = nodes[executingNodeId.value]
|
||||
_executingNodeProgress.value = {
|
||||
value: nodeState.value,
|
||||
max: nodeState.max,
|
||||
prompt_id: nodeState.prompt_id,
|
||||
node: nodeState.display_node_id || nodeState.node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a WebSocket execution message belongs to the
|
||||
* currently active workflow tab. Used to gate writes to the global
|
||||
* "current execution" mirror so a job initiated from another open
|
||||
* workflow cannot leak its progress into the active one.
|
||||
*
|
||||
* Resolution order:
|
||||
* 1. `workflow_id` carried on the WS message (when backend supports it).
|
||||
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
|
||||
* from this tab.
|
||||
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
|
||||
*
|
||||
* When the workflow cannot be resolved at all (e.g. job queued in a
|
||||
* different browser session), the message is treated as belonging to
|
||||
* the active workflow to preserve current behaviour for the existing
|
||||
* single-tab common case.
|
||||
*/
|
||||
function messageMatchesActiveWorkflow(
|
||||
jobId: JobId,
|
||||
messageWorkflowId: string | undefined
|
||||
): boolean {
|
||||
const activeWorkflow = workflowStore.activeWorkflow
|
||||
if (!activeWorkflow) return true
|
||||
|
||||
const activeId =
|
||||
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
|
||||
|
||||
if (messageWorkflowId && activeId) {
|
||||
return messageWorkflowId === activeId
|
||||
}
|
||||
|
||||
const mappedId = jobIdToWorkflowId.value.get(jobId)
|
||||
if (mappedId && activeId) return mappedId === activeId
|
||||
|
||||
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
|
||||
if (mappedPath && activeWorkflow.path) {
|
||||
return mappedPath === activeWorkflow.path
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true when workflow ownership for {@link jobId} can be resolved
|
||||
* — either by an explicit `workflow_id` on the incoming message or by a
|
||||
* mapping registered when the job was queued. When this returns false
|
||||
* the caller should fall back to whatever legacy guard applied before
|
||||
* workflow gating was introduced.
|
||||
*/
|
||||
function canResolveWorkflowOwnership(
|
||||
jobId: JobId,
|
||||
messageWorkflowId: string | undefined
|
||||
): boolean {
|
||||
return (
|
||||
Boolean(messageWorkflowId) ||
|
||||
jobIdToWorkflowId.value.has(jobId) ||
|
||||
jobIdToSessionWorkflowPath.value.has(jobId)
|
||||
)
|
||||
}
|
||||
|
||||
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
|
||||
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
|
||||
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
|
||||
_executingNodeProgress.value = e.detail
|
||||
}
|
||||
|
||||
@@ -393,17 +473,16 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
error: e.detail.exception_message
|
||||
})
|
||||
|
||||
// Cloud wraps validation errors (400) in exception_message as embedded JSON.
|
||||
if (handleCloudValidationError(e.detail)) return
|
||||
}
|
||||
|
||||
// Service-level errors (e.g. "Job has stagnated") have no associated node.
|
||||
// Route them as job errors
|
||||
if (handleServiceLevelError(e.detail)) return
|
||||
|
||||
// OSS path / Cloud fallback (real runtime errors)
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
clearInitializationByJobId(e.detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
|
||||
return
|
||||
|
||||
executionErrorStore.lastExecutionError = e.detail
|
||||
resetExecutionState(e.detail.prompt_id)
|
||||
}
|
||||
|
||||
@@ -413,6 +492,9 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
|
||||
return true
|
||||
|
||||
resetExecutionState(detail.prompt_id)
|
||||
executionErrorStore.lastPromptError = {
|
||||
type: detail.exception_type ?? 'error',
|
||||
@@ -431,6 +513,9 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
if (!result) return false
|
||||
|
||||
clearInitializationByJobId(detail.prompt_id)
|
||||
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
|
||||
return true
|
||||
|
||||
resetExecutionState(detail.prompt_id)
|
||||
|
||||
if (result.kind === 'nodeErrors') {
|
||||
@@ -529,14 +614,21 @@ export const useExecutionStore = defineStore('execution', () => {
|
||||
}
|
||||
|
||||
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
|
||||
const { nodeId, text, prompt_id } = e.detail
|
||||
const { nodeId, text, prompt_id, workflow_id } = e.detail
|
||||
if (!text || !nodeId) return
|
||||
|
||||
// Filter: only accept progress for the active prompt
|
||||
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
|
||||
return
|
||||
// Prefer the workflow-ownership gate when ownership can be resolved.
|
||||
// Only fall back to the legacy active-prompt guard when ownership is
|
||||
// unresolvable; otherwise activeJobId pointing at a different workflow's
|
||||
// job would incorrectly drop messages for the visible workflow.
|
||||
if (prompt_id) {
|
||||
if (canResolveWorkflowOwnership(prompt_id, workflow_id)) {
|
||||
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
|
||||
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle execution node IDs for subgraphs
|
||||
const currentId = getNodeIdIfExecuting(nodeId)
|
||||
if (!currentId) return
|
||||
const node = canvasStore.canvas?.graph?.getNodeById(currentId)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { createTestingPinia } from '@pinia/testing'
|
||||
import ProgressSpinner from 'primevue/progressspinner'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { ref } from 'vue'
|
||||
|
||||
import { render, screen } from '@testing-library/vue'
|
||||
|
||||
@@ -57,8 +56,7 @@ vi.mock('@vueuse/core', () => ({
|
||||
createSharedComposable: vi.fn((fn) => {
|
||||
let cached: ReturnType<typeof fn>
|
||||
return (...args: Parameters<typeof fn>) => (cached ??= fn(...args))
|
||||
}),
|
||||
useDocumentVisibility: vi.fn(() => ref<'visible' | 'hidden'>('visible'))
|
||||
})
|
||||
}))
|
||||
|
||||
vi.mock('@/config', () => ({
|
||||
|
||||
Reference in New Issue
Block a user