diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 067c2c2e..4800686b 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -420,6 +420,22 @@ Notes: - `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts` - `cd ts && bun run check:tsgo` +### 2026-06-04: Client Callback Match Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/client/src/socket/trustgraph-socket.ts` now maps RPC + connection status with `effect/Match` instead of a native `switch`. + - Flow agent streaming chunk callbacks now use `effect/Match` for + `thought`, `observation`, `answer`, `final-answer`, and `action`, while + preserving ignored behavior for unknown chunk types with `Match.orElse`. + - Client RPC tests now drive agent stream chunks through the fake RPC stream + to prove callback dispatch, ignored fallback behavior, completion signals, + and metadata forwarding. +- Verification: + - `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts` + - `cd ts && bun run check:tsgo` + ### 2026-06-04: Gateway Term Service HashSet Slice - Status: migrated and package-verified. diff --git a/ts/packages/client/src/__tests__/rpc-timeout.test.ts b/ts/packages/client/src/__tests__/rpc-timeout.test.ts index 045de42a..fcf160eb 100644 --- a/ts/packages/client/src/__tests__/rpc-timeout.test.ts +++ b/ts/packages/client/src/__tests__/rpc-timeout.test.ts @@ -173,4 +173,86 @@ describe("Effect RPC request policy", () => { }, ]); }); + + it("dispatches agent stream chunk types through the Match-backed callback mapper", async () => { + const dispatchStream = vi.fn((_input: DispatchInput, receiver: (chunk: DispatchStreamChunk) => boolean) => { + const ignoredComplete = receiver(DispatchStreamChunk.make({ + response: { chunk_type: "ignored", content: "skip" }, + complete: false, + })); + const thoughtComplete = receiver(DispatchStreamChunk.make({ + response: { chunk_type: "thought", content: "plan", end_of_message: true }, + complete: false, + })); + const observationComplete = receiver(DispatchStreamChunk.make({ + response: { chunk_type: "observation", content: "facts", end_of_message: true }, + complete: false, + })); + const actionComplete = receiver(DispatchStreamChunk.make({ + response: { chunk_type: "action", content: "lookup" }, + complete: false, + })); + const answerComplete = receiver(DispatchStreamChunk.make({ + response: { + chunk_type: "final-answer", + content: "done", + end_of_message: true, + end_of_dialog: true, + in_token: 3, + out_token: 5, + model: "agent-model", + }, + complete: true, + })); + + expect(ignoredComplete).toBe(false); + expect(thoughtComplete).toBe(false); + expect(observationComplete).toBe(false); + expect(actionComplete).toBe(false); + expect(answerComplete).toBe(true); + + return Promise.resolve( + DispatchStreamChunk.make({ + response: { response: "done" }, + complete: true, + }), + ); + }); + const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", { + dispatch: vi.fn(() => Promise.resolve({ ok: true })), + dispatchStream, + close: vi.fn(() => Promise.resolve()), + subscribe: vi.fn(() => () => {}), + }); + const think = vi.fn(); + const observe = vi.fn(); + const answer = vi.fn(); + const onError = vi.fn(); + + await api.flow("flow-a").agent("hello", think, observe, answer, onError); + + expect(dispatchStream).toHaveBeenCalledWith( + { + scope: "flow", + service: "agent", + flow: "flow-a", + request: { + question: "hello", + user: "alice", + collection: "default", + streaming: true, + }, + }, + expect.any(Function), + { timeoutMs: 120000, retries: 2 }, + ); + expect(think).toHaveBeenCalledWith("plan", true, undefined); + expect(observe).toHaveBeenCalledWith("facts", true, undefined); + expect(answer).toHaveBeenCalledWith( + "done", + true, + { in_token: 3, out_token: 5, model: "agent-model" }, + ); + expect(onError).not.toHaveBeenCalled(); + }); }); diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index 47318471..e8a8a5b4 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -8,7 +8,7 @@ import { makeEffectRpcClient, } from "./effect-rpc-client.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; -import { Clock, Effect, Fiber, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect"; +import { Clock, Effect, Fiber, Match, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect"; import * as Predicate from "effect/Predicate"; // Import all message types for different services @@ -688,18 +688,18 @@ export function makeBaseApi( Effect.runSync(SubscriptionRef.set(connectionStateRef, getConnectionState())); }; - const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] => { - switch (rpcState.status) { - case "connected": - return hasApiKey ? "authenticated" : "unauthenticated"; - case "failed": - return "failed"; - case "closed": - return "failed"; - case "connecting": - return lastError === undefined ? "connecting" : "reconnecting"; - } - }; + const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] => + Match.value(rpcState.status).pipe( + Match.when("connected", (): ConnectionState["status"] => + hasApiKey ? "authenticated" : "unauthenticated" + ), + Match.when("failed", (): ConnectionState["status"] => "failed"), + Match.when("closed", (): ConnectionState["status"] => "failed"), + Match.when("connecting", (): ConnectionState["status"] => + lastError === undefined ? "connecting" : "reconnecting" + ), + Match.exhaustive, + ); const dispatchInput = ( service: string, @@ -1571,22 +1571,14 @@ export function makeFlowApi(api: BaseApi, flowId: string) { // Extract metadata from final message const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined; - switch (chunkType) { - case "thought": - think(content, messageComplete, metadata); - break; - case "observation": - observe(content, messageComplete, metadata); - break; - case "answer": - case "final-answer": - answer(content, messageComplete, metadata); - break; - case "action": - // Actions are typically not streamed incrementally, just logged - logClientInfo(`Agent action: ${content}`); - break; - } + Match.value(chunkType).pipe( + Match.when("thought", () => think(content, messageComplete, metadata)), + Match.when("observation", () => observe(content, messageComplete, metadata)), + Match.when("answer", () => answer(content, messageComplete, metadata)), + Match.when("final-answer", () => answer(content, messageComplete, metadata)), + Match.when("action", () => logClientInfo(`Agent action: ${content}`)), + Match.orElse(() => undefined), + ); return dialogComplete; // End when backend signals complete };