Compare commits

..

6 Commits

Author SHA1 Message Date
Glary-Bot
1d14f1313c test: drop unnecessary Set cast and cover initializing clear on execution_success 2026-05-13 19:58:01 +00:00
Glary-Bot
6a303494d4 fix: preserve backwards-compat 'executing' payload + plug initializing leak
Revert the 'executing' WS event payload back to the legacy
NodeId-only shape so extensions consuming api.addEventListener('executing')
keep working. The extension ecosystem (40+ custom node repos per
AGENTS.md) accesses e.detail directly as a string, and the marginal
workflow gating we added on handleExecuting did not justify the
breaking change.

Drop the corresponding gate in handleExecuting and the groupNode
forwarding-wrapper changes that depended on the object payload. The
gating on progress, progress_state, progress_text, execution_start,
execution_success, execution_interrupted, execution_error, execution_cached,
and executed remains intact (those events carry prompt_id and workflow_id
natively in their WS payload).

Additional fixes from review:
- handleExecutionSuccess now calls clearInitializationByJobId(jobId)
  before the workflow gate so a non-active workflow job's initializing
  flag still clears, matching handleExecutionInterrupted and
  handleExecutionError.
- Extract canResolveWorkflowOwnership helper and reuse it in
  handleProgressText to remove the duplicated resolution check.
2026-05-13 19:58:01 +00:00
Glary-Bot
11ef8c9337 test: tighten active-workflow gating coverage
Address two coverage nitpicks from review:

- The 'executing clears _executingNodeProgress' test in the
  WebSocket-event-handlers suite did not reset mockActiveWorkflow.current
  in beforeEach and did not pass workflow_id, so it relied on the
  hoisted mock leaking from earlier suites and only ever exercised the
  unresolvable-ownership fallback. Reset the mock per-test, set an
  active workflow explicitly with workflow_id: 'wf-active', and add a
  separate test for the legacy unresolvable-ownership fallback.

- The 'execution_error from a non-active workflow' test only proved the
  active job's state was untouched. It now also asserts the errored
  job's initializing flag is cleared (the per-job bookkeeping that runs
  unconditionally) and that executionErrorStore.lastExecutionError stays
  null (proving the global gate held).
2026-05-13 19:58:01 +00:00
Glary-Bot
60828f2286 fix: gate execution_cached, executed, execution_error and revoke transitions
Three handlers and one revocation rule were missed in the first gating
pass and could still let a non-active workflow's events bleed into the
active workflow's UI:

- handleExecutionCached and handleExecuted mutated activeJob.value.nodes
  unconditionally, skewing executionProgress / nodesExecuted on the
  visible workflow when a background workflow emitted those events.
- handleExecutionError, handleServiceLevelError and handleCloudValidationError
  called resetExecutionState and wrote executionErrorStore.* unconditionally,
  meaning a background error wiped activeJobId and node progress for the
  visible workflow. Initialization clearing for the errored job still
  runs in every case.
- handleProgressState revoked previews only when a node was first seen
  (!previousForJob[nodeId]). Once progress_state begins emitting pending
  entries that node is already 'seen', so the pending->running
  transition never revoked. Switch to checking previous state.

groupNode forwarder: reinstate the legacy string detail path in the
'executing' id-extractor so callers still dispatching the pre-change
string payload keep bubbling execution up to the group node.

4 new unit tests cover: execution_cached gated, executed gated,
execution_error gated, and pending->running preview revocation.
2026-05-13 19:58:01 +00:00
Glary-Bot
3c3b0872c3 feat: gate execution lifecycle handlers and executing event by active workflow
Extend the active-workflow gate to lifecycle events that previously
operated globally:

- handleExecutionStart: only adopts activeJobId / clears shared UI state
  when the starting job belongs to the active workflow. Per-job
  bookkeeping (queuedJobs, jobIdToSessionWorkflowPath, initialization
  clearing) still runs for every job.
- handleExecutionSuccess / handleExecutionInterrupted: only call
  resetExecutionState when the terminating job belongs to the active
  workflow. Initialization clearing for the terminated job still runs.
- handleExecuting: now receives the full ExecutingWsMessage from the
  api dispatcher (instead of just NodeId) and gates _executingNodeProgress
  clearing on workflow ownership.

To support handleExecuting gating, drop the ApiToEventType override that
narrowed the executing event to NodeId, forward the full payload through
api.dispatchCustomEvent, and update the groupNode forwarding wrapper to
extract display_node/node from the new object detail and synthesise a
matching ExecutingWsMessage when re-dispatching.

5 new tests cover the cross-workflow cases: execution_start from a
non-active workflow does not steal activeJobId, execution_success and
execution_interrupted from a non-active workflow do not clear the
active job's state, executing from a non-active workflow does not
clear _executingNodeProgress, and execution_start from the active
workflow still adopts activeJobId.

Eventual-consistency cleanup of non-active terminal jobs (which used to
be handled by the dropped polling/eviction code) will be implemented in
a follow-up PR using a reactive watcher pattern over a derived
finishedJobs set.
2026-05-13 19:58:01 +00:00
Glary-Bot
7b2b974968 feat: scope progress events and UI state by active workflow
When multiple workflow tabs are open and a job initiated from one tab
sends progress messages, those messages can leak into the active tab's
canvas because the global nodeProgressStates mirror, _executingNodeProgress,
and progress_text preview state are written unconditionally.

Add an optional workflow_id field to the WS schema for execution-related
messages (progress, progress_state, executing, executed, progress_text,
execution_start/success/cached/interrupted/error, NodeProgressState),
and gate handleProgressState, handleProgress, and handleProgressText on
whether the incoming message belongs to the currently active workflow.

Resolution order for ownership:
1. workflow_id carried on the WS message (when backend supports it).
2. jobIdToWorkflowId mapping populated when the job was queued from this tab.
3. jobIdToSessionWorkflowPath mapping (path-based fallback).

When ownership is unresolvable (e.g. job queued in a different browser
session), the message is treated as belonging to the active workflow to
preserve current single-tab behaviour. Per-job state
(nodeProgressStatesByJob) is always written regardless of ownership so
the source of truth covers every workflow's jobs.

handleProgressText prefers the workflow gate over the legacy activeJobId
guard when ownership can be resolved, since activeJobId is global and may
point at a different workflow's job — falling through to the legacy
guard only when ownership is genuinely unresolvable.

12 new unit tests cover workflow_id match/mismatch, both fallback
resolution paths, default-to-legacy when unresolvable, no-active-workflow
short-circuit, per-job state always updating, preview revocation gating,
_executingNodeProgress gating, and progress_text gating.

Eventual-consistency fallback for dropped terminal WebSocket messages
will be addressed in a follow-up PR using a reactive watcher pattern
rather than queue polling, per design discussion.
2026-05-13 19:58:01 +00:00
20 changed files with 668 additions and 945 deletions

View File

