From 36f629b3411e5c9bdf58677ab4d8d0a8d3239359 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 04:54:33 -0500 Subject: [PATCH] Wrap client streaming callbacks in Effect --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 50 +++++++++-- .../client/src/socket/trustgraph-socket.ts | 83 ++++++++++++------- 2 files changed, 96 insertions(+), 37 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index ccc4ced6..28371edf 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 client socket -close Effect boundary slice: +Current signal counts from `ts/packages` after the 2026-06-02 client streaming +callback Effect boundary slice: | Signal | Count | | --- | ---: | @@ -21,8 +21,8 @@ close Effect boundary slice: | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | | `Layer.succeed` | 18 | -| `Map<` | 82 | -| `WebSocket` | 62 | +| `Map<` | 88 | +| `WebSocket` | 74 | | `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | @@ -31,7 +31,7 @@ close Effect boundary slice: | `new Error` | 8 | | `new Promise` | 10 | | `JSON.parse` | 4 | -| `localStorage` | 8 | +| `localStorage` | 9 | | `JSON.stringify` | 7 | | `setTimeout` | 4 | | `process.env` | 3 | @@ -146,6 +146,11 @@ Notes: from `BaseApi.close()`. The void public facade now runs `rpc.close()` through `Effect.tryPromise` and logs the tagged socket close error through `Effect.catch`. +- The client streaming callback Effect boundary slice removed the remaining + production Promise `.catch` matches from `trustgraph-socket.ts` by + centralizing legacy callback request failures in `runLegacyStreamingRequest`. + The public callback facades still return/ignore Promises where required, but + failure mapping now uses `Effect.tryPromise` and `Effect.catch`. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -1023,8 +1028,34 @@ Notes: facade. - Close failures are mapped to the existing tagged `TrustGraphSocketError` shape and logged through `Effect.catch` instead of a Promise `.catch`. - - The remaining client socket Promise `.catch` matches are streaming callback - compatibility bridges that route failures to legacy `onError` callbacks. + - The remaining client socket Promise `.catch` matches were streaming + callback compatibility bridges and are now handled by the follow-up slice. +- Verification: + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/client build` + - `bun run --cwd ts/packages/client test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + +### 2026-06-02: Client Streaming Callback Effect Boundary Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/client/src/socket/trustgraph-socket.ts` now routes the + legacy `agent`, `graphRagStreaming`, and `documentRagStreaming` callback + request failures through `runLegacyStreamingRequest`. + - `runLegacyStreamingRequest` uses `Effect.tryPromise` to map failures into + tagged `TrustGraphSocketError` values, then uses `Effect.catch` to invoke + the public legacy error callback. + - Production `trustgraph-socket.ts` no longer has Promise `.catch` matches; + remaining matches in that file are `Effect.catch` only. + - Rechecked the PubSub replacement question against Effect v4 source: + Effect's native `PubSub` is an in-process async hub over Effect queues. + TrustGraph's `PubSubBackend` remains the broker adapter boundary for + NATS/Pulsar-style topics, subscriptions, acknowledgement, schema codecs, + and backend lifecycle. - Verification: - `cd ts && bun run check:tsgo` - `bun run --cwd ts/packages/client build` @@ -1191,7 +1222,10 @@ Do not flag these as rewrite blockers without additional proof: - TrustGraph `PubSubBackend` / backend `PubSub` service is a broker adapter boundary for NATS/Pulsar-style topics, acknowledgement, schema codecs, and backend lifecycle. Effect's native `PubSub` can replace in-process fanout - helpers, but not the distributed broker abstraction by itself. + helpers, but not the distributed broker abstraction by itself. This was + rechecked against + `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4/packages/effect/src/PubSub.ts`, + whose exported API is local publish/subscribe over Effect queues. - `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket compatibility bridge. Do not flag its internal connection maps/sets as a standalone replacement target until the gateway is ready to move onto Effect diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index f046f0a4..d3ed72d6 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -233,6 +233,25 @@ const logClientError = (message: string, error: unknown): void => { Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) })); }; +const runLegacyStreamingRequest = ( + operation: string, + label: string, + request: () => Promise, + onError: (message: string) => void, +): Promise => + Effect.runPromise( + Effect.tryPromise({ + try: request, + catch: (error) => socketError(operation, toErrorMessage(error, "Unknown error")), + }).pipe( + Effect.catch((error) => + Effect.sync(() => { + onError(`${label} request failed: ${error.message}`); + }) + ), + ), + ); + const StreamingEnvelopeSchema = S.Struct({ response: S.optionalKey(S.Unknown), complete: S.optionalKey(S.Boolean), @@ -1555,8 +1574,10 @@ export function makeFlowApi(api: BaseApi, flowId: string) { return dialogComplete; // End when backend signals complete }; - return this.api - .makeRequestMulti( + return runLegacyStreamingRequest( + "agent-stream", + "Agent", + () => this.api.makeRequestMulti( "agent", { question: question, @@ -1568,11 +1589,9 @@ export function makeFlowApi(api: BaseApi, flowId: string) { 120000, 2, this.flowId, - ) - .catch((err) => { - const errorMessage = toErrorMessage(err, "Unknown error"); - error(`Agent request failed: ${errorMessage}`); - }); + ), + error, + ); }, @@ -1670,17 +1689,20 @@ export function makeFlowApi(api: BaseApi, flowId: string) { request["max-path-length"] = options.pathLength; } - this.api.makeRequestMulti( - "graph-rag", - request, - recv, - 60000, - undefined, - this.flowId, - ).catch((err) => { - const errorMessage = toErrorMessage(err, "Unknown error"); - onError(`Graph RAG request failed: ${errorMessage}`); - }); + void runLegacyStreamingRequest( + "graph-rag-stream", + "Graph RAG", + () => + this.api.makeRequestMulti( + "graph-rag", + request, + recv, + 60000, + undefined, + this.flowId, + ), + onError, + ); }, @@ -1755,17 +1777,20 @@ export function makeFlowApi(api: BaseApi, flowId: string) { request["doc-limit"] = docLimit; } - this.api.makeRequestMulti( - "document-rag", - request, - recv, - 60000, - undefined, - this.flowId, - ).catch((err) => { - const errorMessage = toErrorMessage(err, "Unknown error"); - onError(`Document RAG request failed: ${errorMessage}`); - }); + void runLegacyStreamingRequest( + "document-rag-stream", + "Document RAG", + () => + this.api.makeRequestMulti( + "document-rag", + request, + recv, + 60000, + undefined, + this.flowId, + ), + onError, + ); },