mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-07-03 05:38:26 +00:00
Compare commits
10 Commits
codex/cove
...
matt/be-18
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0f08b18f0 | ||
|
|
e00363d8d3 | ||
|
|
5b9d4765a9 | ||
|
|
33da58e1c0 | ||
|
|
3e91a60cf9 | ||
|
|
06cbb44a57 | ||
|
|
5237112114 | ||
|
|
289ea0562c | ||
|
|
ce4a53c498 | ||
|
|
9ed93698a0 |
209
docs/adr/0011-in-app-agent-graph-state-integration.md
Normal file
209
docs/adr/0011-in-app-agent-graph-state-integration.md
Normal file
@@ -0,0 +1,209 @@
|
||||
# 11. In-App Agent Graph-State Integration
|
||||
|
||||
Date: 2026-06-25
|
||||
|
||||
## Status
|
||||
|
||||
Proposed
|
||||
|
||||
## Context
|
||||
|
||||
The In-App Agent (V0, target ~2026-07-13) adds a server-side ComfyUI agent the user chats with
|
||||
from a side panel. A hard product requirement is that the agent can **read the user's live
|
||||
workflow** and **write workflow changes back onto the canvas**. This ADR records the frontend
|
||||
graph-state and synchronization design for that capability. It is intentionally written as
|
||||
"TDD-as-code": a design record, ahead of the implementation PRs.
|
||||
|
||||
This decision builds directly on:
|
||||
|
||||
- [ADR-0001 (Merge LiteGraph)](0001-merge-litegraph-into-frontend.md) and
|
||||
[ADR-0003 (Centralized Layout Management with CRDT)](0003-crdt-based-layout-system.md), which
|
||||
established a Yjs CRDT store as the single source of truth for spatial state and a command/
|
||||
observer model for mutations.
|
||||
- The long-term architectural direction RFC ([issue #4661]), which makes a CRDT-mediated state
|
||||
layer the foundation for multiplayer — and frames the agent as **just another client** of a
|
||||
per-graph room.
|
||||
|
||||
### Forces at play
|
||||
|
||||
1. **A second writer.** Until now the only writer to a workflow is the local user. The agent
|
||||
introduces a second, remote writer to the _same_ graph. We need conflict handling, not
|
||||
last-write-wins.
|
||||
2. **The backend is server-authoritative.** The cloud backend (`Comfy-Org/cloud` PR #4432)
|
||||
introduces a mutable server-side **`workflow_draft`** (full save-format JSON + integer
|
||||
`version`), commits edits with **compare-and-swap on `version`**, and pushes results to the
|
||||
browser as a **full-document replace** over an existing **Redis-PubSub → WebSocket** bridge
|
||||
(`channel:ws:{workspaceId}:u:{userId}`). Inbound chat turns go through ingest `/api/agent/*`.
|
||||
3. **The frontend is moving toward decentralized CRDT.** Per ADR-0003 / #4661 the end-state is
|
||||
per-graph rooms with mutation relay and CRDT merge — _not_ server-authoritative full-document
|
||||
replace. The V0 backend model and the FE end-state are different shapes.
|
||||
4. **Timeline.** V0 ships in ~3 weeks. The CRDT migration of the _data-model_ class of state
|
||||
(node existence, widget values) is still in progress; only the _layout_ class is fully on the
|
||||
Yjs store today. A true per-mutation CRDT sync for the agent is not ready for V0.
|
||||
5. **No throwaway work.** Whatever we ship for V0 must be a strict subset of the #4661 end-state.
|
||||
|
||||
### What V0 is NOT
|
||||
|
||||
- The agent does **not** draw, animate, or incrementally lay out nodes on the canvas.
|
||||
- The agent does **not** submit/run the workflow — the user clicks the existing Run button.
|
||||
- The agent is **not** aware of viewport state (zoom/pan/cursor) — that is FE-only ephemeral
|
||||
state and is never synced.
|
||||
|
||||
## Decision
|
||||
|
||||
For V0 the frontend treats the **server `workflow_draft` as the authority**, integrates agent
|
||||
writes as **full-document replaces guarded by `version`**, and frames the whole interaction as a
|
||||
**room-per-graph** model so it is forward-compatible with the CRDT end-state.
|
||||
|
||||
### Graph-state model
|
||||
|
||||
| State class | Source of truth (V0) | Synced to agent? |
|
||||
| ---------------------------------------------------------- | ------------------------------------------------------ | ---------------- |
|
||||
| Save-format **data model** (nodes, links, widgets, groups) | server `workflow_draft.content` | read + write |
|
||||
| **Layout** (positions/sizes/reroutes) | within `content`; mirrors Yjs `layoutStore` (ADR-0003) | within content |
|
||||
| **Selection** (selected node ids) | browser, sent per turn | per-turn input |
|
||||
| **Viewport** (zoom/pan/cursor) | browser only | never |
|
||||
|
||||
- Each `workflow_draft` is a **room**. V0 has up to two writers: the human (via autosave-to-draft)
|
||||
and the agent. The browser keeps a draft's tab **alive in memory** while connected so agent
|
||||
pushes apply even when the tab is unfocused (lazy apply on refocus) — the CRDT room behavior,
|
||||
minus true merge.
|
||||
- The browser **autosaves canvas edits into the draft** so the server copy reflects unsaved work
|
||||
before a turn. Agent awareness then reduces to "read the draft + read the selection ids".
|
||||
|
||||
### Synchronization & conflict handling
|
||||
|
||||
The agent→browser push is a `draft_patch { workflow_id, content, version, base_version }`.
|
||||
|
||||
```
|
||||
agent commits draft (CAS version N → N+1)
|
||||
→ draft_patch { content, version: N+1, base_version: N } over Redis → WS
|
||||
browser:
|
||||
tab.version == base_version → apply full replace, adopt new version (happy path)
|
||||
tab.version != base_version → MERGE DIALOG: [Accept agent's] [Keep mine] [Open in new tab]
|
||||
```
|
||||
|
||||
- **Apply** = load the full save-format graph into the target tab (a destructive variant of the
|
||||
existing `loadGraphData` path) and adopt `version` as the tab's new base.
|
||||
- **Conflict** (user edited the graph during the agent's turn) surfaces a dialog rather than
|
||||
silently clobbering. We explicitly reject **graph-locking** as the primary mechanism: a
|
||||
lost/duplicated backend message could leave the graph _permanently_ locked. A presentational
|
||||
"agent editing…" hint MAY be driven by the optional backend edit-turn lease, but it is never on
|
||||
the correctness path.
|
||||
- The agent can also target a **new tab** (`target: "new_tab"`) for unrelated requests — a
|
||||
non-destructive load, no conflict possible.
|
||||
|
||||
### Awareness & run
|
||||
|
||||
- A chat turn carries `{ content, selection?: NodeId[], attachments?, target }`. `selection` is
|
||||
the set of node ids from the canvas (the panel's `@`-tag chips). The agent reads the draft data
|
||||
model server-side; the browser does not scrape the live graph.
|
||||
- The agent never runs the workflow in V0; after a write it tells the user the graph is loaded and
|
||||
to click Run. Submit is gated off for the in-app client.
|
||||
|
||||
### Migration path to the CRDT end-state (#4661)
|
||||
|
||||
This is the load-bearing reason the V0 shape is acceptable. The transition is a payload swap, not
|
||||
a rewrite:
|
||||
|
||||
1. **V0:** full-document `draft_patch`; convergence via `version` CAS + merge dialog.
|
||||
2. **When the data-model class finishes migrating to the Yjs store:** `draft_patch` gains a
|
||||
**mutation-list** variant applied via `layoutStore.applyOperation(op)` tagged
|
||||
`LayoutSource.External` with a dedicated agent actor id (the store already tracks
|
||||
source/actor). Full replace remains the fallback for large rewrites / new tabs.
|
||||
3. **Multiplayer:** server relays mutations; both client and server write + reconcile via CRDT
|
||||
merge, retiring the merge dialog for fine-grained edits. The room model, actor/source tracking,
|
||||
and the Redis channel are unchanged.
|
||||
|
||||
### Prototype implementation (this PR)
|
||||
|
||||
This PR ships TDD-as-code for **both** layers so the V0 shape and the end-state are exercised, not
|
||||
just described. Nothing is wired into the chat UI yet.
|
||||
|
||||
**Layer A — V0 server-draft path** (`src/platform/agent/common/`)
|
||||
|
||||
- `agentProtocol.ts` — typed FE⇄agent wire contract + `parseAgentEvent()` decoder for untrusted
|
||||
WS payloads.
|
||||
- `draftReconciler.ts` — pure `version`-CAS decision (`apply` / `conflict` / `stale`).
|
||||
- `useAgentDraftSync.ts` — composable tracking per-workflow base `version` and the three merge
|
||||
outcomes via injected canvas ports.
|
||||
|
||||
**Layer B — CRDT peer path (the #4661 migration target)** (`src/platform/agent/crdt/`)
|
||||
|
||||
- `agentRoom.ts` — a `Y.Doc`-backed room whose top-level types (`nodes`/`links`/`reroutes`) mirror
|
||||
the layout store; real `encodeStateVector`/`diffSince`/`applyRemoteUpdate` + presence.
|
||||
- `roomSync.ts` — transport-agnostic two-phase Yjs sync (state-vector handshake + live updates),
|
||||
modelled on `y-protocols/sync`; runs over the existing Redis→WS bridge.
|
||||
- `agentRoomManager.ts` — room lifecycle: a tab switch is a `join`/`leave`; rooms stay alive in
|
||||
memory while referenced or pinned (agent editing a backgrounded workflow).
|
||||
- `roomDocBinding.ts` — generic, layer-safe bidirectional `Y.Doc` binding with origin-tag echo
|
||||
guards.
|
||||
|
||||
**Wiring (renderer layer)** — `src/renderer/core/layout/agent/bindRoomToLayoutStore.ts` binds a
|
||||
room to the **real** `layoutStore` singleton via its existing (`"future feature"`)
|
||||
`getYDoc()`/`applyUpdate` surface. A test drives an agent room edit into the live store and asserts
|
||||
the version bump — concrete proof of the "few lines of code" claim. Layout binds today; data-model
|
||||
mutations bind the same way once that class finishes migrating to the store.
|
||||
|
||||
**Local chat state** — `src/platform/agent/session/agentSessionStore.ts` is an **Immer** reducer
|
||||
for streaming deltas / tool-call lifecycle. Chat is single-client, so it is deliberately _not_ a
|
||||
CRDT: Yjs owns graph state, Immer owns local UI state, and the two never mix.
|
||||
|
||||
A layering constraint surfaced during this work: `platform/` may not import `renderer/`
|
||||
(base → platform → workbench → renderer). Hence the pure CRDT core lives in `platform/` and only
|
||||
the thin singleton wiring lives in `renderer/`.
|
||||
|
||||
### Alternatives considered
|
||||
|
||||
- **Build true CRDT (Yjs) agent sync in V0** — rejected: data-model CRDT migration incomplete;
|
||||
misses the timeline; high risk.
|
||||
- **Always open a new tab for agent output** — rejected: simplest, but fails "update what I'm
|
||||
looking at" and causes tab sprawl.
|
||||
- **Graph-locking / "agent mode" that blocks user edits** — rejected as primary mechanism:
|
||||
permanent-lock dead-end risk on message loss.
|
||||
- **Browser scrapes the live graph per turn** — rejected: invites client/server drift;
|
||||
autosave-to-draft keeps the canonical server copy current instead.
|
||||
|
||||
## Consequences
|
||||
|
||||
### Positive
|
||||
|
||||
- Hits the V0 timeline by reusing the backend draft + the existing Redis→WS transport; no new
|
||||
realtime infrastructure on the frontend.
|
||||
- A clean, documented bridge to the #4661 CRDT end-state: room-per-graph, agent-as-client, and the
|
||||
ADR-0003 source/actor model all carry forward unchanged.
|
||||
- Conflicts never silently destroy user work; the dialog appears only in the genuine
|
||||
concurrent-edit case.
|
||||
- Awareness is minimal and robust: read the draft + the selection ids.
|
||||
|
||||
### Negative
|
||||
|
||||
- Full-document replace is coarse-grained: a concurrent user edit during an agent turn collides
|
||||
and must go through the merge dialog rather than merging automatically.
|
||||
- Introduces a frontend-owned **base-`version` lifecycle** (obtain on draft open, bump on apply
|
||||
and on autosave). If this drifts, the merge dialog can mis-fire — this is the main correctness
|
||||
risk to get right.
|
||||
- The agent→browser event schema becomes a **cross-repo contract** (Go ⇄ TS); it must be
|
||||
versioned and drift-guarded.
|
||||
- A temporary semantic gap with V0 product copy ("Agent generation does not impact the graph")
|
||||
that must be reconciled now that write-to-graph is in scope.
|
||||
|
||||
## Notes
|
||||
|
||||
### Open questions
|
||||
|
||||
1. **Tab closed mid-edit.** If the user closes the draft's tab while the agent is editing, do
|
||||
pending changes invalidate (and report back to the agent) or persist server-side and reopen on
|
||||
a new tab? Affects the room lifecycle.
|
||||
2. **Base-`version` lifecycle.** Exact points where the tab obtains/bumps its base `version` so
|
||||
the merge dialog cannot mis-fire or desync.
|
||||
3. **Event schema home & versioning.** Where the shared `draft_patch` / `agent_message_delta` /
|
||||
`agent_tool_call` / `agent_message_done` contract lives and how drift is caught.
|
||||
|
||||
### References
|
||||
|
||||
- [ADR-0001](0001-merge-litegraph-into-frontend.md), [ADR-0003](0003-crdt-based-layout-system.md)
|
||||
- RFC: Long-Term Architectural Direction for ComfyUI_frontend (issue #4661)
|
||||
- Backend slice: `Comfy-Org/cloud` PR #4432 (`workflow_draft`, ingest `/api/agent/*`, Redis PubSub)
|
||||
- Existing FE entry points: `src/renderer/core/layout/store/layoutStore.ts`,
|
||||
`src/renderer/core/layout/operations/layoutMutations.ts`, `src/scripts/app.ts` (`loadGraphData`)
|
||||
@@ -20,6 +20,7 @@ An Architecture Decision Record captures an important architectural decision mad
|
||||
| [0008](0008-entity-component-system.md) | Entity Component System | Proposed | 2026-03-23 |
|
||||
| [0009](0009-subgraph-promoted-widgets-use-linked-inputs.md) | Subgraph Promoted Widgets Use Linked Inputs | Proposed | 2026-05-05 |
|
||||
| [0010](0010-remove-nx-orchestration.md) | Remove Nx Orchestration | Accepted | 2026-05-19 |
|
||||
| [0011](0011-in-app-agent-graph-state-integration.md) | In-App Agent Graph-State Integration | Proposed | 2026-06-25 |
|
||||
|
||||
## Creating a New ADR
|
||||
|
||||
|
||||
@@ -57,6 +57,8 @@ const config: KnipConfig = {
|
||||
// Marketing media tooling — adopted by pages in a follow-up PR
|
||||
'apps/website/src/components/common/SiteVideo.vue',
|
||||
'apps/website/src/utils/marketingImage.ts',
|
||||
// In-app agent wire contract (ADR-0011) — public types pending UI integration
|
||||
'src/platform/agent/common/agentProtocol.ts',
|
||||
// Agent review check config, not part of the build
|
||||
'.agents/checks/eslint.strict.config.js',
|
||||
// Devtools extensions, included dynamically
|
||||
|
||||
@@ -111,6 +111,7 @@
|
||||
"firebase": "catalog:",
|
||||
"fuse.js": "^7.0.0",
|
||||
"glob": "catalog:",
|
||||
"immer": "catalog:",
|
||||
"jsonata": "catalog:",
|
||||
"loglevel": "^1.9.2",
|
||||
"marked": "^15.0.11",
|
||||
|
||||
20
pnpm-lock.yaml
generated
20
pnpm-lock.yaml
generated
@@ -258,6 +258,9 @@ catalogs:
|
||||
husky:
|
||||
specifier: ^9.1.7
|
||||
version: 9.1.7
|
||||
immer:
|
||||
specifier: ^11.1.8
|
||||
version: 11.1.8
|
||||
jiti:
|
||||
specifier: 2.6.1
|
||||
version: 2.6.1
|
||||
@@ -579,6 +582,9 @@ importers:
|
||||
glob:
|
||||
specifier: 'catalog:'
|
||||
version: 13.0.6
|
||||
immer:
|
||||
specifier: 'catalog:'
|
||||
version: 11.1.8
|
||||
jsonata:
|
||||
specifier: 'catalog:'
|
||||
version: 2.1.0
|
||||
@@ -654,7 +660,7 @@ importers:
|
||||
version: 4.5.0(eslint@10.4.0(jiti@2.6.1))(jsonc-eslint-parser@2.4.0)(vue-eslint-parser@10.4.0(eslint@10.4.0(jiti@2.6.1)))(yaml-eslint-parser@1.3.0)
|
||||
'@lobehub/i18n-cli':
|
||||
specifier: 'catalog:'
|
||||
version: 1.26.1(@types/react@19.1.9)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)
|
||||
version: 1.26.1(@types/react@19.1.9)(immer@11.1.8)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)
|
||||
'@pinia/testing':
|
||||
specifier: 'catalog:'
|
||||
version: 1.0.3(pinia@3.0.4(typescript@5.9.3)(vue@3.5.34(typescript@5.9.3)))
|
||||
@@ -6042,6 +6048,9 @@ packages:
|
||||
immediate@3.0.6:
|
||||
resolution: {integrity: sha512-XXOFtyqDjNDAQxVfYxuF7g9Il/IbWmmlQg2MYKOH8ExIT1qg6xc4zyS3HaEEATgs1btfzxq15ciUiY7gjSXRGQ==}
|
||||
|
||||
immer@11.1.8:
|
||||
resolution: {integrity: sha512-/tbkHMW7y10Lx6i1crLjD4/OhNkRG+Fo7byZHtah0547nIeXYcpIXaUh0IAQY6gO5459qpGGYapcEOHtFXkIuA==}
|
||||
|
||||
import-fresh@3.3.1:
|
||||
resolution: {integrity: sha512-TR3KfrTZTYLPB6jUjfx6MF9WcWrHL9su5TObK4ZkYgBdWKPOFoSoQIdEuTuR82pmtxH2spWG9h6etwfr1pLBqQ==}
|
||||
engines: {node: '>=6'}
|
||||
@@ -10521,7 +10530,7 @@ snapshots:
|
||||
- react-devtools-core
|
||||
- utf-8-validate
|
||||
|
||||
'@lobehub/i18n-cli@1.26.1(@types/react@19.1.9)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)':
|
||||
'@lobehub/i18n-cli@1.26.1(@types/react@19.1.9)(immer@11.1.8)(typescript@5.9.3)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.21.0)(zod@3.25.76)':
|
||||
dependencies:
|
||||
'@lobehub/cli-ui': 1.13.0(@types/react@19.1.9)
|
||||
'@yutengjing/eld': 0.0.2
|
||||
@@ -10552,7 +10561,7 @@ snapshots:
|
||||
unified: 11.0.5
|
||||
unist-util-visit: 5.1.0
|
||||
update-notifier: 7.3.1
|
||||
zustand: 5.0.11(@types/react@19.1.9)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4))
|
||||
zustand: 5.0.11(@types/react@19.1.9)(immer@11.1.8)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4))
|
||||
transitivePeerDependencies:
|
||||
- '@types/react'
|
||||
- bufferutil
|
||||
@@ -14361,6 +14370,8 @@ snapshots:
|
||||
|
||||
immediate@3.0.6: {}
|
||||
|
||||
immer@11.1.8: {}
|
||||
|
||||
import-fresh@3.3.1:
|
||||
dependencies:
|
||||
parent-module: 1.0.1
|
||||
@@ -17949,9 +17960,10 @@ snapshots:
|
||||
|
||||
zod@4.3.6: {}
|
||||
|
||||
zustand@5.0.11(@types/react@19.1.9)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4)):
|
||||
zustand@5.0.11(@types/react@19.1.9)(immer@11.1.8)(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4)):
|
||||
optionalDependencies:
|
||||
'@types/react': 19.1.9
|
||||
immer: 11.1.8
|
||||
react: 19.2.4
|
||||
use-sync-external-store: 1.6.0(react@19.2.4)
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ catalog:
|
||||
gsap: ^3.14.2
|
||||
happy-dom: ^20.8.9
|
||||
husky: ^9.1.7
|
||||
immer: ^11.1.8
|
||||
jiti: 2.6.1
|
||||
jsdom: ^27.4.0
|
||||
jsonata: ^2.1.0
|
||||
|
||||
273
src/platform/agent/common/agentProtocol.test.ts
Normal file
273
src/platform/agent/common/agentProtocol.test.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import type { AgentTurnRequest } from './agentProtocol'
|
||||
import {
|
||||
parseAgentEvent,
|
||||
parseDraftSnapshot,
|
||||
serializeAgentTurnRequest
|
||||
} from './agentProtocol'
|
||||
|
||||
const data = { thread_id: 't1', message_id: 'm1' }
|
||||
|
||||
describe('parseAgentEvent', () => {
|
||||
it('decodes a draft_patch from the snake_case envelope', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: {
|
||||
...data,
|
||||
workflow_id: 'wf1',
|
||||
content: { nodes: [] },
|
||||
version: 8,
|
||||
base_version: 7
|
||||
}
|
||||
})
|
||||
expect(event).toEqual({
|
||||
type: 'draft_patch',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
workflowId: 'wf1',
|
||||
content: { nodes: [] },
|
||||
version: 8,
|
||||
baseVersion: 7
|
||||
})
|
||||
})
|
||||
|
||||
it('decodes a message delta', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_message_delta',
|
||||
data: { ...data, delta: 'hi' }
|
||||
})
|
||||
expect(event).toEqual({
|
||||
type: 'agent_message_delta',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
delta: 'hi'
|
||||
})
|
||||
})
|
||||
|
||||
it('decodes a tool call and drops absent optional fields', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_tool_call',
|
||||
data: {
|
||||
...data,
|
||||
tool_call_id: 'tc1',
|
||||
tool_name: 'workflow set-slot',
|
||||
status: 'success'
|
||||
}
|
||||
})
|
||||
expect(event).toEqual({
|
||||
type: 'agent_tool_call',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
toolCallId: 'tc1',
|
||||
toolName: 'workflow set-slot',
|
||||
status: 'success'
|
||||
})
|
||||
})
|
||||
|
||||
it('maps tool call duration and error code from snake_case', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_tool_call',
|
||||
data: {
|
||||
...data,
|
||||
tool_call_id: 'tc1',
|
||||
tool_name: 'run',
|
||||
status: 'error',
|
||||
duration_ms: 1200,
|
||||
error_code: 'OOM'
|
||||
}
|
||||
})
|
||||
expect(event).toMatchObject({ durationMs: 1200, errorCode: 'OOM' })
|
||||
})
|
||||
|
||||
it('omits a non-finite tool call duration', () => {
|
||||
for (const durationMs of [Number.NaN, Number.POSITIVE_INFINITY]) {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_tool_call',
|
||||
data: {
|
||||
...data,
|
||||
tool_call_id: 'tc1',
|
||||
tool_name: 'run',
|
||||
status: 'success',
|
||||
duration_ms: durationMs
|
||||
}
|
||||
})
|
||||
expect(event).not.toHaveProperty('durationMs')
|
||||
}
|
||||
})
|
||||
|
||||
it('maps message done usage to tokenUsage', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_message_done',
|
||||
data: { ...data, usage: { input: 12, output: 34 } }
|
||||
})
|
||||
expect(event).toEqual({
|
||||
type: 'agent_message_done',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
tokenUsage: { input: 12, output: 34 }
|
||||
})
|
||||
})
|
||||
|
||||
it('omits tokenUsage when a usage field is non-finite', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_message_done',
|
||||
data: { ...data, usage: { input: 12, output: Number.NaN } }
|
||||
})
|
||||
expect(event).not.toHaveProperty('tokenUsage')
|
||||
})
|
||||
|
||||
it('omits tokenUsage when usage is absent', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_message_done',
|
||||
data: { ...data }
|
||||
})
|
||||
expect(event).toEqual({
|
||||
type: 'agent_message_done',
|
||||
threadId: 't1',
|
||||
messageId: 'm1'
|
||||
})
|
||||
})
|
||||
|
||||
it('rejects an unknown event type', () => {
|
||||
expect(parseAgentEvent({ type: 'nope', data })).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a payload with no envelope body', () => {
|
||||
expect(
|
||||
parseAgentEvent({ type: 'agent_message_delta', delta: 'hi' })
|
||||
).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a payload missing the base identifiers', () => {
|
||||
expect(
|
||||
parseAgentEvent({
|
||||
type: 'agent_message_delta',
|
||||
data: { delta: 'hi' }
|
||||
})
|
||||
).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a draft_patch with a malformed version', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: {
|
||||
...data,
|
||||
workflow_id: 'wf1',
|
||||
content: {},
|
||||
version: '8',
|
||||
base_version: 7
|
||||
}
|
||||
})
|
||||
expect(event).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a draft_patch missing base_version', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: { ...data, workflow_id: 'wf1', content: {}, version: 8 }
|
||||
})
|
||||
expect(event).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a draft_patch with a non-finite version', () => {
|
||||
for (const version of [Number.NaN, Number.POSITIVE_INFINITY]) {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: {
|
||||
...data,
|
||||
workflow_id: 'wf1',
|
||||
content: {},
|
||||
version,
|
||||
base_version: 7
|
||||
}
|
||||
})
|
||||
expect(event).toBeNull()
|
||||
}
|
||||
})
|
||||
|
||||
it('rejects a draft_patch whose content is an array', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: {
|
||||
...data,
|
||||
workflow_id: 'wf1',
|
||||
content: [],
|
||||
version: 8,
|
||||
base_version: 7
|
||||
}
|
||||
})
|
||||
expect(event).toBeNull()
|
||||
})
|
||||
|
||||
it('rejects a tool call with an invalid status', () => {
|
||||
const event = parseAgentEvent({
|
||||
type: 'agent_tool_call',
|
||||
data: {
|
||||
...data,
|
||||
tool_call_id: 'tc1',
|
||||
tool_name: 'run',
|
||||
status: 'pending'
|
||||
}
|
||||
})
|
||||
expect(event).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
describe('serializeAgentTurnRequest', () => {
|
||||
it('serializes baseVersion to the snake_case body', () => {
|
||||
const request: AgentTurnRequest = {
|
||||
content: 'add a sampler',
|
||||
selection: ['1', '2'],
|
||||
attachments: ['asset-1'],
|
||||
target: 'active',
|
||||
baseVersion: 7
|
||||
}
|
||||
expect(serializeAgentTurnRequest(request)).toEqual({
|
||||
content: 'add a sampler',
|
||||
selection: ['1', '2'],
|
||||
attachments: ['asset-1'],
|
||||
target: 'active',
|
||||
base_version: 7
|
||||
})
|
||||
})
|
||||
|
||||
it('omits absent optional fields', () => {
|
||||
expect(serializeAgentTurnRequest({ content: 'hi' })).toEqual({
|
||||
content: 'hi'
|
||||
})
|
||||
})
|
||||
|
||||
it('keeps base_version 0 rather than dropping it', () => {
|
||||
expect(
|
||||
serializeAgentTurnRequest({ content: 'hi', baseVersion: 0 })
|
||||
).toEqual({ content: 'hi', base_version: 0 })
|
||||
})
|
||||
})
|
||||
|
||||
describe('parseDraftSnapshot', () => {
|
||||
it('decodes a well-formed snapshot body', () => {
|
||||
const snapshot = parseDraftSnapshot({
|
||||
content: { nodes: ['ksampler'] },
|
||||
version: 12
|
||||
})
|
||||
expect(snapshot).toEqual({ content: { nodes: ['ksampler'] }, version: 12 })
|
||||
})
|
||||
|
||||
it('accepts version 0', () => {
|
||||
expect(parseDraftSnapshot({ content: {}, version: 0 })).toEqual({
|
||||
content: {},
|
||||
version: 0
|
||||
})
|
||||
})
|
||||
|
||||
it('rejects a malformed or partial body', () => {
|
||||
expect(parseDraftSnapshot(null)).toBeNull()
|
||||
expect(parseDraftSnapshot({ content: { nodes: [] } })).toBeNull()
|
||||
expect(parseDraftSnapshot({ version: 3 })).toBeNull()
|
||||
expect(parseDraftSnapshot({ content: 'nope', version: 3 })).toBeNull()
|
||||
expect(parseDraftSnapshot({ content: [], version: 3 })).toBeNull()
|
||||
expect(parseDraftSnapshot({ content: {}, version: '3' })).toBeNull()
|
||||
expect(parseDraftSnapshot({ content: {}, version: Number.NaN })).toBeNull()
|
||||
})
|
||||
})
|
||||
248
src/platform/agent/common/agentProtocol.ts
Normal file
248
src/platform/agent/common/agentProtocol.ts
Normal file
@@ -0,0 +1,248 @@
|
||||
/**
|
||||
* In-App Agent protocol (prototype — ADR-0011).
|
||||
*
|
||||
* The cross-repo contract between the frontend (TS) and the server-side agent
|
||||
* (Go, `Comfy-Org/cloud`). Inbound requests go to ingest `/api/agent/*`;
|
||||
* outbound events arrive over the existing Redis-PubSub -> WebSocket bridge on
|
||||
* `channel:ws:{workspaceId}:u:{userId}`.
|
||||
*
|
||||
* This is the single TS definition of that contract; it should be kept in sync
|
||||
* with the Go side (open question: where the schema lives + how drift is caught).
|
||||
*/
|
||||
|
||||
export type WorkflowId = string
|
||||
export type ThreadId = string
|
||||
export type MessageId = string
|
||||
export type NodeId = string
|
||||
|
||||
/** Full save-format graph. Opaque here; validated by the workflow schema layer. */
|
||||
export type WorkflowGraph = Record<string, unknown>
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Inbound: browser -> agent
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Where an agent write lands (ADR-0001). */
|
||||
export type AgentWriteTarget = 'active' | 'new_tab'
|
||||
|
||||
export interface AgentTurnRequest {
|
||||
content: string
|
||||
/** Selected node ids — the awareness input (ADR-0003). */
|
||||
selection?: NodeId[]
|
||||
/** Uploaded asset ids referenced by the turn. */
|
||||
attachments?: string[]
|
||||
target?: AgentWriteTarget
|
||||
/** The tab's current draft version when `target === 'active'` (ADR-0005). */
|
||||
baseVersion?: number
|
||||
}
|
||||
|
||||
/** Wire body the backend `/api/agent/*` endpoints accept (snake_case). */
|
||||
export interface AgentTurnRequestBody {
|
||||
content: string
|
||||
selection?: NodeId[]
|
||||
attachments?: string[]
|
||||
target?: AgentWriteTarget
|
||||
base_version?: number
|
||||
}
|
||||
|
||||
/** Serialize a turn request to the snake_case body the backend expects. */
|
||||
export function serializeAgentTurnRequest(
|
||||
request: AgentTurnRequest
|
||||
): AgentTurnRequestBody {
|
||||
return {
|
||||
content: request.content,
|
||||
...(request.selection ? { selection: request.selection } : {}),
|
||||
...(request.attachments ? { attachments: request.attachments } : {}),
|
||||
...(request.target ? { target: request.target } : {}),
|
||||
...(request.baseVersion !== undefined
|
||||
? { base_version: request.baseVersion }
|
||||
: {})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Outbound: agent -> browser
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export type AgentToolCallStatus = 'running' | 'success' | 'error'
|
||||
|
||||
interface AgentEventBase {
|
||||
threadId: ThreadId
|
||||
messageId: MessageId
|
||||
}
|
||||
|
||||
export interface AgentMessageDeltaEvent extends AgentEventBase {
|
||||
type: 'agent_message_delta'
|
||||
delta: string
|
||||
}
|
||||
|
||||
export interface AgentToolCallEvent extends AgentEventBase {
|
||||
type: 'agent_tool_call'
|
||||
toolCallId: string
|
||||
toolName: string
|
||||
status: AgentToolCallStatus
|
||||
durationMs?: number
|
||||
errorCode?: string
|
||||
}
|
||||
|
||||
/** A graph write: full-document replace guarded by `version` (ADR-0004). */
|
||||
export interface DraftPatchEvent extends AgentEventBase {
|
||||
type: 'draft_patch'
|
||||
workflowId: WorkflowId
|
||||
content: WorkflowGraph
|
||||
/** The new authoritative version after the agent's CAS commit. */
|
||||
version: number
|
||||
/** The version the agent started from; compared against the tab (ADR-0005). */
|
||||
baseVersion: number
|
||||
}
|
||||
|
||||
export interface AgentMessageDoneEvent extends AgentEventBase {
|
||||
type: 'agent_message_done'
|
||||
tokenUsage?: { input: number; output: number }
|
||||
}
|
||||
|
||||
export type AgentEvent =
|
||||
| AgentMessageDeltaEvent
|
||||
| AgentToolCallEvent
|
||||
| DraftPatchEvent
|
||||
| AgentMessageDoneEvent
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null && !Array.isArray(value)
|
||||
}
|
||||
|
||||
/** A finite number — rejects `NaN`/`Infinity`, which would wedge CAS version compares. */
|
||||
function isFiniteNumber(value: unknown): value is number {
|
||||
return typeof value === 'number' && Number.isFinite(value)
|
||||
}
|
||||
|
||||
/** The base identifiers, read from the snake_case envelope body. */
|
||||
function parseBase(data: Record<string, unknown>): AgentEventBase | null {
|
||||
return typeof data.thread_id === 'string' &&
|
||||
typeof data.message_id === 'string'
|
||||
? { threadId: data.thread_id, messageId: data.message_id }
|
||||
: null
|
||||
}
|
||||
|
||||
function isToolCallStatus(value: unknown): value is AgentToolCallStatus {
|
||||
return value === 'running' || value === 'success' || value === 'error'
|
||||
}
|
||||
|
||||
function parseToolCall(
|
||||
data: Record<string, unknown>,
|
||||
base: AgentEventBase
|
||||
): AgentToolCallEvent | null {
|
||||
if (
|
||||
typeof data.tool_call_id !== 'string' ||
|
||||
typeof data.tool_name !== 'string' ||
|
||||
!isToolCallStatus(data.status)
|
||||
) {
|
||||
return null
|
||||
}
|
||||
return {
|
||||
type: 'agent_tool_call',
|
||||
...base,
|
||||
toolCallId: data.tool_call_id,
|
||||
toolName: data.tool_name,
|
||||
status: data.status,
|
||||
...(isFiniteNumber(data.duration_ms)
|
||||
? { durationMs: data.duration_ms }
|
||||
: {}),
|
||||
...(typeof data.error_code === 'string'
|
||||
? { errorCode: data.error_code }
|
||||
: {})
|
||||
}
|
||||
}
|
||||
|
||||
function parseDraftPatch(
|
||||
data: Record<string, unknown>,
|
||||
base: AgentEventBase
|
||||
): DraftPatchEvent | null {
|
||||
if (
|
||||
typeof data.workflow_id !== 'string' ||
|
||||
!isRecord(data.content) ||
|
||||
!isFiniteNumber(data.version) ||
|
||||
!isFiniteNumber(data.base_version)
|
||||
) {
|
||||
return null
|
||||
}
|
||||
return {
|
||||
type: 'draft_patch',
|
||||
...base,
|
||||
workflowId: data.workflow_id,
|
||||
content: data.content,
|
||||
version: data.version,
|
||||
baseVersion: data.base_version
|
||||
}
|
||||
}
|
||||
|
||||
function parseTokenUsage(
|
||||
usage: unknown
|
||||
): { input: number; output: number } | undefined {
|
||||
return isRecord(usage) &&
|
||||
isFiniteNumber(usage.input) &&
|
||||
isFiniteNumber(usage.output)
|
||||
? { input: usage.input, output: usage.output }
|
||||
: undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode an untrusted WebSocket payload into a typed `AgentEvent`, or `null` if
|
||||
* it is not a well-formed agent event. The wire format is the canonical backend
|
||||
* envelope `{ type, data: { …snake_case… } }`; the event body lives under
|
||||
* `data`. Keeps the transport boundary type-safe.
|
||||
*/
|
||||
export function parseAgentEvent(raw: unknown): AgentEvent | null {
|
||||
if (!isRecord(raw) || !isRecord(raw.data)) return null
|
||||
|
||||
const { data } = raw
|
||||
const base = parseBase(data)
|
||||
if (!base) return null
|
||||
|
||||
switch (raw.type) {
|
||||
case 'agent_message_delta':
|
||||
return typeof data.delta === 'string'
|
||||
? { type: 'agent_message_delta', ...base, delta: data.delta }
|
||||
: null
|
||||
case 'agent_tool_call':
|
||||
return parseToolCall(data, base)
|
||||
case 'draft_patch':
|
||||
return parseDraftPatch(data, base)
|
||||
case 'agent_message_done': {
|
||||
const tokenUsage = parseTokenUsage(data.usage)
|
||||
return {
|
||||
type: 'agent_message_done',
|
||||
...base,
|
||||
...(tokenUsage ? { tokenUsage } : {})
|
||||
}
|
||||
}
|
||||
default:
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Draft snapshot: GET /api/agent/draft?workflow_id=... -> { content, version }
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* The authoritative server draft for a workflow (ADR-0011). Fetched on WS
|
||||
* (re)connect and whenever a gap is suspected, to seed/reconcile the tab's base
|
||||
* `version` without waiting for the agent to emit a new `draft_patch`.
|
||||
*/
|
||||
export interface DraftSnapshot {
|
||||
content: WorkflowGraph
|
||||
version: number
|
||||
}
|
||||
|
||||
/** Decode an untrusted `GET /api/agent/draft` body, or `null` if malformed. */
|
||||
export function parseDraftSnapshot(raw: unknown): DraftSnapshot | null {
|
||||
if (
|
||||
!isRecord(raw) ||
|
||||
!isRecord(raw.content) ||
|
||||
!isFiniteNumber(raw.version)
|
||||
) {
|
||||
return null
|
||||
}
|
||||
return { content: raw.content, version: raw.version }
|
||||
}
|
||||
49
src/platform/agent/common/draftReconciler.test.ts
Normal file
49
src/platform/agent/common/draftReconciler.test.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import type { DraftPatchEvent } from './agentProtocol'
|
||||
import { reconcileDraftPatch } from './draftReconciler'
|
||||
|
||||
function patch(overrides: Partial<DraftPatchEvent> = {}): DraftPatchEvent {
|
||||
return {
|
||||
type: 'draft_patch',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
workflowId: 'wf1',
|
||||
content: { nodes: [] },
|
||||
version: 8,
|
||||
baseVersion: 7,
|
||||
...overrides
|
||||
}
|
||||
}
|
||||
|
||||
describe('reconcileDraftPatch', () => {
|
||||
it('applies when the patch is based on the current tab version', () => {
|
||||
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 7)
|
||||
expect(result).toEqual({ kind: 'apply', version: 8 })
|
||||
})
|
||||
|
||||
it('flags a conflict when a concurrent edit advanced the tab', () => {
|
||||
// Agent started from v7, but the user pushed the tab to v8 mid-turn.
|
||||
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 9 }), 8)
|
||||
expect(result).toEqual({ kind: 'conflict' })
|
||||
})
|
||||
|
||||
it('ignores a stale patch the tab already supersedes', () => {
|
||||
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 8)
|
||||
expect(result).toEqual({ kind: 'stale' })
|
||||
})
|
||||
|
||||
it('ignores an older duplicate patch', () => {
|
||||
const result = reconcileDraftPatch(
|
||||
patch({ baseVersion: 5, version: 6 }),
|
||||
10
|
||||
)
|
||||
expect(result).toEqual({ kind: 'stale' })
|
||||
})
|
||||
|
||||
it('reports a gap when the agent advanced past the tab (missed patches)', () => {
|
||||
// Tab holds v5; this patch is based on v7, so v6/v7 were dropped in transit.
|
||||
const result = reconcileDraftPatch(patch({ baseVersion: 7, version: 8 }), 5)
|
||||
expect(result).toEqual({ kind: 'gap' })
|
||||
})
|
||||
})
|
||||
40
src/platform/agent/common/draftReconciler.ts
Normal file
40
src/platform/agent/common/draftReconciler.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Draft reconciliation (prototype — ADR-0004 / ADR-0005 / ADR-0011).
|
||||
*
|
||||
* Pure decision logic for an incoming `draft_patch`, given the version the tab
|
||||
* currently holds. This is the load-bearing correctness piece: it decides
|
||||
* whether a full-document replace applies cleanly, must surface the merge
|
||||
* dialog, is a stale/duplicate to ignore, or reveals a gap that needs an
|
||||
* authoritative refetch.
|
||||
*
|
||||
* `version` is a monotonic watermark: a patch is adopted only when it advances
|
||||
* past what the tab already holds, so a dropped / duplicated / out-of-order
|
||||
* Redis Pub/Sub `draft_patch` (the transport is at-most-once, BE-1886) can never
|
||||
* regress local state.
|
||||
*/
|
||||
import type { DraftPatchEvent } from './agentProtocol'
|
||||
|
||||
export type ReconcileResult =
|
||||
/** Patch is based on the tab's current version — apply and adopt `version`. */
|
||||
| { kind: 'apply'; version: number }
|
||||
/** A concurrent user edit advanced the tab — surface the merge dialog. */
|
||||
| { kind: 'conflict' }
|
||||
/** Patch is superseded by what the tab already has — ignore (idempotency). */
|
||||
| { kind: 'stale' }
|
||||
/** The agent advanced past the tab — patches were missed; refetch snapshot. */
|
||||
| { kind: 'gap' }
|
||||
|
||||
/** User's choice in the merge dialog (ADR-0005). */
|
||||
export type ConflictResolution = 'accept-agent' | 'keep-mine' | 'new-tab'
|
||||
|
||||
export function reconcileDraftPatch(
|
||||
patch: DraftPatchEvent,
|
||||
currentVersion: number
|
||||
): ReconcileResult {
|
||||
if (patch.version <= currentVersion) return { kind: 'stale' }
|
||||
if (patch.baseVersion === currentVersion) {
|
||||
return { kind: 'apply', version: patch.version }
|
||||
}
|
||||
if (patch.baseVersion > currentVersion) return { kind: 'gap' }
|
||||
return { kind: 'conflict' }
|
||||
}
|
||||
374
src/platform/agent/common/useAgentDraftSync.test.ts
Normal file
374
src/platform/agent/common/useAgentDraftSync.test.ts
Normal file
@@ -0,0 +1,374 @@
|
||||
import { describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import type { DraftPatchEvent } from './agentProtocol'
|
||||
import { parseAgentEvent } from './agentProtocol'
|
||||
import type { AgentDraftPorts } from './useAgentDraftSync'
|
||||
import { useAgentDraftSync } from './useAgentDraftSync'
|
||||
|
||||
const SNAPSHOT = { content: { nodes: ['from-snapshot'] }, version: 12 }
|
||||
|
||||
function makePorts(): AgentDraftPorts {
|
||||
return {
|
||||
applyToTab: vi.fn(),
|
||||
openInNewTab: vi.fn(),
|
||||
discardAgentResult: vi.fn(),
|
||||
fetchSnapshot: vi.fn().mockResolvedValue(SNAPSHOT)
|
||||
}
|
||||
}
|
||||
|
||||
function patch(overrides: Partial<DraftPatchEvent> = {}): DraftPatchEvent {
|
||||
return {
|
||||
type: 'draft_patch',
|
||||
threadId: 't1',
|
||||
messageId: 'm1',
|
||||
workflowId: 'wf1',
|
||||
content: { nodes: ['ksampler'] },
|
||||
version: 8,
|
||||
baseVersion: 7,
|
||||
...overrides
|
||||
}
|
||||
}
|
||||
|
||||
describe('useAgentDraftSync', () => {
|
||||
it('applies a patch to the active tab and adopts the new version', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
|
||||
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
|
||||
|
||||
expect(outcome).toBe('applied')
|
||||
expect(ports.applyToTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
{ nodes: ['ksampler'] },
|
||||
8
|
||||
)
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(8)
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
|
||||
it('surfaces a conflict when the user edited the graph mid-turn', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
sync.setVersion('wf1', 8) // local autosave advanced the tab
|
||||
|
||||
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
|
||||
|
||||
expect(outcome).toBe('conflict')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.pendingConflict.value).toMatchObject({
|
||||
workflowId: 'wf1',
|
||||
version: 9
|
||||
})
|
||||
})
|
||||
|
||||
it('ignores a stale patch', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 8)
|
||||
|
||||
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
|
||||
|
||||
expect(outcome).toBe('ignored')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('opens a new tab when the workflow has no open tab', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
|
||||
const outcome = sync.handlePatch(patch({ workflowId: 'wf-new' }))
|
||||
|
||||
expect(outcome).toBe('opened-new-tab')
|
||||
expect(ports.openInNewTab).toHaveBeenCalledWith(
|
||||
'wf-new',
|
||||
{ nodes: ['ksampler'] },
|
||||
8
|
||||
)
|
||||
})
|
||||
|
||||
describe('resolveConflict', () => {
|
||||
function setupConflict() {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
sync.setVersion('wf1', 8)
|
||||
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
|
||||
return { ports, sync }
|
||||
}
|
||||
|
||||
it('accept-agent applies the agent version and adopts it', () => {
|
||||
const { ports, sync } = setupConflict()
|
||||
sync.resolveConflict('accept-agent')
|
||||
|
||||
expect(ports.applyToTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
{ nodes: ['ksampler'] },
|
||||
9
|
||||
)
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(9)
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
|
||||
it('keep-mine discards the agent result and keeps the tab version', () => {
|
||||
const { ports, sync } = setupConflict()
|
||||
sync.resolveConflict('keep-mine')
|
||||
|
||||
expect(ports.discardAgentResult).toHaveBeenCalledWith('wf1')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(8)
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
|
||||
it('new-tab opens the agent result without touching the active tab', () => {
|
||||
const { ports, sync } = setupConflict()
|
||||
sync.resolveConflict('new-tab')
|
||||
|
||||
expect(ports.openInNewTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
{ nodes: ['ksampler'] },
|
||||
9
|
||||
)
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
describe('forgetWorkflow', () => {
|
||||
it('clears a pending conflict targeting the closed workflow', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
sync.setVersion('wf1', 8)
|
||||
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
|
||||
expect(sync.pendingConflict.value).not.toBeNull()
|
||||
|
||||
sync.forgetWorkflow('wf1')
|
||||
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
|
||||
it('leaves a pending conflict for a different workflow intact', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
sync.setVersion('wf1', 8)
|
||||
sync.handlePatch(patch({ baseVersion: 7, version: 9 }))
|
||||
|
||||
sync.forgetWorkflow('wf2')
|
||||
|
||||
expect(sync.pendingConflict.value).toMatchObject({ workflowId: 'wf1' })
|
||||
})
|
||||
})
|
||||
|
||||
describe('wire envelope -> reconciler', () => {
|
||||
function wirePatch(version: number, baseVersion: number) {
|
||||
const event = parseAgentEvent({
|
||||
type: 'draft_patch',
|
||||
data: {
|
||||
thread_id: 't1',
|
||||
message_id: 'm1',
|
||||
workflow_id: 'wf1',
|
||||
content: { nodes: ['ksampler'] },
|
||||
version,
|
||||
base_version: baseVersion
|
||||
}
|
||||
})
|
||||
if (event?.type !== 'draft_patch') {
|
||||
throw new Error('expected a draft_patch event')
|
||||
}
|
||||
return event
|
||||
}
|
||||
|
||||
it('applies when base_version matches the tab version', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
|
||||
const outcome = sync.handlePatch(wirePatch(8, 7))
|
||||
|
||||
expect(outcome).toBe('applied')
|
||||
expect(ports.applyToTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
{ nodes: ['ksampler'] },
|
||||
8
|
||||
)
|
||||
})
|
||||
|
||||
it('conflicts when base_version differs from the tab version', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 7)
|
||||
sync.setVersion('wf1', 8)
|
||||
|
||||
const outcome = sync.handlePatch(wirePatch(9, 7))
|
||||
|
||||
expect(outcome).toBe('conflict')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.pendingConflict.value).toMatchObject({
|
||||
workflowId: 'wf1',
|
||||
baseVersion: 7,
|
||||
version: 9
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('resync (reconnect / cold start)', () => {
|
||||
it('seeds the tab from the snapshot when none is registered yet', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
|
||||
const outcome = await sync.resync('wf1')
|
||||
|
||||
expect(outcome).toBe('restored')
|
||||
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
|
||||
expect(ports.applyToTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
SNAPSHOT.content,
|
||||
SNAPSHOT.version
|
||||
)
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
|
||||
})
|
||||
|
||||
it('restores a newer snapshot over a stale local version', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
|
||||
const outcome = await sync.resync('wf1')
|
||||
|
||||
expect(outcome).toBe('restored')
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
|
||||
})
|
||||
|
||||
it('leaves the tab untouched when the snapshot is not newer (watermark)', async () => {
|
||||
const ports = makePorts()
|
||||
vi.mocked(ports.fetchSnapshot).mockResolvedValue({
|
||||
content: { nodes: ['old'] },
|
||||
version: 8
|
||||
})
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 8)
|
||||
|
||||
const outcome = await sync.resync('wf1')
|
||||
|
||||
expect(outcome).toBe('up-to-date')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(8)
|
||||
})
|
||||
|
||||
it('shares one in-flight request across concurrent resyncs', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
|
||||
const [a, b] = await Promise.all([sync.resync('wf1'), sync.resync('wf1')])
|
||||
|
||||
expect(a).toBe('restored')
|
||||
expect(b).toBe('restored')
|
||||
expect(ports.fetchSnapshot).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('clears an open merge dialog when it restores a newer snapshot', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
sync.setVersion('wf1', 6)
|
||||
sync.handlePatch(patch({ baseVersion: 4, version: 7 }))
|
||||
expect(sync.pendingConflict.value).not.toBeNull()
|
||||
|
||||
const outcome = await sync.resync('wf1')
|
||||
|
||||
expect(outcome).toBe('restored')
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
|
||||
it('does not resurrect tracking for a tab closed mid-fetch', async () => {
|
||||
const ports = makePorts()
|
||||
let resolveFetch!: (snapshot: typeof SNAPSHOT) => void
|
||||
vi.mocked(ports.fetchSnapshot).mockReturnValue(
|
||||
new Promise((resolve) => {
|
||||
resolveFetch = resolve
|
||||
})
|
||||
)
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
|
||||
const pending = sync.resync('wf1')
|
||||
sync.forgetWorkflow('wf1')
|
||||
resolveFetch(SNAPSHOT)
|
||||
|
||||
expect(await pending).toBe('up-to-date')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.baseVersions.value.has('wf1')).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('gap detection', () => {
|
||||
it('refetches the snapshot when the agent advanced past the tab', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
|
||||
const outcome = sync.handlePatch(patch({ baseVersion: 7, version: 8 }))
|
||||
|
||||
expect(outcome).toBe('gap')
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
await sync.pendingResync('wf1')
|
||||
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
|
||||
expect(ports.applyToTab).toHaveBeenCalledWith(
|
||||
'wf1',
|
||||
SNAPSHOT.content,
|
||||
SNAPSHOT.version
|
||||
)
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
|
||||
expect(sync.pendingConflict.value).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
describe('handleVersionTip (trailing lost patch)', () => {
|
||||
it('resyncs when the tip outruns the local watermark', async () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
|
||||
const outcome = sync.handleVersionTip('wf1', 9)
|
||||
|
||||
expect(outcome).toBe('resyncing')
|
||||
await sync.pendingResync('wf1')
|
||||
expect(ports.fetchSnapshot).toHaveBeenCalledWith('wf1')
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(SNAPSHOT.version)
|
||||
})
|
||||
|
||||
it('does nothing when the tip is not newer', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 9)
|
||||
|
||||
expect(sync.handleVersionTip('wf1', 9)).toBe('up-to-date')
|
||||
expect(ports.fetchSnapshot).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('ignores a tip for a workflow with no open tab', () => {
|
||||
const ports = makePorts()
|
||||
const sync = useAgentDraftSync(ports)
|
||||
|
||||
expect(sync.handleVersionTip('wf-unknown', 9)).toBe('ignored')
|
||||
expect(ports.fetchSnapshot).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('preserves local state when a self-heal fetch fails', async () => {
|
||||
const ports = makePorts()
|
||||
const boom = new Error('network down')
|
||||
vi.mocked(ports.fetchSnapshot).mockRejectedValue(boom)
|
||||
const sync = useAgentDraftSync(ports)
|
||||
sync.registerWorkflow('wf1', 5)
|
||||
|
||||
expect(sync.handleVersionTip('wf1', 9)).toBe('resyncing')
|
||||
await expect(sync.pendingResync('wf1')).rejects.toBe(boom)
|
||||
|
||||
expect(ports.applyToTab).not.toHaveBeenCalled()
|
||||
expect(sync.baseVersions.value.get('wf1')).toBe(5)
|
||||
})
|
||||
})
|
||||
})
|
||||
230
src/platform/agent/common/useAgentDraftSync.ts
Normal file
230
src/platform/agent/common/useAgentDraftSync.ts
Normal file
@@ -0,0 +1,230 @@
|
||||
/**
|
||||
* Agent draft sync (prototype — ADR-0011).
|
||||
*
|
||||
* Orchestrates the frontend side of agent graph writes: tracks the base
|
||||
* `version` per open workflow (the version lifecycle), reconciles incoming
|
||||
* `draft_patch` events, drives the three merge-dialog outcomes, and self-heals
|
||||
* across dropped Redis Pub/Sub patches and WS reconnects by refetching the
|
||||
* authoritative snapshot (BE-1886). The transport stays best-effort; this client
|
||||
* provides reliability — `version` is the monotonic watermark.
|
||||
*
|
||||
* The canvas-facing effects are injected as `ports` so this is decoupled from
|
||||
* litegraph / the workflow store and is unit-testable. In the real app the
|
||||
* ports map to: `applyToTab` -> a destructive variant of `app.loadGraphData`;
|
||||
* `openInNewTab` -> the existing non-destructive load; `discardAgentResult` ->
|
||||
* a no-op (keep the user's canvas as-is); `fetchSnapshot` -> a `GET
|
||||
* /api/agent/draft?workflow_id=...` decoded with `parseDraftSnapshot`.
|
||||
*/
|
||||
import { readonly, ref } from 'vue'
|
||||
|
||||
import type {
|
||||
DraftPatchEvent,
|
||||
DraftSnapshot,
|
||||
WorkflowGraph,
|
||||
WorkflowId
|
||||
} from './agentProtocol'
|
||||
import type { ConflictResolution } from './draftReconciler'
|
||||
import { reconcileDraftPatch } from './draftReconciler'
|
||||
|
||||
export interface AgentDraftPorts {
|
||||
applyToTab(
|
||||
workflowId: WorkflowId,
|
||||
content: WorkflowGraph,
|
||||
version: number
|
||||
): void
|
||||
openInNewTab(
|
||||
workflowId: WorkflowId,
|
||||
content: WorkflowGraph,
|
||||
version: number
|
||||
): void
|
||||
discardAgentResult(workflowId: WorkflowId): void
|
||||
fetchSnapshot(workflowId: WorkflowId): Promise<DraftSnapshot>
|
||||
}
|
||||
|
||||
export interface PendingConflict {
|
||||
workflowId: WorkflowId
|
||||
content: WorkflowGraph
|
||||
version: number
|
||||
baseVersion: number
|
||||
}
|
||||
|
||||
export type PatchOutcome =
|
||||
| 'applied'
|
||||
| 'conflict'
|
||||
| 'ignored'
|
||||
| 'opened-new-tab'
|
||||
| 'gap'
|
||||
|
||||
/** `restored` = a newer snapshot replaced the tab; `up-to-date` = already current. */
|
||||
export type ResyncOutcome = 'restored' | 'up-to-date'
|
||||
|
||||
export type VersionTipOutcome = 'resyncing' | 'up-to-date' | 'ignored'
|
||||
|
||||
export function useAgentDraftSync(ports: AgentDraftPorts) {
|
||||
const baseVersions = ref(new Map<WorkflowId, number>())
|
||||
const pendingConflict = ref<PendingConflict | null>(null)
|
||||
const inFlightResyncs = new Map<WorkflowId, Promise<ResyncOutcome>>()
|
||||
|
||||
/** Call when a draft tab opens, adopting its known version. */
|
||||
function registerWorkflow(workflowId: WorkflowId, version: number): void {
|
||||
baseVersions.value.set(workflowId, version)
|
||||
}
|
||||
|
||||
function clearConflictFor(workflowId: WorkflowId): void {
|
||||
if (pendingConflict.value?.workflowId === workflowId) {
|
||||
pendingConflict.value = null
|
||||
}
|
||||
}
|
||||
|
||||
function forgetWorkflow(workflowId: WorkflowId): void {
|
||||
baseVersions.value.delete(workflowId)
|
||||
clearConflictFor(workflowId)
|
||||
}
|
||||
|
||||
/** Call after a local autosave returns a new server version. */
|
||||
function setVersion(workflowId: WorkflowId, version: number): void {
|
||||
baseVersions.value.set(workflowId, version)
|
||||
}
|
||||
|
||||
function handlePatch(patch: DraftPatchEvent): PatchOutcome {
|
||||
const current = baseVersions.value.get(patch.workflowId)
|
||||
|
||||
// Unknown workflow = no open tab for it (a new-tab write, or a tab the user
|
||||
// closed mid-edit — see ADR-0011 open question). Route to a new tab.
|
||||
if (current === undefined) {
|
||||
ports.openInNewTab(patch.workflowId, patch.content, patch.version)
|
||||
baseVersions.value.set(patch.workflowId, patch.version)
|
||||
return 'opened-new-tab'
|
||||
}
|
||||
|
||||
const result = reconcileDraftPatch(patch, current)
|
||||
switch (result.kind) {
|
||||
case 'apply':
|
||||
ports.applyToTab(patch.workflowId, patch.content, result.version)
|
||||
baseVersions.value.set(patch.workflowId, result.version)
|
||||
return 'applied'
|
||||
case 'conflict':
|
||||
pendingConflict.value = {
|
||||
workflowId: patch.workflowId,
|
||||
content: patch.content,
|
||||
version: patch.version,
|
||||
baseVersion: patch.baseVersion
|
||||
}
|
||||
return 'conflict'
|
||||
case 'gap':
|
||||
scheduleResync(patch.workflowId)
|
||||
return 'gap'
|
||||
case 'stale':
|
||||
return 'ignored'
|
||||
}
|
||||
}
|
||||
|
||||
async function runResync(workflowId: WorkflowId): Promise<ResyncOutcome> {
|
||||
const wasTracked = baseVersions.value.has(workflowId)
|
||||
const snapshot = await ports.fetchSnapshot(workflowId)
|
||||
const current = baseVersions.value.get(workflowId)
|
||||
|
||||
// The tab was closed mid-fetch (`forgetWorkflow`). Don't resurrect tracking
|
||||
// or apply to a tab that no longer exists.
|
||||
if (wasTracked && current === undefined) return 'up-to-date'
|
||||
if (current !== undefined && snapshot.version <= current) {
|
||||
return 'up-to-date'
|
||||
}
|
||||
ports.applyToTab(workflowId, snapshot.content, snapshot.version)
|
||||
baseVersions.value.set(workflowId, snapshot.version)
|
||||
// The authoritative snapshot supersedes any open merge dialog for this tab;
|
||||
// resolving it later would re-apply now-stale content and roll the version
|
||||
// back.
|
||||
clearConflictFor(workflowId)
|
||||
return 'restored'
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the authoritative snapshot and reconcile it against the watermark.
|
||||
* Call on WS (re)connect to restore the draft without waiting for a patch.
|
||||
* Concurrent calls for the same workflow share one in-flight request.
|
||||
*/
|
||||
function resync(workflowId: WorkflowId): Promise<ResyncOutcome> {
|
||||
const existing = inFlightResyncs.get(workflowId)
|
||||
if (existing) return existing
|
||||
const run = runResync(workflowId).finally(() => {
|
||||
inFlightResyncs.delete(workflowId)
|
||||
})
|
||||
inFlightResyncs.set(workflowId, run)
|
||||
return run
|
||||
}
|
||||
|
||||
/** In-flight resync for a workflow, if any (lets callers await self-heal). */
|
||||
function pendingResync(
|
||||
workflowId: WorkflowId
|
||||
): Promise<ResyncOutcome> | undefined {
|
||||
return inFlightResyncs.get(workflowId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fire-and-forget self-heal. A failed refetch is best-effort: it leaves local
|
||||
* state intact and is retried by the next patch / reconnect / version tip.
|
||||
*/
|
||||
function scheduleResync(workflowId: WorkflowId): void {
|
||||
void resync(workflowId).catch((error: unknown) => {
|
||||
console.error('[agent] draft resync failed', workflowId, error)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Consume an optional `draft_version` tip (a content-less version heartbeat).
|
||||
* Catches a trailing lost patch: if the server tip outruns our watermark for an
|
||||
* open workflow, refetch the snapshot.
|
||||
*/
|
||||
function handleVersionTip(
|
||||
workflowId: WorkflowId,
|
||||
version: number
|
||||
): VersionTipOutcome {
|
||||
const current = baseVersions.value.get(workflowId)
|
||||
if (current === undefined) return 'ignored'
|
||||
if (version <= current) return 'up-to-date'
|
||||
scheduleResync(workflowId)
|
||||
return 'resyncing'
|
||||
}
|
||||
|
||||
function resolveConflict(decision: ConflictResolution): void {
|
||||
const conflict = pendingConflict.value
|
||||
if (!conflict) return
|
||||
|
||||
switch (decision) {
|
||||
case 'accept-agent':
|
||||
ports.applyToTab(
|
||||
conflict.workflowId,
|
||||
conflict.content,
|
||||
conflict.version
|
||||
)
|
||||
baseVersions.value.set(conflict.workflowId, conflict.version)
|
||||
break
|
||||
case 'keep-mine':
|
||||
ports.discardAgentResult(conflict.workflowId)
|
||||
break
|
||||
case 'new-tab':
|
||||
ports.openInNewTab(
|
||||
conflict.workflowId,
|
||||
conflict.content,
|
||||
conflict.version
|
||||
)
|
||||
break
|
||||
}
|
||||
|
||||
pendingConflict.value = null
|
||||
}
|
||||
|
||||
return {
|
||||
baseVersions: readonly(baseVersions),
|
||||
pendingConflict: readonly(pendingConflict),
|
||||
registerWorkflow,
|
||||
forgetWorkflow,
|
||||
setVersion,
|
||||
handlePatch,
|
||||
resolveConflict,
|
||||
resync,
|
||||
pendingResync,
|
||||
handleVersionTip
|
||||
}
|
||||
}
|
||||
48
src/platform/agent/crdt/agentRoom.test.ts
Normal file
48
src/platform/agent/crdt/agentRoom.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import { AgentRoom } from './agentRoom'
|
||||
|
||||
describe('AgentRoom', () => {
|
||||
it('reconciles two peers via state-vector diff without conflict', () => {
|
||||
const user = new AgentRoom('wf-1')
|
||||
const agent = new AgentRoom('wf-1')
|
||||
|
||||
user.nodes.set('a', { title: 'Load Checkpoint' })
|
||||
agent.nodes.set('b', { title: 'KSampler' })
|
||||
|
||||
agent.applyRemoteUpdate(user.diffSince(agent.encodeStateVector()))
|
||||
user.applyRemoteUpdate(agent.diffSince(user.encodeStateVector()))
|
||||
|
||||
expect([...user.nodes.keys()].sort()).toEqual(['a', 'b'])
|
||||
expect([...agent.nodes.keys()].sort()).toEqual(['a', 'b'])
|
||||
})
|
||||
|
||||
it('merges concurrent edits to the same map deterministically', () => {
|
||||
const user = new AgentRoom('wf-1')
|
||||
const agent = new AgentRoom('wf-1')
|
||||
agent.applyRemoteUpdate(user.encodeState())
|
||||
|
||||
user.nodes.set('shared', { x: 1 })
|
||||
agent.nodes.set('shared', { x: 2 })
|
||||
|
||||
user.applyRemoteUpdate(agent.diffSince(user.encodeStateVector()))
|
||||
agent.applyRemoteUpdate(user.diffSince(agent.encodeStateVector()))
|
||||
|
||||
expect(user.nodes.get('shared')).toEqual(agent.nodes.get('shared'))
|
||||
})
|
||||
|
||||
it('reports an agent participant as editing via presence', () => {
|
||||
const room = new AgentRoom('wf-1')
|
||||
expect(room.isAgentEditing()).toBe(false)
|
||||
|
||||
room.setPresence({
|
||||
actor: 'agent-1',
|
||||
kind: 'agent',
|
||||
status: 'editing',
|
||||
focus: ['a'],
|
||||
updatedAt: 0
|
||||
})
|
||||
|
||||
expect(room.isAgentEditing()).toBe(true)
|
||||
})
|
||||
})
|
||||
98
src/platform/agent/crdt/agentRoom.ts
Normal file
98
src/platform/agent/crdt/agentRoom.ts
Normal file
@@ -0,0 +1,98 @@
|
||||
/**
|
||||
* Agent CRDT room (prototype — ADR-0011, building on ADR-0003).
|
||||
*
|
||||
* One collaborative workflow = one room: a thin wrapper over a Yjs `Y.Doc` that
|
||||
* the browser and the server-side agent share. The agent is the first
|
||||
* multiplayer peer. Top-level types (`nodes`/`links`/`reroutes`) mirror the live
|
||||
* layout store so a room can drive the real canvas via binary updates
|
||||
* (see `layoutStoreBinding.ts`). Conflict resolution is Yjs's job — no version
|
||||
* CAS at this layer.
|
||||
*/
|
||||
import * as Y from 'yjs'
|
||||
|
||||
export type WorkflowId = string
|
||||
export type ActorId = string
|
||||
export type UpdateOrigin = unknown
|
||||
|
||||
export interface RoomPresence {
|
||||
actor: ActorId
|
||||
kind: 'user' | 'agent'
|
||||
status: 'idle' | 'editing'
|
||||
focus: string[]
|
||||
updatedAt: number
|
||||
}
|
||||
|
||||
const PRESENCE_KEY = '__presence'
|
||||
|
||||
export class AgentRoom {
|
||||
readonly workflowId: WorkflowId
|
||||
readonly doc: Y.Doc
|
||||
readonly nodes: Y.Map<unknown>
|
||||
readonly links: Y.Map<unknown>
|
||||
readonly reroutes: Y.Map<unknown>
|
||||
|
||||
private readonly presence: Y.Map<RoomPresence>
|
||||
|
||||
constructor(workflowId: WorkflowId, doc: Y.Doc = new Y.Doc()) {
|
||||
this.workflowId = workflowId
|
||||
this.doc = doc
|
||||
this.nodes = doc.getMap('nodes')
|
||||
this.links = doc.getMap('links')
|
||||
this.reroutes = doc.getMap('reroutes')
|
||||
this.presence = doc.getMap(PRESENCE_KEY)
|
||||
}
|
||||
|
||||
/** State vector describing what this peer already has (sync step 1). */
|
||||
encodeStateVector(): Uint8Array {
|
||||
return Y.encodeStateVector(this.doc)
|
||||
}
|
||||
|
||||
/** Minimal update a peer needs given its state vector (sync step 2). */
|
||||
diffSince(remoteStateVector: Uint8Array): Uint8Array {
|
||||
return Y.encodeStateAsUpdate(this.doc, remoteStateVector)
|
||||
}
|
||||
|
||||
/** Full state as a single update, used to seed a fresh peer. */
|
||||
encodeState(): Uint8Array {
|
||||
return Y.encodeStateAsUpdate(this.doc)
|
||||
}
|
||||
|
||||
applyRemoteUpdate(update: Uint8Array, origin?: UpdateOrigin): void {
|
||||
Y.applyUpdate(this.doc, update, origin)
|
||||
}
|
||||
|
||||
onUpdate(cb: (update: Uint8Array, origin: UpdateOrigin) => void): () => void {
|
||||
const handler = (update: Uint8Array, origin: UpdateOrigin) =>
|
||||
cb(update, origin)
|
||||
this.doc.on('update', handler)
|
||||
return () => this.doc.off('update', handler)
|
||||
}
|
||||
|
||||
setPresence(p: RoomPresence): void {
|
||||
this.presence.set(p.actor, p)
|
||||
}
|
||||
|
||||
clearPresence(actor: ActorId): void {
|
||||
this.presence.delete(actor)
|
||||
}
|
||||
|
||||
getPresence(): RoomPresence[] {
|
||||
return [...this.presence.values()]
|
||||
}
|
||||
|
||||
isAgentEditing(): boolean {
|
||||
return this.getPresence().some(
|
||||
(p) => p.kind === 'agent' && p.status === 'editing'
|
||||
)
|
||||
}
|
||||
|
||||
onPresenceChange(cb: (presence: RoomPresence[]) => void): () => void {
|
||||
const handler = () => cb(this.getPresence())
|
||||
this.presence.observe(handler)
|
||||
return () => this.presence.unobserve(handler)
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
this.doc.destroy()
|
||||
}
|
||||
}
|
||||
31
src/platform/agent/crdt/agentRoomManager.test.ts
Normal file
31
src/platform/agent/crdt/agentRoomManager.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import { AgentRoomManager } from './agentRoomManager'
|
||||
|
||||
describe('AgentRoomManager', () => {
|
||||
it('keeps one room alive across overlapping tab references', () => {
|
||||
const manager = new AgentRoomManager()
|
||||
const first = manager.join('wf-1')
|
||||
const second = manager.join('wf-1')
|
||||
|
||||
expect(second).toBe(first)
|
||||
|
||||
manager.leave('wf-1')
|
||||
expect(manager.has('wf-1')).toBe(true)
|
||||
|
||||
manager.leave('wf-1')
|
||||
expect(manager.has('wf-1')).toBe(false)
|
||||
})
|
||||
|
||||
it('keeps a pinned room alive with zero open tabs and reaps on unpin', () => {
|
||||
const manager = new AgentRoomManager()
|
||||
manager.join('wf-1')
|
||||
manager.pin('wf-1')
|
||||
|
||||
manager.leave('wf-1')
|
||||
expect(manager.has('wf-1')).toBe(true)
|
||||
|
||||
manager.unpin('wf-1')
|
||||
expect(manager.has('wf-1')).toBe(false)
|
||||
})
|
||||
})
|
||||
71
src/platform/agent/crdt/agentRoomManager.ts
Normal file
71
src/platform/agent/crdt/agentRoomManager.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Agent room manager (prototype — ADR-0011).
|
||||
*
|
||||
* Owns room lifecycle. A tab switch is a `join`/`leave`; a room stays alive in
|
||||
* memory while any tab references it, so the agent can keep applying edits to a
|
||||
* backgrounded workflow and the user sees them lazily on return (the scenario
|
||||
* from the design meeting). `pin` keeps a room alive with zero open tabs while
|
||||
* the agent is mid-edit; the room is torn down only when unreferenced and
|
||||
* unpinned.
|
||||
*/
|
||||
import { AgentRoom } from './agentRoom'
|
||||
import type { WorkflowId } from './agentRoom'
|
||||
|
||||
interface RoomEntry {
|
||||
room: AgentRoom
|
||||
refs: number
|
||||
pinned: boolean
|
||||
}
|
||||
|
||||
export class AgentRoomManager {
|
||||
private readonly entries = new Map<WorkflowId, RoomEntry>()
|
||||
|
||||
join(workflowId: WorkflowId): AgentRoom {
|
||||
const existing = this.entries.get(workflowId)
|
||||
if (existing) {
|
||||
existing.refs += 1
|
||||
return existing.room
|
||||
}
|
||||
const room = new AgentRoom(workflowId)
|
||||
this.entries.set(workflowId, { room, refs: 1, pinned: false })
|
||||
return room
|
||||
}
|
||||
|
||||
leave(workflowId: WorkflowId): void {
|
||||
const entry = this.entries.get(workflowId)
|
||||
if (!entry) return
|
||||
entry.refs = Math.max(0, entry.refs - 1)
|
||||
this.reapIfIdle(workflowId, entry)
|
||||
}
|
||||
|
||||
pin(workflowId: WorkflowId): void {
|
||||
const entry = this.entries.get(workflowId)
|
||||
if (entry) entry.pinned = true
|
||||
}
|
||||
|
||||
unpin(workflowId: WorkflowId): void {
|
||||
const entry = this.entries.get(workflowId)
|
||||
if (!entry) return
|
||||
entry.pinned = false
|
||||
this.reapIfIdle(workflowId, entry)
|
||||
}
|
||||
|
||||
get(workflowId: WorkflowId): AgentRoom | undefined {
|
||||
return this.entries.get(workflowId)?.room
|
||||
}
|
||||
|
||||
has(workflowId: WorkflowId): boolean {
|
||||
return this.entries.has(workflowId)
|
||||
}
|
||||
|
||||
get size(): number {
|
||||
return this.entries.size
|
||||
}
|
||||
|
||||
private reapIfIdle(workflowId: WorkflowId, entry: RoomEntry): void {
|
||||
if (entry.refs === 0 && !entry.pinned) {
|
||||
entry.room.destroy()
|
||||
this.entries.delete(workflowId)
|
||||
}
|
||||
}
|
||||
}
|
||||
33
src/platform/agent/crdt/roomDocBinding.test.ts
Normal file
33
src/platform/agent/crdt/roomDocBinding.test.ts
Normal file
@@ -0,0 +1,33 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import * as Y from 'yjs'
|
||||
|
||||
import { AgentRoom } from './agentRoom'
|
||||
import { bindRoomToDoc } from './roomDocBinding'
|
||||
|
||||
describe('bindRoomToDoc', () => {
|
||||
it('seeds the room from the doc and pushes agent edits back', () => {
|
||||
const doc = new Y.Doc()
|
||||
doc.getMap('nodes').set('existing', { title: 'Existing' })
|
||||
|
||||
const room = new AgentRoom('wf-1')
|
||||
const unbind = bindRoomToDoc(room, doc)
|
||||
|
||||
expect(room.nodes.get('existing')).toEqual({ title: 'Existing' })
|
||||
|
||||
room.nodes.set('agent', { title: 'Agent Node' })
|
||||
expect(doc.getMap('nodes').get('agent')).toEqual({ title: 'Agent Node' })
|
||||
|
||||
unbind()
|
||||
room.nodes.set('after-unbind', {})
|
||||
expect(doc.getMap('nodes').has('after-unbind')).toBe(false)
|
||||
})
|
||||
|
||||
it('reflects doc edits into the room while bound', () => {
|
||||
const doc = new Y.Doc()
|
||||
const room = new AgentRoom('wf-1')
|
||||
bindRoomToDoc(room, doc)
|
||||
|
||||
doc.getMap('nodes').set('from-canvas', { title: 'Canvas' })
|
||||
expect(room.nodes.get('from-canvas')).toEqual({ title: 'Canvas' })
|
||||
})
|
||||
})
|
||||
35
src/platform/agent/crdt/roomDocBinding.ts
Normal file
35
src/platform/agent/crdt/roomDocBinding.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Binds an agent room to a Yjs document (prototype — ADR-0011).
|
||||
*
|
||||
* Generic, layer-safe core of the canvas binding: it works on any `Y.Doc`, so
|
||||
* it stays in the platform layer with no renderer dependency. The renderer-layer
|
||||
* wrapper (`renderer/core/layout/agent/bindRoomToLayoutStore.ts`) supplies the
|
||||
* live layout store's doc. Bidirectional Yjs sync with origin tags prevents echo
|
||||
* loops; Yjs reconciles concurrent edits.
|
||||
*/
|
||||
import * as Y from 'yjs'
|
||||
|
||||
import type { AgentRoom } from './agentRoom'
|
||||
|
||||
const AGENT_ORIGIN = Symbol('agent-room->doc')
|
||||
const DOC_ORIGIN = Symbol('doc->agent-room')
|
||||
|
||||
export function bindRoomToDoc(room: AgentRoom, doc: Y.Doc): () => void {
|
||||
Y.applyUpdate(room.doc, Y.encodeStateAsUpdate(doc), DOC_ORIGIN)
|
||||
Y.applyUpdate(doc, Y.encodeStateAsUpdate(room.doc), AGENT_ORIGIN)
|
||||
|
||||
const onRoom = (update: Uint8Array, origin: unknown) => {
|
||||
if (origin !== DOC_ORIGIN) Y.applyUpdate(doc, update, AGENT_ORIGIN)
|
||||
}
|
||||
const onDoc = (update: Uint8Array, origin: unknown) => {
|
||||
if (origin !== AGENT_ORIGIN) Y.applyUpdate(room.doc, update, DOC_ORIGIN)
|
||||
}
|
||||
|
||||
room.doc.on('update', onRoom)
|
||||
doc.on('update', onDoc)
|
||||
|
||||
return () => {
|
||||
room.doc.off('update', onRoom)
|
||||
doc.off('update', onDoc)
|
||||
}
|
||||
}
|
||||
59
src/platform/agent/crdt/roomSync.test.ts
Normal file
59
src/platform/agent/crdt/roomSync.test.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import { AgentRoom } from './agentRoom'
|
||||
import type { RoomSyncMessage, RoomTransport } from './roomSync'
|
||||
import { syncRoom } from './roomSync'
|
||||
|
||||
class LoopbackTransport implements RoomTransport {
|
||||
private listeners = new Set<(m: RoomSyncMessage) => void>()
|
||||
peer: LoopbackTransport | null = null
|
||||
|
||||
send(message: RoomSyncMessage): void {
|
||||
this.peer?.listeners.forEach((cb) => cb(message))
|
||||
}
|
||||
|
||||
onMessage(cb: (m: RoomSyncMessage) => void): () => void {
|
||||
this.listeners.add(cb)
|
||||
return () => this.listeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
function connect() {
|
||||
const a = new LoopbackTransport()
|
||||
const b = new LoopbackTransport()
|
||||
a.peer = b
|
||||
b.peer = a
|
||||
return [a, b] as const
|
||||
}
|
||||
|
||||
describe('syncRoom', () => {
|
||||
it('converges existing state on connect via the sync handshake', () => {
|
||||
const [t1, t2] = connect()
|
||||
const user = new AgentRoom('wf-1')
|
||||
const agent = new AgentRoom('wf-1')
|
||||
user.nodes.set('a', { title: 'Load Checkpoint' })
|
||||
|
||||
syncRoom(user, t1)
|
||||
syncRoom(agent, t2)
|
||||
|
||||
expect(agent.nodes.get('a')).toEqual({ title: 'Load Checkpoint' })
|
||||
})
|
||||
|
||||
it('propagates live edits and ignores other workflows', () => {
|
||||
const [t1, t2] = connect()
|
||||
const user = new AgentRoom('wf-1')
|
||||
const agent = new AgentRoom('wf-1')
|
||||
syncRoom(user, t1)
|
||||
syncRoom(agent, t2)
|
||||
|
||||
agent.nodes.set('b', { title: 'KSampler' })
|
||||
expect(user.nodes.get('b')).toEqual({ title: 'KSampler' })
|
||||
|
||||
t2.send({
|
||||
type: 'update',
|
||||
workflowId: 'other',
|
||||
update: agent.encodeState()
|
||||
})
|
||||
expect(user.nodes.has('b')).toBe(true)
|
||||
})
|
||||
})
|
||||
63
src/platform/agent/crdt/roomSync.ts
Normal file
63
src/platform/agent/crdt/roomSync.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Room sync protocol (prototype — ADR-0011).
|
||||
*
|
||||
* Transport-agnostic two-phase Yjs sync, modelled on `y-protocols/sync`:
|
||||
* step 1: a peer announces its state vector
|
||||
* step 2: the other peer replies with the diff that vector is missing
|
||||
* update: incremental updates are broadcast as they happen
|
||||
*
|
||||
* The transport is injected so this works over the existing Redis→WebSocket
|
||||
* bridge in V0 and a dedicated relay later. Updates applied from the transport
|
||||
* carry the `REMOTE_ORIGIN` tag so they are not echoed back.
|
||||
*/
|
||||
import type { AgentRoom } from './agentRoom'
|
||||
|
||||
export type RoomSyncMessage =
|
||||
| { type: 'sync-step-1'; workflowId: string; stateVector: Uint8Array }
|
||||
| { type: 'sync-step-2'; workflowId: string; update: Uint8Array }
|
||||
| { type: 'update'; workflowId: string; update: Uint8Array }
|
||||
|
||||
export interface RoomTransport {
|
||||
send(message: RoomSyncMessage): void
|
||||
onMessage(cb: (message: RoomSyncMessage) => void): () => void
|
||||
}
|
||||
|
||||
const REMOTE_ORIGIN = Symbol('agent-room-remote')
|
||||
|
||||
export function syncRoom(
|
||||
room: AgentRoom,
|
||||
transport: RoomTransport
|
||||
): () => void {
|
||||
const offUpdate = room.onUpdate((update, origin) => {
|
||||
if (origin === REMOTE_ORIGIN) return
|
||||
transport.send({ type: 'update', workflowId: room.workflowId, update })
|
||||
})
|
||||
|
||||
const offMessage = transport.onMessage((message) => {
|
||||
if (message.workflowId !== room.workflowId) return
|
||||
switch (message.type) {
|
||||
case 'sync-step-1':
|
||||
transport.send({
|
||||
type: 'sync-step-2',
|
||||
workflowId: room.workflowId,
|
||||
update: room.diffSince(message.stateVector)
|
||||
})
|
||||
return
|
||||
case 'sync-step-2':
|
||||
case 'update':
|
||||
room.applyRemoteUpdate(message.update, REMOTE_ORIGIN)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
transport.send({
|
||||
type: 'sync-step-1',
|
||||
workflowId: room.workflowId,
|
||||
stateVector: room.encodeStateVector()
|
||||
})
|
||||
|
||||
return () => {
|
||||
offUpdate()
|
||||
offMessage()
|
||||
}
|
||||
}
|
||||
102
src/platform/agent/session/agentSessionStore.test.ts
Normal file
102
src/platform/agent/session/agentSessionStore.test.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import type { AgentEvent } from '../common/agentProtocol'
|
||||
import { createSessionState, sessionReducer } from './agentSessionStore'
|
||||
import type { SessionState } from './agentSessionStore'
|
||||
|
||||
function reduce(state: SessionState, events: AgentEvent[]): SessionState {
|
||||
return events.reduce(
|
||||
(acc, event) => sessionReducer(acc, { type: 'agent-event', event }),
|
||||
state
|
||||
)
|
||||
}
|
||||
|
||||
describe('sessionReducer', () => {
|
||||
it('accumulates streaming deltas into one agent message', () => {
|
||||
const start = sessionReducer(createSessionState('t-1'), {
|
||||
type: 'user-send',
|
||||
id: 'u-1',
|
||||
content: 'hello'
|
||||
})
|
||||
|
||||
const next = reduce(start, [
|
||||
{
|
||||
type: 'agent_message_delta',
|
||||
threadId: 't-1',
|
||||
messageId: 'm-1',
|
||||
delta: 'Hi '
|
||||
},
|
||||
{
|
||||
type: 'agent_message_delta',
|
||||
threadId: 't-1',
|
||||
messageId: 'm-1',
|
||||
delta: 'there'
|
||||
}
|
||||
])
|
||||
|
||||
expect(next.messages).toHaveLength(2)
|
||||
expect(next.messages[1]).toMatchObject({
|
||||
role: 'agent',
|
||||
content: 'Hi there',
|
||||
streaming: true
|
||||
})
|
||||
expect(next.status).toBe('streaming')
|
||||
})
|
||||
|
||||
it('does not mutate the previous state (structural sharing)', () => {
|
||||
const before = createSessionState('t-1')
|
||||
const after = sessionReducer(before, {
|
||||
type: 'user-send',
|
||||
id: 'u-1',
|
||||
content: 'hello'
|
||||
})
|
||||
|
||||
expect(before.messages).toHaveLength(0)
|
||||
expect(after.messages).toHaveLength(1)
|
||||
expect(after).not.toBe(before)
|
||||
})
|
||||
|
||||
it('tracks tool-call lifecycle and surfaces errors', () => {
|
||||
const next = reduce(createSessionState('t-1'), [
|
||||
{
|
||||
type: 'agent_tool_call',
|
||||
threadId: 't-1',
|
||||
messageId: 'm-1',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'load_graph',
|
||||
status: 'running'
|
||||
},
|
||||
{
|
||||
type: 'agent_tool_call',
|
||||
threadId: 't-1',
|
||||
messageId: 'm-1',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'load_graph',
|
||||
status: 'error',
|
||||
errorCode: 'BAD_GRAPH'
|
||||
}
|
||||
])
|
||||
|
||||
expect(next.messages[0].toolCalls).toHaveLength(1)
|
||||
expect(next.messages[0].toolCalls[0]).toMatchObject({
|
||||
status: 'error',
|
||||
errorCode: 'BAD_GRAPH'
|
||||
})
|
||||
expect(next.status).toBe('error')
|
||||
})
|
||||
|
||||
it('ends streaming on done', () => {
|
||||
const next = reduce(createSessionState('t-1'), [
|
||||
{
|
||||
type: 'agent_message_delta',
|
||||
threadId: 't-1',
|
||||
messageId: 'm-1',
|
||||
delta: 'x'
|
||||
},
|
||||
{ type: 'agent_message_done', threadId: 't-1', messageId: 'm-1' }
|
||||
])
|
||||
|
||||
expect(next.messages[0].streaming).toBe(false)
|
||||
expect(next.status).toBe('idle')
|
||||
})
|
||||
})
|
||||
110
src/platform/agent/session/agentSessionStore.ts
Normal file
110
src/platform/agent/session/agentSessionStore.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Agent chat session state (prototype — ADR-0011).
|
||||
*
|
||||
* Immer-backed reducer for the *local, single-client* chat surface: streaming
|
||||
* message deltas, tool-call lifecycle, run status. This is deliberately not a
|
||||
* CRDT — chat is owned by one browser tab, so structural-sharing immutable
|
||||
* updates (Immer) are the right tool. Graph state lives in the Yjs room layer
|
||||
* (`../crdt`); the two never mix.
|
||||
*/
|
||||
import { produce } from 'immer'
|
||||
|
||||
import type { AgentEvent, MessageId, ThreadId } from '../common/agentProtocol'
|
||||
|
||||
interface ToolCallView {
|
||||
id: string
|
||||
name: string
|
||||
status: 'running' | 'success' | 'error'
|
||||
durationMs?: number
|
||||
errorCode?: string
|
||||
}
|
||||
|
||||
interface ChatMessage {
|
||||
id: MessageId
|
||||
role: 'user' | 'agent'
|
||||
content: string
|
||||
streaming: boolean
|
||||
toolCalls: ToolCallView[]
|
||||
}
|
||||
|
||||
export interface SessionState {
|
||||
threadId: ThreadId
|
||||
messages: ChatMessage[]
|
||||
status: 'idle' | 'streaming' | 'error'
|
||||
}
|
||||
|
||||
export type SessionAction =
|
||||
| { type: 'user-send'; id: MessageId; content: string }
|
||||
| { type: 'agent-event'; event: AgentEvent }
|
||||
|
||||
export function createSessionState(threadId: ThreadId): SessionState {
|
||||
return { threadId, messages: [], status: 'idle' }
|
||||
}
|
||||
|
||||
function ensureAgentMessage(state: SessionState, id: MessageId): ChatMessage {
|
||||
const existing = state.messages.find((m) => m.id === id)
|
||||
if (existing) return existing
|
||||
const created: ChatMessage = {
|
||||
id,
|
||||
role: 'agent',
|
||||
content: '',
|
||||
streaming: true,
|
||||
toolCalls: []
|
||||
}
|
||||
state.messages.push(created)
|
||||
return created
|
||||
}
|
||||
|
||||
function applyAgentEvent(state: SessionState, event: AgentEvent): void {
|
||||
switch (event.type) {
|
||||
case 'agent_message_delta': {
|
||||
const message = ensureAgentMessage(state, event.messageId)
|
||||
message.content += event.delta
|
||||
message.streaming = true
|
||||
state.status = 'streaming'
|
||||
return
|
||||
}
|
||||
case 'agent_tool_call': {
|
||||
const message = ensureAgentMessage(state, event.messageId)
|
||||
const existing = message.toolCalls.find((t) => t.id === event.toolCallId)
|
||||
const view: ToolCallView = {
|
||||
id: event.toolCallId,
|
||||
name: event.toolName,
|
||||
status: event.status,
|
||||
durationMs: event.durationMs,
|
||||
errorCode: event.errorCode
|
||||
}
|
||||
if (existing) Object.assign(existing, view)
|
||||
else message.toolCalls.push(view)
|
||||
if (event.status === 'error') state.status = 'error'
|
||||
return
|
||||
}
|
||||
case 'agent_message_done': {
|
||||
const message = ensureAgentMessage(state, event.messageId)
|
||||
message.streaming = false
|
||||
if (state.status !== 'error') state.status = 'idle'
|
||||
return
|
||||
}
|
||||
case 'draft_patch':
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
export function sessionReducer(
|
||||
state: SessionState,
|
||||
action: SessionAction
|
||||
): SessionState {
|
||||
return produce(state, (draft) => {
|
||||
if (action.type === 'user-send') {
|
||||
draft.messages.push({
|
||||
id: action.id,
|
||||
role: 'user',
|
||||
content: action.content,
|
||||
streaming: false,
|
||||
toolCalls: []
|
||||
})
|
||||
return
|
||||
}
|
||||
applyAgentEvent(draft, action.event)
|
||||
})
|
||||
}
|
||||
26
src/renderer/core/layout/agent/bindRoomToLayoutStore.test.ts
Normal file
26
src/renderer/core/layout/agent/bindRoomToLayoutStore.test.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
|
||||
import { AgentRoom } from '@/platform/agent/crdt/agentRoom'
|
||||
|
||||
import { layoutStore } from '@/renderer/core/layout/store/layoutStore'
|
||||
|
||||
import { bindRoomToLayoutStore } from './bindRoomToLayoutStore'
|
||||
|
||||
describe('bindRoomToLayoutStore', () => {
|
||||
it('propagates an agent room edit into the real layout store', () => {
|
||||
const room = new AgentRoom('wf-1')
|
||||
const storeNodes = layoutStore.getYDoc().getMap('nodes')
|
||||
const probeId = `agent-probe-${Math.random().toString(36).slice(2)}`
|
||||
const versionBefore = layoutStore.getVersion().value
|
||||
|
||||
const unbind = bindRoomToLayoutStore(room)
|
||||
room.nodes.set(probeId, { title: 'Agent Node' })
|
||||
|
||||
expect(storeNodes.get(probeId)).toEqual({ title: 'Agent Node' })
|
||||
expect(layoutStore.getVersion().value).toBeGreaterThan(versionBefore)
|
||||
|
||||
storeNodes.delete(probeId)
|
||||
unbind()
|
||||
room.destroy()
|
||||
})
|
||||
})
|
||||
17
src/renderer/core/layout/agent/bindRoomToLayoutStore.ts
Normal file
17
src/renderer/core/layout/agent/bindRoomToLayoutStore.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
/**
|
||||
* Wires an agent room to the live layout store (prototype — ADR-0011).
|
||||
*
|
||||
* The renderer-layer entry point: it owns the dependency on the layout store
|
||||
* singleton (platform code may not import renderer) and delegates the actual
|
||||
* sync to the generic, layer-safe `bindRoomToDoc`. This is the "few lines"
|
||||
* the design meeting referenced — the store already exposes the Yjs peer
|
||||
* surface (`getYDoc`, marked "future feature") and the agent is that peer.
|
||||
*/
|
||||
import type { AgentRoom } from '@/platform/agent/crdt/agentRoom'
|
||||
import { bindRoomToDoc } from '@/platform/agent/crdt/roomDocBinding'
|
||||
|
||||
import { layoutStore } from '@/renderer/core/layout/store/layoutStore'
|
||||
|
||||
export function bindRoomToLayoutStore(room: AgentRoom): () => void {
|
||||
return bindRoomToDoc(room, layoutStore.getYDoc())
|
||||
}
|
||||
Reference in New Issue
Block a user