@@ -17,7 +17,6 @@
"test:visual:update": "playwright test --project visual --update-snapshots",
"ashby:refresh-snapshot": "tsx ./scripts/refresh-ashby-snapshot.ts",
"cloud-nodes:refresh-snapshot": "tsx ./scripts/refresh-cloud-nodes-snapshot.ts",
"feature-flags:refresh-snapshot": "tsx ./scripts/refresh-feature-flags-snapshot.ts",
"generate:models": "tsx ./scripts/generate-models.ts"
},
"dependencies": {

View File

@@ -1,29 +0,0 @@
import { renameSync, writeFileSync } from 'node:fs'
import { fileURLToPath } from 'node:url'
import { fetchFeatureFlagsForBuild } from '../src/utils/featureFlags'
const snapshotPath = fileURLToPath(
new URL('../src/data/feature-flags.snapshot.json', import.meta.url)
)
const tempPath = `${snapshotPath}.tmp`
const outcome = await fetchFeatureFlagsForBuild()
if (outcome.status !== 'fresh') {
const reason = 'reason' in outcome ? outcome.reason : '(none)'
console.error(
`Snapshot refresh aborted. Outcome: ${outcome.status}; reason: ${reason}`
)
process.exit(1)
}
writeFileSync(
tempPath,
JSON.stringify(outcome.snapshot, null, 2) + '\n',
'utf8'
)
renameSync(tempPath, snapshotPath)
process.stdout.write(
`Wrote feature flags snapshot to ${snapshotPath}: cloudFreeTier=${outcome.snapshot.flags.cloudFreeTier}\n`
)

View File

@@ -1,3 +1 @@
import snapshot from '../data/feature-flags.snapshot.json' with { type: 'json' }
export const SHOW_FREE_TIER = snapshot.flags.cloudFreeTier
export const SHOW_FREE_TIER = false

View File

@@ -1,6 +0,0 @@
{
"fetchedAt": "2026-05-13T20:30:41.221Z",
"flags": {
"cloudFreeTier": false
}
}

View File

@@ -1,6 +0,0 @@
export interface FeatureFlagsSnapshot {
fetchedAt: string
flags: {
cloudFreeTier: boolean
}
}

View File

@@ -1,112 +0,0 @@
import { mkdtempSync, readFileSync, rmSync, writeFileSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import type { FetchOutcome } from './featureFlags'
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
import {
reportFeatureFlagsOutcome,
resetFeatureFlagsReporterForTests
} from './featureFlags.ci'
function baseSnapshot(cloudFreeTier = false): FeatureFlagsSnapshot {
return {
fetchedAt: new Date().toISOString(),
flags: { cloudFreeTier }
}
}
function freshOutcome(cloudFreeTier = false): FetchOutcome {
return { status: 'fresh', snapshot: baseSnapshot(cloudFreeTier) }
}
describe('reportFeatureFlagsOutcome', () => {
let writeSpy: ReturnType<typeof vi.spyOn>
let summaryDir: string
let summaryPath: string
const originalSummary = process.env.GITHUB_STEP_SUMMARY
beforeEach(() => {
resetFeatureFlagsReporterForTests()
writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true)
summaryDir = mkdtempSync(join(tmpdir(), 'feature-flags-summary-'))
summaryPath = join(summaryDir, 'summary.md')
writeFileSync(summaryPath, '')
process.env.GITHUB_STEP_SUMMARY = summaryPath
})
afterEach(() => {
writeSpy.mockRestore()
rmSync(summaryDir, { recursive: true, force: true })
if (originalSummary === undefined) delete process.env.GITHUB_STEP_SUMMARY
else process.env.GITHUB_STEP_SUMMARY = originalSummary
})
it('emits nothing on a fresh outcome', () => {
reportFeatureFlagsOutcome(freshOutcome())
expect(writeSpy).not.toHaveBeenCalled()
expect(readFileSync(summaryPath, 'utf8')).toContain('Fresh')
})
it('records cloudFreeTier value in the step summary', () => {
reportFeatureFlagsOutcome(freshOutcome(true))
expect(readFileSync(summaryPath, 'utf8')).toContain(
'| **cloudFreeTier** | true |'
)
})
it('emits exactly one annotation across repeated calls', () => {
reportFeatureFlagsOutcome({
status: 'stale',
reason: 'HTTP 500 Server Error',
snapshot: baseSnapshot()
})
reportFeatureFlagsOutcome({
status: 'stale',
reason: 'HTTP 500 Server Error',
snapshot: baseSnapshot()
})
expect(writeSpy).toHaveBeenCalledTimes(1)
})
it('emits ::error for schema-mismatch stale outcomes', () => {
reportFeatureFlagsOutcome({
status: 'stale',
reason:
'schema validation failed: new_free_tier_subscriptions: Expected boolean',
snapshot: baseSnapshot()
})
const annotation = writeSpy.mock.calls[0]![0] as string
expect(annotation).toContain('::error title=Feature flags schema mismatch')
})
it('emits ::warning for transient API unavailability', () => {
reportFeatureFlagsOutcome({
status: 'stale',
reason: 'HTTP 503 Service Unavailable',
snapshot: baseSnapshot()
})
const annotation = writeSpy.mock.calls[0]![0] as string
expect(annotation).toContain(
'::warning title=Feature flags API unavailable'
)
})
it('emits ::error for a failed outcome', () => {
reportFeatureFlagsOutcome({
status: 'failed',
reason: 'HTTP 500 Server Error'
})
const annotation = writeSpy.mock.calls[0]![0] as string
expect(annotation).toContain('::error title=Feature flags fetch failed')
expect(readFileSync(summaryPath, 'utf8')).toContain('Failed')
})
it('does not throw when GITHUB_STEP_SUMMARY is not set', () => {
delete process.env.GITHUB_STEP_SUMMARY
expect(() => reportFeatureFlagsOutcome(freshOutcome())).not.toThrow()
})
})

View File

@@ -1,92 +0,0 @@
import { appendFileSync } from 'node:fs'
import type { FetchOutcome } from './featureFlags'
let hasReported = false
export function resetFeatureFlagsReporterForTests(): void {
hasReported = false
}
export function reportFeatureFlagsOutcome(outcome: FetchOutcome): void {
if (hasReported) return
hasReported = true
const lines = buildAnnotations(outcome)
for (const line of lines) {
process.stdout.write(`${line}\n`)
}
const summaryPath = process.env.GITHUB_STEP_SUMMARY
if (summaryPath) {
try {
appendFileSync(summaryPath, buildStepSummary(outcome))
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
process.stderr.write(
`feature-flags reporter: failed to write GITHUB_STEP_SUMMARY: ${message}\n`
)
}
}
}
function buildAnnotations(outcome: FetchOutcome): string[] {
if (outcome.status === 'fresh') return []
if (outcome.status === 'stale') {
return [staleAnnotation(outcome.reason)]
}
return [
`::error title=Feature flags fetch failed and no snapshot is available::Cannot build site without feature flags.%0A%0AReason: ${escapeAnnotation(outcome.reason)}%0A%0AAction items:%0A 1. Run \`pnpm --filter @comfyorg/website feature-flags:refresh-snapshot\` locally.%0A 2. Commit apps/website/src/data/feature-flags.snapshot.json.%0A 3. Push and re-run CI.`
]
}
function staleAnnotation(reason: string): string {
const escaped = escapeAnnotation(reason)
if (reason.startsWith('schema')) {
return `::error title=Feature flags schema mismatch::${escaped}. The /features API contract has likely changed. Build continues with the snapshot, but future updates will fail until the schema is fixed.%0A%0AAction items:%0A 1. Inspect the response at https://api.comfy.org/features.%0A 2. Update apps/website/src/utils/featureFlags.schema.ts to match the new shape.`
}
if (reason.startsWith('HTTP 401') || reason.startsWith('HTTP 403')) {
return `::error title=Feature flags authentication failed::${escaped}. The /features endpoint should be public; check the backend. Build continues with the last-known-good snapshot.`
}
return `::warning title=Feature flags API unavailable::${escaped}. Using last-known-good snapshot.%0A%0AAction items:%0A 1. Check the status of https://api.comfy.org/features.%0A 2. Re-run this workflow once the API is healthy.`
}
function escapeAnnotation(value: string): string {
return value.replace(/%/g, '%25').replace(/\r/g, '%0D').replace(/\n/g, '%0A')
}
function buildStepSummary(outcome: FetchOutcome): string {
const header = '## 🚩 Feature Flags (/features)\n'
const rows: Array<[string, string]> = []
if (outcome.status === 'fresh') {
rows.push(['Status', '✅ Fresh (fetched from /features)'])
rows.push(['cloudFreeTier', String(outcome.snapshot.flags.cloudFreeTier)])
} else if (outcome.status === 'stale') {
rows.push(['Status', '⚠️ Stale (using snapshot — /features fetch failed)'])
rows.push(['cloudFreeTier', String(outcome.snapshot.flags.cloudFreeTier)])
rows.push(['Reason', outcome.reason])
rows.push(['Snapshot age', describeSnapshotAge(outcome.snapshot.fetchedAt)])
} else {
rows.push(['Status', '❌ Failed (no snapshot available)'])
rows.push(['Reason', outcome.reason])
}
const table =
'| | |\n|---|---|\n' +
rows.map(([k, v]) => `| **${k}** | ${v} |`).join('\n') +
'\n'
return `${header}${table}\n`
}
function describeSnapshotAge(fetchedAt: string): string {
const fetched = new Date(fetchedAt).getTime()
if (Number.isNaN(fetched)) return 'unknown'
const days = Math.floor((Date.now() - fetched) / 86_400_000)
if (days <= 0) return 'today'
if (days === 1) return '1 day'
return `${days} days`
}

View File

@@ -1,11 +0,0 @@
import { z } from 'zod'
export const FeaturesResponseSchema = z
.object({
new_free_tier_subscriptions: z.boolean().optional(),
free_tier_credits: z.number().optional(),
partner_node_conversion_rate: z.number().optional()
})
.passthrough()
export type FeaturesResponse = z.infer<typeof FeaturesResponseSchema>

View File

@@ -1,190 +0,0 @@
import { mkdtempSync, rmSync, writeFileSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { pathToFileURL } from 'node:url'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
import {
fetchFeatureFlagsForBuild,
resetFeatureFlagsFetcherForTests
} from './featureFlags'
const BASE_URL = 'https://api.test'
const tempSnapshotDirs: string[] = []
function response(body: unknown, init: Partial<ResponseInit> = {}): Response {
const base: ResponseInit = {
status: 200,
headers: { 'content-type': 'application/json' }
}
return new Response(JSON.stringify(body), { ...base, ...init })
}
function makeSnapshot(cloudFreeTier: boolean): FeatureFlagsSnapshot {
return {
fetchedAt: '2026-04-01T00:00:00.000Z',
flags: { cloudFreeTier }
}
}
function withSnapshotDir(snapshot: FeatureFlagsSnapshot | null): URL {
const dir = mkdtempSync(join(tmpdir(), 'feature-flags-test-'))
tempSnapshotDirs.push(dir)
const file = join(dir, 'feature-flags.snapshot.json')
if (snapshot) writeFileSync(file, JSON.stringify(snapshot))
return pathToFileURL(file)
}
describe('fetchFeatureFlagsForBuild', () => {
beforeEach(() => {
resetFeatureFlagsFetcherForTests()
})
afterEach(() => {
for (const dir of tempSnapshotDirs.splice(0)) {
rmSync(dir, { recursive: true, force: true })
}
vi.restoreAllMocks()
})
it('returns fresh with cloudFreeTier=true when /features sets the flag', async () => {
const fetchImpl = vi.fn(async () =>
response({ new_free_tier_subscriptions: true })
)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('fresh')
if (outcome.status !== 'fresh') return
expect(outcome.snapshot.flags.cloudFreeTier).toBe(true)
expect(fetchImpl).toHaveBeenCalledWith(
`${BASE_URL}/features`,
expect.objectContaining({ method: 'GET' })
)
})
it('defaults cloudFreeTier to false when the flag is absent from /features', async () => {
const fetchImpl = vi.fn(async () =>
response({ partner_node_conversion_rate: 0.05 })
)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('fresh')
if (outcome.status !== 'fresh') return
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
})
it('returns fresh with cloudFreeTier=false when explicitly disabled', async () => {
const fetchImpl = vi.fn(async () =>
response({ new_free_tier_subscriptions: false })
)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('fresh')
if (outcome.status !== 'fresh') return
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
})
it('returns stale with snapshot when the API returns 401', async () => {
const snapshotUrl = withSnapshotDir(makeSnapshot(true))
const fetchImpl = vi.fn(async () => response({}, { status: 401 }))
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
snapshotUrl,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('stale')
if (outcome.status !== 'stale') return
expect(outcome.reason).toMatch(/^HTTP 401/)
expect(outcome.snapshot.flags.cloudFreeTier).toBe(true)
expect(fetchImpl).toHaveBeenCalledTimes(1)
})
it('retries 5xx up to the configured limit then falls back to snapshot', async () => {
const snapshotUrl = withSnapshotDir(makeSnapshot(false))
const fetchImpl = vi.fn(async () => response({}, { status: 503 }))
const sleep = vi.fn(async () => undefined)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
snapshotUrl,
retryDelaysMs: [1, 1, 1],
sleep,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('stale')
expect(fetchImpl).toHaveBeenCalledTimes(4)
expect(sleep).toHaveBeenCalledTimes(3)
})
it('falls back to snapshot on schema validation failure', async () => {
const snapshotUrl = withSnapshotDir(makeSnapshot(false))
const fetchImpl = vi.fn(async () =>
response({ new_free_tier_subscriptions: 'yes' })
)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
snapshotUrl,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('stale')
if (outcome.status !== 'stale') return
expect(outcome.reason).toMatch(/^schema validation/)
})
it('falls back to the bundled snapshot when fetch fails and the override is missing', async () => {
const snapshotUrl = withSnapshotDir(null)
const fetchImpl = vi.fn(async () => response({}, { status: 500 }))
const sleep = vi.fn(async () => undefined)
const outcome = await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
snapshotUrl,
retryDelaysMs: [1, 1, 1],
sleep,
fetchImpl: fetchImpl as unknown as typeof fetch
})
expect(outcome.status).toBe('stale')
if (outcome.status !== 'stale') return
expect(outcome.snapshot.flags.cloudFreeTier).toBe(false)
})
it('memoizes within a single process', async () => {
const fetchImpl = vi.fn(async () =>
response({ new_free_tier_subscriptions: true })
)
const opts = {
baseUrl: BASE_URL,
fetchImpl: fetchImpl as unknown as typeof fetch
}
const [a, b] = await Promise.all([
fetchFeatureFlagsForBuild(opts),
fetchFeatureFlagsForBuild(opts)
])
expect(a).toBe(b)
expect(fetchImpl).toHaveBeenCalledTimes(1)
})
it('never writes to the snapshot file on success', async () => {
const snapshot = makeSnapshot(true)
const snapshotUrl = withSnapshotDir(snapshot)
const fs = await import('node:fs')
const initial = fs.readFileSync(snapshotUrl).toString()
const fetchImpl = vi.fn(async () =>
response({ new_free_tier_subscriptions: false })
)
await fetchFeatureFlagsForBuild({
baseUrl: BASE_URL,
snapshotUrl,
fetchImpl: fetchImpl as unknown as typeof fetch
})
const after = fs.readFileSync(snapshotUrl).toString()
expect(after).toBe(initial)
})
})

