Compare commits

...

10 Commits

Author SHA1 Message Date
Matt Miller
c0f08b18f0 fix(agent): harden draft resync against conflict/close races
Address cursor-review panel findings on the self-heal resync path:

- runResync now clears a matching pendingConflict when it restores a
  newer snapshot, so a later resolveConflict can't re-apply stale
  content and roll the version watermark backward.
- runResync captures whether the workflow was tracked before the fetch;
  if forgetWorkflow ran during the in-flight fetch it no longer applies
  to (or resurrects tracking for) the closed tab.
- forgetWorkflow clears any pendingConflict targeting the closed
  workflow.
- parseToolCall / parseTokenUsage reject non-finite duration/token
  values via isFiniteNumber, matching the version-field hardening.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-01 00:53:51 -07:00
Matt Miller
e00363d8d3 fix(agent): reject non-finite versions and array records in wire decode
typeof === 'number' accepts NaN/Infinity, which would silently wedge the
CAS version compare (NaN !== NaN); typeof === 'object' accepts arrays,
letting an array pass as a WorkflowGraph. Tighten isRecord to exclude
arrays and gate version/base_version through a finite-number predicate.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-01 00:33:03 -07:00
Matt Miller
5b9d4765a9 feat: self-heal agent draft via snapshot resync and version watermark
Reconnect/cold-start now refetches the authoritative draft snapshot
(GET /api/agent/draft -> {content, version}) and seeds the reconciler,
so the panel restores the current draft without waiting for a new patch.

draftReconciler treats version as a monotonic watermark: a draft_patch is
adopted only when it advances past local, so a dropped/duplicated/
out-of-order Redis Pub/Sub patch can never regress local state. A patch
whose base_version is ahead of local signals missed patches (gap) and
triggers a snapshot refetch; an optional draft_version tip catches a
trailing lost patch. Transport stays best-effort; the client provides
reliability. Full-document V0 path only (CRDT out of scope).
2026-06-27 03:44:47 -07:00
Matt Miller
33da58e1c0 feat: decode canonical agent wire envelope in parseAgentEvent
parseAgentEvent now reads the backend envelope { type, data: { ...snake_case } },
mapping thread_id/message_id/workflow_id/base_version/tool_call_id/tool_name/
duration_ms/error_code to the existing camelCase event fields and usage{input,
output} to tokenUsage, while keeping strict validation (malformed -> null).

