Use Effect collections for RPC protocol clients

This commit is contained in:
elpresidank 2026-06-04 07:07:20 -05:00
parent 5a945af345
commit 935ded616c
3 changed files with 61 additions and 16 deletions

View file

@ -14,6 +14,7 @@ interface ReceivedMessage {
interface ProtocolRunResult {
readonly messages: ReadonlyArray<ReceivedMessage>;
readonly writes: ReadonlyArray<string | Uint8Array | CloseEvent>;
readonly clientIds: ReadonlyArray<number>;
}
const jsonFrame = (value: unknown): string => `${JSON.stringify(value)}\n`;
@ -27,6 +28,7 @@ const optionToArray = <A>(value: O.Option<A>): Array<A> =>
const runProtocolFrames = (
frames: ReadonlyArray<string | Uint8Array>,
headers?: ReadonlyArray<[string, string]>,
sendResponse?: RpcMessage.FromServerEncoded,
): Promise<ProtocolRunResult> =>
Effect.runPromise(
Effect.scoped(
@ -56,6 +58,10 @@ const runProtocolFrames = (
yield* onSocket(socket, headers);
yield* Effect.yieldNow;
const clientIds = yield* protocol.clientIds;
if (sendResponse !== undefined) {
yield* protocol.send(0, sendResponse);
}
const first = yield* Queue.poll(received);
const second = yield* Queue.poll(received);
@ -68,6 +74,7 @@ const runProtocolFrames = (
...optionToArray(third),
],
writes,
clientIds: Array.from(clientIds),
};
}).pipe(
Effect.provideService(RpcSerialization.RpcSerialization, RpcSerialization.ndjson),
@ -145,4 +152,16 @@ describe("gateway RPC socket protocol", () => {
expect(result.writes).toHaveLength(1);
expect(String(result.writes[0])).toContain("\"_tag\":\"Defect\"");
});
it("sends server responses through the registered client", async () => {
const result = await runProtocolFrames(
[],
undefined,
RpcMessage.ResponseDefectEncoded("server-boom"),
);
expect(result.clientIds).toEqual([0]);
expect(result.writes).toHaveLength(1);
expect(String(result.writes[0])).toContain("server-boom");
});
});

View file

@ -1,4 +1,7 @@
import { Effect, Queue, Scope } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as MutableHashSet from "effect/MutableHashSet";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import * as RpcMessage from "effect/unstable/rpc/RpcMessage";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
@ -41,10 +44,10 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
const disconnects = yield* Queue.make<number>();
let nextClientId = 0;
const clients = new Map<number, {
const clients = MutableHashMap.empty<number, {
readonly write: (response: RpcMessage.FromServerEncoded) => Effect.Effect<void>;
}>();
const clientIds = new Set<number>();
const clientIds = MutableHashSet.empty<number>();
let writeRequest!: (
clientId: number,
@ -60,8 +63,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
const clientId = nextClientId++;
yield* Scope.addFinalizerExit(scope, () => {
clients.delete(clientId);
clientIds.delete(clientId);
MutableHashMap.remove(clients, clientId);
MutableHashSet.remove(clientIds, clientId);
return Queue.offer(disconnects, clientId);
});
@ -80,8 +83,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
),
);
clients.set(clientId, { write });
clientIds.add(clientId);
MutableHashMap.set(clients, clientId, { write });
MutableHashSet.add(clientIds, clientId);
yield* socket.runRaw((data) =>
Effect.try({
@ -126,13 +129,13 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
writeRequest = writeRequest_;
return Effect.succeed({
disconnects,
send: (clientId, response) => {
const client = clients.get(clientId);
if (client === undefined) return Effect.void;
return Effect.orDie(client.write(response));
},
send: (clientId, response) =>
O.match(MutableHashMap.get(clients, clientId), {
onNone: () => Effect.void,
onSome: (client) => Effect.orDie(client.write(response)),
}),
end: () => Effect.void,
clientIds: Effect.sync(() => clientIds),
clientIds: Effect.sync(() => new Set(clientIds)),
initialMessage: Effect.succeedNone,
supportsAck: true,
supportsTransferables: false,