View File

@@ -1,189 +0,0 @@
import { readFile } from 'node:fs/promises'
import type { FeaturesResponse } from './featureFlags.schema'
import type { FeatureFlagsSnapshot } from '../data/feature-flags'
import { FeaturesResponseSchema } from './featureFlags.schema'
import bundledSnapshot from '../data/feature-flags.snapshot.json' with { type: 'json' }
const DEFAULT_BASE_URL = 'https://api.comfy.org'
const DEFAULT_TIMEOUT_MS = 10_000
const RETRY_DELAYS_MS = [1_000, 2_000, 4_000]
export type FetchOutcome =
| { status: 'fresh'; snapshot: FeatureFlagsSnapshot }
| { status: 'stale'; snapshot: FeatureFlagsSnapshot; reason: string }
| { status: 'failed'; reason: string }
interface FetchFeatureFlagsOptions {
baseUrl?: string
timeoutMs?: number
retryDelaysMs?: readonly number[]
fetchImpl?: typeof fetch
snapshotUrl?: URL
sleep?: (ms: number) => Promise<void>
}
let inflight: Promise<FetchOutcome> | undefined
export function resetFeatureFlagsFetcherForTests(): void {
inflight = undefined
}
export function fetchFeatureFlagsForBuild(
options: FetchFeatureFlagsOptions = {}
): Promise<FetchOutcome> {
inflight ??= doFetchFeatureFlagsForBuild(options)
return inflight
}
async function doFetchFeatureFlagsForBuild(
options: FetchFeatureFlagsOptions
): Promise<FetchOutcome> {
const result = await tryFetchAndParse(options)
if (result.kind === 'ok') {
return {
status: 'fresh',
snapshot: {
fetchedAt: new Date().toISOString(),
flags: deriveFlags(result.features)
}
}
}
return fallback(result.reason, options.snapshotUrl)
}
async function fallback(
reason: string,
snapshotUrl: URL | undefined
): Promise<FetchOutcome> {
const snapshot = await readSnapshot(snapshotUrl)
if (snapshot) return { status: 'stale', snapshot, reason }
return { status: 'failed', reason }
}
interface FetchOk {
kind: 'ok'
features: FeaturesResponse
}
interface FetchErr {
kind: 'err'
reason: string
}
async function tryFetchAndParse(
options: FetchFeatureFlagsOptions
): Promise<FetchOk | FetchErr> {
const baseUrl = options.baseUrl ?? DEFAULT_BASE_URL
const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS
const retryDelaysMs = options.retryDelaysMs ?? RETRY_DELAYS_MS
const fetchImpl = options.fetchImpl ?? fetch
const sleep = options.sleep ?? defaultSleep
const url = `${baseUrl.replace(/\/+$/, '')}/features`
let lastReason = 'unknown error'
for (let attempt = 0; attempt <= retryDelaysMs.length; attempt++) {
if (attempt > 0) await sleep(retryDelaysMs[attempt - 1])
const response = await callOnce(fetchImpl, url, timeoutMs)
if (response.kind === 'err') {
lastReason = response.reason
if (!response.retryable) return response
continue
}
const parsed = FeaturesResponseSchema.safeParse(response.body)
if (!parsed.success) {
return {
kind: 'err',
reason: `schema validation failed: ${parsed.error.issues
.map((i) => `${i.path.join('.') || '<root>'}: ${i.message}`)
.join('; ')}`
}
}
return { kind: 'ok', features: parsed.data }
}
return { kind: 'err', reason: lastReason }
}
type CallResponse =
| { kind: 'ok'; body: unknown }
| { kind: 'err'; reason: string; retryable: boolean }
async function callOnce(
fetchImpl: typeof fetch,
url: string,
timeoutMs: number
): Promise<CallResponse> {
const controller = new AbortController()
const timer = setTimeout(() => controller.abort(), timeoutMs)
try {
const res = await fetchImpl(url, {
method: 'GET',
headers: { Accept: 'application/json' },
signal: controller.signal
})
if (res.ok) {
return { kind: 'ok', body: await res.json() }
}
const retryable =
res.status === 429 || (res.status >= 500 && res.status < 600)
return {
kind: 'err',
reason: `HTTP ${res.status} ${res.statusText || ''}`.trim(),
retryable
}
} catch (error) {
const reason =
error instanceof Error
? `network error: ${error.message}`
: 'network error'
return { kind: 'err', reason, retryable: true }
} finally {
clearTimeout(timer)
}
}
function deriveFlags(
features: FeaturesResponse
): FeatureFlagsSnapshot['flags'] {
return {
cloudFreeTier: features.new_free_tier_subscriptions ?? false
}
}
async function readSnapshot(
snapshotUrl: URL | undefined
): Promise<FeatureFlagsSnapshot | null> {
if (snapshotUrl) {
try {
const text = await readFile(snapshotUrl, 'utf8')
const parsed: unknown = JSON.parse(text)
if (isFeatureFlagsSnapshot(parsed)) return parsed
} catch {
// Fall through to the bundled snapshot if the override is unreadable.
}
}
return isFeatureFlagsSnapshot(bundledSnapshot) ? bundledSnapshot : null
}
function isFeatureFlagsSnapshot(value: unknown): value is FeatureFlagsSnapshot {
if (value === null || typeof value !== 'object') return false
const candidate = value as { fetchedAt?: unknown; flags?: unknown }
if (typeof candidate.fetchedAt !== 'string') return false
if (candidate.flags === null || typeof candidate.flags !== 'object') {
return false
}
const flags = candidate.flags as { cloudFreeTier?: unknown }
return typeof flags.cloudFreeTier === 'boolean'
}
function defaultSleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}