Adds serializeAgentTurnRequest to emit the snake_case /api/agent/* request body
({content, selection, attachments, target, base_version}). base_version and
workflow_id now flow from the wire through useAgentDraftSync into the reconciler.
2026-06-27 02:53:33 -07:00
GitHub Action
3e91a60cf9 [automated] Apply ESLint and Oxfmt fixes 2026-06-26 00:59:21 +00:00
Connor Byrne
06cbb44a57 feat(agent): CRDT room layer + Immer session, wired to layout store
Adds the long-term CRDT peer path alongside the V0 server-draft path, both
as TDD-as-code (ADR-0011). Not wired into the chat UI yet.

CRDT (src/platform/agent/crdt):
- agentRoom: Y.Doc-backed room, real state-vector reconciliation + presence
- roomSync: transport-agnostic two-phase Yjs sync (y-protocols/sync style)
- agentRoomManager: room lifecycle (join/leave/pin) kept alive across tabs
- roomDocBinding: generic bidirectional Y.Doc binding with echo guards

Wiring (renderer layer):
- bindRoomToLayoutStore: binds a room to the real layoutStore singleton via
  its getYDoc/applyUpdate surface; test proves an agent edit bumps the store

Session (src/platform/agent/session):
- agentSessionStore: Immer reducer for local chat state (deltas, tool calls)

Keeps platform/ free of renderer/ imports per the layer architecture. Adds
immer dep; ignores the agent wire-contract module in knip pending UI wiring.
2026-06-25 17:55:09 -07:00
GitHub Action
5237112114 [automated] Apply ESLint and Oxfmt fixes 2026-06-26 00:22:24 +00:00
Connor Byrne
289ea0562c feat(agent): prototype draft sync + protocol for in-app agent (TDD-as-code)
Prototype supporting ADR-0011. No UI wiring yet.

- agentProtocol.ts: typed FE<->agent contract + parseAgentEvent() decoder
  for untrusted WS payloads
- draftReconciler.ts: pure version-CAS reconcile (apply/conflict/stale)
  per ADR-0004/0005
- useAgentDraftSync.ts: composable tracking per-workflow base versions and
  driving the three merge-dialog outcomes via injected canvas ports
- unit tests for all three (17 tests)
2026-06-25 17:18:47 -07:00
GitHub Action
ce4a53c498 [automated] Apply ESLint and Oxfmt fixes 2026-06-26 00:07:56 +00:00
Connor Byrne
9ed93698a0 docs: add ADR-0011 for in-app agent graph-state integration
Records the V0 frontend graph-state and sync design for the server-side
in-app agent: server workflow_draft as authority, full-document draft_patch
over the existing Redis->WS bridge, version-CAS + merge dialog for conflicts,
and a room-per-graph framing that bridges to the CRDT end-state (ADR-0003,
issue #4661). Design-record only; no functional code.
2026-06-25 16:45:03 -07:00
24 changed files with 2137 additions and 4 deletions

View File

@@ -0,0 +1,209 @@
# 11. In-App Agent Graph-State Integration
Date: 2026-06-25
## Status
Proposed
## Context
The In-App Agent (V0, target ~2026-07-13) adds a server-side ComfyUI agent the user chats with
from a side panel. A hard product requirement is that the agent can **read the user's live
workflow** and **write workflow changes back onto the canvas**. This ADR records the frontend
graph-state and synchronization design for that capability. It is intentionally written as
"TDD-as-code": a design record, ahead of the implementation PRs.
This decision builds directly on:
- [ADR-0001 (Merge LiteGraph)](0001-merge-litegraph-into-frontend.md) and
[ADR-0003 (Centralized Layout Management with CRDT)](0003-crdt-based-layout-system.md), which
established a Yjs CRDT store as the single source of truth for spatial state and a command/
observer model for mutations.
- The long-term architectural direction RFC ([issue #4661]), which makes a CRDT-mediated state
layer the foundation for multiplayer — and frames the agent as **just another client** of a
per-graph room.
### Forces at play
1. **A second writer.** Until now the only writer to a workflow is the local user. The agent
introduces a second, remote writer to the _same_ graph. We need conflict handling, not
last-write-wins.
2. **The backend is server-authoritative.** The cloud backend (`Comfy-Org/cloud` PR #4432)
introduces a mutable server-side **`workflow_draft`** (full save-format JSON + integer
`version`), commits edits with **compare-and-swap on `version`**, and pushes results to the
browser as a **full-document replace** over an existing **Redis-PubSub → WebSocket** bridge
(`channel:ws:{workspaceId}:u:{userId}`). Inbound chat turns go through ingest `/api/agent/*`.
3. **The frontend is moving toward decentralized CRDT.** Per ADR-0003 / #4661 the end-state is
per-graph rooms with mutation relay and CRDT merge — _not_ server-authoritative full-document
replace. The V0 backend model and the FE end-state are different shapes.
4. **Timeline.** V0 ships in ~3 weeks. The CRDT migration of the _data-model_ class of state
(node existence, widget values) is still in progress; only the _layout_ class is fully on the
Yjs store today. A true per-mutation CRDT sync for the agent is not ready for V0.
5. **No throwaway work.** Whatever we ship for V0 must be a strict subset of the #4661 end-state.
### What V0 is NOT
- The agent does **not** draw, animate, or incrementally lay out nodes on the canvas.
- The agent does **not** submit/run the workflow — the user clicks the existing Run button.
- The agent is **not** aware of viewport state (zoom/pan/cursor) — that is FE-only ephemeral
state and is never synced.
## Decision
For V0 the frontend treats the **server `workflow_draft` as the authority**, integrates agent
writes as **full-document replaces guarded by `version`**, and frames the whole interaction as a
**room-per-graph** model so it is forward-compatible with the CRDT end-state.
### Graph-state model
| State class | Source of truth (V0) | Synced to agent? |
| ---------------------------------------------------------- | ------------------------------------------------------ | ---------------- |
| Save-format **data model** (nodes, links, widgets, groups) | server `workflow_draft.content` | read + write |
| **Layout** (positions/sizes/reroutes) | within `content`; mirrors Yjs `layoutStore` (ADR-0003) | within content |
| **Selection** (selected node ids) | browser, sent per turn | per-turn input |
| **Viewport** (zoom/pan/cursor) | browser only | never |
- Each `workflow_draft` is a **room**. V0 has up to two writers: the human (via autosave-to-draft)
and the agent. The browser keeps a draft's tab **alive in memory** while connected so agent
pushes apply even when the tab is unfocused (lazy apply on refocus) — the CRDT room behavior,
minus true merge.
- The browser **autosaves canvas edits into the draft** so the server copy reflects unsaved work
before a turn. Agent awareness then reduces to "read the draft + read the selection ids".
### Synchronization & conflict handling
The agent→browser push is a `draft_patch { workflow_id, content, version, base_version }`.
```
agent commits draft (CAS version N → N+1)
→ draft_patch { content, version: N+1, base_version: N } over Redis → WS
browser:
tab.version == base_version → apply full replace, adopt new version (happy path)
tab.version != base_version → MERGE DIALOG: [Accept agent's] [Keep mine] [Open in new tab]
```
- **Apply** = load the full save-format graph into the target tab (a destructive variant of the
existing `loadGraphData` path) and adopt `version` as the tab's new base.
- **Conflict** (user edited the graph during the agent's turn) surfaces a dialog rather than
silently clobbering. We explicitly reject **graph-locking** as the primary mechanism: a
lost/duplicated backend message could leave the graph _permanently_ locked. A presentational
"agent editing…" hint MAY be driven by the optional backend edit-turn lease, but it is never on
the correctness path.
- The agent can also target a **new tab** (`target: "new_tab"`) for unrelated requests — a
non-destructive load, no conflict possible.
### Awareness & run
- A chat turn carries `{ content, selection?: NodeId[], attachments?, target }`. `selection` is
the set of node ids from the canvas (the panel's `@`-tag chips). The agent reads the draft data
model server-side; the browser does not scrape the live graph.
- The agent never runs the workflow in V0; after a write it tells the user the graph is loaded and
to click Run. Submit is gated off for the in-app client.
### Migration path to the CRDT end-state (#4661)
This is the load-bearing reason the V0 shape is acceptable. The transition is a payload swap, not
a rewrite:
1. **V0:** full-document `draft_patch`; convergence via `version` CAS + merge dialog.
2. **When the data-model class finishes migrating to the Yjs store:** `draft_patch` gains a
**mutation-list** variant applied via `layoutStore.applyOperation(op)` tagged
`LayoutSource.External` with a dedicated agent actor id (the store already tracks
source/actor). Full replace remains the fallback for large rewrites / new tabs.
3. **Multiplayer:** server relays mutations; both client and server write + reconcile via CRDT
merge, retiring the merge dialog for fine-grained edits. The room model, actor/source tracking,
and the Redis channel are unchanged.
### Prototype implementation (this PR)
This PR ships TDD-as-code for **both** layers so the V0 shape and the end-state are exercised, not
just described. Nothing is wired into the chat UI yet.
**Layer A — V0 server-draft path** (`src/platform/agent/common/`)
- `agentProtocol.ts` — typed FE⇄agent wire contract + `parseAgentEvent()` decoder for untrusted
WS payloads.
- `draftReconciler.ts` — pure `version`-CAS decision (`apply` / `conflict` / `stale`).
- `useAgentDraftSync.ts` — composable tracking per-workflow base `version` and the three merge
outcomes via injected canvas ports.
**Layer B — CRDT peer path (the #4661 migration target)** (`src/platform/agent/crdt/`)
- `agentRoom.ts` — a `Y.Doc`-backed room whose top-level types (`nodes`/`links`/`reroutes`) mirror
the layout store; real `encodeStateVector`/`diffSince`/`applyRemoteUpdate` + presence.
- `roomSync.ts` — transport-agnostic two-phase Yjs sync (state-vector handshake + live updates),
modelled on `y-protocols/sync`; runs over the existing Redis→WS bridge.
- `agentRoomManager.ts` — room lifecycle: a tab switch is a `join`/`leave`; rooms stay alive in
memory while referenced or pinned (agent editing a backgrounded workflow).
- `roomDocBinding.ts` — generic, layer-safe bidirectional `Y.Doc` binding with origin-tag echo
guards.
**Wiring (renderer layer)**`src/renderer/core/layout/agent/bindRoomToLayoutStore.ts` binds a
room to the **real** `layoutStore` singleton via its existing (`"future feature"`)
`getYDoc()`/`applyUpdate` surface. A test drives an agent room edit into the live store and asserts
the version bump — concrete proof of the "few lines of code" claim. Layout binds today; data-model
mutations bind the same way once that class finishes migrating to the store.
**Local chat state**`src/platform/agent/session/agentSessionStore.ts` is an **Immer** reducer
for streaming deltas / tool-call lifecycle. Chat is single-client, so it is deliberately _not_ a
CRDT: Yjs owns graph state, Immer owns local UI state, and the two never mix.
A layering constraint surfaced during this work: `platform/` may not import `renderer/`
(base → platform → workbench → renderer). Hence the pure CRDT core lives in `platform/` and only
the thin singleton wiring lives in `renderer/`.
### Alternatives considered
- **Build true CRDT (Yjs) agent sync in V0** — rejected: data-model CRDT migration incomplete;
misses the timeline; high risk.
- **Always open a new tab for agent output** — rejected: simplest, but fails "update what I'm
looking at" and causes tab sprawl.
- **Graph-locking / "agent mode" that blocks user edits** — rejected as primary mechanism:
permanent-lock dead-end risk on message loss.
- **Browser scrapes the live graph per turn** — rejected: invites client/server drift;
autosave-to-draft keeps the canonical server copy current instead.
## Consequences
### Positive
- Hits the V0 timeline by reusing the backend draft + the existing Redis→WS transport; no new
realtime infrastructure on the frontend.
- A clean, documented bridge to the #4661 CRDT end-state: room-per-graph, agent-as-client, and the
ADR-0003 source/actor model all carry forward unchanged.
- Conflicts never silently destroy user work; the dialog appears only in the genuine
concurrent-edit case.
- Awareness is minimal and robust: read the draft + the selection ids.
### Negative
- Full-document replace is coarse-grained: a concurrent user edit during an agent turn collides
and must go through the merge dialog rather than merging automatically.
- Introduces a frontend-owned **base-`version` lifecycle** (obtain on draft open, bump on apply
and on autosave). If this drifts, the merge dialog can mis-fire — this is the main correctness
risk to get right.
- The agent→browser event schema becomes a **cross-repo contract** (Go ⇄ TS); it must be
versioned and drift-guarded.
- A temporary semantic gap with V0 product copy ("Agent generation does not impact the graph")
that must be reconciled now that write-to-graph is in scope.
## Notes
### Open questions
1. **Tab closed mid-edit.** If the user closes the draft's tab while the agent is editing, do
pending changes invalidate (and report back to the agent) or persist server-side and reopen on
a new tab? Affects the room lifecycle.
2. **Base-`version` lifecycle.** Exact points where the tab obtains/bumps its base `version` so
the merge dialog cannot mis-fire or desync.
3. **Event schema home & versioning.** Where the shared `draft_patch` / `agent_message_delta` /
`agent_tool_call` / `agent_message_done` contract lives and how drift is caught.
### References
- [ADR-0001](0001-merge-litegraph-into-frontend.md), [ADR-0003](0003-crdt-based-layout-system.md)
- RFC: Long-Term Architectural Direction for ComfyUI_frontend (issue #4661)
- Backend slice: `Comfy-Org/cloud` PR #4432 (`workflow_draft`, ingest `/api/agent/*`, Redis PubSub)
- Existing FE entry points: `src/renderer/core/layout/store/layoutStore.ts`,
`src/renderer/core/layout/operations/layoutMutations.ts`, `src/scripts/app.ts` (`loadGraphData`)

View File

@@ -20,6 +20,7 @@ An Architecture Decision Record captures an important architectural decision mad
| [0008](0008-entity-component-system.md) | Entity Component System | Proposed | 2026-03-23 |
| [0009](0009-subgraph-promoted-widgets-use-linked-inputs.md) | Subgraph Promoted Widgets Use Linked Inputs | Proposed | 2026-05-05 |
| [0010](0010-remove-nx-orchestration.md) | Remove Nx Orchestration | Accepted | 2026-05-19 |
| [0011](0011-in-app-agent-graph-state-integration.md) | In-App Agent Graph-State Integration | Proposed | 2026-06-25 |
## Creating a New ADR

View File

@@ -57,6 +57,8 @@ const config: KnipConfig = {
// Marketing media tooling — adopted by pages in a follow-up PR
'apps/website/src/components/common/SiteVideo.vue',
'apps/website/src/utils/marketingImage.ts',
// In-app agent wire contract (ADR-0011) — public types pending UI integration
'src/platform/agent/common/agentProtocol.ts',
// Agent review check config, not part of the build
'.agents/checks/eslint.strict.config.js',
// Devtools extensions, included dynamically

View File

@@ -111,6 +111,7 @@
"firebase": "catalog:",
"fuse.js": "^7.0.0",
"glob": "catalog:",
"immer": "catalog:",
"jsonata": "catalog:",
"loglevel": "^1.9.2",
"marked": "^15.0.11",

20
pnpm-lock.yaml generated
View File

@@ -258,6 +258,9 @@ catalogs:
husky:
specifier: ^9.1.7
version: 9.1.7
immer:
specifier: ^11.1.8
version: 11.1.8
jiti:
specifier: 2.6.1
version: 2.6.1
@@ -579,6 +582,9 @@ importers:
glob:
specifier: 'catalog:'
version: 13.0.6
immer:
specifier: 'catalog:'
version: 11.1.8
jsonata:
specifier: 'catalog:'
version: 2.1.0
@@ -654,7 +660,7 @@ importers:
version: 4.5.0(eslint@10.4.0(jiti@2.6.1))(jsonc-eslint-parser@2.4.0)(vue-eslint-parser@10.4.0(eslint@10.4.0(jiti@2.6.1)))(yaml-eslint-parser@1.3.0)
'@lobehub/i18n-cli':
specifier: 'catalog:'
version: 1.26.1(@types/react@19.1.9)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)
version: 1.26.1(@types/react@19.1.9)(immer@11.1.8)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)
'@pinia/testing':
specifier: 'catalog:'
version: 1.0.3(pinia@3.0.4(typescript@5.9.3)(vue@3.5.34(typescript@5.9.3)))
@@ -6042,6 +6048,9 @@ packages:
immediate@3.0.6:
resolution: {integrity: sha512-XXOFtyqDjNDAQxVfYxuF7g9Il/IbWmmlQg2MYKOH8ExIT1qg6xc4zyS3HaEEATgs1btfzxq15ciUiY7gjSXRGQ==}
immer@11.1.8:
resolution: {integrity: sha512-/tbkHMW7y10Lx6i1crLjD4/OhNkRG+Fo7byZHtah0547nIeXYcpIXaUh0IAQY6gO5459qpGGYapcEOHtFXkIuA==}
import-fresh@3.3.1:
resolution: {integrity: sha512-TR3KfrTZTYLPB6jUjfx6MF9WcWrHL9su5TObK4ZkYgBdWKPOFoSoQIdEuTuR82pmtxH2spWG9h6etwfr1pLBqQ==}
engines: {node: '>=6'}
@@ -10521,7 +10530,7 @@ snapshots:
- react-devtools-core
- utf-8-validate
'@lobehub/i18n-cli@1.26.1(@types/react@19.1.9)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)':
'@lobehub/i18n-cli@1.26.1(@types/react@19.1.9)(immer@11.1.8)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)':
dependencies:
'@lobehub/cli-ui': 1.13.0(@types/react@19.1.9)
'@yutengjing/eld': 0.0.2
@@ -10552,7 +10561,7 @@ snapshots:
unified: 11.0.5
unist-util-visit: 5.1.0
update-notifier: 7.3.1
zustand: 5.0.11(@types/react@19.1.9)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4))
zustand: 5.0.11(@types/react@19.1.9)(immer@11.1.8)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4))
transitivePeerDependencies:
- '@types/react'
- bufferutil
@@ -14361,6 +14370,8 @@ snapshots:
immediate@3.0.6: {}
immer@11.1.8: {}
import-fresh@3.3.1:
dependencies:
parent-module: 1.0.1
@@ -17949,9 +17960,10 @@ snapshots:
zod@4.3.6: {}
zustand@5.0.11(@types/react@19.1.9)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4)):
zustand@5.0.11(@types/react@19.1.9)(immer@11.1.8)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4)):
optionalDependencies:
'@types/react': 19.1.9
immer: 11.1.8
react: 19.2.4
use-sync-external-store: 1.6.0(react@19.2.4)

View File

@@ -95,6 +95,7 @@ catalog:
gsap: ^3.14.2
happy-dom: ^20.8.9
husky: ^9.1.7
immer: ^11.1.8
jiti: 2.6.1
jsdom: ^27.4.0
jsonata: ^2.1.0

View File

@@ -0,0 +1,273 @@
import { describe, expect, it } from 'vitest'
import type { AgentTurnRequest } from './agentProtocol'
import {
parseAgentEvent,
parseDraftSnapshot,
serializeAgentTurnRequest
} from './agentProtocol'
const data = { thread_id: 't1', message_id: 'm1' }
describe('parseAgentEvent', () => {
it('decodes a draft_patch from the snake_case envelope', () => {
const event = parseAgentEvent({
type: 'draft_patch',
data: {
...data,
workflow_id: 'wf1',
content: { nodes: [] },
version: 8,
base_version: 7
}
})
expect(event).toEqual({
type: 'draft_patch',
threadId: 't1',
messageId: 'm1',
workflowId: 'wf1',
content: { nodes: [] },
version: 8,
baseVersion: 7
})
})
it('decodes a message delta', () => {
const event = parseAgentEvent({
type: 'agent_message_delta',
data: { ...data, delta: 'hi' }
})
expect(event).toEqual({
type: 'agent_message_delta',
threadId: 't1',
messageId: 'm1',
delta: 'hi'
})
})
it('decodes a tool call and drops absent optional fields', () => {
const event = parseAgentEvent({
type: 'agent_tool_call',
data: {
...data,
tool_call_id: 'tc1',
tool_name: 'workflow set-slot',
status: 'success'
}
})
expect(event).toEqual({
type: 'agent_tool_call',
threadId: 't1',
messageId: 'm1',
toolCallId: 'tc1',
toolName: 'workflow set-slot',
status: 'success'
})
})
it('maps tool call duration and error code from snake_case', () => {
const event = parseAgentEvent({
type: 'agent_tool_call',
data: {
...data,
tool_call_id: 'tc1',
tool_name: 'run',
status: 'error',
duration_ms: 1200,
error_code: 'OOM'
}
})
expect(event).toMatchObject({ durationMs: 1200, errorCode: 'OOM' })
})
it('omits a non-finite tool call duration', () => {
for (const durationMs of [Number.NaN, Number.POSITIVE_INFINITY]) {
const event = parseAgentEvent({
type: 'agent_tool_call',
data: {
...data,
tool_call_id: 'tc1',
tool_name: 'run',
status: 'success',
duration_ms: durationMs
}
})
expect(event).not.toHaveProperty('durationMs')
}
})
it('maps message done usage to tokenUsage', () => {
const event = parseAgentEvent({
type: 'agent_message_done',
data: { ...data, usage: { input: 12, output: 34 } }
})
expect(event).toEqual({
type: 'agent_message_done',
threadId: 't1',
messageId: 'm1',
tokenUsage: { input: 12, output: 34 }
})
})
it('omits tokenUsage when a usage field is non-finite', () => {
const event = parseAgentEvent({
type: 'agent_message_done',
data: { ...data, usage: { input: 12, output: Number.NaN } }
})
expect(event).not.toHaveProperty('tokenUsage')
})
it('omits tokenUsage when usage is absent', () => {
const event = parseAgentEvent({
type: 'agent_message_done',
data: { ...data }
})
expect(event).toEqual({
type: 'agent_message_done',
threadId: 't1',
messageId: 'm1'
})
})
it('rejects an unknown event type', () => {
expect(parseAgentEvent({ type: 'nope', data })).toBeNull()
})
it('rejects a payload with no envelope body', () => {
expect(
parseAgentEvent({ type: 'agent_message_delta', delta: 'hi' })
).toBeNull()
})
it('rejects a payload missing the base identifiers', () => {
expect(
parseAgentEvent({
type: 'agent_message_delta',
data: { delta: 'hi' }
})
).toBeNull()
})
it('rejects a draft_patch with a malformed version', () => {
const event = parseAgentEvent({
type: 'draft_patch',
data: {
...data,
workflow_id: 'wf1',
content: {},
version: '8',
base_version: 7
}
})
expect(event).toBeNull()
})
it('rejects a draft_patch missing base_version', () => {
const event = parseAgentEvent({
type: 'draft_patch',
data: { ...data, workflow_id: 'wf1', content: {}, version: 8 }
})
expect(event).toBeNull()
})
it('rejects a draft_patch with a non-finite version', () => {
for (const version of [Number.NaN, Number.POSITIVE_INFINITY]) {
const event = parseAgentEvent({
type: 'draft_patch',
data: {
...data,
workflow_id: 'wf1',
content: {},
version,
base_version: 7
}
})
expect(event).toBeNull()
}
})
it('rejects a draft_patch whose content is an array', () => {
const event = parseAgentEvent({
type: 'draft_patch',
data: {
...data,
workflow_id: 'wf1',
content: [],
version: 8,
base_version: 7
}
})
expect(event).toBeNull()
})
it('rejects a tool call with an invalid status', () => {
const event = parseAgentEvent({
type: 'agent_tool_call',
data: {
...data,
tool_call_id: 'tc1',
tool_name: 'run',
status: 'pending'
}
})
expect(event).toBeNull()
})
})
describe('serializeAgentTurnRequest', () => {
it('serializes baseVersion to the snake_case body', () => {
const request: AgentTurnRequest = {
content: 'add a sampler',
selection: ['1', '2'],
attachments: ['asset-1'],
target: 'active',
baseVersion: 7
}
expect(serializeAgentTurnRequest(request)).toEqual({
content: 'add a sampler',
selection: ['1', '2'],
attachments: ['asset-1'],
target: 'active',
base_version: 7
})
})
it('omits absent optional fields', () => {
expect(serializeAgentTurnRequest({ content: 'hi' })).toEqual({
content: 'hi'
})
})
it('keeps base_version 0 rather than dropping it', () => {
expect(
serializeAgentTurnRequest({ content: 'hi', baseVersion: 0 })
).toEqual({ content: 'hi', base_version: 0 })
})
})
describe('parseDraftSnapshot', () => {
it('decodes a well-formed snapshot body', () => {
const snapshot = parseDraftSnapshot({
content: { nodes: ['ksampler'] },
version: 12
})
expect(snapshot).toEqual({ content: { nodes: ['ksampler'] }, version: 12 })
})
it('accepts version 0', () => {
expect(parseDraftSnapshot({ content: {}, version: 0 })).toEqual({
content: {},
version: 0
})
})
it('rejects a malformed or partial body', () => {
expect(parseDraftSnapshot(null)).toBeNull()
expect(parseDraftSnapshot({ content: { nodes: [] } })).toBeNull()
expect(parseDraftSnapshot({ version: 3 })).toBeNull()
expect(parseDraftSnapshot({ content: 'nope', version: 3 })).toBeNull()
expect(parseDraftSnapshot({ content: [], version: 3 })).toBeNull()
expect(parseDraftSnapshot({ content: {}, version: '3' })).toBeNull()
expect(parseDraftSnapshot({ content: {}, version: Number.NaN })).toBeNull()
})
})

View File

@@ -0,0 +1,248 @@
/**
* In-App Agent protocol (prototype — ADR-0011).
*
* The cross-repo contract between the frontend (TS) and the server-side agent
* (Go, `Comfy-Org/cloud`). Inbound requests go to ingest `/api/agent/*`;
* outbound events arrive over the existing Redis-PubSub -> WebSocket bridge on
* `channel:ws:{workspaceId}:u:{userId}`.
*
* This is the single TS definition of that contract; it should be kept in sync
* with the Go side (open question: where the schema lives + how drift is caught).
*/
export type WorkflowId = string
export type ThreadId = string
export type MessageId = string
export type NodeId = string
/** Full save-format graph. Opaque here; validated by the workflow schema layer. */
export type WorkflowGraph = Record<string, unknown>
// ---------------------------------------------------------------------------
// Inbound: browser -> agent
// ---------------------------------------------------------------------------
/** Where an agent write lands (ADR-0001). */
export type AgentWriteTarget = 'active' | 'new_tab'
export interface AgentTurnRequest {
content: string
/** Selected node ids — the awareness input (ADR-0003). */
selection?: NodeId[]
/** Uploaded asset ids referenced by the turn. */
attachments?: string[]
target?: AgentWriteTarget
/** The tab's current draft version when `target === 'active'` (ADR-0005). */
baseVersion?: number
}
/** Wire body the backend `/api/agent/*` endpoints accept (snake_case). */
export interface AgentTurnRequestBody {
content: string
selection?: NodeId[]
attachments?: string[]
target?: AgentWriteTarget
base_version?: number
}
/** Serialize a turn request to the snake_case body the backend expects. */
export function serializeAgentTurnRequest(
request: AgentTurnRequest
): AgentTurnRequestBody {
return {
content: request.content,
...(request.selection ? { selection: request.selection } : {}),
...(request.attachments ? { attachments: request.attachments } : {}),
...(request.target ? { target: request.target } : {}),
...(request.baseVersion !== undefined
? { base_version: request.baseVersion }
: {})
}
}
// ---------------------------------------------------------------------------
// Outbound: agent -> browser
// ---------------------------------------------------------------------------
export type AgentToolCallStatus = 'running' | 'success' | 'error'
interface AgentEventBase {
threadId: ThreadId
messageId: MessageId
}
export interface AgentMessageDeltaEvent extends AgentEventBase {
type: 'agent_message_delta'
delta: string
}
export interface AgentToolCallEvent extends AgentEventBase {
type: 'agent_tool_call'
toolCallId: string
toolName: string
status: AgentToolCallStatus
durationMs?: number
errorCode?: string
}
/** A graph write: full-document replace guarded by `version` (ADR-0004). */
export interface DraftPatchEvent extends AgentEventBase {
type: 'draft_patch'
workflowId: WorkflowId
content: WorkflowGraph
/** The new authoritative version after the agent's CAS commit. */
version: number
/** The version the agent started from; compared against the tab (ADR-0005). */
baseVersion: number
}
export interface AgentMessageDoneEvent extends AgentEventBase {
type: 'agent_message_done'
tokenUsage?: { input: number; output: number }
}
export type AgentEvent =
| AgentMessageDeltaEvent
| AgentToolCallEvent
| DraftPatchEvent
| AgentMessageDoneEvent
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value)
}
/** A finite number — rejects `NaN`/`Infinity`, which would wedge CAS version compares. */
function isFiniteNumber(value: unknown): value is number {
return typeof value === 'number' && Number.isFinite(value)
}
/** The base identifiers, read from the snake_case envelope body. */
function parseBase(data: Record<string, unknown>): AgentEventBase | null {
return typeof data.thread_id === 'string' &&
typeof data.message_id === 'string'
? { threadId: data.thread_id, messageId: data.message_id }
: null
}
function isToolCallStatus(value: unknown): value is AgentToolCallStatus {
return value === 'running' || value === 'success' || value === 'error'
}
function parseToolCall(
data: Record<string, unknown>,
base: AgentEventBase
): AgentToolCallEvent | null {
if (
typeof data.tool_call_id !== 'string' ||
typeof data.tool_name !== 'string' ||
!isToolCallStatus(data.status)
) {
return null
}
return {
type: 'agent_tool_call',
...base,
toolCallId: data.tool_call_id,
toolName: data.tool_name,
status: data.status,
...(isFiniteNumber(data.duration_ms)
? { durationMs: data.duration_ms }
: {}),
...(typeof data.error_code === 'string'
? { errorCode: data.error_code }
: {})
}
}
function parseDraftPatch(
data: Record<string, unknown>,
base: AgentEventBase
): DraftPatchEvent | null {
if (
typeof data.workflow_id !== 'string' ||
!isRecord(data.content) ||
!isFiniteNumber(data.version) ||
!isFiniteNumber(data.base_version)
) {
return null
}
return {
type: 'draft_patch',
...base,
workflowId: data.workflow_id,
content: data.content,
version: data.version,
baseVersion: data.base_version
}
}
function parseTokenUsage(
usage: unknown
): { input: number; output: number } | undefined {
return isRecord(usage) &&
isFiniteNumber(usage.input) &&
isFiniteNumber(usage.output)
? { input: usage.input, output: usage.output }
: undefined
}
/**
* Decode an untrusted WebSocket payload into a typed `AgentEvent`, or `null` if
* it is not a well-formed agent event. The wire format is the canonical backend
* envelope `{ type, data: { …snake_case… } }`; the event body lives under
* `data`. Keeps the transport boundary type-safe.
*/
export function parseAgentEvent(raw: unknown): AgentEvent | null {
if (!isRecord(raw) || !isRecord(raw.data)) return null
const { data } = raw
const base = parseBase(data)
if (!base) return null
switch (raw.type) {
case 'agent_message_delta':
return typeof data.delta === 'string'
? { type: 'agent_message_delta', ...base, delta: data.delta }
: null
case 'agent_tool_call':
return parseToolCall(data, base)
case 'draft_patch':
return parseDraftPatch(data, base)
case 'agent_message_done': {
const tokenUsage = parseTokenUsage(data.usage)
return {
type: 'agent_message_done',
...base,
...(tokenUsage ? { tokenUsage } : {})
}
}
default:
return null
}
}
// ---------------------------------------------------------------------------
// Draft snapshot: GET /api/agent/draft?workflow_id=... -> { content, version }
// ---------------------------------------------------------------------------
/**
* The authoritative server draft for a workflow (ADR-0011). Fetched on WS
* (re)connect and whenever a gap is suspected, to seed/reconcile the tab's base
* `version` without waiting for the agent to emit a new `draft_patch`.
*/
export interface DraftSnapshot {
content: WorkflowGraph
version: number
}
/** Decode an untrusted `GET /api/agent/draft` body, or `null` if malformed. */
export function parseDraftSnapshot(raw: unknown): DraftSnapshot | null {
if (
!isRecord(raw) ||
!isRecord(raw.content) ||
!isFiniteNumber(raw.version)
) {
return null
}
return { content: raw.content, version: raw.version }
}

