import { Cause, Context, Effect, Fiber, Layer, ManagedRuntime, Stream, SubscriptionRef } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as Socket from "effect/unstable/socket/Socket"; import { DispatchPayload, DispatchError, TrustGraphRpcs, type DispatchStreamChunk } from "../rpc/contract.js"; type TrustGraphRpcClient = RpcClient.RpcClient< RpcGroup.Rpcs, RpcClientError >; class TrustGraphRpcClientService extends Context.Service< TrustGraphRpcClientService, TrustGraphRpcClient >()("@trustgraph/client/socket/effect-rpc-client/TrustGraphRpcClientService") {} export type RpcConnectionStatus = "connecting" | "connected" | "failed" | "closed"; export interface RpcConnectionState { status: RpcConnectionStatus; lastError?: string; } export interface DispatchInput { scope: "global" | "flow"; service: string; flow?: string; request: Record; } export interface DispatchOptions { readonly timeoutMs?: number; readonly retries?: number; } const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_REQUEST_ATTEMPTS = 3; type NewableFactory = { new (...args: Args): A; (...args: Args): A; readonly prototype: A; }; function newableFactory( factory: (...args: Args) => A, ): NewableFactory { function Constructor(...args: Args): A { return factory(...args); } return Constructor as unknown as NewableFactory; } export interface EffectRpcClient { readonly subscribe: (listener: (state: RpcConnectionState) => void) => () => void; readonly dispatch: ( input: DispatchInput, options?: DispatchOptions, ) => Promise; readonly dispatchStream: ( input: DispatchInput, receiver: (chunk: DispatchStreamChunk) => boolean, options?: DispatchOptions, ) => Promise; readonly close: () => Promise; } export function makeEffectRpcClient( url: string, onConnect?: () => void, onDisconnect?: () => void, ): EffectRpcClient { const stateRef = Effect.runSync(SubscriptionRef.make({ status: "connecting" })); let closed = false; const setState = (nextState: RpcConnectionState) => SubscriptionRef.set(stateRef, nextState); const makeClientLayer = (): Layer.Layer => { const socketLayer = Layer.effect( Socket.Socket, Socket.makeWebSocket(url, { closeCodeIsError: (code) => code !== 1000, openTimeout: "10 seconds", }), ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal)); const hooksLayer = Layer.succeed( RpcClient.ConnectionHooks, RpcClient.ConnectionHooks.of({ onConnect: Effect.gen(function* () { yield* setState({ status: "connected" }); onConnect?.(); }), onDisconnect: Effect.gen(function* () { if (!closed) { yield* setState({ status: "connecting", lastError: "Disconnected from gateway", }); } onDisconnect?.(); }), }), ); const protocolLayer = RpcClient.layerProtocolSocket({ retryTransientErrors: true, }).pipe( Layer.provide(socketLayer), Layer.provide(RpcSerialization.layerNdjson), Layer.provide(hooksLayer), ); const clientLayer = Layer.effect( TrustGraphRpcClientService, RpcClient.make(TrustGraphRpcs), ).pipe(Layer.provide(protocolLayer)); return clientLayer; }; const runtime = ManagedRuntime.make(makeClientLayer()); const clientPromise = runtime.runPromise( TrustGraphRpcClientService.pipe( Effect.tapCause((cause) => setState({ status: "failed", lastError: Cause.pretty(cause), }) ), ), ); return { subscribe: (listener) => { let latest = SubscriptionRef.getUnsafe(stateRef); listener(latest); let replaySeen = false; const fiber = Effect.runFork( SubscriptionRef.changes(stateRef).pipe( Stream.runForEach((nextState) => Effect.sync(() => { if (!replaySeen) { replaySeen = true; if (nextState === latest) return; } latest = nextState; listener(nextState); }) ), ), ); return () => { Effect.runFork(Fiber.interrupt(fiber)); }; }, dispatch: (input, options = {}) => clientPromise.then((client) => runtime.runPromise( withDispatchRequestPolicy(client.Dispatch(DispatchPayload.make(input)), options), ) ), dispatchStream: (input, receiver, options = {}) => { let last: DispatchStreamChunk | undefined; return clientPromise.then((client) => runtime.runPromise( withDispatchRequestPolicy( client.DispatchStream(DispatchPayload.make(input)).pipe( Stream.runForEachWhile((chunk) => Effect.suspend(() => { last = chunk; return Effect.succeed(!receiver(chunk)); }), ), ), options, ), ) ).then(() => last); }, close: () => { if (closed) return Promise.resolve(); closed = true; Effect.runSync(setState({ status: "closed" })); return runtime.dispose(); }, }; } export const EffectRpcClient = newableFactory(makeEffectRpcClient); export function withDispatchRequestPolicy( effect: Effect.Effect, options: DispatchOptions, ): Effect.Effect { const timeoutMs = normalizeTimeoutMs(options.timeoutMs); const retryTimes = normalizeAttempts(options.retries) - 1; const timed = effect.pipe( Effect.timeoutOrElse({ duration: timeoutMs, orElse: () => Effect.fail( DispatchError.make({ message: `Request timed out after ${timeoutMs}ms`, }), ), }), ); return retryTimes > 0 ? timed.pipe(Effect.retry({ times: retryTimes })) : timed; } function normalizeTimeoutMs(timeoutMs: number | undefined): number { if (timeoutMs === undefined || !Number.isFinite(timeoutMs) || timeoutMs <= 0) { return DEFAULT_REQUEST_TIMEOUT_MS; } return Math.floor(timeoutMs); } function normalizeAttempts(retries: number | undefined): number { if (retries === undefined || !Number.isFinite(retries)) { return DEFAULT_REQUEST_ATTEMPTS; } return Math.max(1, Math.floor(retries)); }