View File

@@ -1,90 +0,0 @@
{
"id": "06e5b524-5a40-40b9-b561-199dfab18cf0",
"revision": 0,
"last_node_id": 12,
"last_link_id": 10,
"nodes": [
{
"id": 10,
"type": "KSampler",
"pos": [230, 110],
"size": [270, 317.5666809082031],
"flags": {},
"order": 1,
"mode": 0,
"inputs": [
{
"name": "model",
"type": "MODEL",
"link": null
},
{
"name": "positive",
"type": "CONDITIONING",
"link": null
},
{
"name": "negative",
"type": "CONDITIONING",
"link": null
},
{
"name": "latent_image",
"type": "LATENT",
"link": null
},
{
"name": "denoise",
"type": "FLOAT",
"widget": {
"name": "denoise"
},
"link": 10
}
],
"outputs": [
{
"name": "LATENT",
"type": "LATENT",
"links": null
}
],
"properties": {
"Node name for S&R": "KSampler"
},
"widgets_values": [0, "randomize", 20, 8, "euler", "simple", 1]
},
{
"id": 11,
"type": "PrimitiveFloat",
"pos": [-80.55032348632812, 375.2260443115233],
"size": [270, 80.23332977294922],
"flags": {},
"order": 0,
"mode": 0,
"inputs": [],
"outputs": [
{
"name": "FLOAT",
"type": "FLOAT",
"links": [10]
}
],
"properties": {
"Node name for S&R": "PrimitiveFloat"
},
"widgets_values": [0]
}
],
"links": [[10, 11, 0, 10, 4, "FLOAT"]],
"groups": [],
"config": {},
"extra": {
"ds": {
"scale": 0.8264462809917354,
"offset": [1335.8909766107738, 692.7345403667316]
},
"frontendVersion": "1.45.4"
},
"version": 0.4
}

View File