View File

@@ -0,0 +1,49 @@
import { describe, expect, it } from 'vitest'
import type { DraftPatchEvent } from './agentProtocol'
import { reconcileDraftPatch } from './draftReconciler'
function patch(overrides: Partial<DraftPatchEvent> = {}): DraftPatchEvent {
return {
type: 'draft_patch',
threadId: 't1',
messageId: 'm1',
workflowId: 'wf1',
content: { nodes: [] },
version: 8,
baseVersion: 7,
...overrides
}
}
describe('reconcileDraftPatch', () => {
it('applies when the patch is based on the current tab version', () => {
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 7)
expect(result).toEqual({ kind: 'apply', version: 8 })
})
it('flags a conflict when a concurrent edit advanced the tab', () => {
// Agent started from v7, but the user pushed the tab to v8 mid-turn.
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 9 }), 8)
expect(result).toEqual({ kind: 'conflict' })
})
it('ignores a stale patch the tab already supersedes', () => {
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 8)
expect(result).toEqual({ kind: 'stale' })
})
it('ignores an older duplicate patch', () => {
const result = reconcileDraftPatch(
patch({ baseVersion: 5, version: 6 }),
10
)
expect(result).toEqual({ kind: 'stale' })
})
it('reports a gap when the agent advanced past the tab (missed patches)', () => {
// Tab holds v5; this patch is based on v7, so v6/v7 were dropped in transit.
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 5)
expect(result).toEqual({ kind: 'gap' })
})
})

