mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-01 09:29:38 +02:00
Use Match for client callback dispatch
This commit is contained in:
parent
664aef44a7
commit
3378b79fbc
3 changed files with 119 additions and 29 deletions
|
|
@ -420,6 +420,22 @@ Notes:
|
||||||
- `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts`
|
- `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts`
|
||||||
- `cd ts && bun run check:tsgo`
|
- `cd ts && bun run check:tsgo`
|
||||||
|
|
||||||
|
### 2026-06-04: Client Callback Match Slice
|
||||||
|
|
||||||
|
- Status: migrated and package-verified.
|
||||||
|
- Completed:
|
||||||
|
- `ts/packages/client/src/socket/trustgraph-socket.ts` now maps RPC
|
||||||
|
connection status with `effect/Match` instead of a native `switch`.
|
||||||
|
- Flow agent streaming chunk callbacks now use `effect/Match` for
|
||||||
|
`thought`, `observation`, `answer`, `final-answer`, and `action`, while
|
||||||
|
preserving ignored behavior for unknown chunk types with `Match.orElse`.
|
||||||
|
- Client RPC tests now drive agent stream chunks through the fake RPC stream
|
||||||
|
to prove callback dispatch, ignored fallback behavior, completion signals,
|
||||||
|
and metadata forwarding.
|
||||||
|
- Verification:
|
||||||
|
- `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts`
|
||||||
|
- `cd ts && bun run check:tsgo`
|
||||||
|
|
||||||
### 2026-06-04: Gateway Term Service HashSet Slice
|
### 2026-06-04: Gateway Term Service HashSet Slice
|
||||||
|
|
||||||
- Status: migrated and package-verified.
|
- Status: migrated and package-verified.
|
||||||
|
|
|
||||||
|
|
@ -173,4 +173,86 @@ describe("Effect RPC request policy", () => {
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("dispatches agent stream chunk types through the Match-backed callback mapper", async () => {
|
||||||
|
const dispatchStream = vi.fn((_input: DispatchInput, receiver: (chunk: DispatchStreamChunk) => boolean) => {
|
||||||
|
const ignoredComplete = receiver(DispatchStreamChunk.make({
|
||||||
|
response: { chunk_type: "ignored", content: "skip" },
|
||||||
|
complete: false,
|
||||||
|
}));
|
||||||
|
const thoughtComplete = receiver(DispatchStreamChunk.make({
|
||||||
|
response: { chunk_type: "thought", content: "plan", end_of_message: true },
|
||||||
|
complete: false,
|
||||||
|
}));
|
||||||
|
const observationComplete = receiver(DispatchStreamChunk.make({
|
||||||
|
response: { chunk_type: "observation", content: "facts", end_of_message: true },
|
||||||
|
complete: false,
|
||||||
|
}));
|
||||||
|
const actionComplete = receiver(DispatchStreamChunk.make({
|
||||||
|
response: { chunk_type: "action", content: "lookup" },
|
||||||
|
complete: false,
|
||||||
|
}));
|
||||||
|
const answerComplete = receiver(DispatchStreamChunk.make({
|
||||||
|
response: {
|
||||||
|
chunk_type: "final-answer",
|
||||||
|
content: "done",
|
||||||
|
end_of_message: true,
|
||||||
|
end_of_dialog: true,
|
||||||
|
in_token: 3,
|
||||||
|
out_token: 5,
|
||||||
|
model: "agent-model",
|
||||||
|
},
|
||||||
|
complete: true,
|
||||||
|
}));
|
||||||
|
|
||||||
|
expect(ignoredComplete).toBe(false);
|
||||||
|
expect(thoughtComplete).toBe(false);
|
||||||
|
expect(observationComplete).toBe(false);
|
||||||
|
expect(actionComplete).toBe(false);
|
||||||
|
expect(answerComplete).toBe(true);
|
||||||
|
|
||||||
|
return Promise.resolve(
|
||||||
|
DispatchStreamChunk.make({
|
||||||
|
response: { response: "done" },
|
||||||
|
complete: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", {
|
||||||
|
dispatch: vi.fn(() => Promise.resolve({ ok: true })),
|
||||||
|
dispatchStream,
|
||||||
|
close: vi.fn(() => Promise.resolve()),
|
||||||
|
subscribe: vi.fn(() => () => {}),
|
||||||
|
});
|
||||||
|
const think = vi.fn();
|
||||||
|
const observe = vi.fn();
|
||||||
|
const answer = vi.fn();
|
||||||
|
const onError = vi.fn();
|
||||||
|
|
||||||
|
await api.flow("flow-a").agent("hello", think, observe, answer, onError);
|
||||||
|
|
||||||
|
expect(dispatchStream).toHaveBeenCalledWith(
|
||||||
|
{
|
||||||
|
scope: "flow",
|
||||||
|
service: "agent",
|
||||||
|
flow: "flow-a",
|
||||||
|
request: {
|
||||||
|
question: "hello",
|
||||||
|
user: "alice",
|
||||||
|
collection: "default",
|
||||||
|
streaming: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expect.any(Function),
|
||||||
|
{ timeoutMs: 120000, retries: 2 },
|
||||||
|
);
|
||||||
|
expect(think).toHaveBeenCalledWith("plan", true, undefined);
|
||||||
|
expect(observe).toHaveBeenCalledWith("facts", true, undefined);
|
||||||
|
expect(answer).toHaveBeenCalledWith(
|
||||||
|
"done",
|
||||||
|
true,
|
||||||
|
{ in_token: 3, out_token: 5, model: "agent-model" },
|
||||||
|
);
|
||||||
|
expect(onError).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import {
|
||||||
makeEffectRpcClient,
|
makeEffectRpcClient,
|
||||||
} from "./effect-rpc-client.js";
|
} from "./effect-rpc-client.js";
|
||||||
import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js";
|
import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js";
|
||||||
import { Clock, Effect, Fiber, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect";
|
import { Clock, Effect, Fiber, Match, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect";
|
||||||
import * as Predicate from "effect/Predicate";
|
import * as Predicate from "effect/Predicate";
|
||||||
|
|
||||||
// Import all message types for different services
|
// Import all message types for different services
|
||||||
|
|
@ -688,18 +688,18 @@ export function makeBaseApi(
|
||||||
Effect.runSync(SubscriptionRef.set(connectionStateRef, getConnectionState()));
|
Effect.runSync(SubscriptionRef.set(connectionStateRef, getConnectionState()));
|
||||||
};
|
};
|
||||||
|
|
||||||
const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] => {
|
const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] =>
|
||||||
switch (rpcState.status) {
|
Match.value(rpcState.status).pipe(
|
||||||
case "connected":
|
Match.when("connected", (): ConnectionState["status"] =>
|
||||||
return hasApiKey ? "authenticated" : "unauthenticated";
|
hasApiKey ? "authenticated" : "unauthenticated"
|
||||||
case "failed":
|
),
|
||||||
return "failed";
|
Match.when("failed", (): ConnectionState["status"] => "failed"),
|
||||||
case "closed":
|
Match.when("closed", (): ConnectionState["status"] => "failed"),
|
||||||
return "failed";
|
Match.when("connecting", (): ConnectionState["status"] =>
|
||||||
case "connecting":
|
lastError === undefined ? "connecting" : "reconnecting"
|
||||||
return lastError === undefined ? "connecting" : "reconnecting";
|
),
|
||||||
}
|
Match.exhaustive,
|
||||||
};
|
);
|
||||||
|
|
||||||
const dispatchInput = <RequestType extends object>(
|
const dispatchInput = <RequestType extends object>(
|
||||||
service: string,
|
service: string,
|
||||||
|
|
@ -1571,22 +1571,14 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
|
||||||
// Extract metadata from final message
|
// Extract metadata from final message
|
||||||
const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined;
|
const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined;
|
||||||
|
|
||||||
switch (chunkType) {
|
Match.value(chunkType).pipe(
|
||||||
case "thought":
|
Match.when("thought", () => think(content, messageComplete, metadata)),
|
||||||
think(content, messageComplete, metadata);
|
Match.when("observation", () => observe(content, messageComplete, metadata)),
|
||||||
break;
|
Match.when("answer", () => answer(content, messageComplete, metadata)),
|
||||||
case "observation":
|
Match.when("final-answer", () => answer(content, messageComplete, metadata)),
|
||||||
observe(content, messageComplete, metadata);
|
Match.when("action", () => logClientInfo(`Agent action: ${content}`)),
|
||||||
break;
|
Match.orElse(() => undefined),
|
||||||
case "answer":
|
);
|
||||||
case "final-answer":
|
|
||||||
answer(content, messageComplete, metadata);
|
|
||||||
break;
|
|
||||||
case "action":
|
|
||||||
// Actions are typically not streamed incrementally, just logged
|
|
||||||
logClientInfo(`Agent action: ${content}`);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return dialogComplete; // End when backend signals complete
|
return dialogComplete; // End when backend signals complete
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue