Handle gateway RPC socket causes in Effect

This commit is contained in:
elpresidank 2026-06-02 04:42:32 -05:00
parent b922426b56
commit f7f29c4df9
2 changed files with 40 additions and 13 deletions

View file

@ -134,6 +134,10 @@ Notes:
`Stream.filterMapEffect`, `Result`, and `Stream.runHead`, dropping the
remaining `while (` count from 3 to 2. The two remaining production `while`
hits are synchronous parsing/CLI traversal loops, not async polling loops.
- The gateway RPC WebSocket cause-handling slice removed the Promise `.catch`
around the socket program by sandboxing the Effect and handling the resulting
`Cause` in the Effect pipeline before the Fastify fire-and-forget
`runPromise` boundary.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`.
@ -962,6 +966,25 @@ Notes:
- `cd ts && bun run test`
- `git diff --check`
### 2026-06-02: Gateway RPC WebSocket Cause Handling Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/flow/src/gateway/server.ts` now handles RPC WebSocket program
defects and interruptions inside the Effect pipeline with `Effect.sandbox`,
`Effect.catch`, and `Cause.pretty`.
- The previous Promise `.catch(...)` around `Effect.runPromise(...)`, plus the
nested `Effect.runPromise` used only for logging and socket close, is removed.
- The outer `Effect.runPromise` remains as the Fastify WebSocket host boundary.
- Verification:
- `cd ts && bun run check:tsgo`
- `bun run --cwd ts/packages/flow build`
- `bun run --cwd ts/packages/flow test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
## Subagent Findings To Preserve
- MCP/workbench:

View file

@ -10,7 +10,7 @@
import Fastify, { type FastifyReply } from "fastify";
import websocketPlugin from "@fastify/websocket";
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scope } from "effect";
import { Cause, Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scope } from "effect";
import * as O from "effect/Option";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as EffectSocket from "effect/unstable/socket/Socket";
@ -208,19 +208,23 @@ export function createGateway(config: GatewayConfig) {
}),
);
void Effect.runPromise(program.pipe(Scope.provide(rpcScope))).catch((error) => {
void Effect.runPromise(
Effect.logError("[Gateway] RPC WebSocket error", { error: toTgError(error).message }).pipe(
Effect.flatMap(() =>
Effect.sync(() => {
if (socket.readyState === 1) {
socket.close(1011, "Internal server error");
}
}),
),
void Effect.runPromise(
program.pipe(
Scope.provide(rpcScope),
Effect.sandbox,
Effect.catch((cause) =>
Effect.logError("[Gateway] RPC WebSocket error", { error: Cause.pretty(cause) }).pipe(
Effect.flatMap(() =>
Effect.sync(() => {
if (socket.readyState === 1) {
socket.close(1011, "Internal server error");
}
}),
),
)
),
);
});
),
);
});
// Metrics endpoint — returns Prometheus metrics from prom-client