View File

@@ -0,0 +1,40 @@
/**
* Draft reconciliation (prototype — ADR-0004 / ADR-0005 / ADR-0011).
*
* Pure decision logic for an incoming `draft_patch`, given the version the tab
* currently holds. This is the load-bearing correctness piece: it decides
* whether a full-document replace applies cleanly, must surface the merge
* dialog, is a stale/duplicate to ignore, or reveals a gap that needs an
* authoritative refetch.
*
* `version` is a monotonic watermark: a patch is adopted only when it advances
* past what the tab already holds, so a dropped / duplicated / out-of-order
* Redis Pub/Sub `draft_patch` (the transport is at-most-once, BE-1886) can never
* regress local state.
*/
import type { DraftPatchEvent } from './agentProtocol'
export type ReconcileResult =
/** Patch is based on the tab's current version — apply and adopt `version`. */
| { kind: 'apply'; version: number }
/** A concurrent user edit advanced the tab — surface the merge dialog. */
| { kind: 'conflict' }
/** Patch is superseded by what the tab already has — ignore (idempotency). */
| { kind: 'stale' }
/** The agent advanced past the tab — patches were missed; refetch snapshot. */
| { kind: 'gap' }
/** User's choice in the merge dialog (ADR-0005). */
export type ConflictResolution = 'accept-agent' | 'keep-mine' | 'new-tab'
export function reconcileDraftPatch(
patch: DraftPatchEvent,
currentVersion: number
): ReconcileResult {
if (patch.version <= currentVersion) return { kind: 'stale' }
if (patch.baseVersion === currentVersion) {
return { kind: 'apply', version: patch.version }
}
if (patch.baseVersion > currentVersion) return { kind: 'gap' }
return { kind: 'conflict' }
}