@@ -1133,108 +1133,3 @@ test.describe(
})
}
)
test.describe('Vue Node Widget Link Position', { tag: '@vue-nodes' }, () => {
test('should keep widget-input link aligned after persisted-workflow reload', async ({
comfyPage
}) => {
test.setTimeout(30000)
await comfyPage.workflow.loadWorkflow(
'vueNodes/ksampler-denoise-widget-link'
)
await comfyPage.vueNodes.waitForNodes(2)
await comfyPage.workflow.waitForDraftPersisted()
await comfyPage.workflow.reloadAndWaitForApp()
await comfyPage.vueNodes.waitForNodes(2)
const ksampler = await comfyPage.page.evaluate(() => {
const node = window.app!.graph.nodes.find((n) => n.type === 'KSampler')
if (!node) return null
const findIndex = (name: string) =>
node.inputs.findIndex(
(input) => input.name === name || input.widget?.name === name
)
return {
id: node.id,
denoiseIndex: findIndex('denoise'),
schedulerIndex: findIndex('scheduler')
}
})
if (!ksampler) {
throw new Error('KSampler should be present in fixture')
}
expect(
ksampler.denoiseIndex,
'denoise input slot not found'
).toBeGreaterThanOrEqual(0)
expect(
ksampler.schedulerIndex,
'scheduler input slot not found'
).toBeGreaterThanOrEqual(0)
const denoiseSlot = slotLocator(
comfyPage.page,
ksampler.id,
ksampler.denoiseIndex,
true
)
const schedulerSlot = slotLocator(
comfyPage.page,
ksampler.id,
ksampler.schedulerIndex,
true
)
await expectVisibleAll(denoiseSlot, schedulerSlot)
await expect
.poll(() =>
getInputLinkDetails(comfyPage.page, ksampler.id, ksampler.denoiseIndex)
)
.toMatchObject({
targetId: ksampler.id,
targetSlot: ksampler.denoiseIndex
})
// If the regression returns, getInputPos stays stale relative to the
// grown slot DOM and the endpoint drifts toward scheduler. Re-read
// positions each retry so layout settle doesn't cause flakes.
await expect(async () => {
const linkEnd = await comfyPage.page.evaluate(
([nodeId, targetSlotIndex]) => {
const node = window.app!.graph.getNodeById(nodeId)
if (!node) return null
const slotPos = node.getInputPos(targetSlotIndex)
const [cx, cy] = window.app!.canvas.ds.convertOffsetToCanvas([
slotPos[0],
slotPos[1]
])
const rect = window.app!.canvas.canvas.getBoundingClientRect()
return { x: cx + rect.left, y: cy + rect.top }
},
[ksampler.id, ksampler.denoiseIndex] as const
)
expect(linkEnd, 'link endpoint should resolve').not.toBeNull()
const denoiseCenter = await getCenter(denoiseSlot)
const schedulerCenter = await getCenter(schedulerSlot)
const distToDenoise = Math.hypot(
linkEnd!.x - denoiseCenter.x,
linkEnd!.y - denoiseCenter.y
)
const rowGap = Math.hypot(
denoiseCenter.x - schedulerCenter.x,
denoiseCenter.y - schedulerCenter.y
)
// Bound at rowGap / 4 - half the inter-slot midpoint, so any drift
// toward scheduler fails well before reaching it.
expect(
distToDenoise,
`Link endpoint (${linkEnd!.x.toFixed(1)}, ${linkEnd!.y.toFixed(1)}) is ` +
`${distToDenoise.toFixed(1)}px from denoise — should be within ` +
`${(rowGap / 4).toFixed(1)}px (quarter of inter-slot gap ${rowGap.toFixed(1)}px)`
).toBeLessThan(rowGap / 4)
}).toPass({ timeout: 5000 })
})
})

View File

@@ -90,7 +90,6 @@ import { useCanvasInteractions } from '@/renderer/core/canvas/useCanvasInteracti
import AppInput from '@/renderer/extensions/linearMode/AppInput.vue'
import { useNodeZIndex } from '@/renderer/extensions/vueNodes/composables/useNodeZIndex'
import { useProcessedWidgets } from '@/renderer/extensions/vueNodes/composables/useProcessedWidgets'
import { useVueElementTracking } from '@/renderer/extensions/vueNodes/composables/useVueNodeResizeTracking'
import { cn } from '@comfyorg/tailwind-utils'
import InputSlot from './InputSlot.vue'
@@ -135,9 +134,4 @@ const {
processedWidgets,
showAdvanced
} = useProcessedWidgets(() => nodeData)
// Tracks widget-row growth that the node-level RO can't see
if (nodeData?.id != null) {
useVueElementTracking(String(nodeData.id), 'widgets-grid')
}
</script>

View File

