diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 44057f30..6021f8ab 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,14 +12,14 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Librarian -ref-backed state slice: +Current signal counts from `ts/packages` after the 2026-06-02 Client RPC +managed runtime slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 208 | +| `Effect.runPromise` | 203 | | `Map<` | 88 | -| `WebSocket` | 49 | +| `WebSocket` | 43 | | `new Map` | 62 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | @@ -45,6 +45,9 @@ Notes: - The `Map<` and `new Map` counts increased in this snapshot because the Librarian slice introduced explicit ref-backed state types and clone helpers while removing the service object's direct mutable maps/handles. +- The `Effect.runPromise` and `WebSocket` counts dropped in this snapshot + because `EffectRpcClient` now owns its RPC/socket layer with + `ManagedRuntime` and uses Effect's WebSocket constructor layer. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -348,6 +351,32 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Client RPC Managed Runtime Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/client/src/socket/effect-rpc-client.ts` now builds one + `ManagedRuntime` from the RPC client layer instead of manually creating a + `Scope`, building the layer, and calling `Effect.runPromise` for every + operation. + - RPC dispatch and stream dispatch continue to expose the existing + Promise-returning `EffectRpcClient` facade, but they run through the managed + runtime and close with `runtime.dispose()`. + - The Effect RPC socket path now consumes `Socket.layerWebSocketConstructorGlobal` + instead of a duplicate local WebSocket constructor layer. + - Dispatch payload construction now uses `DispatchPayload.make(...)` so + schema classes are not instantiated with `new`. + - Client socket logging and timestamp creation now use Effect `Logger` and + `Clock` instead of direct console and `Date.now()` calls in the touched + surface. +- Verification: + - `bun run --cwd ts/packages/client build` + - `cd ts && bun run check` + - `bun run --cwd ts/packages/client test` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -371,11 +400,14 @@ Notes: use type assertions; they need a typed factory/registry redesign rather than more assertions. - Gateway/client: + - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. + Remaining client cleanup should focus on `trustgraph-socket.ts` + higher-level normal `Error` throws/JSON parsing and the public synchronous + `websocket-adapter.ts` compatibility helpers. - Knowledge streams still duplicate legacy end-of-stream handling. - - Effect RPC client remains Promise-first internally in places and should be - turned into a managed runtime or scoped layer. - WebSocket adapter shims still contain host-boundary `try`/`catch` and - normal `Error` construction. + normal `Error` construction, but their sync exports are public API and + should be migrated in a separate compatibility-preserving slice. - RAG/providers/storage: - RAG and agent requestor bridges are complete: `toPromiseRequestor` has no remaining `ts/packages` matches. @@ -399,10 +431,13 @@ Notes: - `effect/unstable/rpc/RpcSerialization.layerNdjson` or `layerNdJsonRpc`. - `ManagedRuntime` for compatibility facades when a Promise API must remain. - Rewrite shape: - - Treat `EffectRpcClient` as an internal managed runtime or scoped layer. + - `EffectRpcClient` is now an internal managed runtime with Promise + compatibility facades. - Expose Promise-returning methods through a thin adapter. - - Replace normal client `Error` constructors with tagged errors before they - cross into shared Effect code. + - Finish replacing remaining normal client `Error` constructors with tagged + errors before they cross into shared Effect code. + - Preserve public sync exports in `websocket-adapter.ts` while moving host + failure capture toward typed Effect helpers. - Tests: - `cd ts && bun run --cwd packages/client test` diff --git a/ts/packages/client/src/socket/effect-rpc-client.ts b/ts/packages/client/src/socket/effect-rpc-client.ts index 4a3ba88d..70e724af 100644 --- a/ts/packages/client/src/socket/effect-rpc-client.ts +++ b/ts/packages/client/src/socket/effect-rpc-client.ts @@ -1,4 +1,4 @@ -import { Context, Data, Effect, Exit, Layer, Scope, Stream } from "effect"; +import { Context, Data, Effect, Layer, ManagedRuntime, Stream } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; @@ -83,14 +83,14 @@ export function makeEffectRpcClient( } }; - const makeClient = (): Effect.Effect => { + const makeClientLayer = (): Layer.Layer => { const socketLayer = Layer.effect( Socket.Socket, Socket.makeWebSocket(url, { closeCodeIsError: (code) => code !== 1000, openTimeout: "10 seconds", }), - ).pipe(Layer.provide(webSocketConstructorLayer)); + ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal)); const hooksLayer = Layer.succeed( RpcClient.ConnectionHooks, @@ -124,16 +124,11 @@ export function makeEffectRpcClient( RpcClient.make(TrustGraphRpcs), ).pipe(Layer.provide(protocolLayer)); - return Effect.map( - Layer.build(clientLayer), - (context) => Context.get(context, TrustGraphRpcClientService), - ); + return clientLayer; }; - const scopePromise = Effect.runPromise(Scope.make()); - const clientPromise = scopePromise.then((scope) => - Effect.runPromise(makeClient().pipe(Scope.provide(scope))), - ); + const runtime = ManagedRuntime.make(makeClientLayer()); + const clientPromise = runtime.runPromise(TrustGraphRpcClientService); clientPromise.catch((cause) => { setState({ status: "failed", @@ -149,41 +144,40 @@ export function makeEffectRpcClient( listeners.delete(listener); }; }, - dispatch: async (input, options = {}) => { - const client = await clientPromise; - return await Effect.runPromise( - withDispatchRequestPolicy(client.Dispatch(new DispatchPayload(input)), options), - ); - }, - dispatchStream: async (input, receiver, options = {}) => { - const client = await clientPromise; + dispatch: (input, options = {}) => + clientPromise.then((client) => + runtime.runPromise( + withDispatchRequestPolicy(client.Dispatch(DispatchPayload.make(input)), options), + ) + ), + dispatchStream: (input, receiver, options = {}) => { let last: DispatchStreamChunk | undefined; - await Effect.runPromise( - withDispatchRequestPolicy( - client.DispatchStream(new DispatchPayload(input)).pipe( - Stream.runForEach((chunk) => - Effect.suspend(() => { - last = chunk; - if (receiver(chunk)) return Effect.fail(new StopStreaming()); - return Effect.void; - }), - ), - Effect.catchIf( - (cause): cause is StopStreaming => cause instanceof StopStreaming, - () => Effect.void, + return clientPromise.then((client) => + runtime.runPromise( + withDispatchRequestPolicy( + client.DispatchStream(DispatchPayload.make(input)).pipe( + Stream.runForEach((chunk) => + Effect.suspend(() => { + last = chunk; + if (receiver(chunk)) return Effect.fail(new StopStreaming()); + return Effect.void; + }), + ), + Effect.catchIf( + (cause): cause is StopStreaming => cause instanceof StopStreaming, + () => Effect.void, + ), ), + options, ), - options, - ), - ); - return last; + ) + ).then(() => last); }, - close: async () => { - if (closed) return; + close: () => { + if (closed) return Promise.resolve(); closed = true; setState({ status: "closed" }); - const scope = await scopePromise; - await Effect.runPromise(Scope.close(scope, Exit.void)); + return runtime.dispose(); }, }; } @@ -201,7 +195,7 @@ export function withDispatchRequestPolicy( duration: timeoutMs, orElse: () => Effect.fail( - new DispatchError({ + DispatchError.make({ message: `Request timed out after ${timeoutMs}ms`, }), ), @@ -213,25 +207,6 @@ export function withDispatchRequestPolicy( class StopStreaming extends Data.TaggedError("StopStreaming")<{}> {} -const webSocketConstructorLayer: Layer.Layer = Layer.effect( - Socket.WebSocketConstructor, - Effect.promise(async () => { - if (typeof globalThis !== "undefined" && "WebSocket" in globalThis) { - return (url, protocols) => new globalThis.WebSocket(url, protocols); - } - - try { - const mod = await import("ws"); - const WS = mod.WebSocket; - return (url, protocols) => new WS(url, protocols) as unknown as globalThis.WebSocket; - } catch (cause) { - throw new DispatchError({ - message: `WebSocket is not available: ${errorMessage(cause)}`, - }); - } - }), -); - function errorMessage(cause: unknown): string { if (cause instanceof Error) return cause.message; if (typeof cause === "string") return cause; diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index e7fb6ea1..c68adbcb 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -8,6 +8,7 @@ import { makeEffectRpcClient, } from "./effect-rpc-client.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; +import { Clock, Effect } from "effect"; // Import all message types for different services import type { @@ -200,6 +201,17 @@ function parseConfigJson(value: unknown): unknown { } } +const currentEpochSeconds = (): number => + Math.floor(Effect.runSync(Clock.currentTimeMillis) / 1000); + +const logClientInfo = (message: string): void => { + Effect.runFork(Effect.log(message)); +}; + +const logClientError = (message: string, error: unknown): void => { + Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) })); +}; + /** * Socket interface defining all available operations for the TrustGraph API * This provides a unified interface for various AI/ML and knowledge graph @@ -386,7 +398,7 @@ export function makeBaseApi( */ close() { rpc.close().catch((err) => { - console.error("[socket close error]", err); + logClientError("[socket close error]", err); }); }, @@ -418,9 +430,7 @@ export function makeBaseApi( ) { return rpc .dispatch(dispatchInput(service, request, flow), dispatchOptions(timeout, retries)) - .then((obj) => { - return obj as ResponseType; - }); + .then((obj) => obj as ResponseType); }, /** @@ -438,14 +448,10 @@ export function makeBaseApi( return rpc .dispatchStream( dispatchInput(service, request, flow), - (chunk) => { - return receiver({ response: chunk.response, complete: chunk.complete }); - }, + (chunk) => receiver({ response: chunk.response, complete: chunk.complete }), dispatchOptions(timeout, retries), ) - .then((obj) => { - return obj as ResponseType; - }); + .then((obj) => obj as ResponseType); }, /** @@ -523,7 +529,7 @@ export function makeBaseApi( try { listener(state); } catch (error) { - console.error("Error in connection state listener:", error); + logClientError("Error in connection state listener", error); } }); }; @@ -574,11 +580,8 @@ export function makeBaseApi( notifyStateChange(); }); - console.log( - "SOCKET: opening socket...", - isNonEmptyString(token) ? "with auth" : "without auth", - "user:", - user, + logClientInfo( + `SOCKET: opening socket... ${isNonEmptyString(token) ? "with auth" : "without auth"} user: ${user}`, ); return api; @@ -684,7 +687,7 @@ export function makeLibrarianApi(api: BaseApi) { metadata?: Triple[], ) { const documentMetadata: DocumentMetadata = { - time: Math.floor(Date.now() / 1000), // Unix timestamp + time: currentEpochSeconds(), kind: mimeType, title, comments, @@ -756,7 +759,7 @@ export function makeLibrarianApi(api: BaseApi) { id: id, "document-id": doc_id, documentId: doc_id, - time: Math.floor(Date.now() / 1000), + time: currentEpochSeconds(), flow: flow, user: this.api.user, collection: withDefault(collection, "default"), @@ -1416,7 +1419,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { break; case "action": // Actions are typically not streamed incrementally, just logged - console.log("Agent action:", content); + logClientInfo(`Agent action: ${content}`); break; } @@ -2202,12 +2205,12 @@ export function makeConfigApi(api: BaseApi) { }, 60000, ) - .then((r) => { - return asConfigValues(r).map((item) => ({ + .then((r) => + asConfigValues(r).map((item) => ({ key: item.key, value: parseConfigJson(item.value), - })); - }) + })) + ) .then((r) => // Transform to more usable format r.map((x: unknown) => { @@ -2514,6 +2517,4 @@ export const createTrustGraphSocket = ( user: string, token?: string, socketUrl?: string, -): BaseApi => { - return new BaseApi(user, token, socketUrl); -}; +): BaseApi => new BaseApi(user, token, socketUrl);