View File

@@ -0,0 +1,374 @@
import { describe, expect, it, vi } from 'vitest'
import type { DraftPatchEvent } from './agentProtocol'
import { parseAgentEvent } from './agentProtocol'
import type { AgentDraftPorts } from './useAgentDraftSync'
import { useAgentDraftSync } from './useAgentDraftSync'
const SNAPSHOT = { content: { nodes: ['from-snapshot'] }, version: 12 }
function makePorts(): AgentDraftPorts {
return {
applyToTab: vi.fn(),
openInNewTab: vi.fn(),
discardAgentResult: vi.fn(),
fetchSnapshot: vi.fn().mockResolvedValue(SNAPSHOT)
}
}
function patch(overrides: Partial<DraftPatchEvent> = {}): DraftPatchEvent {
return {
type: 'draft_patch',
threadId: 't1',
messageId: 'm1',
workflowId: 'wf1',
content: { nodes: ['ksampler'] },
version: 8,
baseVersion: 7,
...overrides
}
}
describe('useAgentDraftSync', () => {
it('applies a patch to the active tab and adopts the new version', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
expect(outcome).toBe('applied')
expect(ports.applyToTab).toHaveBeenCalledWith(
'wf1',
{ nodes: ['ksampler'] },
8
)
expect(sync.baseVersions.value.get('wf1')).toBe(8)
expect(sync.pendingConflict.value).toBeNull()
})
it('surfaces a conflict when the user edited the graph mid-turn', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
sync.setVersion('wf1', 8) // local autosave advanced the tab
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
expect(outcome).toBe('conflict')
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.pendingConflict.value).toMatchObject({
workflowId: 'wf1',
version: 9
})
})
it('ignores a stale patch', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 8)
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
expect(outcome).toBe('ignored')
expect(ports.applyToTab).not.toHaveBeenCalled()
})
it('opens a new tab when the workflow has no open tab', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
const outcome = sync.handlePatch(patch({ workflowId: 'wf-new' }))
expect(outcome).toBe('opened-new-tab')
expect(ports.openInNewTab).toHaveBeenCalledWith(
'wf-new',
{ nodes: ['ksampler'] },
8
)
})
describe('resolveConflict', () => {
function setupConflict() {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
sync.setVersion('wf1', 8)
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
return { ports, sync }
}
it('accept-agent applies the agent version and adopts it', () => {
const { ports, sync } = setupConflict()
sync.resolveConflict('accept-agent')
expect(ports.applyToTab).toHaveBeenCalledWith(
'wf1',
{ nodes: ['ksampler'] },
9
)
expect(sync.baseVersions.value.get('wf1')).toBe(9)
expect(sync.pendingConflict.value).toBeNull()
})
it('keep-mine discards the agent result and keeps the tab version', () => {
const { ports, sync } = setupConflict()
sync.resolveConflict('keep-mine')
expect(ports.discardAgentResult).toHaveBeenCalledWith('wf1')
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.baseVersions.value.get('wf1')).toBe(8)
expect(sync.pendingConflict.value).toBeNull()
})
it('new-tab opens the agent result without touching the active tab', () => {
const { ports, sync } = setupConflict()
sync.resolveConflict('new-tab')
expect(ports.openInNewTab).toHaveBeenCalledWith(
'wf1',
{ nodes: ['ksampler'] },
9
)
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.pendingConflict.value).toBeNull()
})
})
describe('forgetWorkflow', () => {
it('clears a pending conflict targeting the closed workflow', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
sync.setVersion('wf1', 8)
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
expect(sync.pendingConflict.value).not.toBeNull()
sync.forgetWorkflow('wf1')
expect(sync.pendingConflict.value).toBeNull()
})
it('leaves a pending conflict for a different workflow intact', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
sync.setVersion('wf1', 8)
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
sync.forgetWorkflow('wf2')
expect(sync.pendingConflict.value).toMatchObject({ workflowId: 'wf1' })
})
})
describe('wire envelope -> reconciler', () => {
function wirePatch(version: number, baseVersion: number) {
const event = parseAgentEvent({
type: 'draft_patch',
data: {
thread_id: 't1',
message_id: 'm1',
workflow_id: 'wf1',
content: { nodes: ['ksampler'] },
version,
base_version: baseVersion
}
})
if (event?.type !== 'draft_patch') {
throw new Error('expected a draft_patch event')
}
return event
}
it('applies when base_version matches the tab version', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
const outcome = sync.handlePatch(wirePatch(8, 7))
expect(outcome).toBe('applied')
expect(ports.applyToTab).toHaveBeenCalledWith(
'wf1',
{ nodes: ['ksampler'] },
8
)
})
it('conflicts when base_version differs from the tab version', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 7)
sync.setVersion('wf1', 8)
const outcome = sync.handlePatch(wirePatch(9, 7))
expect(outcome).toBe('conflict')
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.pendingConflict.value).toMatchObject({
workflowId: 'wf1',
baseVersion: 7,
version: 9
})
})
})
describe('resync (reconnect / cold start)', () => {
it('seeds the tab from the snapshot when none is registered yet', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
const outcome = await sync.resync('wf1')
expect(outcome).toBe('restored')
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
expect(ports.applyToTab).toHaveBeenCalledWith(
'wf1',
SNAPSHOT.content,
SNAPSHOT.version
)
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
})
it('restores a newer snapshot over a stale local version', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
const outcome = await sync.resync('wf1')
expect(outcome).toBe('restored')
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
})
it('leaves the tab untouched when the snapshot is not newer (watermark)', async () => {
const ports = makePorts()
vi.mocked(ports.fetchSnapshot).mockResolvedValue({
content: { nodes: ['old'] },
version: 8
})
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 8)
const outcome = await sync.resync('wf1')
expect(outcome).toBe('up-to-date')
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.baseVersions.value.get('wf1')).toBe(8)
})
it('shares one in-flight request across concurrent resyncs', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
const [a, b] = await Promise.all([sync.resync('wf1'), sync.resync('wf1')])
expect(a).toBe('restored')
expect(b).toBe('restored')
expect(ports.fetchSnapshot).toHaveBeenCalledTimes(1)
})
it('clears an open merge dialog when it restores a newer snapshot', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
sync.setVersion('wf1', 6)
sync.handlePatch(patch({ baseVersion: 4, version: 7 }))
expect(sync.pendingConflict.value).not.toBeNull()
const outcome = await sync.resync('wf1')
expect(outcome).toBe('restored')
expect(sync.pendingConflict.value).toBeNull()
})
it('does not resurrect tracking for a tab closed mid-fetch', async () => {
const ports = makePorts()
let resolveFetch!: (snapshot: typeof SNAPSHOT) => void
vi.mocked(ports.fetchSnapshot).mockReturnValue(
new Promise((resolve) => {
resolveFetch = resolve
})
)
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
const pending = sync.resync('wf1')
sync.forgetWorkflow('wf1')
resolveFetch(SNAPSHOT)
expect(await pending).toBe('up-to-date')
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.baseVersions.value.has('wf1')).toBe(false)
})
})
describe('gap detection', () => {
it('refetches the snapshot when the agent advanced past the tab', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
expect(outcome).toBe('gap')
expect(ports.applyToTab).not.toHaveBeenCalled()
await sync.pendingResync('wf1')
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
expect(ports.applyToTab).toHaveBeenCalledWith(
'wf1',
SNAPSHOT.content,
SNAPSHOT.version
)
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
expect(sync.pendingConflict.value).toBeNull()
})
})
describe('handleVersionTip (trailing lost patch)', () => {
it('resyncs when the tip outruns the local watermark', async () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
const outcome = sync.handleVersionTip('wf1', 9)
expect(outcome).toBe('resyncing')
await sync.pendingResync('wf1')
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
})
it('does nothing when the tip is not newer', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 9)
expect(sync.handleVersionTip('wf1', 9)).toBe('up-to-date')
expect(ports.fetchSnapshot).not.toHaveBeenCalled()
})
it('ignores a tip for a workflow with no open tab', () => {
const ports = makePorts()
const sync = useAgentDraftSync(ports)
expect(sync.handleVersionTip('wf-unknown', 9)).toBe('ignored')
expect(ports.fetchSnapshot).not.toHaveBeenCalled()
})
it('preserves local state when a self-heal fetch fails', async () => {
const ports = makePorts()
const boom = new Error('network down')
vi.mocked(ports.fetchSnapshot).mockRejectedValue(boom)
const sync = useAgentDraftSync(ports)
sync.registerWorkflow('wf1', 5)
expect(sync.handleVersionTip('wf1', 9)).toBe('resyncing')
await expect(sync.pendingResync('wf1')).rejects.toBe(boom)
expect(ports.applyToTab).not.toHaveBeenCalled()
expect(sync.baseVersions.value.get('wf1')).toBe(5)
})
})
})

View File

