From 935ded616ccc2fa8f6dbd07c91185f9ff0439db2 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 07:07:20 -0500 Subject: [PATCH] Use Effect collections for RPC protocol clients --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 31 ++++++++++++++++--- .../__tests__/gateway-rpc-protocol.test.ts | 19 ++++++++++++ ts/packages/flow/src/gateway/rpc-protocol.ts | 27 +++++++++------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 1d9f398d..9b2139f7 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2099,6 +2099,29 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Gateway RPC Protocol Mutable Collections Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/gateway/rpc-protocol.ts` now tracks active socket + clients with `MutableHashMap` and `MutableHashSet` instead of native + `Map` / `Set` closure state. + - The protocol still returns a native `ReadonlySet` snapshot at the + `RpcServer.Protocol.clientIds` API boundary because that is the Effect RPC + protocol contract. + - `ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts` now covers + server response sends through the registered client and the public + `clientIds` snapshot. + - The focused scan for native map/set state in `gateway/rpc-protocol.ts` is + clean. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/gateway-rpc-protocol.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2260,10 +2283,10 @@ Notes: - Remaining real helper-normalization targets from the fresh sweep are retrieval/document-rag, retrieval/graph-rag, embeddings/ollama, base processor flow helpers, and one workbench atom helper. - - Remaining real long-lived native collection targets include - `gateway/rpc-protocol.ts`, base processor registries, Librarian service / - collection manager state, prompt template cache, and a workbench module - cache. Local traversal sets and test fakes remain no-op boundaries. + - Remaining real long-lived native collection targets include base processor + registries, Librarian service / collection manager state, prompt template + cache, and a workbench module cache. Local traversal sets and test fakes + remain no-op boundaries. ## Ranked Findings diff --git a/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts b/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts index 4081c321..f7525b70 100644 --- a/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts +++ b/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts @@ -14,6 +14,7 @@ interface ReceivedMessage { interface ProtocolRunResult { readonly messages: ReadonlyArray; readonly writes: ReadonlyArray; + readonly clientIds: ReadonlyArray; } const jsonFrame = (value: unknown): string => `${JSON.stringify(value)}\n`; @@ -27,6 +28,7 @@ const optionToArray = (value: O.Option): Array => const runProtocolFrames = ( frames: ReadonlyArray, headers?: ReadonlyArray<[string, string]>, + sendResponse?: RpcMessage.FromServerEncoded, ): Promise => Effect.runPromise( Effect.scoped( @@ -56,6 +58,10 @@ const runProtocolFrames = ( yield* onSocket(socket, headers); yield* Effect.yieldNow; + const clientIds = yield* protocol.clientIds; + if (sendResponse !== undefined) { + yield* protocol.send(0, sendResponse); + } const first = yield* Queue.poll(received); const second = yield* Queue.poll(received); @@ -68,6 +74,7 @@ const runProtocolFrames = ( ...optionToArray(third), ], writes, + clientIds: Array.from(clientIds), }; }).pipe( Effect.provideService(RpcSerialization.RpcSerialization, RpcSerialization.ndjson), @@ -145,4 +152,16 @@ describe("gateway RPC socket protocol", () => { expect(result.writes).toHaveLength(1); expect(String(result.writes[0])).toContain("\"_tag\":\"Defect\""); }); + + it("sends server responses through the registered client", async () => { + const result = await runProtocolFrames( + [], + undefined, + RpcMessage.ResponseDefectEncoded("server-boom"), + ); + + expect(result.clientIds).toEqual([0]); + expect(result.writes).toHaveLength(1); + expect(String(result.writes[0])).toContain("server-boom"); + }); }); diff --git a/ts/packages/flow/src/gateway/rpc-protocol.ts b/ts/packages/flow/src/gateway/rpc-protocol.ts index 54240291..f2a70c45 100644 --- a/ts/packages/flow/src/gateway/rpc-protocol.ts +++ b/ts/packages/flow/src/gateway/rpc-protocol.ts @@ -1,4 +1,7 @@ import { Effect, Queue, Scope } from "effect"; +import * as MutableHashMap from "effect/MutableHashMap"; +import * as MutableHashSet from "effect/MutableHashSet"; +import * as O from "effect/Option"; import * as S from "effect/Schema"; import * as RpcMessage from "effect/unstable/rpc/RpcMessage"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; @@ -41,10 +44,10 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { const disconnects = yield* Queue.make(); let nextClientId = 0; - const clients = new Map Effect.Effect; }>(); - const clientIds = new Set(); + const clientIds = MutableHashSet.empty(); let writeRequest!: ( clientId: number, @@ -60,8 +63,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { const clientId = nextClientId++; yield* Scope.addFinalizerExit(scope, () => { - clients.delete(clientId); - clientIds.delete(clientId); + MutableHashMap.remove(clients, clientId); + MutableHashSet.remove(clientIds, clientId); return Queue.offer(disconnects, clientId); }); @@ -80,8 +83,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { ), ); - clients.set(clientId, { write }); - clientIds.add(clientId); + MutableHashMap.set(clients, clientId, { write }); + MutableHashSet.add(clientIds, clientId); yield* socket.runRaw((data) => Effect.try({ @@ -126,13 +129,13 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { writeRequest = writeRequest_; return Effect.succeed({ disconnects, - send: (clientId, response) => { - const client = clients.get(clientId); - if (client === undefined) return Effect.void; - return Effect.orDie(client.write(response)); - }, + send: (clientId, response) => + O.match(MutableHashMap.get(clients, clientId), { + onNone: () => Effect.void, + onSome: (client) => Effect.orDie(client.write(response)), + }), end: () => Effect.void, - clientIds: Effect.sync(() => clientIds), + clientIds: Effect.sync(() => new Set(clientIds)), initialMessage: Effect.succeedNone, supportsAck: true, supportsTransferables: false,