diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 63ee2dc8..04898aed 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -514,6 +514,23 @@ Notes: - `bun run --cwd ts/packages/cli test -- src/__tests__/library.test.ts` - `cd ts && bun run check:tsgo` +### 2026-06-04: Streaming ReAct Parser Match Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/agent/react/parser.ts` now uses `effect/Match` for + ReAct state emission instead of a native `switch`. + - Marker states now use an inferred `MarkerState` type instead of per-entry + `as ReActState` assertions. + - Flush now processes an unterminated final buffer line through marker + detection, fixing `Final Answer:` chunks that arrive without a trailing + newline. + - Parser tests cover split markers, pre-marker thought fallback, + continuations, action input, and final-answer routing. +- Verification: + - `bun run --cwd ts/packages/flow test -- src/__tests__/agent-parser.test.ts` + - `cd ts && bun run check:tsgo` + ### 2026-06-02: RAG And Agent Requestor Bridge Slice - Status: migrated, root-verified, committed, and pushed. diff --git a/ts/packages/flow/src/__tests__/agent-parser.test.ts b/ts/packages/flow/src/__tests__/agent-parser.test.ts new file mode 100644 index 00000000..801a15d9 --- /dev/null +++ b/ts/packages/flow/src/__tests__/agent-parser.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from "vitest"; +import { makeStreamingReActParser } from "../agent/react/parser.js"; + +describe("StreamingReActParser", () => { + it("routes split marker content through the Match-backed emitter", () => { + const thought: string[] = []; + const action: string[] = []; + const actionInput: string[] = []; + const finalAnswer: string[] = []; + const parser = makeStreamingReActParser( + (text) => thought.push(text), + (text) => action.push(text), + (text) => actionInput.push(text), + (text) => finalAnswer.push(text), + ); + + parser.feed("Tho"); + expect(thought).toEqual([]); + + parser.feed("ught: plan\nAction: Search\nAction Input: {\"query\":\"alpha\"}\n"); + parser.feed("Final Answer: done"); + parser.flush(); + + expect(thought).toEqual(["plan"]); + expect(action).toEqual(["Search"]); + expect(actionInput).toEqual(["{\"query\":\"alpha\"}"]); + expect(finalAnswer).toEqual(["done"]); + }); + + it("treats pre-marker content as thought and routes continuations", () => { + const events: Array = []; + const parser = makeStreamingReActParser( + (text) => events.push(["thought", text]), + (text) => events.push(["action", text]), + (text) => events.push(["action_input", text]), + (text) => events.push(["final_answer", text]), + ); + + parser.feed("opening thought\n"); + parser.feed("continued thought\n"); + parser.feed("Action: Lookup\n"); + parser.feed("more lookup words\n"); + parser.feed("Action Input: first\n"); + parser.feed("second\n"); + parser.flush(); + + expect(events).toEqual([ + ["thought", "opening thought"], + ["thought", "continued thought"], + ["action", "Lookup"], + ["action", "more lookup words"], + ["action_input", "first"], + ["action_input", "second"], + ]); + }); +}); diff --git a/ts/packages/flow/src/agent/react/parser.ts b/ts/packages/flow/src/agent/react/parser.ts index 01f4b9ab..b1879d2b 100644 --- a/ts/packages/flow/src/agent/react/parser.ts +++ b/ts/packages/flow/src/agent/react/parser.ts @@ -10,13 +10,19 @@ * Handles markers split across chunks by buffering lines. */ +import { Match } from "effect"; import type { ReActState } from "./types.js"; -const MARKERS = [ - { prefix: "Thought:", state: "thought" as ReActState }, - { prefix: "Action Input:", state: "action_input" as ReActState }, - { prefix: "Action:", state: "action" as ReActState }, - { prefix: "Final Answer:", state: "final_answer" as ReActState }, +type MarkerState = Exclude; + +const MARKERS: ReadonlyArray<{ + readonly prefix: string; + readonly state: MarkerState; +}> = [ + { prefix: "Thought:", state: "thought" }, + { prefix: "Action Input:", state: "action_input" }, + { prefix: "Action:", state: "action" }, + { prefix: "Final Answer:", state: "final_answer" }, ]; // Longest marker prefix for partial-match detection @@ -39,27 +45,27 @@ export function makeStreamingReActParser( const emitContent = (content: string): void => { if (content.length === 0) return; - switch (state) { - case "thought": + Match.value(state).pipe( + Match.when("thought", () => { onThought(content); - break; - case "action": + }), + Match.when("action", () => { onAction(content); - break; - case "action_input": + }), + Match.when("action_input", () => { onActionInput(content); - break; - case "final_answer": + }), + Match.when("final_answer", () => { onFinalAnswer(content); - break; - case "initial": + }), + Match.when("initial", () => { // Content before any marker -- treat as thought state = "thought"; onThought(content); - break; - case "complete": - break; - } + }), + Match.when("complete", () => undefined), + Match.exhaustive, + ); }; const processLine = (line: string): void => { @@ -86,6 +92,11 @@ export function makeStreamingReActParser( while (true) { const newlineIdx = buffer.indexOf("\n"); if (newlineIdx === -1) { + if (isFinal && buffer.length > 0) { + const line = buffer; + buffer = ""; + processLine(line); + } // No complete line yet. // If not final, check for partial marker match at the end and wait. if (!isFinal) {