From 64fb23e7d03005d798db30c2d8ed9d397b383ba2 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Mon, 1 Jun 2026 22:17:50 -0500 Subject: [PATCH] Make gateway dispatcher requestors Effect-scoped --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 56 ++++- ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md | 6 +- .../src/__tests__/gateway-dispatcher.test.ts | 210 ++++++++++++++++++ .../flow/src/gateway/dispatch/manager.ts | 185 ++++++++++----- ts/packages/flow/src/gateway/index.ts | 1 + ts/packages/flow/src/gateway/server.ts | 3 +- 6 files changed, 395 insertions(+), 66 deletions(-) create mode 100644 ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index ad94d822..d8197aa1 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -70,6 +70,46 @@ Signal counts from `ts/packages`: - Normal `Error` construction in library internals is migration evidence. Prefer existing `S.TaggedErrorClass` errors from `ts/packages/base/src/errors.ts`, adding new tagged errors when needed. + - `try`/`catch` blocks are also migration evidence. Prefer `Effect.try`, + `Effect.tryPromise`, or `Result.try` unless the block is a host/tool + boundary or test-only helper. + +### 2026-06-02: Gateway Dispatcher Requestor Cache + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/gateway/dispatch/manager.ts:121` centralizes + streaming completion detection as `dispatcherManagerIsCompleteResponse`. + - `ts/packages/flow/src/gateway/dispatch/manager.ts:137` stores requestors + as `EffectRequestResponse` handles, not `Promise` values. + - `ts/packages/flow/src/gateway/dispatch/manager.ts:152` starts the manager + through an Effect program, `:157` creates a `SynchronizedRef` cache, and + `:164` uses `Effect.onError` for scope cleanup instead of a `try`/`catch` + block. + - `ts/packages/flow/src/gateway/dispatch/manager.ts:200` uses + `SynchronizedRef.modifyEffect` so lazy requestor creation and caching are + serialized under the manager scope. + - `ts/packages/flow/src/gateway/dispatch/manager.ts:267` and `:312` keep + Fastify/RPC as Promise boundaries via `Effect.runPromise`; streaming + responder failures are mapped with `MessagingDeliveryError` at `:290` and + `:340`. + - `ts/packages/flow/src/gateway/server.ts:25` accepts an optional injected + `PubSubBackend` for tests without changing production NATS defaults. + - `ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts:150` verifies + scoped requestor reuse and shutdown, `:172` verifies streaming through the + centralized completion predicate, and `:192` table-tests all final markers. +- Verification: + - `bun run --cwd ts/packages/flow test` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts check:tsgo` +- Remaining gateway evidence: + - `ts/packages/flow/src/gateway/rpc-server.ts` still has Promise callbacks + around Effect RPC queues. + - `ts/packages/flow/src/gateway/server.ts` still has Fastify route + `try`/`catch` blocks. These are boundary code, but should still be audited + for `Effect.tryPromise` wrapping where it improves consistency. + - `ts/packages/client/src/socket/trustgraph-socket.ts` still duplicates some + streaming final-marker detection on the client side. ## Ranked Findings @@ -173,6 +213,9 @@ Signal counts from `ts/packages`: - Blockers: - The gateway is an integration boundary. Preserve current HTTP and WebSocket wire behavior during the first rewrite. + - First dispatcher-cache slice is complete. Follow-up gateway work should + target RPC server Promise callbacks and client-side streaming completion + duplication, not recreate the requestor cache migration. ### P1: Remove RAG And Agent `toPromiseRequestor` Bridges @@ -316,11 +359,11 @@ Signal counts from `ts/packages`: ## Recommended PR Order -1. Gateway dispatcher requestor-cache and streaming-completion migration. -2. Config service scoped state migration. -3. RAG and agent requestor bridge removal. -4. Base consumer facade and subscriber export cleanup. -5. Client compatibility facade tightening. +1. Config service scoped state migration. +2. RAG and agent requestor bridge removal. +3. Base consumer facade and subscriber export cleanup. +4. Client compatibility facade tightening. +5. Gateway RPC callback and client streaming completion cleanup. 6. Storage/provider managed resource cleanup. 7. MCP canonicalization and Workbench polish. @@ -331,6 +374,9 @@ Do not flag these as rewrite blockers without additional proof: - Promise-returning CLI actions and Fastify route handlers at external boundaries. This does not exempt normal `Error` construction inside shared library code. +- `try`/`catch` blocks at host/tool boundaries only when the catch maps into a + typed error or wire error. Internal exception capture should use `Effect.try`, + `Effect.tryPromise`, or `Result.try`. - `S.Class`, `S.TaggedErrorClass`, `Context.Service`, `Rpc.make`, and `HttpApi.make` when they are required or idiomatic for the Effect API. - Plain `Map` usage for local pure transformations, such as graph utility diff --git a/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md b/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md index 26c0d50f..96431b93 100644 --- a/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md +++ b/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md @@ -52,6 +52,7 @@ primitive exists. | Workbench async state | `Atom`, `AtomRpc`, `AtomHttpApi`, `AsyncResult`, `AtomRegistry`, `Reactivity` | `effect/unstable/reactivity`, `@effect/atom-react` | `ts/node_modules/effect/src/unstable/reactivity/*.ts`, `ts/packages/workbench/node_modules/@effect/atom-react/src/*.ts` | | Metrics and logs | `Metric`, `Logger`, `Effect.log*` | `effect`, `@effect/opentelemetry` | `packages/effect/src/Metric.ts`, `packages/effect/src/Logger.ts` | | Normal internal errors | `S.TaggedErrorClass` and existing TrustGraph tagged errors | `effect/Schema`, `@trustgraph/base/errors` | `packages/effect/src/Schema.ts`, `ts/packages/base/src/errors.ts` | +| Imperative exception capture | `Effect.try`, `Effect.tryPromise`, or `Result.try` | `effect`, `effect/Result` | `packages/effect/src/Effect.ts`, `packages/effect/src/Result.ts` | Known concrete exports useful to scouts: @@ -73,6 +74,9 @@ Known concrete exports useful to scouts: - `S.TaggedErrorClass` for internal/library errors. Treat `new Error` inside library internals as migration evidence unless it is a host/tool boundary, test-only helper, or externally mandated error shape. +- `Effect.try`, `Effect.tryPromise`, and `Result.try` for exception capture. + Treat `try`/`catch` blocks as migration evidence unless they are host/tool + boundaries or test-only helpers. ## Scout Workflow @@ -83,7 +87,7 @@ Known concrete exports useful to scouts: 3. Run quick signal scans: ```sh - rg -n "new Error|new Promise|setTimeout|while \\(|receive\\(|Effect\\.runPromise|toPromiseRequestor|makeAsyncProcessor|process\\.env|JSON\\.parse|JSON\\.stringify|localStorage|new Map|WebSocket" ts/packages --glob '*.ts' --glob '*.tsx' + rg -n "try \\{|new Error|new Promise|setTimeout|while \\(|receive\\(|Effect\\.runPromise|toPromiseRequestor|makeAsyncProcessor|process\\.env|JSON\\.parse|JSON\\.stringify|localStorage|new Map|WebSocket" ts/packages --glob '*.ts' --glob '*.tsx' ``` 4. Split scouts by lane. If the thread cannot spawn every scout in parallel, diff --git a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts new file mode 100644 index 00000000..e6fd77b2 --- /dev/null +++ b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts @@ -0,0 +1,210 @@ +import { describe, expect, it } from "vitest"; +import { + dispatcherManagerIsCompleteResponse, + makeDispatcherManager, +} from "../gateway/dispatch/manager.js"; +import type { + BackendConsumer, + BackendProducer, + CreateConsumerOptions, + CreateProducerOptions, + Message, + PubSubBackend, +} from "@trustgraph/base"; + +function createMessage(value: T, properties: Record = {}): Message { + return { + value: () => value, + properties: () => properties, + }; +} + +class TopicConsumer implements BackendConsumer { + readonly acknowledged: Array> = []; + readonly nacked: Array> = []; + closeCount = 0; + private readonly messages: Array> = []; + private readonly waiters: Array<(message: Message | null) => void> = []; + private closed = false; + + push(message: Message): void { + const waiter = this.waiters.shift(); + if (waiter !== undefined) { + waiter(message); + return; + } + + this.messages.push(message); + } + + async receive(): Promise | null> { + const message = this.messages.shift(); + if (message !== undefined || this.closed) return message ?? null; + + return await new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + async acknowledge(message: Message): Promise { + this.acknowledged.push(message); + } + + async negativeAcknowledge(message: Message): Promise { + this.nacked.push(message); + } + + async unsubscribe(): Promise {} + + async close(): Promise { + this.closed = true; + for (const waiter of this.waiters.splice(0)) { + waiter(null); + } + this.closeCount += 1; + } +} + +class RecordingProducer implements BackendProducer { + readonly sent: Array<{ readonly message: T; readonly properties?: Record }> = []; + closeCount = 0; + flushCount = 0; + + constructor( + private readonly topic: string, + private readonly onSend: (topic: string, message: T, properties?: Record) => void, + ) {} + + async send(message: T, properties?: Record): Promise { + this.sent.push(properties === undefined ? { message } : { message, properties }); + this.onSend(this.topic, message, properties); + } + + async flush(): Promise { + this.flushCount += 1; + } + + async close(): Promise { + this.closeCount += 1; + } +} + +class DispatchBackend implements PubSubBackend { + closeCount = 0; + readonly producerOptions: CreateProducerOptions[] = []; + readonly consumerOptions: CreateConsumerOptions[] = []; + readonly producersByTopic = new Map>(); + readonly consumersByTopic = new Map>(); + + async createProducer(options: CreateProducerOptions): Promise> { + this.producerOptions.push(options); + let producer = this.producersByTopic.get(options.topic); + if (producer === undefined) { + producer = new RecordingProducer(options.topic, (topic, message, properties) => { + this.handleSend(topic, message, properties); + }); + this.producersByTopic.set(options.topic, producer); + } + return producer as BackendProducer; + } + + async createConsumer(options: CreateConsumerOptions): Promise> { + this.consumerOptions.push(options); + let consumer = this.consumersByTopic.get(options.topic); + if (consumer === undefined) { + consumer = new TopicConsumer(); + this.consumersByTopic.set(options.topic, consumer); + } + return consumer as BackendConsumer; + } + + async close(): Promise { + this.closeCount += 1; + } + + private handleSend(topic: string, message: unknown, properties?: Record): void { + const id = properties?.id ?? ""; + if (topic === "tg.flow.config-request") { + this.push("tg.flow.config-response", { ok: true, echo: message }, id); + return; + } + + if (topic === "tg.flow.knowledge-request") { + this.push("tg.flow.knowledge-response", { chunk: 1 }, id); + this.push("tg.flow.knowledge-response", { chunk: 2, endOfStream: true }, id); + return; + } + + if (topic === "tg.flow.prompt-request") { + this.push("tg.flow.prompt-response", { prompt: message }, id); + } + } + + private push(topic: string, response: unknown, id: string): void { + const consumer = this.consumersByTopic.get(topic); + consumer?.push(createMessage(response, { id })); + } +} + +describe("gateway dispatcher manager", () => { + it("caches Effect requestors as scoped handles", async () => { + const backend = new DispatchBackend(); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + + await manager.start(); + const first = await manager.dispatchGlobalService("config", { operation: "get" }); + const second = await manager.dispatchGlobalService("config", { operation: "list" }); + await manager.stop(); + + expect(first).toEqual({ ok: true, echo: { operation: "get" } }); + expect(second).toEqual({ ok: true, echo: { operation: "list" } }); + expect(backend.producerOptions.filter((options) => options.topic === "tg.flow.config-request")).toHaveLength(1); + expect(backend.consumerOptions.filter((options) => options.topic === "tg.flow.config-response")).toHaveLength(1); + expect(backend.producersByTopic.get("tg.flow.config-request")?.closeCount).toBe(1); + expect(backend.consumersByTopic.get("tg.flow.config-response")?.closeCount).toBe(1); + expect(backend.closeCount).toBe(1); + }); + + it("streams responses until the centralized completion predicate is true", async () => { + const backend = new DispatchBackend(); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + const chunks: Array<{ readonly response: unknown; readonly complete: boolean }> = []; + + await manager.dispatchGlobalServiceStreaming("knowledge", { query: "hello" }, async (response, complete) => { + chunks.push({ response, complete }); + }); + await manager.stop(); + + expect(chunks).toEqual([ + { response: { chunk: 1 }, complete: false }, + { response: { chunk: 2, endOfStream: true }, complete: true }, + ]); + }); + + it.each([ + [{ complete: true }], + [{ endOfStream: true }], + [{ endOfSession: true }], + [{ end_of_stream: true }], + [{ end_of_session: true }], + [{ end_of_dialog: true }], + [{ eos: true }], + [{ error: { message: "failed" } }], + ["plain"], + [null], + ])("treats %j as a complete streaming response", (response) => { + expect(dispatcherManagerIsCompleteResponse(response)).toBe(true); + }); + + it("does not mark ordinary response objects complete", () => { + expect(dispatcherManagerIsCompleteResponse({ chunk: 1 })).toBe(false); + }); +}); diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index fb612b5e..fcbc81b5 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -8,7 +8,18 @@ * Python reference: trustgraph-flow/trustgraph/gateway/dispatch/manager.py */ -import { makeNatsBackend, makeRequestResponse, type PubSubBackend, type RequestResponse } from "@trustgraph/base"; +import { Effect, Exit, Scope, SynchronizedRef } from "effect"; +import { + loadMessagingRuntimeConfig, + makeNatsBackend, + makePubSubService, + makeRequestResponseFactoryService, + messagingDeliveryError, + messagingLifecycleError, + type EffectRequestResponse, + type PubSubBackend, + type RequestResponseFactoryService, +} from "@trustgraph/base"; import type { GatewayConfig } from "../server.js"; import { translateRequest, translateResponse } from "./serialize.js"; @@ -107,44 +118,106 @@ export const dispatcherManagerGlobalServiceNames = (): readonly string[] => [ export const dispatcherManagerIsStreamingService = (kind: string): boolean => STREAMING_SERVICES.has(kind); +export const dispatcherManagerIsCompleteResponse = (response: unknown): boolean => { + if (typeof response !== "object" || response === null) return true; + const res = response as Record; + return ( + res.complete === true || + res.endOfStream === true || + res.endOfSession === true || + res.end_of_stream === true || + res.end_of_session === true || + res.end_of_dialog === true || + res.eos === true || + // error responses are always final + (res.error !== undefined && res.error !== null) + ); +}; + +type RequestorMap = Map>; + +interface DispatcherRuntime { + readonly scope: Scope.Closeable; + readonly requestors: SynchronizedRef.SynchronizedRef; + readonly factory: RequestResponseFactoryService; +} + export function makeDispatcherManager(config: GatewayConfig): DispatcherManager { - const pubsub: PubSubBackend = makeNatsBackend(config.natsUrl ?? "nats://localhost:4222"); - const requestors = new Map>>(); + const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222"); + let runtime: DispatcherRuntime | null = null; const start = async (): Promise => { - // Requestors are created on demand when first accessed + if (runtime !== null) return; + + runtime = await Effect.runPromise( + Effect.gen(function* () { + const scope = yield* Scope.make(); + return yield* Effect.gen(function* () { + const messagingConfig = yield* loadMessagingRuntimeConfig(); + const requestors = yield* SynchronizedRef.make(new Map()); + return { + scope, + requestors, + factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig), + } satisfies DispatcherRuntime; + }).pipe( + Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))), + ); + }), + ); }; const stop = async (): Promise => { - for (const pending of requestors.values()) { - const rr = await pending; - await rr.stop(); + const current = runtime; + runtime = null; + + if (current !== null) { + await Effect.runPromise(Scope.close(current.scope, Exit.void)); } + await pubsub.close(); }; // ---------- Internal helpers ---------- - const getRequestor = ( + const ensureRuntime = async (): Promise => { + if (runtime === null) { + await start(); + } + if (runtime === null) { + throw messagingLifecycleError("gateway-dispatcher", "start", "Dispatcher manager failed to start"); + } + return runtime; + }; + + const getRequestor = async ( requestTopic: string, responseTopic: string, key: string, - ): Promise> => { - let pending = requestors.get(key); - if (pending === undefined) { - pending = (async () => { - const rr = makeRequestResponse({ - pubsub, + ): Promise> => { + const current = await ensureRuntime(); + + return await Effect.runPromise( + SynchronizedRef.modifyEffect(current.requestors, (requestors) => { + const cached = requestors.get(key); + if (cached !== undefined) { + return Effect.succeed([cached, requestors] as const); + } + + return current.factory.make({ requestTopic, responseTopic, subscription: `gateway-${key}`, - }); - await rr.start(); - return rr; - })(); - requestors.set(key, pending); - } - return pending; + }).pipe( + Scope.provide(current.scope), + Effect.map((requestor) => { + const next = new Map(requestors); + next.set(key, requestor); + return [requestor, next] as const; + }), + ); + }), + ); }; const resolveGlobalTopics = ( @@ -181,26 +254,6 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager }; }; - /** - * Determine whether a response is the final one in a streaming sequence. - * Checks for various end-of-stream markers used by different services. - */ - const isComplete = (response: unknown): boolean => { - if (typeof response !== "object" || response === null) return true; - const res = response as Record; - return ( - res.complete === true || - res.endOfStream === true || - res.endOfSession === true || - res.end_of_stream === true || - res.end_of_session === true || - res.end_of_dialog === true || - res.eos === true || - // error responses are always final - (res.error !== undefined && res.error !== null) - ); - }; - // ---------- Global service dispatch ---------- const dispatchGlobalService = async ( @@ -211,7 +264,7 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager const rr = await getRequestor(requestTopic, responseTopic, `global:${kind}`); const translated = translateRequest(kind, request); - const response = await rr.request(translated); + const response = await Effect.runPromise(rr.request(translated)); return translateResponse(kind, response); }; @@ -224,14 +277,21 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager const rr = await getRequestor(requestTopic, responseTopic, `global:${kind}`); const translated = translateRequest(kind, request); - await rr.request(translated, { - recipient: async (response) => { - const translatedRes = translateResponse(kind, response); - const complete = isComplete(translatedRes); - await responder(translatedRes, complete); - return complete; - }, - }); + await Effect.runPromise( + rr.request(translated, { + recipient: (response) => { + const translatedRes = translateResponse(kind, response); + const complete = dispatcherManagerIsCompleteResponse(translatedRes); + return Effect.tryPromise({ + try: async () => { + await responder(translatedRes, complete); + return complete; + }, + catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error), + }); + }, + }), + ); }; // ---------- Flow-scoped service dispatch ---------- @@ -249,7 +309,7 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager ); const translated = translateRequest(kind, request); - const response = await rr.request(translated); + const response = await Effect.runPromise(rr.request(translated)); return translateResponse(kind, response); }; @@ -267,14 +327,21 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager ); const translated = translateRequest(kind, request); - await rr.request(translated, { - recipient: async (response) => { - const translatedRes = translateResponse(kind, response); - const complete = isComplete(translatedRes); - await responder(translatedRes, complete); - return complete; - }, - }); + await Effect.runPromise( + rr.request(translated, { + recipient: (response) => { + const translatedRes = translateResponse(kind, response); + const complete = dispatcherManagerIsCompleteResponse(translatedRes); + return Effect.tryPromise({ + try: async () => { + await responder(translatedRes, complete); + return complete; + }, + catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error), + }); + }, + }), + ); }; // ---------- Fire-and-forget publish ---------- diff --git a/ts/packages/flow/src/gateway/index.ts b/ts/packages/flow/src/gateway/index.ts index b6fc4b44..cfb30767 100644 --- a/ts/packages/flow/src/gateway/index.ts +++ b/ts/packages/flow/src/gateway/index.ts @@ -2,6 +2,7 @@ export { createGateway, run, type GatewayConfig } from "./server.js"; export { dispatcherManagerFlowServiceNames, dispatcherManagerGlobalServiceNames, + dispatcherManagerIsCompleteResponse, dispatcherManagerIsStreamingService, makeDispatcherManager, type DispatcherManager, diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts index ac6c3ca9..6d8d7b5f 100644 --- a/ts/packages/flow/src/gateway/server.ts +++ b/ts/packages/flow/src/gateway/server.ts @@ -13,7 +13,7 @@ import { Config, Effect, Exit, Scope } from "effect"; import * as O from "effect/Option"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as EffectSocket from "effect/unstable/socket/Socket"; -import { optionalStringConfig, registry, toTgError } from "@trustgraph/base"; +import { optionalStringConfig, registry, toTgError, type PubSubBackend } from "@trustgraph/base"; import { makeDispatcherManager } from "./dispatch/manager.js"; import { makeGatewayRpcServer } from "./rpc-server.js"; @@ -22,6 +22,7 @@ export interface GatewayConfig { metricsPort: number; secret?: string; natsUrl?: string; + pubsub?: PubSubBackend; } export async function createGateway(config: GatewayConfig) {