Manage client RPC runtime with Effect

This commit is contained in:
elpresidank 2026-06-02 02:09:45 -05:00
parent 710656be26
commit 74ba05703a
3 changed files with 107 additions and 96 deletions

View file

@ -12,14 +12,14 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect` - Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 Librarian Current signal counts from `ts/packages` after the 2026-06-02 Client RPC
ref-backed state slice: managed runtime slice:
| Signal | Count | | Signal | Count |
| --- | ---: | | --- | ---: |
| `Effect.runPromise` | 208 | | `Effect.runPromise` | 203 |
| `Map<` | 88 | | `Map<` | 88 |
| `WebSocket` | 49 | | `WebSocket` | 43 |
| `new Map` | 62 | | `new Map` | 62 |
| `toPromiseRequestor` | 0 | | `toPromiseRequestor` | 0 |
| `makeAsyncProcessor` | 19 | | `makeAsyncProcessor` | 19 |
@ -45,6 +45,9 @@ Notes:
- The `Map<` and `new Map` counts increased in this snapshot because the - The `Map<` and `new Map` counts increased in this snapshot because the
Librarian slice introduced explicit ref-backed state types and clone helpers Librarian slice introduced explicit ref-backed state types and clone helpers
while removing the service object's direct mutable maps/handles. while removing the service object's direct mutable maps/handles.
- The `Effect.runPromise` and `WebSocket` counts dropped in this snapshot
because `EffectRpcClient` now owns its RPC/socket layer with
`ManagedRuntime` and uses Effect's WebSocket constructor layer.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in - `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`. `ts/packages`.
@ -348,6 +351,32 @@ Notes:
- `cd ts && bun run test` - `cd ts && bun run test`
- `git diff --check` - `git diff --check`
### 2026-06-02: Client RPC Managed Runtime Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/client/src/socket/effect-rpc-client.ts` now builds one
`ManagedRuntime` from the RPC client layer instead of manually creating a
`Scope`, building the layer, and calling `Effect.runPromise` for every
operation.
- RPC dispatch and stream dispatch continue to expose the existing
Promise-returning `EffectRpcClient` facade, but they run through the managed
runtime and close with `runtime.dispose()`.
- The Effect RPC socket path now consumes `Socket.layerWebSocketConstructorGlobal`
instead of a duplicate local WebSocket constructor layer.
- Dispatch payload construction now uses `DispatchPayload.make(...)` so
schema classes are not instantiated with `new`.
- Client socket logging and timestamp creation now use Effect `Logger` and
`Clock` instead of direct console and `Date.now()` calls in the touched
surface.
- Verification:
- `bun run --cwd ts/packages/client build`
- `cd ts && bun run check`
- `bun run --cwd ts/packages/client test`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -371,11 +400,14 @@ Notes:
use type assertions; they need a typed factory/registry redesign rather use type assertions; they need a typed factory/registry redesign rather
than more assertions. than more assertions.
- Gateway/client: - Gateway/client:
- `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`.
Remaining client cleanup should focus on `trustgraph-socket.ts`
higher-level normal `Error` throws/JSON parsing and the public synchronous
`websocket-adapter.ts` compatibility helpers.
- Knowledge streams still duplicate legacy end-of-stream handling. - Knowledge streams still duplicate legacy end-of-stream handling.
- Effect RPC client remains Promise-first internally in places and should be
turned into a managed runtime or scoped layer.
- WebSocket adapter shims still contain host-boundary `try`/`catch` and - WebSocket adapter shims still contain host-boundary `try`/`catch` and
normal `Error` construction. normal `Error` construction, but their sync exports are public API and
should be migrated in a separate compatibility-preserving slice.
- RAG/providers/storage: - RAG/providers/storage:
- RAG and agent requestor bridges are complete: `toPromiseRequestor` has no - RAG and agent requestor bridges are complete: `toPromiseRequestor` has no
remaining `ts/packages` matches. remaining `ts/packages` matches.
@ -399,10 +431,13 @@ Notes:
- `effect/unstable/rpc/RpcSerialization.layerNdjson` or `layerNdJsonRpc`. - `effect/unstable/rpc/RpcSerialization.layerNdjson` or `layerNdJsonRpc`.
- `ManagedRuntime` for compatibility facades when a Promise API must remain. - `ManagedRuntime` for compatibility facades when a Promise API must remain.
- Rewrite shape: - Rewrite shape:
- Treat `EffectRpcClient` as an internal managed runtime or scoped layer. - `EffectRpcClient` is now an internal managed runtime with Promise
compatibility facades.
- Expose Promise-returning methods through a thin adapter. - Expose Promise-returning methods through a thin adapter.
- Replace normal client `Error` constructors with tagged errors before they - Finish replacing remaining normal client `Error` constructors with tagged
cross into shared Effect code. errors before they cross into shared Effect code.
- Preserve public sync exports in `websocket-adapter.ts` while moving host
failure capture toward typed Effect helpers.
- Tests: - Tests:
- `cd ts && bun run --cwd packages/client test` - `cd ts && bun run --cwd packages/client test`

