Wrap client streaming callbacks in Effect

This commit is contained in:
elpresidank 2026-06-02 04:54:33 -05:00
parent d93b0adda8
commit 36f629b341
2 changed files with 96 additions and 37 deletions

View file

@ -12,8 +12,8 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 client socket
close Effect boundary slice:
Current signal counts from `ts/packages` after the 2026-06-02 client streaming
callback Effect boundary slice:
| Signal | Count |
| --- | ---: |
@ -21,8 +21,8 @@ close Effect boundary slice:
| `Effect.runPromiseWith` | 0 |
| `Effect.cached` | 0 |
| `Layer.succeed` | 18 |
| `Map<` | 82 |
| `WebSocket` | 62 |
| `Map<` | 88 |
| `WebSocket` | 74 |
| `new Map` | 60 |
| `toPromiseRequestor` | 0 |
| `makeAsyncProcessor` | 19 |
@ -31,7 +31,7 @@ close Effect boundary slice:
| `new Error` | 8 |
| `new Promise` | 10 |
| `JSON.parse` | 4 |
| `localStorage` | 8 |
| `localStorage` | 9 |
| `JSON.stringify` | 7 |
| `setTimeout` | 4 |
| `process.env` | 3 |
@ -146,6 +146,11 @@ Notes:
from `BaseApi.close()`. The void public facade now runs `rpc.close()` through
`Effect.tryPromise` and logs the tagged socket close error through
`Effect.catch`.
- The client streaming callback Effect boundary slice removed the remaining
production Promise `.catch` matches from `trustgraph-socket.ts` by
centralizing legacy callback request failures in `runLegacyStreamingRequest`.
The public callback facades still return/ignore Promises where required, but
failure mapping now uses `Effect.tryPromise` and `Effect.catch`.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`.
@ -1023,8 +1028,34 @@ Notes:
facade.
- Close failures are mapped to the existing tagged `TrustGraphSocketError`
shape and logged through `Effect.catch` instead of a Promise `.catch`.
- The remaining client socket Promise `.catch` matches are streaming callback
compatibility bridges that route failures to legacy `onError` callbacks.
- The remaining client socket Promise `.catch` matches were streaming
callback compatibility bridges and are now handled by the follow-up slice.
- Verification:
- `cd ts && bun run check:tsgo`
- `bun run --cwd ts/packages/client build`
- `bun run --cwd ts/packages/client test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
### 2026-06-02: Client Streaming Callback Effect Boundary Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/client/src/socket/trustgraph-socket.ts` now routes the
legacy `agent`, `graphRagStreaming`, and `documentRagStreaming` callback
request failures through `runLegacyStreamingRequest`.
- `runLegacyStreamingRequest` uses `Effect.tryPromise` to map failures into
tagged `TrustGraphSocketError` values, then uses `Effect.catch` to invoke
the public legacy error callback.
- Production `trustgraph-socket.ts` no longer has Promise `.catch` matches;
remaining matches in that file are `Effect.catch` only.
- Rechecked the PubSub replacement question against Effect v4 source:
Effect's native `PubSub` is an in-process async hub over Effect queues.
TrustGraph's `PubSubBackend` remains the broker adapter boundary for
NATS/Pulsar-style topics, subscriptions, acknowledgement, schema codecs,
and backend lifecycle.
- Verification:
- `cd ts && bun run check:tsgo`
- `bun run --cwd ts/packages/client build`
@ -1191,7 +1222,10 @@ Do not flag these as rewrite blockers without additional proof:
- TrustGraph `PubSubBackend` / backend `PubSub` service is a broker adapter
boundary for NATS/Pulsar-style topics, acknowledgement, schema codecs, and
backend lifecycle. Effect's native `PubSub` can replace in-process fanout
helpers, but not the distributed broker abstraction by itself.
helpers, but not the distributed broker abstraction by itself. This was
rechecked against
`/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4/packages/effect/src/PubSub.ts`,
whose exported API is local publish/subscribe over Effect queues.
- `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket
compatibility bridge. Do not flag its internal connection maps/sets as a
standalone replacement target until the gateway is ready to move onto Effect

View file

@ -233,6 +233,25 @@ const logClientError = (message: string, error: unknown): void => {
Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) }));
};
const runLegacyStreamingRequest = (
operation: string,
label: string,
request: () => Promise<unknown>,
onError: (message: string) => void,
): Promise<unknown | void> =>
Effect.runPromise(
Effect.tryPromise({
try: request,
catch: (error) => socketError(operation, toErrorMessage(error, "Unknown error")),
}).pipe(
Effect.catch((error) =>
Effect.sync(() => {
onError(`${label} request failed: ${error.message}`);
})
),
),
);
const StreamingEnvelopeSchema = S.Struct({
response: S.optionalKey(S.Unknown),
complete: S.optionalKey(S.Boolean),
@ -1555,8 +1574,10 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
return dialogComplete; // End when backend signals complete
};
return this.api
.makeRequestMulti<AgentRequest, AgentResponse>(
return runLegacyStreamingRequest(
"agent-stream",
"Agent",
() => this.api.makeRequestMulti<AgentRequest, AgentResponse>(
"agent",
{
question: question,
@ -1568,11 +1589,9 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
120000,
2,
this.flowId,
)
.catch((err) => {
const errorMessage = toErrorMessage(err, "Unknown error");
error(`Agent request failed: ${errorMessage}`);
});
),
error,
);
},
@ -1670,17 +1689,20 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
request["max-path-length"] = options.pathLength;
}
this.api.makeRequestMulti<GraphRagRequest, GraphRagResponse>(
"graph-rag",
request,
recv,
60000,
undefined,
this.flowId,
).catch((err) => {
const errorMessage = toErrorMessage(err, "Unknown error");
onError(`Graph RAG request failed: ${errorMessage}`);
});
void runLegacyStreamingRequest(
"graph-rag-stream",
"Graph RAG",
() =>
this.api.makeRequestMulti<GraphRagRequest, GraphRagResponse>(
"graph-rag",
request,
recv,
60000,
undefined,
this.flowId,
),
onError,
);
},
@ -1755,17 +1777,20 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
request["doc-limit"] = docLimit;
}
this.api.makeRequestMulti<DocumentRagRequest, DocumentRagResponse>(
"document-rag",
request,
recv,
60000,
undefined,
this.flowId,
).catch((err) => {
const errorMessage = toErrorMessage(err, "Unknown error");
onError(`Document RAG request failed: ${errorMessage}`);
});
void runLegacyStreamingRequest(
"document-rag-stream",
"Document RAG",
() =>
this.api.makeRequestMulti<DocumentRagRequest, DocumentRagResponse>(
"document-rag",
request,
recv,
60000,
undefined,
this.flowId,
),
onError,
);
},