From f7f29c4df999ca633b43e0fe0960eae41d11afa1 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 04:42:32 -0500 Subject: [PATCH] Handle gateway RPC socket causes in Effect --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 23 ++++++++++++++++++++ ts/packages/flow/src/gateway/server.ts | 30 +++++++++++++++----------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index d261ac5f..979af51f 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -134,6 +134,10 @@ Notes: `Stream.filterMapEffect`, `Result`, and `Stream.runHead`, dropping the remaining `while (` count from 3 to 2. The two remaining production `while` hits are synchronous parsing/CLI traversal loops, not async polling loops. +- The gateway RPC WebSocket cause-handling slice removed the Promise `.catch` + around the socket program by sandboxing the Effect and handling the resulting + `Cause` in the Effect pipeline before the Fastify fire-and-forget + `runPromise` boundary. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -962,6 +966,25 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Gateway RPC WebSocket Cause Handling Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/gateway/server.ts` now handles RPC WebSocket program + defects and interruptions inside the Effect pipeline with `Effect.sandbox`, + `Effect.catch`, and `Cause.pretty`. + - The previous Promise `.catch(...)` around `Effect.runPromise(...)`, plus the + nested `Effect.runPromise` used only for logging and socket close, is removed. + - The outer `Effect.runPromise` remains as the Fastify WebSocket host boundary. +- Verification: + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts index 7c17a89a..4e405250 100644 --- a/ts/packages/flow/src/gateway/server.ts +++ b/ts/packages/flow/src/gateway/server.ts @@ -10,7 +10,7 @@ import Fastify, { type FastifyReply } from "fastify"; import websocketPlugin from "@fastify/websocket"; import { NodeRuntime } from "@effect/platform-node"; -import { Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scope } from "effect"; +import { Cause, Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scope } from "effect"; import * as O from "effect/Option"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as EffectSocket from "effect/unstable/socket/Socket"; @@ -208,19 +208,23 @@ export function createGateway(config: GatewayConfig) { }), ); - void Effect.runPromise(program.pipe(Scope.provide(rpcScope))).catch((error) => { - void Effect.runPromise( - Effect.logError("[Gateway] RPC WebSocket error", { error: toTgError(error).message }).pipe( - Effect.flatMap(() => - Effect.sync(() => { - if (socket.readyState === 1) { - socket.close(1011, "Internal server error"); - } - }), - ), + void Effect.runPromise( + program.pipe( + Scope.provide(rpcScope), + Effect.sandbox, + Effect.catch((cause) => + Effect.logError("[Gateway] RPC WebSocket error", { error: Cause.pretty(cause) }).pipe( + Effect.flatMap(() => + Effect.sync(() => { + if (socket.readyState === 1) { + socket.close(1011, "Internal server error"); + } + }), + ), + ) ), - ); - }); + ), + ); }); // Metrics endpoint — returns Prometheus metrics from prom-client