From a0d2575273f2b16f90b26350f3ad4096c37b4a86 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Mon, 1 Jun 2026 22:11:03 -0500 Subject: [PATCH] Migrate request-response facade to Effect runtime --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 53 ++++- ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md | 6 +- .../src/__tests__/request-response.test.ts | 182 ++++++++++++++++++ .../base/src/messaging/request-response.ts | 101 ++++++---- ts/packages/base/src/messaging/runtime.ts | 11 +- 5 files changed, 306 insertions(+), 47 deletions(-) create mode 100644 ts/packages/base/src/__tests__/request-response.test.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index a9b9d2ff..ad94d822 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -33,6 +33,44 @@ Signal counts from `ts/packages`: | `while (` | 10 | | `localStorage` | 8 | +## Loop Passes + +### 2026-06-02: Base Request/Response Facade + +- Status: migrated and verified. +- Completed: + - `ts/packages/base/src/messaging/request-response.ts:50` now creates an + explicit `Scope.Closeable` and `:55` builds the existing + `EffectRequestResponse` runtime. + - `ts/packages/base/src/messaging/request-response.ts:91` rejects + not-started calls with `MessagingLifecycleError`, and `:108` maps + recipient callback failures into `MessagingDeliveryError`. It no longer + constructs normal `Error` values. + - `ts/packages/base/src/messaging/runtime.ts:427` now lets + request/response own its producer directly, `:442` runs the response + dispatcher with `Effect.forkScoped`, and `:445` makes shutdown idempotent. + - `ts/packages/base/src/__tests__/request-response.test.ts:115` covers the + Promise facade over the Effect runtime, `:143` asserts tagged timeout + errors, and `:164` asserts tagged lifecycle errors. +- Verification: + - `bun run --cwd ts/packages/base test` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts check:tsgo` + - `bun run --cwd ts build` + - `bun run --cwd ts test` +- Remaining base evidence: + - `makeSubscriber(` has no current `ts/packages` call sites after this slice, + but `ts/packages/base/src/messaging/index.ts` still exports + `makeAsyncQueue`, `makeSubscriber`, and related types. + - `ts/packages/base/src/messaging/consumer.ts` still has a Promise polling + loop and a normal `Error` constructor. + - `ts/packages/base/src/messaging/producer.ts` still throws a normal + not-started `Error`. +- Decision: + - Normal `Error` construction in library internals is migration evidence. + Prefer existing `S.TaggedErrorClass` errors from + `ts/packages/base/src/errors.ts`, adding new tagged errors when needed. + ## Ranked Findings ### P0: Collapse Base Messaging Promise Facades @@ -56,7 +94,8 @@ Signal counts from `ts/packages`: `Scope.ts`, `Layer.ts`, `Schedule.ts`, `Ref.ts`. - Rewrite shape: - Make the Effect runtime factories the canonical internal surface. - - Keep Promise adapters only at external compatibility boundaries. + - Keep Promise adapters only at external compatibility boundaries. Rejected + values at those boundaries should still be tagged TrustGraph errors. - Replace polling sleep loops with scheduled scoped consumers where possible. - Replace resolver maps with `Queue`, `Deferred`, or `PubSub`-backed routing. - Tests: @@ -66,6 +105,9 @@ Signal counts from `ts/packages`: - Blockers: - Public package exports may still expect Promise-shaped producer, consumer, and request/response handles. Inventory callers before changing exports. + - First slice completed request/response facade migration. Next base follow-up + is either an Effect-backed consumer facade or a public export decision for + `subscriber.ts`. ### P0: Convert Stateful Flow Services To Scoped Effect Services @@ -274,10 +316,10 @@ Signal counts from `ts/packages`: ## Recommended PR Order -1. Base messaging/runtime convergence design and tests. -2. Gateway dispatcher internal Effect conversion. +1. Gateway dispatcher requestor-cache and streaming-completion migration. +2. Config service scoped state migration. 3. RAG and agent requestor bridge removal. -4. One stateful Flow service conversion, starting with config or cores. +4. Base consumer facade and subscriber export cleanup. 5. Client compatibility facade tightening. 6. Storage/provider managed resource cleanup. 7. MCP canonicalization and Workbench polish. @@ -287,7 +329,8 @@ Signal counts from `ts/packages`: Do not flag these as rewrite blockers without additional proof: - Promise-returning CLI actions and Fastify route handlers at external - boundaries. + boundaries. This does not exempt normal `Error` construction inside shared + library code. - `S.Class`, `S.TaggedErrorClass`, `Context.Service`, `Rpc.make`, and `HttpApi.make` when they are required or idiomatic for the Effect API. - Plain `Map` usage for local pure transformations, such as graph utility diff --git a/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md b/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md index 0d07f42f..26c0d50f 100644 --- a/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md +++ b/ts/EFFECT_NATIVE_REWRITE_PLAYBOOK.md @@ -51,6 +51,7 @@ primitive exists. | AI tools, MCP, and model calls | `Tool`, `Toolkit`, `McpServer`, `McpSchema`, `LanguageModel`, provider layers | `effect/unstable/ai`, provider packages such as `@effect/ai-openai` | `ts/node_modules/effect/src/unstable/ai/*.ts`, `packages/ai/ai/src/*.ts` | | Workbench async state | `Atom`, `AtomRpc`, `AtomHttpApi`, `AsyncResult`, `AtomRegistry`, `Reactivity` | `effect/unstable/reactivity`, `@effect/atom-react` | `ts/node_modules/effect/src/unstable/reactivity/*.ts`, `ts/packages/workbench/node_modules/@effect/atom-react/src/*.ts` | | Metrics and logs | `Metric`, `Logger`, `Effect.log*` | `effect`, `@effect/opentelemetry` | `packages/effect/src/Metric.ts`, `packages/effect/src/Logger.ts` | +| Normal internal errors | `S.TaggedErrorClass` and existing TrustGraph tagged errors | `effect/Schema`, `@trustgraph/base/errors` | `packages/effect/src/Schema.ts`, `ts/packages/base/src/errors.ts` | Known concrete exports useful to scouts: @@ -69,6 +70,9 @@ Known concrete exports useful to scouts: `Reactivity.query`, `Reactivity.stream`, `Reactivity.mutation`. - `Tool.make`, `Toolkit.make`, `McpServer.registerToolkit`, `LanguageModel.generateText`, `LanguageModel.streamText`. +- `S.TaggedErrorClass` for internal/library errors. Treat `new Error` inside + library internals as migration evidence unless it is a host/tool boundary, + test-only helper, or externally mandated error shape. ## Scout Workflow @@ -79,7 +83,7 @@ Known concrete exports useful to scouts: 3. Run quick signal scans: ```sh - rg -n "new Promise|setTimeout|while \\(|receive\\(|Effect\\.runPromise|toPromiseRequestor|makeAsyncProcessor|process\\.env|JSON\\.parse|JSON\\.stringify|localStorage|new Map|WebSocket" ts/packages --glob '*.ts' --glob '*.tsx' + rg -n "new Error|new Promise|setTimeout|while \\(|receive\\(|Effect\\.runPromise|toPromiseRequestor|makeAsyncProcessor|process\\.env|JSON\\.parse|JSON\\.stringify|localStorage|new Map|WebSocket" ts/packages --glob '*.ts' --glob '*.tsx' ``` 4. Split scouts by lane. If the thread cannot spawn every scout in parallel, diff --git a/ts/packages/base/src/__tests__/request-response.test.ts b/ts/packages/base/src/__tests__/request-response.test.ts new file mode 100644 index 00000000..71bdecff --- /dev/null +++ b/ts/packages/base/src/__tests__/request-response.test.ts @@ -0,0 +1,182 @@ +import { describe, expect, it } from "vitest"; +import { + makeRequestResponse, + type BackendConsumer, + type BackendProducer, + type CreateConsumerOptions, + type CreateProducerOptions, + type Message, + type PubSubBackend, +} from "../index.js"; + +function createMessage(value: T, properties: Record = {}): Message { + return { + value: () => value, + properties: () => properties, + }; +} + +class RecordingProducer implements BackendProducer { + readonly sent: Array<{ readonly message: T; readonly properties?: Record }> = []; + closeCount = 0; + flushCount = 0; + + constructor(private readonly onSend?: (message: T, properties?: Record) => void) {} + + async send(message: T, properties?: Record): Promise { + this.sent.push(properties === undefined ? { message } : { message, properties }); + this.onSend?.(message, properties); + } + + async flush(): Promise { + this.flushCount += 1; + } + + async close(): Promise { + this.closeCount += 1; + } +} + +class WaitingConsumer implements BackendConsumer { + readonly acknowledged: Array> = []; + readonly nacked: Array> = []; + closeCount = 0; + private readonly messages: Array> = []; + private readonly waiters: Array<(message: Message | null) => void> = []; + private closed = false; + + push(message: Message): void { + const waiter = this.waiters.shift(); + if (waiter !== undefined) { + waiter(message); + return; + } + + this.messages.push(message); + } + + async receive(): Promise | null> { + const message = this.messages.shift(); + if (message !== undefined || this.closed) return message ?? null; + + return await new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + async acknowledge(message: Message): Promise { + this.acknowledged.push(message); + } + + async negativeAcknowledge(message: Message): Promise { + this.nacked.push(message); + } + + async unsubscribe(): Promise {} + + async close(): Promise { + this.closed = true; + for (const waiter of this.waiters.splice(0)) { + waiter(null); + } + this.closeCount += 1; + } +} + +class RuntimeBackend implements PubSubBackend { + closeCount = 0; + producerOptions: CreateProducerOptions | null = null; + consumerOptions: CreateConsumerOptions | null = null; + readonly producer: RecordingProducer; + + constructor( + private readonly consumer: BackendConsumer, + onSend?: (message: unknown, properties?: Record) => void, + ) { + this.producer = new RecordingProducer(onSend); + } + + async createProducer(options: CreateProducerOptions): Promise> { + this.producerOptions = options; + return this.producer as BackendProducer; + } + + async createConsumer(options: CreateConsumerOptions): Promise> { + this.consumerOptions = options; + return this.consumer as BackendConsumer; + } + + async close(): Promise { + this.closeCount += 1; + } +} + +describe("RequestResponse compatibility facade", () => { + it("routes requests through the Effect-native request-response runtime", async () => { + const consumer = new WaitingConsumer(); + const backend = new RuntimeBackend( + consumer as BackendConsumer, + (_message, properties) => { + consumer.push(createMessage("response", { id: properties?.id ?? "" })); + }, + ); + const requestor = makeRequestResponse({ + pubsub: backend, + requestTopic: "request-topic", + responseTopic: "response-topic", + subscription: "sub", + }); + + await requestor.start(); + const response = await requestor.request("request", { timeoutMs: 250 }); + await requestor.stop(); + + expect(response).toBe("response"); + expect(backend.producerOptions).toEqual({ topic: "request-topic" }); + expect(backend.consumerOptions).toEqual({ topic: "response-topic", subscription: "sub" }); + expect(backend.producer.sent[0]?.message).toBe("request"); + expect(consumer.acknowledged.length).toBe(1); + expect(backend.producer.closeCount).toBe(1); + expect(consumer.closeCount).toBe(1); + }); + + it("rejects with a tagged timeout error instead of a normal Error", async () => { + const consumer = new WaitingConsumer(); + const backend = new RuntimeBackend(consumer as BackendConsumer); + const requestor = makeRequestResponse({ + pubsub: backend, + requestTopic: "request-topic", + responseTopic: "response-topic", + subscription: "sub", + }); + + await requestor.start(); + const error = await requestor.request("request", { timeoutMs: 5 }).catch((caught: unknown) => caught); + await requestor.stop(); + + expect(error).toMatchObject({ + _tag: "MessagingTimeoutError", + operation: "request-response", + timeoutMs: 5, + }); + }); + + it("rejects with a tagged lifecycle error when requested before start", async () => { + const consumer = new WaitingConsumer(); + const backend = new RuntimeBackend(consumer as BackendConsumer); + const requestor = makeRequestResponse({ + pubsub: backend, + requestTopic: "request-topic", + responseTopic: "response-topic", + subscription: "sub", + }); + + const error = await requestor.request("request").catch((caught: unknown) => caught); + + expect(error).toMatchObject({ + _tag: "MessagingLifecycleError", + operation: "request", + resource: "request-topic:response-topic", + }); + }); +}); diff --git a/ts/packages/base/src/messaging/request-response.ts b/ts/packages/base/src/messaging/request-response.ts index 6f227f31..92906683 100644 --- a/ts/packages/base/src/messaging/request-response.ts +++ b/ts/packages/base/src/messaging/request-response.ts @@ -7,10 +7,12 @@ * Python reference: trustgraph-base/trustgraph/base/request_response_spec.py */ -import { randomUUID } from "node:crypto"; -import { makeProducer, type Producer } from "./producer.js"; -import { makeSubscriber, type Subscriber } from "./subscriber.js"; +import { Effect, Exit, Scope } from "effect"; import type { PubSubBackend } from "../backend/types.js"; +import { PubSub } from "../backend/pubsub.js"; +import { messagingDeliveryError, messagingLifecycleError } from "../errors.js"; +import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; +import { makeEffectRequestResponseFromPubSub, type EffectRequestResponse } from "./runtime.js"; export interface RequestResponseOptions { pubsub: PubSubBackend; @@ -31,24 +33,48 @@ export interface RequestResponse { ) => Promise; } +interface RequestResponseRuntime { + readonly scope: Scope.Closeable; + readonly requestor: EffectRequestResponse; +} + export function makeRequestResponse( options: RequestResponseOptions, ): RequestResponse { - const producer: Producer = makeProducer(options.pubsub, options.requestTopic); - const subscriber: Subscriber = makeSubscriber( - options.pubsub, - options.responseTopic, - options.subscription, - ); + let runtime: RequestResponseRuntime | null = null; return { start: async () => { - await producer.start(); - await subscriber.start(); + if (runtime !== null) return; + + const scope = await Effect.runPromise(Scope.make()); + + try { + const config = await Effect.runPromise(loadMessagingRuntimeConfig()); + const requestor = await Effect.runPromise( + makeEffectRequestResponseFromPubSub( + PubSub.fromBackend(options.pubsub), + config, + { + requestTopic: options.requestTopic, + responseTopic: options.responseTopic, + subscription: options.subscription, + }, + ).pipe(Scope.provide(scope)), + ); + + runtime = { scope, requestor }; + } catch (error) { + await Effect.runPromise(Scope.close(scope, Exit.fail(error))).catch(() => undefined); + throw error; + } }, stop: async () => { - await producer.stop(); - await subscriber.stop(); + const current = runtime; + runtime = null; + if (current === null) return; + + await Effect.runPromise(Scope.close(current.scope, Exit.void)); }, /** * Send a request and wait for responses. @@ -60,35 +86,32 @@ export function makeRequestResponse( * If omitted, returns the first response. */ request: async (request, requestOptions) => { - const id = randomUUID(); + const current = runtime; + if (current === null) { + throw messagingLifecycleError( + `${options.requestTopic}:${options.responseTopic}`, + "request", + "RequestResponse not started", + ); + } + const timeoutMs = requestOptions?.timeoutMs ?? 300_000; const recipient = requestOptions?.recipient; - const queue = subscriber.subscribe(id); - - try { - await producer.send(id, request); - - const deadline = Date.now() + timeoutMs; - - while (true) { - const remaining = deadline - Date.now(); - if (remaining <= 0) { - throw new Error(`Request timed out after ${timeoutMs}ms`); - } - - const response = await queue.pop(remaining); - - if (recipient !== undefined) { - const isFinal = await recipient(response); - if (isFinal) return response; - } else { - return response; - } - } - } finally { - subscriber.unsubscribe(id); - } + return await Effect.runPromise( + current.requestor.request(request, { + timeoutMs, + ...(recipient === undefined + ? {} + : { + recipient: (response) => + Effect.tryPromise({ + try: () => recipient(response), + catch: (error) => messagingDeliveryError(options.responseTopic, "recipient", error), + }), + }), + }), + ); }, }; } diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index 741aaa26..5c85378f 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -424,7 +424,11 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR config: MessagingRuntimeConfig, options: EffectRequestResponseOptions, ) { - const producer = yield* makeEffectProducerFromPubSub(pubsub, { + const producerOptions: CreateProducerOptions = options.requestSchema === undefined + ? { topic: options.requestTopic } + : { topic: options.requestTopic, schema: options.requestSchema }; + const producerBackend = yield* pubsub.createProducer(producerOptions); + const producer = makeEffectProducerHandle(producerBackend, { topic: options.requestTopic, ...(options.requestSchema === undefined ? {} : { schema: options.requestSchema }), }); @@ -435,9 +439,12 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR }; const backend = yield* pubsub.createConsumer(createOptions); const subscribers = new Map>(); - const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkChild); + const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped); + let stopped = false; const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { + if (stopped) return; + stopped = true; yield* Fiber.interrupt(fiber); yield* producer.close; yield* closeConsumerBackend(backend, options.responseTopic, options.subscription);