Validate RPC protocol frames with Schema

This commit is contained in:
elpresidank 2026-06-04 07:04:15 -05:00
parent 67b5e0dd5b
commit 5a945af345
3 changed files with 241 additions and 19 deletions

View file

@ -1124,7 +1124,8 @@ Notes:
- Remaining: - Remaining:
- `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket - `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket
compatibility bridge, not a direct replacement target for Effect RPC compatibility bridge, not a direct replacement target for Effect RPC
server layers yet. server layers yet. Local parser validation and collection cleanup inside
that bridge remain valid migration targets.
- Verification: - Verification:
- `bun run --cwd ts/packages/flow build` - `bun run --cwd ts/packages/flow build`
- `bun run --cwd ts/packages/client build` - `bun run --cwd ts/packages/client build`
@ -2073,6 +2074,31 @@ Notes:
- `cd ts && bun run lint` - `cd ts && bun run lint`
- `git diff --check` - `git diff --check`
### 2026-06-04: Gateway RPC Protocol Decode Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/gateway/rpc-protocol.ts` no longer casts
`parser.decode(data)` to `ReadonlyArray<RpcMessage.FromClientEncoded>`.
- The socket protocol now validates parser output through a local Schema for
client-to-server `Request`, `Ack`, `Interrupt`, `Ping`, and `Eof`
envelopes before calling the server protocol write hook.
- Invalid client frames and parser failures now map to a tagged
`RpcProtocolDecodeError` and are written back as encoded RPC defects.
- The existing server-response defect encoding no longer uses a non-null
assertion on `parser.encode`.
- `ts/packages/flow/src/__tests__/gateway-rpc-protocol.test.ts` covers valid
request header prepending, control-frame forwarding, and rejection of
server response envelopes received from a client socket.
- Verification:
- `cd ts/packages/flow && bunx --bun vitest run src/__tests__/gateway-rpc-protocol.test.ts`
- `cd ts/packages/flow && bunx --bun vitest run`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -2160,9 +2186,11 @@ Notes:
injected pubsub backends are not closed by the manager, one-shot producers injected pubsub backends are not closed by the manager, one-shot producers
are acquire/use/release bracketed, and serialization failures are typed are acquire/use/release bracketed, and serialization failures are typed
Effect errors. Effect errors.
- Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a - `gateway/rpc-protocol.ts` remains a Fastify socket compatibility bridge
Fastify socket compatibility bridge while the public Effect RPC server while the public Effect RPC server layers require SocketServer or Effect
layers require SocketServer or Effect HTTP routing. HTTP routing. The parser decode assertion is complete through Schema
validation; future local cleanups should focus on its connection
collections rather than replacing the whole bridge.
- WebSocket adapter host fallbacks now use `Result.try` and tagged adapter - WebSocket adapter host fallbacks now use `Result.try` and tagged adapter
errors while preserving sync exports. errors while preserving sync exports.
- RAG/providers/storage: - RAG/providers/storage:
@ -2470,9 +2498,10 @@ Do not flag these as rewrite blockers without additional proof:
`start()` now initializes scoped Effect consumers and returns after startup, `start()` now initializes scoped Effect consumers and returns after startup,
while `stop()` closes the native consumer scope. while `stop()` closes the native consumer scope.
- `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket - `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket
compatibility bridge. Do not flag its internal connection maps/sets as a compatibility bridge. Do not treat it as a wholesale server-layer replacement
standalone replacement target until the gateway is ready to move onto Effect target until the gateway is ready to move onto Effect SocketServer or Effect
SocketServer or Effect HTTP routing. HTTP routing; local parser validation and Effect collection cleanup inside
that bridge are still valid.
## Acceptance For Final Loop Completion ## Acceptance For Final Loop Completion

View file

