Use Match for streaming agent parser

This commit is contained in:
elpresidank 2026-06-04 06:03:14 -05:00
parent a0d98a573b
commit dfc79bb050
3 changed files with 103 additions and 19 deletions

View file

@ -514,6 +514,23 @@ Notes:
- `bun run --cwd ts/packages/cli test -- src/__tests__/library.test.ts`
- `cd ts && bun run check:tsgo`
### 2026-06-04: Streaming ReAct Parser Match Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/agent/react/parser.ts` now uses `effect/Match` for
ReAct state emission instead of a native `switch`.
- Marker states now use an inferred `MarkerState` type instead of per-entry
`as ReActState` assertions.
- Flush now processes an unterminated final buffer line through marker
detection, fixing `Final Answer:` chunks that arrive without a trailing
newline.
- Parser tests cover split markers, pre-marker thought fallback,
continuations, action input, and final-answer routing.
- Verification:
- `bun run --cwd ts/packages/flow test -- src/__tests__/agent-parser.test.ts`
- `cd ts && bun run check:tsgo`
### 2026-06-02: RAG And Agent Requestor Bridge Slice
- Status: migrated, root-verified, committed, and pushed.

View file

@ -0,0 +1,56 @@
import { describe, expect, it } from "vitest";
import { makeStreamingReActParser } from "../agent/react/parser.js";
describe("StreamingReActParser", () => {
it("routes split marker content through the Match-backed emitter", () => {
const thought: string[] = [];
const action: string[] = [];
const actionInput: string[] = [];
const finalAnswer: string[] = [];
const parser = makeStreamingReActParser(
(text) => thought.push(text),
(text) => action.push(text),
(text) => actionInput.push(text),
(text) => finalAnswer.push(text),
);
parser.feed("Tho");
expect(thought).toEqual([]);
parser.feed("ught: plan\nAction: Search\nAction Input: {\"query\":\"alpha\"}\n");
parser.feed("Final Answer: done");
parser.flush();
expect(thought).toEqual(["plan"]);
expect(action).toEqual(["Search"]);
expect(actionInput).toEqual(["{\"query\":\"alpha\"}"]);
expect(finalAnswer).toEqual(["done"]);
});
it("treats pre-marker content as thought and routes continuations", () => {
const events: Array<readonly [string, string]> = [];
const parser = makeStreamingReActParser(
(text) => events.push(["thought", text]),
(text) => events.push(["action", text]),
(text) => events.push(["action_input", text]),
(text) => events.push(["final_answer", text]),
);
parser.feed("opening thought\n");
parser.feed("continued thought\n");
parser.feed("Action: Lookup\n");
parser.feed("more lookup words\n");
parser.feed("Action Input: first\n");
parser.feed("second\n");
parser.flush();
expect(events).toEqual([
["thought", "opening thought"],
["thought", "continued thought"],
["action", "Lookup"],
["action", "more lookup words"],
["action_input", "first"],
["action_input", "second"],
]);
});
});

View file

@ -10,13 +10,19 @@
* Handles markers split across chunks by buffering lines.
*/
import { Match } from "effect";
import type { ReActState } from "./types.js";
const MARKERS = [
{ prefix: "Thought:", state: "thought" as ReActState },
{ prefix: "Action Input:", state: "action_input" as ReActState },
{ prefix: "Action:", state: "action" as ReActState },
{ prefix: "Final Answer:", state: "final_answer" as ReActState },
type MarkerState = Exclude<ReActState, "initial" | "complete">;
const MARKERS: ReadonlyArray<{
readonly prefix: string;
readonly state: MarkerState;
}> = [
{ prefix: "Thought:", state: "thought" },
{ prefix: "Action Input:", state: "action_input" },
{ prefix: "Action:", state: "action" },
{ prefix: "Final Answer:", state: "final_answer" },
];
// Longest marker prefix for partial-match detection
@ -39,27 +45,27 @@ export function makeStreamingReActParser(
const emitContent = (content: string): void => {
if (content.length === 0) return;
switch (state) {
case "thought":
Match.value(state).pipe(
Match.when("thought", () => {
onThought(content);
break;
case "action":
}),
Match.when("action", () => {
onAction(content);
break;
case "action_input":
}),
Match.when("action_input", () => {
onActionInput(content);
break;
case "final_answer":
}),
Match.when("final_answer", () => {
onFinalAnswer(content);
break;
case "initial":
}),
Match.when("initial", () => {
// Content before any marker -- treat as thought
state = "thought";
onThought(content);
break;
case "complete":
break;
}
}),
Match.when("complete", () => undefined),
Match.exhaustive,
);
};
const processLine = (line: string): void => {
@ -86,6 +92,11 @@ export function makeStreamingReActParser(
while (true) {
const newlineIdx = buffer.indexOf("\n");
if (newlineIdx === -1) {
if (isFinal && buffer.length > 0) {
const line = buffer;
buffer = "";
processLine(line);
}
// No complete line yet.
// If not final, check for partial marker match at the end and wait.
if (!isFinal) {