diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index ff155fec..1d9f398d 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1124,7 +1124,8 @@ Notes: - Remaining: - `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket compatibility bridge, not a direct replacement target for Effect RPC - server layers yet. + server layers yet. Local parser validation and collection cleanup inside + that bridge remain valid migration targets. - Verification: - `bun run --cwd ts/packages/flow build` - `bun run --cwd ts/packages/client build` @@ -2073,6 +2074,31 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Gateway RPC Protocol Decode Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/gateway/rpc-protocol.ts` no longer casts + `parser.decode(data)` to `ReadonlyArray`. + - The socket protocol now validates parser output through a local Schema for + client-to-server `Request`, `Ack`, `Interrupt`, `Ping`, and `Eof` + envelopes before calling the server protocol write hook. + - Invalid client frames and parser failures now map to a tagged + `RpcProtocolDecodeError` and are written back as encoded RPC defects. + - The existing server-response defect encoding no longer uses a non-null + assertion on `parser.encode`. + - `ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts` covers valid + request header prepending, control-frame forwarding, and rejection of + server response envelopes received from a client socket. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/gateway-rpc-protocol.test.ts` + - `cd ts/packages/flow && bunx --bun vitest run` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2160,9 +2186,11 @@ Notes: injected pubsub backends are not closed by the manager, one-shot producers are acquire/use/release bracketed, and serialization failures are typed Effect errors. - - Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a - Fastify socket compatibility bridge while the public Effect RPC server - layers require SocketServer or Effect HTTP routing. + - `gateway/rpc-protocol.ts` remains a Fastify socket compatibility bridge + while the public Effect RPC server layers require SocketServer or Effect + HTTP routing. The parser decode assertion is complete through Schema + validation; future local cleanups should focus on its connection + collections rather than replacing the whole bridge. - WebSocket adapter host fallbacks now use `Result.try` and tagged adapter errors while preserving sync exports. - RAG/providers/storage: @@ -2470,9 +2498,10 @@ Do not flag these as rewrite blockers without additional proof: `start()` now initializes scoped Effect consumers and returns after startup, while `stop()` closes the native consumer scope. - `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket - compatibility bridge. Do not flag its internal connection maps/sets as a - standalone replacement target until the gateway is ready to move onto Effect - SocketServer or Effect HTTP routing. + compatibility bridge. Do not treat it as a wholesale server-layer replacement + target until the gateway is ready to move onto Effect SocketServer or Effect + HTTP routing; local parser validation and Effect collection cleanup inside + that bridge are still valid. ## Acceptance For Final Loop Completion diff --git a/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts b/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts new file mode 100644 index 00000000..4081c321 --- /dev/null +++ b/ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts @@ -0,0 +1,148 @@ +import { Effect, Queue } from "effect"; +import * as O from "effect/Option"; +import * as RpcMessage from "effect/unstable/rpc/RpcMessage"; +import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; +import * as Socket from "effect/unstable/socket/Socket"; +import { describe, expect, it } from "vitest"; +import { makeSocketRpcProtocol } from "../gateway/rpc-protocol.js"; + +interface ReceivedMessage { + readonly clientId: number; + readonly message: RpcMessage.FromClientEncoded; +} + +interface ProtocolRunResult { + readonly messages: ReadonlyArray; + readonly writes: ReadonlyArray; +} + +const jsonFrame = (value: unknown): string => `${JSON.stringify(value)}\n`; + +const optionToArray = (value: O.Option): Array => + O.match(value, { + onNone: () => [], + onSome: (item) => [item], + }); + +const runProtocolFrames = ( + frames: ReadonlyArray, + headers?: ReadonlyArray<[string, string]>, +): Promise => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const received = yield* Queue.unbounded(); + const writes: Array = []; + const { onSocket, protocol } = yield* makeSocketRpcProtocol; + + yield* protocol.run((clientId, message) => + Queue.offer(received, { clientId, message }).pipe(Effect.asVoid) + ).pipe(Effect.forkScoped); + yield* Effect.yieldNow; + + const socket = Socket.make({ + writer: Effect.succeed((chunk) => + Effect.sync(() => { + writes.push(chunk); + }) + ), + runRaw: (handler) => + Effect.forEach(frames, (frame) => + Effect.suspend(() => { + const result = handler(frame); + return result === undefined ? Effect.void : result; + }), { discard: true }), + }); + + yield* onSocket(socket, headers); + yield* Effect.yieldNow; + + const first = yield* Queue.poll(received); + const second = yield* Queue.poll(received); + const third = yield* Queue.poll(received); + + return { + messages: [ + ...optionToArray(first), + ...optionToArray(second), + ...optionToArray(third), + ], + writes, + }; + }).pipe( + Effect.provideService(RpcSerialization.RpcSerialization, RpcSerialization.ndjson), + ), + ), + ); + +describe("gateway RPC socket protocol", () => { + it("validates client request frames and prepends websocket headers", async () => { + const result = await runProtocolFrames([ + jsonFrame({ + _tag: "Request", + id: "1", + tag: "Dispatch", + payload: { + scope: "global", + service: "config", + request: {}, + }, + headers: [["rpc", "client"]], + }), + ], [["socket", "header"]]); + + expect(result.writes).toEqual([]); + expect(result.messages).toHaveLength(1); + + const received = result.messages[0]; + expect(received).toBeDefined(); + if (received === undefined) return; + + expect(received.clientId).toBe(0); + expect(received.message._tag).toBe("Request"); + if (received.message._tag !== "Request") return; + + expect(received.message.id).toBe("1"); + expect(received.message.tag).toBe("Dispatch"); + expect(received.message.headers).toEqual([ + ["socket", "header"], + ["rpc", "client"], + ]); + }); + + it("validates client control frames without mutating them", async () => { + const result = await runProtocolFrames([ + jsonFrame({ + _tag: "Ping", + }), + jsonFrame({ + _tag: "Ack", + requestId: "1", + }), + ]); + + expect(result.writes).toEqual([]); + expect(result.messages.map(({ message }) => message._tag)).toEqual(["Ping", "Ack"]); + expect(result.messages[1]?.message).toEqual({ + _tag: "Ack", + requestId: "1", + }); + }); + + it("rejects server response envelopes received from the socket", async () => { + const result = await runProtocolFrames([ + jsonFrame({ + _tag: "Exit", + requestId: "1", + exit: { + _tag: "Success", + value: { ok: true }, + }, + }), + ]); + + expect(result.messages).toEqual([]); + expect(result.writes).toHaveLength(1); + expect(String(result.writes[0])).toContain("\"_tag\":\"Defect\""); + }); +}); diff --git a/ts/packages/flow/src/gateway/rpc-protocol.ts b/ts/packages/flow/src/gateway/rpc-protocol.ts index c037ea86..54240291 100644 --- a/ts/packages/flow/src/gateway/rpc-protocol.ts +++ b/ts/packages/flow/src/gateway/rpc-protocol.ts @@ -1,9 +1,41 @@ import { Effect, Queue, Scope } from "effect"; +import * as S from "effect/Schema"; import * as RpcMessage from "effect/unstable/rpc/RpcMessage"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as RpcServer from "effect/unstable/rpc/RpcServer"; import * as Socket from "effect/unstable/socket/Socket"; +export class RpcProtocolDecodeError extends S.TaggedErrorClass()( + "RpcProtocolDecodeError", + { + message: S.String, + cause: S.Unknown, + }, +) {} + +const HeaderSchema = S.mutable(S.Tuple([S.String, S.String])); +const FromClientEncodedSchema = S.Union([ + S.TaggedStruct("Request", { + id: S.String, + tag: S.String, + payload: S.Unknown, + headers: S.Array(HeaderSchema), + traceId: S.optionalKey(S.String), + spanId: S.optionalKey(S.String), + sampled: S.optionalKey(S.Boolean), + }), + S.TaggedStruct("Ack", { + requestId: S.String, + }), + S.TaggedStruct("Interrupt", { + requestId: S.String, + }), + S.TaggedStruct("Ping", {}), + S.TaggedStruct("Eof", {}), +]).pipe(S.toTaggedUnion("_tag")); +const FromClientEncodedMessagesSchema = S.Array(FromClientEncodedSchema); +const decodeFromClientMessages = S.decodeUnknownEffect(FromClientEncodedMessagesSchema); + export const makeSocketRpcProtocol = Effect.gen(function* () { const serialization = yield* RpcSerialization.RpcSerialization; const disconnects = yield* Queue.make(); @@ -34,18 +66,17 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { }); const writeRaw = yield* socket.writer; - const encodeDefect = (cause: unknown) => - Effect.sync(() => parser.encode(RpcMessage.ResponseDefectEncoded(cause))!); + const writeDefect = (cause: unknown) => + Effect.sync(() => parser.encode(RpcMessage.ResponseDefectEncoded(cause))).pipe( + Effect.flatMap((encoded) => encoded === undefined ? Effect.void : writeRaw(encoded)), + ); const write = (response: RpcMessage.FromServerEncoded) => Effect.sync(() => parser.encode(response)).pipe( Effect.flatMap((encoded) => encoded === undefined ? Effect.void : Effect.orDie(writeRaw(encoded)), ), Effect.catchDefect((cause: unknown) => - encodeDefect(cause).pipe( - Effect.flatMap((encoded) => Effect.orDie(writeRaw(encoded))), - Effect.orDie, - ), + writeDefect(cause).pipe(Effect.orDie), ), ); @@ -53,7 +84,24 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { clientIds.add(clientId); yield* socket.runRaw((data) => - Effect.sync(() => parser.decode(data) as ReadonlyArray).pipe( + Effect.try({ + try: () => parser.decode(data), + catch: (cause) => + RpcProtocolDecodeError.make({ + message: "Failed to decode RPC socket frame", + cause, + }), + }).pipe( + Effect.flatMap((raw) => + decodeFromClientMessages(raw).pipe( + Effect.mapError((cause) => + RpcProtocolDecodeError.make({ + message: "RPC socket frame did not contain valid client messages", + cause, + }) + ), + ) + ), Effect.flatMap((decoded) => Effect.forEach(decoded, (message) => { if (message._tag === "Request" && headers !== undefined) { @@ -65,11 +113,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () { return writeRequest(clientId, message); }, { discard: true }), ), - Effect.catchDefect((cause: unknown) => - encodeDefect(cause).pipe( - Effect.flatMap((encoded) => writeRaw(encoded)), - ), - ), + Effect.catch((cause) => writeDefect(cause)), + Effect.catchDefect((cause: unknown) => writeDefect(cause)), ) ).pipe( Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),