@@ -0,0 +1,230 @@
/**
* Agent draft sync (prototype — ADR-0011).
*
* Orchestrates the frontend side of agent graph writes: tracks the base
* `version` per open workflow (the version lifecycle), reconciles incoming
* `draft_patch` events, drives the three merge-dialog outcomes, and self-heals
* across dropped Redis Pub/Sub patches and WS reconnects by refetching the
* authoritative snapshot (BE-1886). The transport stays best-effort; this client
* provides reliability — `version` is the monotonic watermark.
*
* The canvas-facing effects are injected as `ports` so this is decoupled from
* litegraph / the workflow store and is unit-testable. In the real app the
* ports map to: `applyToTab` -> a destructive variant of `app.loadGraphData`;
* `openInNewTab` -> the existing non-destructive load; `discardAgentResult` ->
* a no-op (keep the user's canvas as-is); `fetchSnapshot` -> a `GET
* /api/agent/draft?workflow_id=...` decoded with `parseDraftSnapshot`.
*/
import { readonly, ref } from 'vue'
import type {
DraftPatchEvent,
DraftSnapshot,
WorkflowGraph,
WorkflowId
} from './agentProtocol'
import type { ConflictResolution } from './draftReconciler'
import { reconcileDraftPatch } from './draftReconciler'
export interface AgentDraftPorts {
applyToTab(
workflowId: WorkflowId,
content: WorkflowGraph,
version: number
): void
openInNewTab(
workflowId: WorkflowId,
content: WorkflowGraph,
version: number
): void
discardAgentResult(workflowId: WorkflowId): void
fetchSnapshot(workflowId: WorkflowId): Promise<DraftSnapshot>
}
export interface PendingConflict {
workflowId: WorkflowId
content: WorkflowGraph
version: number
baseVersion: number
}
export type PatchOutcome =
| 'applied'
| 'conflict'
| 'ignored'
| 'opened-new-tab'
| 'gap'
/** `restored` = a newer snapshot replaced the tab; `up-to-date` = already current. */
export type ResyncOutcome = 'restored' | 'up-to-date'
export type VersionTipOutcome = 'resyncing' | 'up-to-date' | 'ignored'
export function useAgentDraftSync(ports: AgentDraftPorts) {
const baseVersions = ref(new Map<WorkflowId, number>())
const pendingConflict = ref<PendingConflict | null>(null)
const inFlightResyncs = new Map<WorkflowId, Promise<ResyncOutcome>>()
/** Call when a draft tab opens, adopting its known version. */
function registerWorkflow(workflowId: WorkflowId, version: number): void {
baseVersions.value.set(workflowId, version)
}
function clearConflictFor(workflowId: WorkflowId): void {
if (pendingConflict.value?.workflowId === workflowId) {
pendingConflict.value = null
}
}
function forgetWorkflow(workflowId: WorkflowId): void {
baseVersions.value.delete(workflowId)
clearConflictFor(workflowId)
}
/** Call after a local autosave returns a new server version. */
function setVersion(workflowId: WorkflowId, version: number): void {
baseVersions.value.set(workflowId, version)
}
function handlePatch(patch: DraftPatchEvent): PatchOutcome {
const current = baseVersions.value.get(patch.workflowId)
// Unknown workflow = no open tab for it (a new-tab write, or a tab the user
// closed mid-edit — see ADR-0011 open question). Route to a new tab.
if (current === undefined) {
ports.openInNewTab(patch.workflowId, patch.content, patch.version)
baseVersions.value.set(patch.workflowId, patch.version)
return 'opened-new-tab'
}
const result = reconcileDraftPatch(patch, current)
switch (result.kind) {
case 'apply':
ports.applyToTab(patch.workflowId, patch.content, result.version)
baseVersions.value.set(patch.workflowId, result.version)
return 'applied'
case 'conflict':
pendingConflict.value = {
workflowId: patch.workflowId,
content: patch.content,
version: patch.version,
baseVersion: patch.baseVersion
}
return 'conflict'
case 'gap':
scheduleResync(patch.workflowId)
return 'gap'
case 'stale':
return 'ignored'
}
}
async function runResync(workflowId: WorkflowId): Promise<ResyncOutcome> {
const wasTracked = baseVersions.value.has(workflowId)
const snapshot = await ports.fetchSnapshot(workflowId)
const current = baseVersions.value.get(workflowId)
// The tab was closed mid-fetch (`forgetWorkflow`). Don't resurrect tracking
// or apply to a tab that no longer exists.
if (wasTracked && current === undefined) return 'up-to-date'
if (current !== undefined && snapshot.version <= current) {
return 'up-to-date'
}
ports.applyToTab(workflowId, snapshot.content, snapshot.version)
baseVersions.value.set(workflowId, snapshot.version)
// The authoritative snapshot supersedes any open merge dialog for this tab;
// resolving it later would re-apply now-stale content and roll the version
// back.
clearConflictFor(workflowId)
return 'restored'
}
/**
* Fetch the authoritative snapshot and reconcile it against the watermark.
* Call on WS (re)connect to restore the draft without waiting for a patch.
* Concurrent calls for the same workflow share one in-flight request.
*/
function resync(workflowId: WorkflowId): Promise<ResyncOutcome> {
const existing = inFlightResyncs.get(workflowId)
if (existing) return existing
const run = runResync(workflowId).finally(() => {
inFlightResyncs.delete(workflowId)
})
inFlightResyncs.set(workflowId, run)
return run
}
/** In-flight resync for a workflow, if any (lets callers await self-heal). */
function pendingResync(
workflowId: WorkflowId
): Promise<ResyncOutcome> | undefined {
return inFlightResyncs.get(workflowId)
}
/**
* Fire-and-forget self-heal. A failed refetch is best-effort: it leaves local
* state intact and is retried by the next patch / reconnect / version tip.
*/
function scheduleResync(workflowId: WorkflowId): void {
void resync(workflowId).catch((error: unknown) => {
console.error('[agent] draft resync failed', workflowId, error)
})
}
/**
* Consume an optional `draft_version` tip (a content-less version heartbeat).
* Catches a trailing lost patch: if the server tip outruns our watermark for an
* open workflow, refetch the snapshot.
*/
function handleVersionTip(
workflowId: WorkflowId,
version: number
): VersionTipOutcome {
const current = baseVersions.value.get(workflowId)
if (current === undefined) return 'ignored'
if (version <= current) return 'up-to-date'
scheduleResync(workflowId)
return 'resyncing'
}
function resolveConflict(decision: ConflictResolution): void {
const conflict = pendingConflict.value
if (!conflict) return
switch (decision) {
case 'accept-agent':
ports.applyToTab(
conflict.workflowId,
conflict.content,
conflict.version
)
baseVersions.value.set(conflict.workflowId, conflict.version)
break
case 'keep-mine':
ports.discardAgentResult(conflict.workflowId)
break
case 'new-tab':
ports.openInNewTab(
conflict.workflowId,
conflict.content,
conflict.version
)
break
}
pendingConflict.value = null
}
return {
baseVersions: readonly(baseVersions),
pendingConflict: readonly(pendingConflict),
registerWorkflow,
forgetWorkflow,
setVersion,
handlePatch,
resolveConflict,
resync,
pendingResync,
handleVersionTip
}
}

View File

@@ -0,0 +1,48 @@
import { describe, expect, it } from 'vitest'
import { AgentRoom } from './agentRoom'
describe('AgentRoom', () => {
it('reconciles two peers via state-vector diff without conflict', () => {
const user = new AgentRoom('wf-1')
const agent = new AgentRoom('wf-1')
user.nodes.set('a', { title: 'Load Checkpoint' })
agent.nodes.set('b', { title: 'KSampler' })
agent.applyRemoteUpdate(user.diffSince(agent.encodeStateVector()))
user.applyRemoteUpdate(agent.diffSince(user.encodeStateVector()))
expect([...user.nodes.keys()].sort()).toEqual(['a', 'b'])
expect([...agent.nodes.keys()].sort()).toEqual(['a', 'b'])
})
it('merges concurrent edits to the same map deterministically', () => {
const user = new AgentRoom('wf-1')
const agent = new AgentRoom('wf-1')
agent.applyRemoteUpdate(user.encodeState())
user.nodes.set('shared', { x: 1 })
agent.nodes.set('shared', { x: 2 })
user.applyRemoteUpdate(agent.diffSince(user.encodeStateVector()))
agent.applyRemoteUpdate(user.diffSince(agent.encodeStateVector()))
expect(user.nodes.get('shared')).toEqual(agent.nodes.get('shared'))
})
it('reports an agent participant as editing via presence', () => {
const room = new AgentRoom('wf-1')
expect(room.isAgentEditing()).toBe(false)
room.setPresence({
actor: 'agent-1',
kind: 'agent',
status: 'editing',
focus: ['a'],
updatedAt: 0
})
expect(room.isAgentEditing()).toBe(true)
})
})

View File