@@ -29,10 +29,7 @@ const raf = createRafBatch(() => {
flushScheduledSlotLayoutSync()
})
export function scheduleSlotLayoutSync(nodeId: string) {
// Drop signals for unregistered nodes (e.g. preview nodes with synthetic
// ids from LGraphNodePreview) - they'd otherwise pump setDirty per RAF.
if (!useNodeSlotRegistryStore().getNode(nodeId)) return
function scheduleSlotLayoutSync(nodeId: string) {
pendingNodes.add(nodeId)
raf.schedule()
}

View File

@@ -43,8 +43,7 @@ const testState = vi.hoisted(() => ({
nodeLayouts: new Map<NodeId, NodeLayout>(),
batchUpdateNodeBounds: vi.fn(),
setSource: vi.fn(),
syncNodeSlotLayoutsFromDOM: vi.fn(),
scheduleSlotLayoutSync: vi.fn()
syncNodeSlotLayoutsFromDOM: vi.fn()
}))
vi.mock('@vueuse/core', () => ({
@@ -74,7 +73,6 @@ vi.mock('@/renderer/core/layout/store/layoutStore', () => ({
}))
vi.mock('./useSlotElementTracking', () => ({
scheduleSlotLayoutSync: testState.scheduleSlotLayoutSync,
syncNodeSlotLayoutsFromDOM: testState.syncNodeSlotLayoutsFromDOM
}))
@@ -161,7 +159,6 @@ describe('useVueNodeResizeTracking', () => {
testState.batchUpdateNodeBounds.mockReset()
testState.setSource.mockReset()
testState.syncNodeSlotLayoutsFromDOM.mockReset()
testState.scheduleSlotLayoutSync.mockReset()
resizeObserverState.observe.mockReset()
resizeObserverState.unobserve.mockReset()
resizeObserverState.disconnect.mockReset()
@@ -320,25 +317,4 @@ describe('useVueNodeResizeTracking', () => {
expect(testState.setSource).toHaveBeenCalledWith(LayoutSource.DOM)
expect(testState.batchUpdateNodeBounds).toHaveBeenCalled()
})
it('widgets-grid resize schedules a slot resync without writing node bounds', () => {
const parentNodeId: NodeId = 'parent-node'
const element = document.createElement('div')
element.dataset.widgetsGridNodeId = parentNodeId
const boxSizes = [{ inlineSize: 200, blockSize: 80 }]
const entry = {
target: element,
borderBoxSize: boxSizes,
contentBoxSize: boxSizes,
devicePixelContentBoxSize: boxSizes,
contentRect: new DOMRect(0, 0, 200, 80)
} satisfies ResizeEntryLike
resizeObserverState.callback?.([entry], createObserverMock())
expect(testState.scheduleSlotLayoutSync).toHaveBeenCalledWith(parentNodeId)
expect(testState.batchUpdateNodeBounds).not.toHaveBeenCalled()
expect(testState.setSource).not.toHaveBeenCalled()
expect(testState.syncNodeSlotLayoutsFromDOM).not.toHaveBeenCalled()
})
})

View File

@@ -24,10 +24,7 @@ import {
} from '@/renderer/core/layout/utils/geometry'
import { removeNodeTitleHeight } from '@/renderer/core/layout/utils/nodeSizeUtil'
import {
scheduleSlotLayoutSync,
syncNodeSlotLayoutsFromDOM
} from './useSlotElementTracking'
import { syncNodeSlotLayoutsFromDOM } from './useSlotElementTracking'
/**
* Generic update item for element bounds tracking
@@ -50,14 +47,14 @@ interface CachedNodeMeasurement {
interface ElementTrackingConfig {
/** Data attribute name (e.g., 'nodeId') */
dataAttribute: string
/** Handler for processing bounds updates. Omit for signal-only entries. */
updateHandler?: (updates: ElementBoundsUpdate[]) => void
/** Handler for processing bounds updates */
updateHandler: (updates: ElementBoundsUpdate[]) => void
}
/**
* Registry of tracking configurations by element type
*/
const trackingConfigs = new Map<string, ElementTrackingConfig>([
const trackingConfigs: Map<string, ElementTrackingConfig> = new Map([
[
'node',
{
@@ -70,10 +67,7 @@ const trackingConfigs = new Map<string, ElementTrackingConfig>([
layoutStore.batchUpdateNodeBounds(nodeUpdates)
}
}
],
// Signal-only: outer node stays at its persisted min-h floor during
// widget hydration, so the inner grid's RO is the only slot-drift signal.
['widgets-grid', { dataAttribute: 'widgetsGridNodeId' }]
]
])
// Elements whose ResizeObserver fired while the tab was hidden
@@ -127,14 +121,6 @@ const resizeObserver = new ResizeObserver((entries) => {
if (!(entry.target instanceof HTMLElement)) continue
const element = entry.target
// Signal-only widgets-grid resize - route the parent node through the
// slot-layout pipeline and skip bounds processing entirely.
const widgetsGridParentNodeId = element.dataset.widgetsGridNodeId
if (widgetsGridParentNodeId) {
scheduleSlotLayoutSync(widgetsGridParentNodeId as NodeId)
continue
}
// Find which type this element belongs to
let elementType: string | undefined
let elementId: string | undefined
@@ -252,7 +238,7 @@ const resizeObserver = new ResizeObserver((entries) => {
// Flush per-type
for (const [type, updates] of updatesByType) {
const config = trackingConfigs.get(type)
if (config?.updateHandler && updates.length) config.updateHandler(updates)
if (config && updates.length) config.updateHandler(updates)
}
}

View File

@@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>
@@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})
@@ -65,6 +67,7 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
@@ -72,13 +75,15 @@ const zNodeProgressState = z.object({
const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})
const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})
const zExecutedWsMessage = zExecutingWsMessage.extend({
@@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({
const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})
@@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
})
const zNotificationWsMessage = z.object({

View File

@@ -11,12 +11,22 @@ const {
mockNodeExecutionIdToNodeLocatorId,
mockNodeIdToNodeLocatorId,
mockNodeLocatorIdToNodeExecutionId,
mockShowTextPreview
mockShowTextPreview,
mockActiveWorkflow,
mockRevokePreviewsByExecutionId
} = vi.hoisted(() => ({
mockNodeExecutionIdToNodeLocatorId: vi.fn(),
mockNodeIdToNodeLocatorId: vi.fn(),
mockNodeLocatorIdToNodeExecutionId: vi.fn(),
mockShowTextPreview: vi.fn()
mockShowTextPreview: vi.fn(),
mockActiveWorkflow: {
current: null as null | {
activeState?: { id?: string }
initialState?: { id?: string }
path?: string
}
},
mockRevokePreviewsByExecutionId: vi.fn()
}))
import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore'
@@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => {
useWorkflowStore: vi.fn(() => ({
nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId,
nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId,
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId
nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId,
get activeWorkflow() {
return mockActiveWorkflow.current
}
}))
}
})
@@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({
}
}))
vi.mock('@/stores/imagePreviewStore', () => ({
vi.mock('@/stores/nodeOutputStore', () => ({
useNodeOutputStore: () => ({
revokePreviewsByExecutionId: vi.fn()
revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId
})
}))
@@ -491,6 +504,445 @@ describe('useExecutionStore - clearActiveJobIfStale', () => {
})
})
describe('useExecutionStore - active workflow gating', () => {
let store: ReturnType<typeof useExecutionStore>
function makeProgressNodes(
nodeId: string,
jobId: string
): Record<string, NodeProgressState> {
return {
[nodeId]: {
value: 5,
max: 10,
state: 'running',
node_id: nodeId,
prompt_id: jobId,
display_node_id: nodeId
}
}
}
function fireProgressState(
jobId: string,
nodes: Record<string, NodeProgressState>,
workflowId?: string
) {
const handler = apiEventHandlers.get('progress_state')
if (!handler) throw new Error('progress_state handler not bound')
handler(
new CustomEvent('progress_state', {
detail: { nodes, prompt_id: jobId, workflow_id: workflowId }
})
)
}
function fireProgress(
jobId: string,
nodeId: string,
workflowId?: string,
value = 5,
max = 10
) {
const handler = apiEventHandlers.get('progress')
if (!handler) throw new Error('progress handler not bound')
handler(
new CustomEvent('progress', {
detail: {
value,
max,
prompt_id: jobId,
node: nodeId,
workflow_id: workflowId
}
})
)
}
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
})
it('always updates per-job progress regardless of active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStatesByJob).toHaveProperty('job-other')
})
it('skips global mirror when message workflow_id mismatches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(store.nodeProgressStates).toEqual({})
})
it('updates global mirror when message workflow_id matches active workflow', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('falls back to jobIdToWorkflowId mapping when workflow_id missing', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.registerJobWorkflowIdMapping('job-other', 'wf-other')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('falls back to session path mapping when no id mapping is registered', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
store.ensureSessionWorkflowPath('job-other', '/wf-other.json')
fireProgressState('job-other', makeProgressNodes('1', 'job-other'))
expect(store.nodeProgressStates).toEqual({})
})
it('preserves single-tab behaviour when ownership is unresolvable', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown'))
expect(store.nodeProgressStates).toEqual(
makeProgressNodes('1', 'job-unknown')
)
})
it('updates mirror when there is no active workflow', () => {
mockActiveWorkflow.current = null
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1')
expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1'))
})
it('skips preview revocation for non-active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState(
'job-other',
makeProgressNodes('1', 'job-other'),
'wf-other'
)
expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled()
})
it('revokes previews for active workflow messages', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
mockRevokePreviewsByExecutionId.mockClear()
fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1')
})
it('skips _executingNodeProgress on workflow_id mismatch', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-other', '1', 'wf-other')
expect(store._executingNodeProgress).toBeNull()
})
it('updates _executingNodeProgress on workflow_id match', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
fireProgress('job-1', '1', 'wf-active', 7, 10)
expect(store._executingNodeProgress).toEqual({
value: 7,
max: 10,
prompt_id: 'job-1',
node: '1',
workflow_id: 'wf-active'
})
})
it('execution_start from a non-active workflow does not steal activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBeNull()
})
it('execution_start from active workflow adopts activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const handler = apiEventHandlers.get('execution_start')
if (!handler) throw new Error('execution_start handler not bound')
handler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_success from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const successHandler = apiEventHandlers.get('execution_success')
if (!successHandler) throw new Error('execution_success handler not bound')
successHandler(
new CustomEvent('execution_success', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_interrupted from a non-active workflow does not clear activeJobId', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: {
prompt_id: 'job-1',
timestamp: 0,
workflow_id: 'wf-active'
}
})
)
const intHandler = apiEventHandlers.get('execution_interrupted')
if (!intHandler) throw new Error('execution_interrupted handler not bound')
intHandler(
new CustomEvent('execution_interrupted', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
node_id: '1',
node_type: 'X',
executed: [],
workflow_id: 'wf-other'
}
})
)
expect(store.activeJobId).toBe('job-1')
})
it('execution_cached from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const cachedHandler = apiEventHandlers.get('execution_cached')
if (!cachedHandler) throw new Error('execution_cached handler not bound')
cachedHandler(
new CustomEvent('execution_cached', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
nodes: ['n1', 'n2']
}
})
)
expect(store.activeJob?.nodes).toEqual({})
})
it('executed from a non-active workflow does not mark active job nodes', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
const executedHandler = apiEventHandlers.get('executed')
if (!executedHandler) throw new Error('executed handler not bound')
executedHandler(
new CustomEvent('executed', {
detail: {
prompt_id: 'job-other',
node: 'n1',
display_node: 'n1',
workflow_id: 'wf-other',
output: {}
}
})
)
expect(store.activeJob?.nodes['n1']).toBeUndefined()
})
it('execution_error from a non-active workflow does not clear active job state but still clears the errored job initializing flag', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const startHandler = apiEventHandlers.get('execution_start')
if (!startHandler) throw new Error('execution_start handler not bound')
startHandler(
new CustomEvent('execution_start', {
detail: { prompt_id: 'job-1', timestamp: 0, workflow_id: 'wf-active' }
})
)
store.initializingJobIds = new Set(['job-other'])
const errorHandler = apiEventHandlers.get('execution_error')
if (!errorHandler) throw new Error('execution_error handler not bound')
errorHandler(
new CustomEvent('execution_error', {
detail: {
prompt_id: 'job-other',
timestamp: 0,
workflow_id: 'wf-other',
node_id: 'n1',
node_type: 'X',
executed: [],
exception_message: 'oops',
exception_type: 'RuntimeError',
traceback: [],
current_inputs: {},
current_outputs: {}
}
})
)
expect(store.activeJobId).toBe('job-1')
expect(store.initializingJobIds.has('job-other')).toBe(false)
expect(useExecutionErrorStore().lastExecutionError).toBeNull()
})
it('revokes preview when node transitions pending -> running', () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const pendingNodes: Record<string, NodeProgressState> = {
n1: {
value: 0,
max: 10,
state: 'pending',
node_id: 'n1',
prompt_id: 'job-1',
display_node_id: 'n1'
}
}
fireProgressState('job-1', pendingNodes, 'wf-active')
mockRevokePreviewsByExecutionId.mockClear()
const runningNodes: Record<string, NodeProgressState> = {
n1: { ...pendingNodes.n1, state: 'running', value: 1 }
}
fireProgressState('job-1', runningNodes, 'wf-active')
expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('n1')
})
})
describe('useExecutionStore - progress_text startup guard', () => {
let store: ReturnType<typeof useExecutionStore>
@@ -498,6 +950,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
nodeId: string
text: string
prompt_id?: string
workflow_id?: string
}) {
const handler = apiEventHandlers.get('progress_text')
if (!handler) throw new Error('progress_text handler not bound')
@@ -507,6 +960,7 @@ describe('useExecutionStore - progress_text startup guard', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -539,6 +993,50 @@ describe('useExecutionStore - progress_text startup guard', () => {
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
it('skips progress_text whose workflow_id mismatches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-other',
workflow_id: 'wf-other'
})
expect(mockShowTextPreview).not.toHaveBeenCalled()
})
it('forwards progress_text whose workflow_id matches active workflow', async () => {
mockActiveWorkflow.current = {
activeState: { id: 'wf-active' },
path: '/wf-active.json'
}
const mockNode = createMockLGraphNode({ id: 1 })
const { useCanvasStore } =
await import('@/renderer/core/canvas/canvasStore')
useCanvasStore().canvas = {
graph: { getNodeById: vi.fn(() => mockNode) }
} as unknown as LGraphCanvas
fireProgressText({
nodeId: '1',
text: 'warming up',
prompt_id: 'job-1',
workflow_id: 'wf-active'
})
expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up')
})
})
describe('useExecutionErrorStore - Node Error Lookups', () => {
@@ -818,6 +1316,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
beforeEach(() => {
vi.clearAllMocks()
apiEventHandlers.clear()
mockActiveWorkflow.current = null
setActivePinia(createTestingPinia({ stubActions: false }))
store = useExecutionStore()
store.bindExecutionEvents()
@@ -832,10 +1331,7 @@ describe('useExecutionStore - WebSocket event handlers', () => {
})
it('clears initializing state for the starting job', () => {
store.initializingJobIds = new Set([
'job-1',
'job-2'
]) as unknown as Set<string>
store.initializingJobIds = new Set(['job-1', 'job-2'])
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
expect(store.initializingJobIds.has('job-1')).toBe(false)
@@ -926,6 +1422,16 @@ describe('useExecutionStore - WebSocket event handlers', () => {
expect(store.activeJobId).toBeNull()
expect(store.queuedJobs['job-1']).toBeUndefined()
})
it('clears initializing state for the completed job', () => {
store.initializingJobIds = new Set(['job-1', 'job-2'])
fire('execution_start', { prompt_id: 'job-1', timestamp: 0 })
fire('execution_success', { prompt_id: 'job-1', timestamp: 0 })
expect(store.initializingJobIds.has('job-1')).toBe(false)
expect(store.initializingJobIds.has('job-2')).toBe(true)
})
})
describe('executing', () => {

View File

@@ -247,22 +247,32 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleExecutionStart(e: CustomEvent<ExecutionStartWsMessage>) {
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = e.detail.prompt_id
queuedJobs.value[activeJobId.value] ??= { nodes: {} }
clearInitializationByJobId(activeJobId.value)
const jobId = e.detail.prompt_id
queuedJobs.value[jobId] ??= { nodes: {} }
clearInitializationByJobId(jobId)
// Ensure path mapping exists — execution_start can arrive via WebSocket
// before the HTTP response from queuePrompt triggers storeJob.
if (!jobIdToSessionWorkflowPath.value.has(activeJobId.value)) {
const path = queuedJobs.value[activeJobId.value]?.workflow?.path
if (path) ensureSessionWorkflowPath(activeJobId.value, path)
if (!jobIdToSessionWorkflowPath.value.has(jobId)) {
const path = queuedJobs.value[jobId]?.workflow?.path
if (path) ensureSessionWorkflowPath(jobId, path)
}
// Only adopt as the global active job and clear shared UI state when the
// starting job belongs to the active workflow. Otherwise a job started
// from another tab would steal activeJobId and clobber the active tab's
// execution UI.
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
executionIdToLocatorCache.clear()
executionErrorStore.clearAllErrors()
activeJobId.value = jobId
}
function handleExecutionCached(e: CustomEvent<ExecutionCachedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
for (const n of e.detail.nodes) {
activeJob.value.nodes[n] = true
}
@@ -272,12 +282,15 @@ export const useExecutionStore = defineStore('execution', () => {
e: CustomEvent<ExecutionInterruptedWsMessage>
) {
const jobId = e.detail.prompt_id
if (activeJobId.value) clearInitializationByJobId(activeJobId.value)
clearInitializationByJobId(jobId)
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuted(e: CustomEvent<ExecutedWsMessage>) {
if (!activeJob.value) return
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
activeJob.value.nodes[e.detail.node] = true
}
@@ -288,16 +301,16 @@ export const useExecutionStore = defineStore('execution', () => {
})
}
const jobId = e.detail.prompt_id
clearInitializationByJobId(jobId)
if (!messageMatchesActiveWorkflow(jobId, e.detail.workflow_id)) return
resetExecutionState(jobId)
}
function handleExecuting(e: CustomEvent<NodeId | null>): void {
// Clear the current node progress when a new node starts executing
_executingNodeProgress.value = null
if (!activeJob.value) return
// Update the executing nodes list
if (typeof e.detail !== 'string') {
if (activeJobId.value) {
delete queuedJobs.value[activeJobId.value]
@@ -335,43 +348,110 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressState(e: CustomEvent<ProgressStateWsMessage>) {
const { nodes, prompt_id: jobId } = e.detail
const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
const isActiveWorkflowMessage = messageMatchesActiveWorkflow(
jobId,
messageWorkflowId
)
// Revoke previews for nodes that are starting to execute
const previousForJob = nodeProgressStatesByJob.value[jobId] || {}
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (nodeState.state === 'running' && !previousForJob[nodeId]) {
// This node just started executing, revoke its previews
// Note that we're doing the *actual* node id instead of the display node id
// here intentionally. That way, we don't clear the preview every time a new node
// within an expanded graph starts executing.
const { revokePreviewsByExecutionId } = useNodeOutputStore()
revokePreviewsByExecutionId(nodeId)
if (isActiveWorkflowMessage) {
const { revokePreviewsByExecutionId } = useNodeOutputStore()
for (const nodeId in nodes) {
const nodeState = nodes[nodeId]
if (
nodeState.state === 'running' &&
previousForJob[nodeId]?.state !== 'running'
) {
revokePreviewsByExecutionId(nodeId)
}
}
}
// Update the progress states for all nodes
nodeProgressStatesByJob.value = {
...nodeProgressStatesByJob.value,
[jobId]: nodes
}
evictOldProgressJobs()
nodeProgressStates.value = nodes
// If we have progress for the currently executing node, update it for backwards compatibility
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
if (isActiveWorkflowMessage) {
nodeProgressStates.value = nodes
if (executingNodeId.value && nodes[executingNodeId.value]) {
const nodeState = nodes[executingNodeId.value]
_executingNodeProgress.value = {
value: nodeState.value,
max: nodeState.max,
prompt_id: nodeState.prompt_id,
node: nodeState.display_node_id || nodeState.node_id
}
}
}
}
/**
* Determines whether a WebSocket execution message belongs to the
* currently active workflow tab. Used to gate writes to the global
* "current execution" mirror so a job initiated from another open
* workflow cannot leak its progress into the active one.
*
* Resolution order:
* 1. `workflow_id` carried on the WS message (when backend supports it).
* 2. {@link jobIdToWorkflowId} mapping populated when the job was queued
* from this tab.
* 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback).
*
* When the workflow cannot be resolved at all (e.g. job queued in a
* different browser session), the message is treated as belonging to
* the active workflow to preserve current behaviour for the existing
* single-tab common case.
*/
function messageMatchesActiveWorkflow(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
const activeWorkflow = workflowStore.activeWorkflow
if (!activeWorkflow) return true
const activeId =
activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null
if (messageWorkflowId && activeId) {
return messageWorkflowId === activeId
}
const mappedId = jobIdToWorkflowId.value.get(jobId)
if (mappedId && activeId) return mappedId === activeId
const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId)
if (mappedPath && activeWorkflow.path) {
return mappedPath === activeWorkflow.path
}
return true
}
/**
* Returns true when workflow ownership for {@link jobId} can be resolved
* — either by an explicit `workflow_id` on the incoming message or by a
* mapping registered when the job was queued. When this returns false
* the caller should fall back to whatever legacy guard applied before
* workflow gating was introduced.
*/
function canResolveWorkflowOwnership(
jobId: JobId,
messageWorkflowId: string | undefined
): boolean {
return (
Boolean(messageWorkflowId) ||
jobIdToWorkflowId.value.has(jobId) ||
jobIdToSessionWorkflowPath.value.has(jobId)
)
}
function handleProgress(e: CustomEvent<ProgressWsMessage>) {
const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail
if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return
_executingNodeProgress.value = e.detail
}
@@ -393,17 +473,16 @@ export const useExecutionStore = defineStore('execution', () => {
error: e.detail.exception_message
})
// Cloud wraps validation errors (400) in exception_message as embedded JSON.
if (handleCloudValidationError(e.detail)) return
}
// Service-level errors (e.g. "Job has stagnated") have no associated node.
// Route them as job errors
if (handleServiceLevelError(e.detail)) return
// OSS path / Cloud fallback (real runtime errors)
executionErrorStore.lastExecutionError = e.detail
clearInitializationByJobId(e.detail.prompt_id)
if (!messageMatchesActiveWorkflow(e.detail.prompt_id, e.detail.workflow_id))
return
executionErrorStore.lastExecutionError = e.detail
resetExecutionState(e.detail.prompt_id)
}
@@ -413,6 +492,9 @@ export const useExecutionStore = defineStore('execution', () => {
return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
executionErrorStore.lastPromptError = {
type: detail.exception_type ?? 'error',
@@ -431,6 +513,9 @@ export const useExecutionStore = defineStore('execution', () => {
if (!result) return false
clearInitializationByJobId(detail.prompt_id)
if (!messageMatchesActiveWorkflow(detail.prompt_id, detail.workflow_id))
return true
resetExecutionState(detail.prompt_id)
if (result.kind === 'nodeErrors') {
@@ -529,14 +614,21 @@ export const useExecutionStore = defineStore('execution', () => {
}
function handleProgressText(e: CustomEvent<ProgressTextWsMessage>) {
const { nodeId, text, prompt_id } = e.detail
const { nodeId, text, prompt_id, workflow_id } = e.detail
if (!text || !nodeId) return
// Filter: only accept progress for the active prompt
if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value)
return
// Prefer the workflow-ownership gate when ownership can be resolved.
// Only fall back to the legacy active-prompt guard when ownership is
// unresolvable; otherwise activeJobId pointing at a different workflow's
// job would incorrectly drop messages for the visible workflow.
if (prompt_id) {
if (canResolveWorkflowOwnership(prompt_id, workflow_id)) {
if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return
} else if (activeJobId.value && prompt_id !== activeJobId.value) {
return
}
}
// Handle execution node IDs for subgraphs
const currentId = getNodeIdIfExecuting(nodeId)
if (!currentId) return
const node = canvasStore.canvas?.graph?.getNodeById(currentId)

View File

@@ -1,7 +1,6 @@
import { createTestingPinia } from '@pinia/testing'
import ProgressSpinner from 'primevue/progressspinner'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { ref } from 'vue'
import { render, screen } from '@testing-library/vue'
@@ -57,8 +56,7 @@ vi.mock('@vueuse/core', () => ({
createSharedComposable: vi.fn((fn) => {
let cached: ReturnType<typeof fn>
return (...args: Parameters<typeof fn>) => (cached ??= fn(...args))
}),
useDocumentVisibility: vi.fn(() => ref<'visible' | 'hidden'>('visible'))
})
}))
vi.mock('@/config', () => ({