@ -0,0 +1,148 @@
import { Effect, Queue } from "effect";
import * as O from "effect/Option";
import * as RpcMessage from "effect/unstable/rpc/RpcMessage";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as Socket from "effect/unstable/socket/Socket";
import { describe, expect, it } from "vitest";
import { makeSocketRpcProtocol } from "../gateway/rpc-protocol.js";
interface ReceivedMessage {
readonly clientId: number;
readonly message: RpcMessage.FromClientEncoded;
}
interface ProtocolRunResult {
readonly messages: ReadonlyArray<ReceivedMessage>;
readonly writes: ReadonlyArray<string | Uint8Array | CloseEvent>;
}
const jsonFrame = (value: unknown): string => `${JSON.stringify(value)}\n`;
const optionToArray = <A>(value: O.Option<A>): Array<A> =>
O.match(value, {
onNone: () => [],
onSome: (item) => [item],
});
const runProtocolFrames = (
frames: ReadonlyArray<string | Uint8Array>,
headers?: ReadonlyArray<[string, string]>,
): Promise<ProtocolRunResult> =>
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const received = yield* Queue.unbounded<ReceivedMessage>();
const writes: Array<string | Uint8Array | CloseEvent> = [];
const { onSocket, protocol } = yield* makeSocketRpcProtocol;
yield* protocol.run((clientId, message) =>
Queue.offer(received, { clientId, message }).pipe(Effect.asVoid)
).pipe(Effect.forkScoped);
yield* Effect.yieldNow;
const socket = Socket.make({
writer: Effect.succeed((chunk) =>
Effect.sync(() => {
writes.push(chunk);
})
),
runRaw: (handler) =>
Effect.forEach(frames, (frame) =>
Effect.suspend(() => {
const result = handler(frame);
return result === undefined ? Effect.void : result;
}), { discard: true }),
});
yield* onSocket(socket, headers);
yield* Effect.yieldNow;
const first = yield* Queue.poll(received);
const second = yield* Queue.poll(received);
const third = yield* Queue.poll(received);
return {
messages: [
...optionToArray(first),
...optionToArray(second),
...optionToArray(third),
],
writes,
};
}).pipe(
Effect.provideService(RpcSerialization.RpcSerialization, RpcSerialization.ndjson),
),
),
);
describe("gateway RPC socket protocol", () => {
it("validates client request frames and prepends websocket headers", async () => {
const result = await runProtocolFrames([
jsonFrame({
_tag: "Request",
id: "1",
tag: "Dispatch",
payload: {
scope: "global",
service: "config",
request: {},
},
headers: [["rpc", "client"]],
}),
], [["socket", "header"]]);
expect(result.writes).toEqual([]);
expect(result.messages).toHaveLength(1);
const received = result.messages[0];
expect(received).toBeDefined();
if (received === undefined) return;
expect(received.clientId).toBe(0);
expect(received.message._tag).toBe("Request");
if (received.message._tag !== "Request") return;
expect(received.message.id).toBe("1");
expect(received.message.tag).toBe("Dispatch");
expect(received.message.headers).toEqual([
["socket", "header"],
["rpc", "client"],
]);
});
it("validates client control frames without mutating them", async () => {
const result = await runProtocolFrames([
jsonFrame({
_tag: "Ping",
}),
jsonFrame({
_tag: "Ack",
requestId: "1",
}),
]);
expect(result.writes).toEqual([]);
expect(result.messages.map(({ message }) => message._tag)).toEqual(["Ping", "Ack"]);
expect(result.messages[1]?.message).toEqual({
_tag: "Ack",
requestId: "1",
});
});
it("rejects server response envelopes received from the socket", async () => {
const result = await runProtocolFrames([
jsonFrame({
_tag: "Exit",
requestId: "1",
exit: {
_tag: "Success",
value: { ok: true },
},
}),
]);
expect(result.messages).toEqual([]);
expect(result.writes).toHaveLength(1);
expect(String(result.writes[0])).toContain("\"_tag\":\"Defect\"");
});
});

View file