@@ -0,0 +1,98 @@
/**
* Agent CRDT room (prototype — ADR-0011, building on ADR-0003).
*
* One collaborative workflow = one room: a thin wrapper over a Yjs `Y.Doc` that
* the browser and the server-side agent share. The agent is the first
* multiplayer peer. Top-level types (`nodes`/`links`/`reroutes`) mirror the live
* layout store so a room can drive the real canvas via binary updates
* (see `layoutStoreBinding.ts`). Conflict resolution is Yjs's job — no version
* CAS at this layer.
*/
import * as Y from 'yjs'
export type WorkflowId = string
export type ActorId = string
export type UpdateOrigin = unknown
export interface RoomPresence {
actor: ActorId
kind: 'user' | 'agent'
status: 'idle' | 'editing'
focus: string[]
updatedAt: number
}
const PRESENCE_KEY = '__presence'
export class AgentRoom {
readonly workflowId: WorkflowId
readonly doc: Y.Doc
readonly nodes: Y.Map<unknown>
readonly links: Y.Map<unknown>
readonly reroutes: Y.Map<unknown>
private readonly presence: Y.Map<RoomPresence>
constructor(workflowId: WorkflowId, doc: Y.Doc = new Y.Doc()) {
this.workflowId = workflowId
this.doc = doc
this.nodes = doc.getMap('nodes')
this.links = doc.getMap('links')
this.reroutes = doc.getMap('reroutes')
this.presence = doc.getMap(PRESENCE_KEY)
}
/** State vector describing what this peer already has (sync step 1). */
encodeStateVector(): Uint8Array {
return Y.encodeStateVector(this.doc)
}
/** Minimal update a peer needs given its state vector (sync step 2). */
diffSince(remoteStateVector: Uint8Array): Uint8Array {
return Y.encodeStateAsUpdate(this.doc, remoteStateVector)
}
/** Full state as a single update, used to seed a fresh peer. */
encodeState(): Uint8Array {
return Y.encodeStateAsUpdate(this.doc)
}
applyRemoteUpdate(update: Uint8Array, origin?: UpdateOrigin): void {
Y.applyUpdate(this.doc, update, origin)
}
onUpdate(cb: (update: Uint8Array, origin: UpdateOrigin) => void): () => void {
const handler = (update: Uint8Array, origin: UpdateOrigin) =>
cb(update, origin)
this.doc.on('update', handler)
return () => this.doc.off('update', handler)
}
setPresence(p: RoomPresence): void {
this.presence.set(p.actor, p)
}
clearPresence(actor: ActorId): void {
this.presence.delete(actor)
}
getPresence(): RoomPresence[] {
return [...this.presence.values()]
}
isAgentEditing(): boolean {
return this.getPresence().some(
(p) => p.kind === 'agent' && p.status === 'editing'
)
}
onPresenceChange(cb: (presence: RoomPresence[]) => void): () => void {
const handler = () => cb(this.getPresence())
this.presence.observe(handler)
return () => this.presence.unobserve(handler)
}
destroy(): void {
this.doc.destroy()
}
}

View File

@@ -0,0 +1,31 @@
import { describe, expect, it } from 'vitest'
import { AgentRoomManager } from './agentRoomManager'
describe('AgentRoomManager', () => {
it('keeps one room alive across overlapping tab references', () => {
const manager = new AgentRoomManager()
const first = manager.join('wf-1')
const second = manager.join('wf-1')
expect(second).toBe(first)
manager.leave('wf-1')
expect(manager.has('wf-1')).toBe(true)
manager.leave('wf-1')
expect(manager.has('wf-1')).toBe(false)
})
it('keeps a pinned room alive with zero open tabs and reaps on unpin', () => {
const manager = new AgentRoomManager()
manager.join('wf-1')
manager.pin('wf-1')
manager.leave('wf-1')
expect(manager.has('wf-1')).toBe(true)
manager.unpin('wf-1')
expect(manager.has('wf-1')).toBe(false)
})
})

View File

@@ -0,0 +1,71 @@
/**
* Agent room manager (prototype — ADR-0011).
*
* Owns room lifecycle. A tab switch is a `join`/`leave`; a room stays alive in
* memory while any tab references it, so the agent can keep applying edits to a
* backgrounded workflow and the user sees them lazily on return (the scenario
* from the design meeting). `pin` keeps a room alive with zero open tabs while
* the agent is mid-edit; the room is torn down only when unreferenced and
* unpinned.
*/
import { AgentRoom } from './agentRoom'
import type { WorkflowId } from './agentRoom'
interface RoomEntry {
room: AgentRoom
refs: number
pinned: boolean
}
export class AgentRoomManager {
private readonly entries = new Map<WorkflowId, RoomEntry>()
join(workflowId: WorkflowId): AgentRoom {
const existing = this.entries.get(workflowId)
if (existing) {
existing.refs += 1
return existing.room
}
const room = new AgentRoom(workflowId)
this.entries.set(workflowId, { room, refs: 1, pinned: false })
return room
}
leave(workflowId: WorkflowId): void {
const entry = this.entries.get(workflowId)
if (!entry) return
entry.refs = Math.max(0, entry.refs - 1)
this.reapIfIdle(workflowId, entry)
}
pin(workflowId: WorkflowId): void {
const entry = this.entries.get(workflowId)
if (entry) entry.pinned = true
}
unpin(workflowId: WorkflowId): void {
const entry = this.entries.get(workflowId)
if (!entry) return
entry.pinned = false
this.reapIfIdle(workflowId, entry)
}
get(workflowId: WorkflowId): AgentRoom | undefined {
return this.entries.get(workflowId)?.room
}
has(workflowId: WorkflowId): boolean {
return this.entries.has(workflowId)
}
get size(): number {
return this.entries.size
}
private reapIfIdle(workflowId: WorkflowId, entry: RoomEntry): void {
if (entry.refs === 0 && !entry.pinned) {
entry.room.destroy()
this.entries.delete(workflowId)
}
}
}

View File

@@ -0,0 +1,33 @@
import { describe, expect, it } from 'vitest'
import * as Y from 'yjs'
import { AgentRoom } from './agentRoom'
import { bindRoomToDoc } from './roomDocBinding'
describe('bindRoomToDoc', () => {
it('seeds the room from the doc and pushes agent edits back', () => {
const doc = new Y.Doc()
doc.getMap('nodes').set('existing', { title: 'Existing' })
const room = new AgentRoom('wf-1')
const unbind = bindRoomToDoc(room, doc)
expect(room.nodes.get('existing')).toEqual({ title: 'Existing' })
room.nodes.set('agent', { title: 'Agent Node' })
expect(doc.getMap('nodes').get('agent')).toEqual({ title: 'Agent Node' })
unbind()
room.nodes.set('after-unbind', {})
expect(doc.getMap('nodes').has('after-unbind')).toBe(false)
})
it('reflects doc edits into the room while bound', () => {
const doc = new Y.Doc()
const room = new AgentRoom('wf-1')
bindRoomToDoc(room, doc)
doc.getMap('nodes').set('from-canvas', { title: 'Canvas' })
expect(room.nodes.get('from-canvas')).toEqual({ title: 'Canvas' })
})
})

View File

@@ -0,0 +1,35 @@
/**
* Binds an agent room to a Yjs document (prototype — ADR-0011).
*
* Generic, layer-safe core of the canvas binding: it works on any `Y.Doc`, so
* it stays in the platform layer with no renderer dependency. The renderer-layer
* wrapper (`renderer/core/layout/agent/bindRoomToLayoutStore.ts`) supplies the
* live layout store's doc. Bidirectional Yjs sync with origin tags prevents echo
* loops; Yjs reconciles concurrent edits.
*/
import * as Y from 'yjs'
import type { AgentRoom } from './agentRoom'
const AGENT_ORIGIN = Symbol('agent-room->doc')
const DOC_ORIGIN = Symbol('doc->agent-room')
export function bindRoomToDoc(room: AgentRoom, doc: Y.Doc): () => void {
Y.applyUpdate(room.doc, Y.encodeStateAsUpdate(doc), DOC_ORIGIN)
Y.applyUpdate(doc, Y.encodeStateAsUpdate(room.doc), AGENT_ORIGIN)
const onRoom = (update: Uint8Array, origin: unknown) => {
if (origin !== DOC_ORIGIN) Y.applyUpdate(doc, update, AGENT_ORIGIN)
}
const onDoc = (update: Uint8Array, origin: unknown) => {
if (origin !== AGENT_ORIGIN) Y.applyUpdate(room.doc, update, DOC_ORIGIN)
}
room.doc.on('update', onRoom)
doc.on('update', onDoc)
return () => {
room.doc.off('update', onRoom)
doc.off('update', onDoc)
}
}

View File

@@ -0,0 +1,59 @@
import { describe, expect, it } from 'vitest'
import { AgentRoom } from './agentRoom'
import type { RoomSyncMessage, RoomTransport } from './roomSync'
import { syncRoom } from './roomSync'
class LoopbackTransport implements RoomTransport {
private listeners = new Set<(m: RoomSyncMessage) => void>()
peer: LoopbackTransport | null = null
send(message: RoomSyncMessage): void {
this.peer?.listeners.forEach((cb) => cb(message))
}
onMessage(cb: (m: RoomSyncMessage) => void): () => void {
this.listeners.add(cb)
return () => this.listeners.delete(cb)
}
}
function connect() {
const a = new LoopbackTransport()
const b = new LoopbackTransport()
a.peer = b
b.peer = a
return [a, b] as const
}
describe('syncRoom', () => {
it('converges existing state on connect via the sync handshake', () => {
const [t1, t2] = connect()
const user = new AgentRoom('wf-1')
const agent = new AgentRoom('wf-1')
user.nodes.set('a', { title: 'Load Checkpoint' })
syncRoom(user, t1)
syncRoom(agent, t2)
expect(agent.nodes.get('a')).toEqual({ title: 'Load Checkpoint' })
})
it('propagates live edits and ignores other workflows', () => {
const [t1, t2] = connect()
const user = new AgentRoom('wf-1')
const agent = new AgentRoom('wf-1')
syncRoom(user, t1)
syncRoom(agent, t2)
agent.nodes.set('b', { title: 'KSampler' })
expect(user.nodes.get('b')).toEqual({ title: 'KSampler' })
t2.send({
type: 'update',
workflowId: 'other',
update: agent.encodeState()
})
expect(user.nodes.has('b')).toBe(true)
})
})