View file

@ -1,4 +1,4 @@
import { Context, Data, Effect, Exit, Layer, Scope, Stream } from "effect"; import { Context, Data, Effect, Layer, ManagedRuntime, Stream } from "effect";
import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup";
import * as RpcClient from "effect/unstable/rpc/RpcClient"; import * as RpcClient from "effect/unstable/rpc/RpcClient";
import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError";
@ -83,14 +83,14 @@ export function makeEffectRpcClient(
} }
}; };
const makeClient = (): Effect.Effect<TrustGraphRpcClient, never, Scope.Scope> => { const makeClientLayer = (): Layer.Layer<TrustGraphRpcClientService> => {
const socketLayer = Layer.effect( const socketLayer = Layer.effect(
Socket.Socket, Socket.Socket,
Socket.makeWebSocket(url, { Socket.makeWebSocket(url, {
closeCodeIsError: (code) => code !== 1000, closeCodeIsError: (code) => code !== 1000,
openTimeout: "10 seconds", openTimeout: "10 seconds",
}), }),
).pipe(Layer.provide(webSocketConstructorLayer)); ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal));
const hooksLayer = Layer.succeed( const hooksLayer = Layer.succeed(
RpcClient.ConnectionHooks, RpcClient.ConnectionHooks,
@ -124,16 +124,11 @@ export function makeEffectRpcClient(
RpcClient.make(TrustGraphRpcs), RpcClient.make(TrustGraphRpcs),
).pipe(Layer.provide(protocolLayer)); ).pipe(Layer.provide(protocolLayer));
return Effect.map( return clientLayer;
Layer.build(clientLayer),
(context) => Context.get(context, TrustGraphRpcClientService),
);
}; };
const scopePromise = Effect.runPromise(Scope.make()); const runtime = ManagedRuntime.make(makeClientLayer());
const clientPromise = scopePromise.then((scope) => const clientPromise = runtime.runPromise(TrustGraphRpcClientService);
Effect.runPromise(makeClient().pipe(Scope.provide(scope))),
);
clientPromise.catch((cause) => { clientPromise.catch((cause) => {
setState({ setState({
status: "failed", status: "failed",
@ -149,41 +144,40 @@ export function makeEffectRpcClient(
listeners.delete(listener); listeners.delete(listener);
}; };
}, },
dispatch: async (input, options = {}) => { dispatch: (input, options = {}) =>
const client = await clientPromise; clientPromise.then((client) =>
return await Effect.runPromise( runtime.runPromise(
withDispatchRequestPolicy(client.Dispatch(new DispatchPayload(input)), options), withDispatchRequestPolicy(client.Dispatch(DispatchPayload.make(input)), options),
); )
}, ),
dispatchStream: async (input, receiver, options = {}) => { dispatchStream: (input, receiver, options = {}) => {
const client = await clientPromise;
let last: DispatchStreamChunk | undefined; let last: DispatchStreamChunk | undefined;
await Effect.runPromise( return clientPromise.then((client) =>
withDispatchRequestPolicy( runtime.runPromise(
client.DispatchStream(new DispatchPayload(input)).pipe( withDispatchRequestPolicy(
Stream.runForEach((chunk) => client.DispatchStream(DispatchPayload.make(input)).pipe(
Effect.suspend(() => { Stream.runForEach((chunk) =>
last = chunk; Effect.suspend(() => {
if (receiver(chunk)) return Effect.fail(new StopStreaming()); last = chunk;
return Effect.void; if (receiver(chunk)) return Effect.fail(new StopStreaming());
}), return Effect.void;
), }),
Effect.catchIf( ),
(cause): cause is StopStreaming => cause instanceof StopStreaming, Effect.catchIf(
() => Effect.void, (cause): cause is StopStreaming => cause instanceof StopStreaming,
() => Effect.void,
),
), ),
options,
), ),
options, )
), ).then(() => last);
);
return last;
}, },
close: async () => { close: () => {
if (closed) return; if (closed) return Promise.resolve();
closed = true; closed = true;
setState({ status: "closed" }); setState({ status: "closed" });
const scope = await scopePromise; return runtime.dispose();
await Effect.runPromise(Scope.close(scope, Exit.void));
}, },
}; };
} }
@ -201,7 +195,7 @@ export function withDispatchRequestPolicy<A, E, R>(
duration: timeoutMs, duration: timeoutMs,
orElse: () => orElse: () =>
Effect.fail( Effect.fail(
new DispatchError({ DispatchError.make({
message: `Request timed out after ${timeoutMs}ms`, message: `Request timed out after ${timeoutMs}ms`,
}), }),
), ),
@ -213,25 +207,6 @@ export function withDispatchRequestPolicy<A, E, R>(
class StopStreaming extends Data.TaggedError("StopStreaming")<{}> {} class StopStreaming extends Data.TaggedError("StopStreaming")<{}> {}
const webSocketConstructorLayer: Layer.Layer<Socket.WebSocketConstructor> = Layer.effect(
Socket.WebSocketConstructor,
Effect.promise(async () => {
if (typeof globalThis !== "undefined" && "WebSocket" in globalThis) {
return (url, protocols) => new globalThis.WebSocket(url, protocols);
}
try {
const mod = await import("ws");
const WS = mod.WebSocket;
return (url, protocols) => new WS(url, protocols) as unknown as globalThis.WebSocket;
} catch (cause) {
throw new DispatchError({
message: `WebSocket is not available: ${errorMessage(cause)}`,
});
}
}),
);
function errorMessage(cause: unknown): string { function errorMessage(cause: unknown): string {
if (cause instanceof Error) return cause.message; if (cause instanceof Error) return cause.message;
if (typeof cause === "string") return cause; if (typeof cause === "string") return cause;

View file

@ -8,6 +8,7 @@ import {
makeEffectRpcClient, makeEffectRpcClient,
} from "./effect-rpc-client.js"; } from "./effect-rpc-client.js";
import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js";
import { Clock, Effect } from "effect";
// Import all message types for different services // Import all message types for different services
import type { import type {
@ -200,6 +201,17 @@ function parseConfigJson(value: unknown): unknown {
} }
} }
const currentEpochSeconds = (): number =>
Math.floor(Effect.runSync(Clock.currentTimeMillis) / 1000);
const logClientInfo = (message: string): void => {
Effect.runFork(Effect.log(message));
};
const logClientError = (message: string, error: unknown): void => {
Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) }));
};
/** /**
* Socket interface defining all available operations for the TrustGraph API * Socket interface defining all available operations for the TrustGraph API
* This provides a unified interface for various AI/ML and knowledge graph * This provides a unified interface for various AI/ML and knowledge graph
@ -386,7 +398,7 @@ export function makeBaseApi(
*/ */
close() { close() {
rpc.close().catch((err) => { rpc.close().catch((err) => {
console.error("[socket close error]", err); logClientError("[socket close error]", err);
}); });
}, },
@ -418,9 +430,7 @@ export function makeBaseApi(
) { ) {
return rpc return rpc
.dispatch(dispatchInput(service, request, flow), dispatchOptions(timeout, retries)) .dispatch(dispatchInput(service, request, flow), dispatchOptions(timeout, retries))
.then((obj) => { .then((obj) => obj as ResponseType);
return obj as ResponseType;
});
}, },
/** /**
@ -438,14 +448,10 @@ export function makeBaseApi(
return rpc return rpc
.dispatchStream( .dispatchStream(
dispatchInput(service, request, flow), dispatchInput(service, request, flow),
(chunk) => { (chunk) => receiver({ response: chunk.response, complete: chunk.complete }),
return receiver({ response: chunk.response, complete: chunk.complete });
},
dispatchOptions(timeout, retries), dispatchOptions(timeout, retries),
) )
.then((obj) => { .then((obj) => obj as ResponseType);
return obj as ResponseType;
});
}, },
/** /**
@ -523,7 +529,7 @@ export function makeBaseApi(
try { try {
listener(state); listener(state);
} catch (error) { } catch (error) {
console.error("Error in connection state listener:", error); logClientError("Error in connection state listener", error);
} }
}); });
}; };
@ -574,11 +580,8 @@ export function makeBaseApi(
notifyStateChange(); notifyStateChange();
}); });
console.log( logClientInfo(
"SOCKET: opening socket...", `SOCKET: opening socket... ${isNonEmptyString(token) ? "with auth" : "without auth"} user: ${user}`,
isNonEmptyString(token) ? "with auth" : "without auth",
"user:",
user,
); );
return api; return api;
@ -684,7 +687,7 @@ export function makeLibrarianApi(api: BaseApi) {
metadata?: Triple[], metadata?: Triple[],
) { ) {
const documentMetadata: DocumentMetadata = { const documentMetadata: DocumentMetadata = {
time: Math.floor(Date.now() / 1000), // Unix timestamp time: currentEpochSeconds(),
kind: mimeType, kind: mimeType,
title, title,
comments, comments,
@ -756,7 +759,7 @@ export function makeLibrarianApi(api: BaseApi) {
id: id, id: id,
"document-id": doc_id, "document-id": doc_id,
documentId: doc_id, documentId: doc_id,
time: Math.floor(Date.now() / 1000), time: currentEpochSeconds(),
flow: flow, flow: flow,
user: this.api.user, user: this.api.user,
collection: withDefault(collection, "default"), collection: withDefault(collection, "default"),
@ -1416,7 +1419,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
break; break;
case "action": case "action":
// Actions are typically not streamed incrementally, just logged // Actions are typically not streamed incrementally, just logged
console.log("Agent action:", content); logClientInfo(`Agent action: ${content}`);
break; break;
} }
@ -2202,12 +2205,12 @@ export function makeConfigApi(api: BaseApi) {
}, },
60000, 60000,
) )
.then((r) => { .then((r) =>
return asConfigValues(r).map((item) => ({ asConfigValues(r).map((item) => ({
key: item.key, key: item.key,
value: parseConfigJson(item.value), value: parseConfigJson(item.value),
})); }))
}) )
.then((r) => .then((r) =>
// Transform to more usable format // Transform to more usable format
r.map((x: unknown) => { r.map((x: unknown) => {
@ -2514,6 +2517,4 @@ export const createTrustGraphSocket = (
user: string, user: string,
token?: string, token?: string,
socketUrl?: string, socketUrl?: string,
): BaseApi => { ): BaseApi => new BaseApi(user, token, socketUrl);
return new BaseApi(user, token, socketUrl);
};