From 89ef3dbbbff2d36d565a9ad54e430cbf34c114bf Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 05:14:58 -0500 Subject: [PATCH] Harden gateway dispatcher effects --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 67 ++++++++++------- .../src/__tests__/gateway-dispatcher.test.ts | 49 +++++++++++- .../flow/src/gateway/dispatch/manager.ts | 75 +++++++++++-------- .../flow/src/gateway/dispatch/serialize.ts | 35 ++++++++- 4 files changed, 164 insertions(+), 62 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 89fd4302..84a493fb 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 text completion -provider effectful layer slice: +Current signal counts from `ts/packages` after the 2026-06-02 gateway dispatcher +ownership and serialization slice: | Signal | Count | | --- | ---: | @@ -157,6 +157,11 @@ Notes: `makeTextCompletionLayer(makeXProviderEffect(config))`. SDK construction and config lookup now live in Effect; sync `makeXProvider` exports remain compatibility facades. +- The gateway dispatcher ownership and serialization slice did not change broad + signal counts. It stopped closing injected pubsub backends, brackets + one-shot publish producers with `Effect.acquireUseRelease`, and routes + gateway request/response translation through `Effect.try` wrappers returning + tagged `DispatchSerializationError` failures. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -1098,6 +1103,31 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Gateway Dispatcher Ownership And Serialization Slice + +- Status: migrated and root-verified. +- Completed: + - `makeDispatcherManager` now tracks whether it owns the pubsub backend and + no longer closes injected `PubSubBackend` instances on `stop()`. + - `publishToTopic` now uses `Effect.acquireUseRelease` so the one-shot + producer is closed even when `send` fails. + - Gateway dispatch paths now call `translateRequestEffect` and + `translateResponseEffect`, which wrap serialization with `Effect.try` and + return tagged `DispatchSerializationError` failures. + - Streaming dispatch recipients are named `Effect.fn` callbacks, satisfying + strict Effect diagnostics while preserving responder behavior. + - Tests cover injected backend ownership, typed serialization failure before + requestor startup, and producer close on send failure. +- Verification: + - `bunx --bun vitest run src/__tests__/gateway-dispatcher.test.ts` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1140,6 +1170,10 @@ Notes: callbacks instead of nested `Effect.runPromiseWith`, and client streaming facade callbacks now decode the legacy envelope through Schema before applying service-specific public callback semantics. + - Gateway dispatcher ownership and serialization cleanup is complete: + injected pubsub backends are not closed by the manager, one-shot producers + are acquire/use/release bracketed, and serialization failures are typed + Effect errors. - Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a Fastify socket compatibility bridge while the public Effect RPC server layers require SocketServer or Effect HTTP routing. @@ -1192,28 +1226,6 @@ Notes: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. -### P1: Gateway Dispatcher Ownership And Serialization - -- TrustGraph evidence: - - `ts/packages/flow/src/gateway/dispatch/manager.ts` - - `ts/packages/flow/src/gateway/dispatch/serialize.ts` - - `ts/packages/flow/src/gateway/server.ts` -- Effect primitives: - - `Layer`, `Scope`, `Effect.acquireUseRelease`, `Effect.try`, `Result.try`, - and typed dispatch errors. -- Rewrite shape: - - Track whether the dispatcher owns `PubSubBackend` so injected backends are - not closed. - - Use `Effect.acquireUseRelease` for one-shot gateway producers so producer - close runs even when send fails. - - Replace throwing gateway serialization helpers with Effect/Result-returning - helpers mapped to typed dispatch or wire errors. - - Longer term, move `createGateway` to a scoped `createGatewayEffect` while - keeping Fastify route `Effect.runPromise` calls as host boundaries. -- Tests: - - Injected pubsub is not closed, one-shot producer closes on send failure, - and malformed gateway payloads return typed dispatch errors. - ### P2: Effect AI Provider Adapter Cleanup - TrustGraph evidence: @@ -1265,10 +1277,9 @@ Notes: ## Recommended PR Order -1. Gateway dispatcher ownership and serialization. -2. Broker backend Effect-native runtime. -3. Effect AI provider adapter cleanup. -4. MCP parity/deletion decision and workbench platform polish. +1. Broker backend Effect-native runtime. +2. Effect AI provider adapter cleanup. +3. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts index 4f0b597d..e12d1c80 100644 --- a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts +++ b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts @@ -96,6 +96,7 @@ class DispatchBackend implements PubSubBackend { readonly consumerOptions: CreateConsumerOptions[] = []; readonly producersByTopic = new Map>(); readonly consumersByTopic = new Map>(); + readonly failSendTopics = new Set(); async createProducer(options: CreateProducerOptions): Promise> { this.producerOptions.push(options); @@ -124,6 +125,10 @@ class DispatchBackend implements PubSubBackend { } private handleSend(topic: string, message: unknown, properties?: Record): void { + if (this.failSendTopics.has(topic)) { + throw "send failed"; + } + const id = properties?.id ?? ""; if (topic === "tg.flow.config-request") { this.push("tg.flow.config-response", { ok: true, echo: message }, id); @@ -167,7 +172,49 @@ describe("gateway dispatcher manager", () => { expect(backend.consumerOptions.filter((options) => options.topic === "tg.flow.config-response")).toHaveLength(1); expect(backend.producersByTopic.get("tg.flow.config-request")?.closeCount).toBe(1); expect(backend.consumersByTopic.get("tg.flow.config-response")?.closeCount).toBe(1); - expect(backend.closeCount).toBe(1); + expect(backend.closeCount).toBe(0); + }); + + it("does not start requestors when request serialization fails", async () => { + const backend = new DispatchBackend(); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + + await expect( + manager.dispatchGlobalService("knowledge", { term: { t: "t" } }), + ).rejects.toMatchObject({ + _tag: "DispatchSerializationError", + operation: "client-term-to-internal", + }); + await manager.stop(); + + expect(backend.producerOptions).toHaveLength(0); + expect(backend.consumerOptions).toHaveLength(0); + expect(backend.closeCount).toBe(0); + }); + + it("closes one-shot publish producers when send fails", async () => { + const backend = new DispatchBackend(); + backend.failSendTopics.add("tg.flow.ingest"); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + + await expect( + manager.publishToTopic("tg.flow.ingest", { text: "hello" }, "msg-1"), + ).rejects.toMatchObject({ + _tag: "MessagingDeliveryError", + operation: "send", + }); + await manager.stop(); + + expect(backend.producersByTopic.get("tg.flow.ingest")?.closeCount).toBe(1); + expect(backend.closeCount).toBe(0); }); it("streams responses until the centralized completion predicate is true", async () => { diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 5dda5d89..e31339b0 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -25,7 +25,11 @@ import { type RequestResponseFactoryService, } from "@trustgraph/base"; import type { GatewayConfig } from "../server.js"; -import { translateRequest, translateResponse } from "./serialize.js"; +import { + translateRequestEffect, + translateResponseEffect, + type DispatchSerializationError, +} from "./serialize.js"; export type Responder = (response: unknown, complete: boolean) => Promise; export type EffectResponder = ( @@ -37,6 +41,7 @@ export type DispatcherStreamError = | MessagingLifecycleError | MessagingDeliveryError | MessagingTimeoutError + | DispatchSerializationError | E; // ---------- Service registry ---------- @@ -169,6 +174,7 @@ interface DispatcherRuntime { export function makeDispatcherManager(config: GatewayConfig): DispatcherManager { const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222"); + const ownsPubSub = config.pubsub === undefined; let runtime: DispatcherRuntime | null = null; const startEffect = Effect.fn("DispatcherManager.start")(function* () { @@ -207,10 +213,12 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager yield* Scope.close(current.scope, Exit.void); } - yield* Effect.tryPromise({ - try: () => pubsub.close(), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause), - }); + if (ownsPubSub) { + yield* Effect.tryPromise({ + try: () => pubsub.close(), + catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause), + }); + } }); const stop = (): Promise => Effect.runPromise(stopEffect()); @@ -306,11 +314,11 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager request: Record, ) { const { requestTopic, responseTopic } = resolveGlobalTopics(kind); + const translated = yield* translateRequestEffect(kind, request); const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`); - const translated = translateRequest(kind, request); const response = yield* rr.request(translated); - return translateResponse(kind, response); + return yield* translateResponseEffect(kind, response); }); const dispatchGlobalServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming")(function* < @@ -322,15 +330,15 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager responder: EffectResponder, ) { const { requestTopic, responseTopic } = resolveGlobalTopics(kind); + const translated = yield* translateRequestEffect(kind, request); const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`); - const translated = translateRequest(kind, request); yield* rr.request(translated, { - recipient: (response) => { - const translatedRes = translateResponse(kind, response); + recipient: Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming.recipient")(function* (response) { + const translatedRes = yield* translateResponseEffect(kind, response); const complete = dispatcherManagerIsCompleteResponse(translatedRes); - return responder(translatedRes, complete).pipe(Effect.as(complete)); - }, + return yield* responder(translatedRes, complete).pipe(Effect.as(complete)); + }), }); }); @@ -367,15 +375,15 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager request: Record, ) { const { requestTopic, responseTopic } = resolveFlowTopics(kind); + const translated = yield* translateRequestEffect(kind, request); const rr = yield* getRequestorEffect( requestTopic, responseTopic, `flow:${flow}:${kind}`, ); - const translated = translateRequest(kind, request); const response = yield* rr.request(translated); - return translateResponse(kind, response); + return yield* translateResponseEffect(kind, response); }); const dispatchFlowServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchFlowServiceStreaming")(function* < @@ -388,19 +396,19 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager responder: EffectResponder, ) { const { requestTopic, responseTopic } = resolveFlowTopics(kind); + const translated = yield* translateRequestEffect(kind, request); const rr = yield* getRequestorEffect( requestTopic, responseTopic, `flow:${flow}:${kind}`, ); - const translated = translateRequest(kind, request); yield* rr.request(translated, { - recipient: (response) => { - const translatedRes = translateResponse(kind, response); + recipient: Effect.fn("DispatcherManager.dispatchFlowServiceStreaming.recipient")(function* (response) { + const translatedRes = yield* translateResponseEffect(kind, response); const complete = dispatcherManagerIsCompleteResponse(translatedRes); - return responder(translatedRes, complete).pipe(Effect.as(complete)); - }, + return yield* responder(translatedRes, complete).pipe(Effect.as(complete)); + }), }); }); @@ -431,24 +439,27 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager */ const publishToTopic = (topic: string, message: unknown, id?: string): Promise => Effect.runPromise( - Effect.gen(function* () { - const producer = yield* Effect.tryPromise({ + Effect.acquireUseRelease( + Effect.tryPromise({ try: () => pubsub.createProducer({ topic }), catch: (cause) => messagingDeliveryError(topic, "create-producer", cause), - }); - const timestamp = yield* Clock.currentTimeMillis; - const suffix = yield* Random.nextIntBetween(0, 36 ** 6, { halfOpen: true }); - const messageId = id ?? `pub-${timestamp}-${suffix.toString(36).padStart(6, "0")}`; + }), + (producer) => + Effect.gen(function* () { + const timestamp = yield* Clock.currentTimeMillis; + const suffix = yield* Random.nextIntBetween(0, 36 ** 6, { halfOpen: true }); + const messageId = id ?? `pub-${timestamp}-${suffix.toString(36).padStart(6, "0")}`; - yield* Effect.tryPromise({ - try: () => producer.send(message, { id: messageId }), - catch: (cause) => messagingDeliveryError(topic, "send", cause), - }); - yield* Effect.tryPromise({ + yield* Effect.tryPromise({ + try: () => producer.send(message, { id: messageId }), + catch: (cause) => messagingDeliveryError(topic, "send", cause), + }); + }), + (producer) => Effect.tryPromise({ try: () => producer.close(), catch: (cause) => messagingDeliveryError(topic, "close-producer", cause), - }); - }), + }), + ), ); return { diff --git a/ts/packages/flow/src/gateway/dispatch/serialize.ts b/ts/packages/flow/src/gateway/dispatch/serialize.ts index f502b7e3..1523dcf4 100644 --- a/ts/packages/flow/src/gateway/dispatch/serialize.ts +++ b/ts/packages/flow/src/gateway/dispatch/serialize.ts @@ -18,7 +18,8 @@ * Python reference: trustgraph-base/trustgraph/messaging/translators/primitives.py */ -import type { Term, Triple } from "@trustgraph/base"; +import { errorMessage, type Term, type Triple } from "@trustgraph/base"; +import { Effect } from "effect"; import * as S from "effect/Schema"; // ---------- Client wire format type definitions ---------- @@ -55,6 +56,8 @@ export class DispatchSerializationError extends S.TaggedErrorClass => + Effect.try({ + try: () => translateRequest(service, body), + catch: (cause) => + isDispatchSerializationError(cause) + ? cause + : DispatchSerializationError.make({ + operation: `translate-request:${service}`, + message: errorMessage(cause), + }), + }); + /** * Translate an internal response body to client wire format. * @@ -293,3 +311,18 @@ export function translateResponse(service: string, response: unknown): unknown { } return response; } + +export const translateResponseEffect = ( + service: string, + response: unknown, +): Effect.Effect => + Effect.try({ + try: () => translateResponse(service, response), + catch: (cause) => + isDispatchSerializationError(cause) + ? cause + : DispatchSerializationError.make({ + operation: `translate-response:${service}`, + message: errorMessage(cause), + }), + });