View File

@@ -0,0 +1,63 @@
/**
* Room sync protocol (prototype — ADR-0011).
*
* Transport-agnostic two-phase Yjs sync, modelled on `y-protocols/sync`:
* step 1: a peer announces its state vector
* step 2: the other peer replies with the diff that vector is missing
* update: incremental updates are broadcast as they happen
*
* The transport is injected so this works over the existing Redis→WebSocket
* bridge in V0 and a dedicated relay later. Updates applied from the transport
* carry the `REMOTE_ORIGIN` tag so they are not echoed back.
*/
import type { AgentRoom } from './agentRoom'
export type RoomSyncMessage =
| { type: 'sync-step-1'; workflowId: string; stateVector: Uint8Array }
| { type: 'sync-step-2'; workflowId: string; update: Uint8Array }
| { type: 'update'; workflowId: string; update: Uint8Array }
export interface RoomTransport {
send(message: RoomSyncMessage): void
onMessage(cb: (message: RoomSyncMessage) => void): () => void
}
const REMOTE_ORIGIN = Symbol('agent-room-remote')
export function syncRoom(
room: AgentRoom,
transport: RoomTransport
): () => void {
const offUpdate = room.onUpdate((update, origin) => {
if (origin === REMOTE_ORIGIN) return
transport.send({ type: 'update', workflowId: room.workflowId, update })
})
const offMessage = transport.onMessage((message) => {
if (message.workflowId !== room.workflowId) return
switch (message.type) {
case 'sync-step-1':
transport.send({
type: 'sync-step-2',
workflowId: room.workflowId,
update: room.diffSince(message.stateVector)
})
return
case 'sync-step-2':
case 'update':
room.applyRemoteUpdate(message.update, REMOTE_ORIGIN)
return
}
})
transport.send({
type: 'sync-step-1',
workflowId: room.workflowId,
stateVector: room.encodeStateVector()
})
return () => {
offUpdate()
offMessage()
}
}

View File

@@ -0,0 +1,102 @@
import { describe, expect, it } from 'vitest'
import type { AgentEvent } from '../common/agentProtocol'
import { createSessionState, sessionReducer } from './agentSessionStore'
import type { SessionState } from './agentSessionStore'
function reduce(state: SessionState, events: AgentEvent[]): SessionState {
return events.reduce(
(acc, event) => sessionReducer(acc, { type: 'agent-event', event }),
state
)
}
describe('sessionReducer', () => {
it('accumulates streaming deltas into one agent message', () => {
const start = sessionReducer(createSessionState('t-1'), {
type: 'user-send',
id: 'u-1',
content: 'hello'
})
const next = reduce(start, [
{
type: 'agent_message_delta',
threadId: 't-1',
messageId: 'm-1',
delta: 'Hi '
},
{
type: 'agent_message_delta',
threadId: 't-1',
messageId: 'm-1',
delta: 'there'
}
])
expect(next.messages).toHaveLength(2)
expect(next.messages[1]).toMatchObject({
role: 'agent',
content: 'Hi there',
streaming: true
})
expect(next.status).toBe('streaming')
})
it('does not mutate the previous state (structural sharing)', () => {
const before = createSessionState('t-1')
const after = sessionReducer(before, {
type: 'user-send',
id: 'u-1',
content: 'hello'
})
expect(before.messages).toHaveLength(0)
expect(after.messages).toHaveLength(1)
expect(after).not.toBe(before)
})
it('tracks tool-call lifecycle and surfaces errors', () => {
const next = reduce(createSessionState('t-1'), [
{
type: 'agent_tool_call',
threadId: 't-1',
messageId: 'm-1',
toolCallId: 'tc-1',
toolName: 'load_graph',
status: 'running'
},
{
type: 'agent_tool_call',
threadId: 't-1',
messageId: 'm-1',
toolCallId: 'tc-1',
toolName: 'load_graph',
status: 'error',
errorCode: 'BAD_GRAPH'
}
])
expect(next.messages[0].toolCalls).toHaveLength(1)
expect(next.messages[0].toolCalls[0]).toMatchObject({
status: 'error',
errorCode: 'BAD_GRAPH'
})
expect(next.status).toBe('error')
})
it('ends streaming on done', () => {
const next = reduce(createSessionState('t-1'), [
{
type: 'agent_message_delta',
threadId: 't-1',
messageId: 'm-1',
delta: 'x'
},
{ type: 'agent_message_done', threadId: 't-1', messageId: 'm-1' }
])
expect(next.messages[0].streaming).toBe(false)
expect(next.status).toBe('idle')
})
})

View File

@@ -0,0 +1,110 @@
/**
* Agent chat session state (prototype — ADR-0011).
*
* Immer-backed reducer for the *local, single-client* chat surface: streaming
* message deltas, tool-call lifecycle, run status. This is deliberately not a
* CRDT — chat is owned by one browser tab, so structural-sharing immutable
* updates (Immer) are the right tool. Graph state lives in the Yjs room layer
* (`../crdt`); the two never mix.
*/
import { produce } from 'immer'
import type { AgentEvent, MessageId, ThreadId } from '../common/agentProtocol'
interface ToolCallView {
id: string
name: string
status: 'running' | 'success' | 'error'
durationMs?: number
errorCode?: string
}
interface ChatMessage {
id: MessageId
role: 'user' | 'agent'
content: string
streaming: boolean
toolCalls: ToolCallView[]
}
export interface SessionState {
threadId: ThreadId
messages: ChatMessage[]
status: 'idle' | 'streaming' | 'error'
}
export type SessionAction =
| { type: 'user-send'; id: MessageId; content: string }
| { type: 'agent-event'; event: AgentEvent }
export function createSessionState(threadId: ThreadId): SessionState {
return { threadId, messages: [], status: 'idle' }
}
function ensureAgentMessage(state: SessionState, id: MessageId): ChatMessage {
const existing = state.messages.find((m) => m.id === id)
if (existing) return existing
const created: ChatMessage = {
id,
role: 'agent',
content: '',
streaming: true,
toolCalls: []
}
state.messages.push(created)
return created
}
function applyAgentEvent(state: SessionState, event: AgentEvent): void {
switch (event.type) {
case 'agent_message_delta': {
const message = ensureAgentMessage(state, event.messageId)
message.content += event.delta
message.streaming = true
state.status = 'streaming'
return
}
case 'agent_tool_call': {
const message = ensureAgentMessage(state, event.messageId)
const existing = message.toolCalls.find((t) => t.id === event.toolCallId)
const view: ToolCallView = {
id: event.toolCallId,
name: event.toolName,
status: event.status,
durationMs: event.durationMs,
errorCode: event.errorCode
}
if (existing) Object.assign(existing, view)
else message.toolCalls.push(view)
if (event.status === 'error') state.status = 'error'
return
}
case 'agent_message_done': {
const message = ensureAgentMessage(state, event.messageId)
message.streaming = false
if (state.status !== 'error') state.status = 'idle'
return
}
case 'draft_patch':
return
}
}
export function sessionReducer(
state: SessionState,
action: SessionAction
): SessionState {
return produce(state, (draft) => {
if (action.type === 'user-send') {
draft.messages.push({
id: action.id,
role: 'user',
content: action.content,
streaming: false,
toolCalls: []
})
return
}
applyAgentEvent(draft, action.event)
})
}

View File

@@ -0,0 +1,26 @@
import { describe, expect, it } from 'vitest'
import { AgentRoom } from '@/platform/agent/crdt/agentRoom'
import { layoutStore } from '@/renderer/core/layout/store/layoutStore'
import { bindRoomToLayoutStore } from './bindRoomToLayoutStore'
describe('bindRoomToLayoutStore', () => {
it('propagates an agent room edit into the real layout store', () => {
const room = new AgentRoom('wf-1')
const storeNodes = layoutStore.getYDoc().getMap('nodes')
const probeId = `agent-probe-${Math.random().toString(36).slice(2)}`
const versionBefore = layoutStore.getVersion().value
const unbind = bindRoomToLayoutStore(room)
room.nodes.set(probeId, { title: 'Agent Node' })
expect(storeNodes.get(probeId)).toEqual({ title: 'Agent Node' })
expect(layoutStore.getVersion().value).toBeGreaterThan(versionBefore)
storeNodes.delete(probeId)
unbind()
room.destroy()
})
})

View File

@@ -0,0 +1,17 @@
/**
* Wires an agent room to the live layout store (prototype — ADR-0011).
*
* The renderer-layer entry point: it owns the dependency on the layout store
* singleton (platform code may not import renderer) and delegates the actual
* sync to the generic, layer-safe `bindRoomToDoc`. This is the "few lines"
* the design meeting referenced — the store already exposes the Yjs peer
* surface (`getYDoc`, marked "future feature") and the agent is that peer.
*/
import type { AgentRoom } from '@/platform/agent/crdt/agentRoom'
import { bindRoomToDoc } from '@/platform/agent/crdt/roomDocBinding'
import { layoutStore } from '@/renderer/core/layout/store/layoutStore'
export function bindRoomToLayoutStore(room: AgentRoom): () => void {
return bindRoomToDoc(room, layoutStore.getYDoc())
}