From 0862250dabf47073bb5c085a786c78ab138adc31 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 05:30:31 -0500 Subject: [PATCH] Use SubscriptionRef for client connection state --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 21 ++++++ .../client/src/__tests__/rpc-timeout.test.ts | 48 ++++++++++++- .../client/src/socket/effect-rpc-client.ts | 52 ++++++++------ .../client/src/socket/trustgraph-socket.ts | 71 ++++++++++++------- 4 files changed, 141 insertions(+), 51 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index d67f4751..c721ba46 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -402,6 +402,24 @@ Notes: - `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts` - `cd ts && bun run check:tsgo` +### 2026-06-04: Client Connection State SubscriptionRef Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/client/src/socket/effect-rpc-client.ts` now owns RPC + connection state in `effect/SubscriptionRef` instead of a mutable state + variable plus manual listener `Set`. + - `ts/packages/client/src/socket/trustgraph-socket.ts` now bridges UI + connection-state listeners through `SubscriptionRef.changes` instead of a + hand-rolled listener array. + - Both public `subscribe` APIs preserve synchronous immediate replay and + unsubscribe compatibility while using Effect fibers for later updates. + - Client tests now drive a fake RPC state source to prove immediate replay, + connection updates, and unsubscribe behavior. +- Verification: + - `bun run --cwd ts/packages/client test -- src/__tests__/rpc-timeout.test.ts` + - `cd ts && bun run check:tsgo` + ### 2026-06-02: RAG And Agent Requestor Bridge Slice - Status: migrated, root-verified, committed, and pushed. @@ -1851,6 +1869,9 @@ Notes: - ConfigService and KnowledgeCore operation dispatch now use `effect/Match` with `Match.exhaustive`; FlowManager and Librarian operation dispatch now use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. + - Client RPC/BaseApi connection-state fanout now uses + `effect/SubscriptionRef`; remaining gateway/client P1 work is broader API + design, not listener bookkeeping. - Long-lived `Map` / `Set` state in ref-backed services can move toward Effect collections later; local pure traversal maps/sets remain no-ops. diff --git a/ts/packages/client/src/__tests__/rpc-timeout.test.ts b/ts/packages/client/src/__tests__/rpc-timeout.test.ts index 236971e1..045de42a 100644 --- a/ts/packages/client/src/__tests__/rpc-timeout.test.ts +++ b/ts/packages/client/src/__tests__/rpc-timeout.test.ts @@ -1,8 +1,8 @@ import { Effect } from "effect"; import { describe, expect, it, vi } from "vitest"; import { DispatchError, DispatchStreamChunk } from "../rpc/contract"; -import { type DispatchInput, withDispatchRequestPolicy } from "../socket/effect-rpc-client"; -import { makeBaseApiWithRpc } from "../socket/trustgraph-socket"; +import { type DispatchInput, type RpcConnectionState, withDispatchRequestPolicy } from "../socket/effect-rpc-client"; +import { type ConnectionState, makeBaseApiWithRpc } from "../socket/trustgraph-socket"; const input: DispatchInput = { scope: "global", @@ -11,6 +11,50 @@ const input: DispatchInput = { }; describe("Effect RPC request policy", () => { + it("replays and updates connection state through the SubscriptionRef-backed bridge", async () => { + let rpcListener: ((state: RpcConnectionState) => void) | undefined; + const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", { + dispatch: vi.fn(() => Promise.resolve({ ok: true })), + dispatchStream: vi.fn(() => Promise.resolve(undefined)), + close: vi.fn(() => Promise.resolve()), + subscribe: vi.fn((listener) => { + rpcListener = listener; + listener({ status: "connecting" }); + return () => undefined; + }), + }); + const observed: ConnectionState[] = []; + + const unsubscribe = api.onConnectionStateChange((state) => { + observed.push(state); + }); + + expect(observed).toEqual([{ status: "connecting", hasApiKey: false }]); + const listener = rpcListener; + expect(listener).toBeDefined(); + if (listener !== undefined) { + listener({ status: "connected" }); + } + await Effect.runPromise(Effect.yieldNow); + + expect(observed).toEqual([ + { status: "connecting", hasApiKey: false }, + { status: "unauthenticated", hasApiKey: false }, + ]); + + unsubscribe(); + await Effect.runPromise(Effect.yieldNow); + if (listener !== undefined) { + listener({ status: "failed", lastError: "boom" }); + } + await Effect.runPromise(Effect.yieldNow); + + expect(observed).toEqual([ + { status: "connecting", hasApiKey: false }, + { status: "unauthenticated", hasApiKey: false }, + ]); + }); + it("threads BaseApi timeout and retry options into dispatch calls", async () => { const dispatch = vi.fn(() => Promise.resolve({ ok: true })); const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", { diff --git a/ts/packages/client/src/socket/effect-rpc-client.ts b/ts/packages/client/src/socket/effect-rpc-client.ts index b7ec7bd9..6876cfcd 100644 --- a/ts/packages/client/src/socket/effect-rpc-client.ts +++ b/ts/packages/client/src/socket/effect-rpc-client.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Effect, Layer, ManagedRuntime, Stream } from "effect"; +import { Cause, Context, Effect, Fiber, Layer, ManagedRuntime, Stream, SubscriptionRef } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; @@ -72,16 +72,11 @@ export function makeEffectRpcClient( onConnect?: () => void, onDisconnect?: () => void, ): EffectRpcClient { - const listeners = new Set<(state: RpcConnectionState) => void>(); - let state: RpcConnectionState = { status: "connecting" }; + const stateRef = Effect.runSync(SubscriptionRef.make({ status: "connecting" })); let closed = false; - const setState = (nextState: RpcConnectionState): void => { - state = nextState; - for (const listener of listeners) { - listener(nextState); - } - }; + const setState = (nextState: RpcConnectionState) => + SubscriptionRef.set(stateRef, nextState); const makeClientLayer = (): Layer.Layer => { const socketLayer = Layer.effect( @@ -95,13 +90,13 @@ export function makeEffectRpcClient( const hooksLayer = Layer.succeed( RpcClient.ConnectionHooks, RpcClient.ConnectionHooks.of({ - onConnect: Effect.sync(() => { - setState({ status: "connected" }); + onConnect: Effect.gen(function* () { + yield* setState({ status: "connected" }); onConnect?.(); }), - onDisconnect: Effect.sync(() => { + onDisconnect: Effect.gen(function* () { if (!closed) { - setState({ + yield* setState({ status: "connecting", lastError: "Disconnected from gateway", }); @@ -131,11 +126,9 @@ export function makeEffectRpcClient( const clientPromise = runtime.runPromise( TrustGraphRpcClientService.pipe( Effect.tapCause((cause) => - Effect.sync(() => { - setState({ - status: "failed", - lastError: Cause.pretty(cause), - }); + setState({ + status: "failed", + lastError: Cause.pretty(cause), }) ), ), @@ -143,10 +136,25 @@ export function makeEffectRpcClient( return { subscribe: (listener) => { - listeners.add(listener); - listener(state); + let latest = SubscriptionRef.getUnsafe(stateRef); + listener(latest); + let replaySeen = false; + const fiber = Effect.runFork( + SubscriptionRef.changes(stateRef).pipe( + Stream.runForEach((nextState) => + Effect.sync(() => { + if (!replaySeen) { + replaySeen = true; + if (nextState === latest) return; + } + latest = nextState; + listener(nextState); + }) + ), + ), + ); return () => { - listeners.delete(listener); + Effect.runFork(Fiber.interrupt(fiber)); }; }, dispatch: (input, options = {}) => @@ -176,7 +184,7 @@ export function makeEffectRpcClient( close: () => { if (closed) return Promise.resolve(); closed = true; - setState({ status: "closed" }); + Effect.runSync(setState({ status: "closed" })); return runtime.dispose(); }, }; diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index d3ed72d6..47318471 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -8,7 +8,7 @@ import { makeEffectRpcClient, } from "./effect-rpc-client.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; -import { Clock, Effect, Option, Result, Schema as S } from "effect"; +import { Clock, Effect, Fiber, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect"; import * as Predicate from "effect/Predicate"; // Import all message types for different services @@ -491,7 +491,12 @@ export function makeBaseApi( rpcFactory: (url: string) => EffectRpcClient = makeEffectRpcClient, ) { let rpc: EffectRpcClient; - const connectionStateListeners: ((state: ConnectionState) => void)[] = []; + const connectionStateRef = Effect.runSync( + SubscriptionRef.make({ + status: "connecting", + hasApiKey: isNonEmptyString(token), + }), + ); let lastError: string | undefined = undefined; let rpcState: RpcConnectionState = { status: "connecting" }; @@ -506,16 +511,27 @@ export function makeBaseApi( * Subscribe to connection state changes for UI updates */ onConnectionStateChange(listener: (state: ConnectionState) => void) { - connectionStateListeners.push(listener); - // Immediately send current state - listener(getConnectionState()); + let latest = SubscriptionRef.getUnsafe(connectionStateRef); + listener(latest); + let replaySeen = false; + const fiber = Effect.runFork( + SubscriptionRef.changes(connectionStateRef).pipe( + Stream.runForEach((state) => + Effect.sync(() => { + if (!replaySeen) { + replaySeen = true; + if (state === latest) return; + } + latest = state; + notifyConnectionStateListener(listener, state); + }) + ), + ), + ); // Return unsubscribe function return () => { - const index = connectionStateListeners.indexOf(listener); - if (index > -1) { - connectionStateListeners.splice(index, 1); - } + Effect.runFork(Fiber.interrupt(fiber)); }; }, @@ -651,24 +667,25 @@ export function makeBaseApi( return state; }; - /** - * Notify all listeners of connection state changes - */ - const notifyStateChange = () => { - const state = getConnectionState(); - connectionStateListeners.forEach((listener) => { - const result = Result.try({ - try: () => listener(state), - catch: (error) => - socketError( - "connection-state-listener", - toErrorMessage(error, "Error in connection state listener"), - ), - }); - if (Result.isFailure(result)) { - logClientError("Error in connection state listener", result.failure); - } + const notifyConnectionStateListener = ( + listener: (state: ConnectionState) => void, + state: ConnectionState, + ): void => { + const result = Result.try({ + try: () => listener(state), + catch: (error) => + socketError( + "connection-state-listener", + toErrorMessage(error, "Error in connection state listener"), + ), }); + if (Result.isFailure(result)) { + logClientError("Error in connection state listener", result.failure); + } + }; + + const publishConnectionState = () => { + Effect.runSync(SubscriptionRef.set(connectionStateRef, getConnectionState())); }; const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] => { @@ -714,7 +731,7 @@ export function makeBaseApi( rpc.subscribe((state) => { rpcState = state; lastError = state.lastError; - notifyStateChange(); + publishConnectionState(); }); logClientInfo(