@ -1,9 +1,41 @@
import { Effect, Queue, Scope } from "effect"; import { Effect, Queue, Scope } from "effect";
import * as S from "effect/Schema";
import * as RpcMessage from "effect/unstable/rpc/RpcMessage"; import * as RpcMessage from "effect/unstable/rpc/RpcMessage";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as RpcServer from "effect/unstable/rpc/RpcServer"; import * as RpcServer from "effect/unstable/rpc/RpcServer";
import * as Socket from "effect/unstable/socket/Socket"; import * as Socket from "effect/unstable/socket/Socket";
export class RpcProtocolDecodeError extends S.TaggedErrorClass<RpcProtocolDecodeError>()(
"RpcProtocolDecodeError",
{
message: S.String,
cause: S.Unknown,
},
) {}
const HeaderSchema = S.mutable(S.Tuple([S.String, S.String]));
const FromClientEncodedSchema = S.Union([
S.TaggedStruct("Request", {
id: S.String,
tag: S.String,
payload: S.Unknown,
headers: S.Array(HeaderSchema),
traceId: S.optionalKey(S.String),
spanId: S.optionalKey(S.String),
sampled: S.optionalKey(S.Boolean),
}),
S.TaggedStruct("Ack", {
requestId: S.String,
}),
S.TaggedStruct("Interrupt", {
requestId: S.String,
}),
S.TaggedStruct("Ping", {}),
S.TaggedStruct("Eof", {}),
]).pipe(S.toTaggedUnion("_tag"));
const FromClientEncodedMessagesSchema = S.Array(FromClientEncodedSchema);
const decodeFromClientMessages = S.decodeUnknownEffect(FromClientEncodedMessagesSchema);
export const makeSocketRpcProtocol = Effect.gen(function* () { export const makeSocketRpcProtocol = Effect.gen(function* () {
const serialization = yield* RpcSerialization.RpcSerialization; const serialization = yield* RpcSerialization.RpcSerialization;
const disconnects = yield* Queue.make<number>(); const disconnects = yield* Queue.make<number>();
@ -34,18 +66,17 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
}); });
const writeRaw = yield* socket.writer; const writeRaw = yield* socket.writer;
const encodeDefect = (cause: unknown) => const writeDefect = (cause: unknown) =>
Effect.sync(() => parser.encode(RpcMessage.ResponseDefectEncoded(cause))!); Effect.sync(() => parser.encode(RpcMessage.ResponseDefectEncoded(cause))).pipe(
Effect.flatMap((encoded) => encoded === undefined ? Effect.void : writeRaw(encoded)),
);
const write = (response: RpcMessage.FromServerEncoded) => const write = (response: RpcMessage.FromServerEncoded) =>
Effect.sync(() => parser.encode(response)).pipe( Effect.sync(() => parser.encode(response)).pipe(
Effect.flatMap((encoded) => Effect.flatMap((encoded) =>
encoded === undefined ? Effect.void : Effect.orDie(writeRaw(encoded)), encoded === undefined ? Effect.void : Effect.orDie(writeRaw(encoded)),
), ),
Effect.catchDefect((cause: unknown) => Effect.catchDefect((cause: unknown) =>
encodeDefect(cause).pipe( writeDefect(cause).pipe(Effect.orDie),
Effect.flatMap((encoded) => Effect.orDie(writeRaw(encoded))),
Effect.orDie,
),
), ),
); );
@ -53,7 +84,24 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
clientIds.add(clientId); clientIds.add(clientId);
yield* socket.runRaw((data) => yield* socket.runRaw((data) =>
Effect.sync(() => parser.decode(data) as ReadonlyArray<RpcMessage.FromClientEncoded>).pipe( Effect.try({
try: () => parser.decode(data),
catch: (cause) =>
RpcProtocolDecodeError.make({
message: "Failed to decode RPC socket frame",
cause,
}),
}).pipe(
Effect.flatMap((raw) =>
decodeFromClientMessages(raw).pipe(
Effect.mapError((cause) =>
RpcProtocolDecodeError.make({
message: "RPC socket frame did not contain valid client messages",
cause,
})
),
)
),
Effect.flatMap((decoded) => Effect.flatMap((decoded) =>
Effect.forEach(decoded, (message) => { Effect.forEach(decoded, (message) => {
if (message._tag === "Request" && headers !== undefined) { if (message._tag === "Request" && headers !== undefined) {
@ -65,11 +113,8 @@ export const makeSocketRpcProtocol = Effect.gen(function* () {
return writeRequest(clientId, message); return writeRequest(clientId, message);
}, { discard: true }), }, { discard: true }),
), ),
Effect.catchDefect((cause: unknown) => Effect.catch((cause) => writeDefect(cause)),
encodeDefect(cause).pipe( Effect.catchDefect((cause: unknown) => writeDefect(cause)),
Effect.flatMap((encoded) => writeRaw(encoded)),
),
),
) )
).pipe( ).pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),