From 5c4948cc2e8c3c90521e7d6e53292322d82281e0 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 07:44:16 -0500 Subject: [PATCH] Delegate legacy consumers to Effect runtime --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 101 ++++++--- .../base/src/__tests__/consumer.test.ts | 123 ++++------- ts/packages/base/src/backend/nats.ts | 10 +- ts/packages/base/src/messaging/consumer.ts | 196 ++++++------------ ts/packages/base/src/messaging/producer.ts | 2 +- .../base/src/messaging/request-response.ts | 6 +- ts/packages/base/src/messaging/runtime.ts | 2 +- 7 files changed, 184 insertions(+), 256 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 54070d46..ba1f34e2 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,12 +12,12 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 -request-response stop signal slice: +Current signal counts from `ts/packages` after the 2026-06-02 legacy consumer +facade slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 172 | +| `Effect.runPromise` | 170 | | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | | `Layer.succeed` | 12 | @@ -26,14 +26,14 @@ request-response stop signal slice: | `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | -| `receive(` | 18 | +| `receive(` | 17 | | `while (` | 2 | | `new Error` | 7 | -| `new Promise` | 10 | +| `new Promise` | 9 | | `JSON.parse` | 4 | | `localStorage` | 9 | | `JSON.stringify` | 8 | -| `setTimeout` | 4 | +| `setTimeout` | 3 | | `process.env` | 3 | Notes: @@ -125,6 +125,17 @@ Notes: `makeEffectRequestResponseFromPubSub`. Pending requests now race response waiting against runtime stop and fail promptly with a tagged `MessagingLifecycleError` instead of waiting for timeout. +- The legacy consumer facade slice moved `makeConsumer` onto + `makeEffectConsumerFromPubSub` with a `ManagedRuntime` Promise boundary and a + closeable `Scope`. Consumer workers now use `Effect.forkScoped` so their + lifetime is owned by the caller scope rather than the parent fiber. The + `Effect.runPromise`, `receive(`, `new Promise`, and `setTimeout` counts + dropped because the old blocking facade loop and its test timer shim were + removed. +- A focused broker-backend scout found no remaining P0 broker runtime rewrite + after the producer, NATS, consumer concurrency, rate-limit, and + request-response stop slices. `PubSubBackend` remains an intentional + Promise-returning adapter boundary wrapped by `PubSub`/Effect services. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1293,6 +1304,34 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Legacy Consumer Facade Slice + +- Status: migrated and root-verified. +- Completed: + - `makeConsumer` is now a Promise compatibility facade over + `makeEffectConsumerFromPubSub` instead of owning a separate mutable + backend, `running` flag, retry loop, and direct `BackendConsumer.receive`. + - The facade uses a module `ManagedRuntime`, `Scope.make`, `Scope.provide`, + and `Scope.close` to keep public `start()`/`stop()` Promises at the + boundary while the actual consumer lifetime stays scoped. + - Legacy Promise handlers are adapted with `Effect.tryPromise` and preserve + `TooManyRequestsError` as a typed retry signal. + - `makeEffectConsumerFromPubSub` now forks workers with `Effect.forkScoped`, + so a caller-owned scope keeps workers alive until stop/finalization. + - `consumer.test.ts` no longer encodes `start()` as the blocking consume-loop + join; it waits for observable handler/ack/nak effects and then stops the + scoped consumer. +- Verification: + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/consumer.test.ts` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts src/__tests__/flow-spec-runtime.test.ts` + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1320,19 +1359,18 @@ Notes: - The legacy `messaging/subscriber.ts` async queue/fanout implementation is removed. Use native `effect/PubSub` for future in-process fanout, while keeping `PubSubBackend` for broker-backed messaging. - - The legacy producer facade now delegates to the scoped Effect producer - runtime. Remaining broker P0 work should focus on native backend/NATS - runtime shape and consumer polling, not replacing `PubSubBackend` with - `effect/PubSub`. + - The legacy producer and consumer facades now delegate to scoped Effect + runtime factories. Public `start()`/`send()`/`stop()` Promises remain + compatibility boundaries. - NATS header construction, ack/nak operations, and lookup create-on-missing - behavior now stay typed. Remaining NATS work is scoped backend/layer - construction and stream/consumer state ownership. + behavior now stay typed. `PubSubBackend` remains an intentional broker + adapter contract rather than a direct `effect/PubSub` replacement target. - Consumer rate-limit retry timeout behavior is now wired in both legacy and Effect-native consumer paths. Effect-native consumer concurrency now owns one backend consumer per worker, and request-response pending shutdown now - fails through a tagged lifecycle error. Remaining consumer runtime work - should focus on the legacy consumer facade's blocking compatibility shape - and scoped backend/layer construction. + fails through a tagged lifecycle error. The legacy consumer facade blocking + shape is complete; do not reopen it unless a public API compatibility + issue appears. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1385,19 +1423,29 @@ Notes: ## Ranked Findings -### P0: Broker Backend Effect-Native Runtime +### No-op: Broker Backend Adapter Boundary +- Status: + - Closed as a P0 rewrite item after the 2026-06-02 broker scout and the + legacy consumer facade slice. - TrustGraph evidence: - `ts/packages/base/src/backend/types.ts` - `ts/packages/base/src/backend/nats.ts` + - `ts/packages/base/src/backend/pubsub.ts` - `ts/packages/base/src/messaging/runtime.ts` - - `ts/packages/base/src/processor/flow-processor.ts` - Effect primitives: - `Layer`, `Scope`, `Stream`, `Schedule`, `Queue`, `Effect.acquireRelease`, and `Effect.tryPromise`. -- Rewrite shape: - - Introduce an Effect-native broker service/layer with scoped NATS - acquisition and stream/schedule-based consumer loops. +- Evidence: + - `BackendProducer`, `BackendConsumer`, and `PubSubBackend` are the external + Promise broker adapter contract. `backend/pubsub.ts` wraps that contract in + Effect through `Context.Service`, `Layer`, `Effect.tryPromise`, and scoped + finalizers. + - NATS boundary failures, selective stream/consumer lookup recovery, + producer sends, ack/nak, and close paths are typed with Effect wrappers. + - Producer, consumer, and request-response runtime ownership now live in + scoped Effect factories. +- Rule: - Keep `PubSubBackend` as the compatibility adapter boundary; Effect native `PubSub` remains in-process only. - Treat the producer Promise facade as a completed compatibility wrapper; @@ -1414,9 +1462,8 @@ Notes: handles. - Treat request-response pending shutdown semantics as complete; do not flag `waitForResponse` timeout behavior for stopped runtimes. -- Tests: - - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and - config-push stream tests. + - Treat the legacy consumer facade as a completed compatibility wrapper over + `makeEffectConsumerFromPubSub`; do not flag blocking `start()` semantics. ### P2: Effect AI Provider Adapter Cleanup @@ -1469,9 +1516,8 @@ Notes: ## Recommended PR Order -1. Broker backend Effect-native runtime. -2. Effect AI provider adapter cleanup. -3. MCP parity/deletion decision and workbench platform polish. +1. Effect AI provider adapter cleanup. +2. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules @@ -1510,6 +1556,9 @@ Do not flag these as rewrite blockers without additional proof: `makeEffectRequestResponseFromPubSub`: pending calls race response waiting against a `Deferred` stop signal and fail with tagged `MessagingLifecycleError`. +- Legacy `makeConsumer` facade blocking-loop ownership is complete: + `start()` now initializes scoped Effect consumers and returns after startup, + while `stop()` closes the native consumer scope. - `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket compatibility bridge. Do not flag its internal connection maps/sets as a standalone replacement target until the gateway is ready to move onto Effect diff --git a/ts/packages/base/src/__tests__/consumer.test.ts b/ts/packages/base/src/__tests__/consumer.test.ts index f552a9a5..3e69c5e9 100644 --- a/ts/packages/base/src/__tests__/consumer.test.ts +++ b/ts/packages/base/src/__tests__/consumer.test.ts @@ -56,6 +56,16 @@ function createFlowContext(): FlowContext { }; } +async function advanceUntil( + predicate: () => boolean, + totalMs = 1_000, + stepMs = 10, +): Promise { + for (let elapsed = 0; elapsed < totalMs && !predicate(); elapsed += stepMs) { + await vi.advanceTimersByTimeAsync(stepMs); + } +} + describe("Consumer", () => { let backendConsumer: ReturnType; let pubsub: PubSubBackend; @@ -106,20 +116,10 @@ describe("Consumer", () => { }); // ── start() creates consumer and calls handler ───────────────── - it("creates a backend consumer and invokes handler for received messages", async () => { + it("starts a scoped consumer and invokes handler for received messages", async () => { const handler = vi.fn().mockResolvedValue(undefined); const msg = createMockMessage({ data: "hello" }, { id: "1" }); - // First call returns a message, second call triggers stop - let callCount = 0; - backendConsumer.receive.mockImplementation(async () => { - callCount++; - if (callCount === 1) return msg; - // Stop the consumer on second receive - await consumer.stop(); - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "topic-a", @@ -127,8 +127,11 @@ describe("Consumer", () => { handler, }); - // start() blocks until the consume loop ends, so we don't need to await separately + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); + await consumer.start(flowCtx); + await advanceUntil(() => handler.mock.calls.length > 0); + await consumer.stop(); expect(pubsub.createConsumer).toHaveBeenCalledWith({ topic: "topic-a", @@ -143,14 +146,6 @@ describe("Consumer", () => { const handler = vi.fn().mockResolvedValue(undefined); const msg = createMockMessage("payload"); - let callCount = 0; - backendConsumer.receive.mockImplementation(async () => { - callCount++; - if (callCount === 1) return msg; - await consumer.stop(); - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "t", @@ -158,7 +153,11 @@ describe("Consumer", () => { handler, }); + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); + await consumer.start(flowCtx); + await advanceUntil(() => backendConsumer.acknowledge.mock.calls.length > 0); + await consumer.stop(); expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg); expect(backendConsumer.negativeAcknowledge).not.toHaveBeenCalled(); @@ -169,15 +168,6 @@ describe("Consumer", () => { const handler = vi.fn().mockRejectedValue("handler boom"); const msg = createMockMessage("bad-payload"); - let callCount = 0; - backendConsumer.receive.mockImplementation(async () => { - callCount++; - if (callCount === 1) return msg; - // Stop on second call (after the 1s sleep from error handling) - await consumer.stop(); - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "t", @@ -185,19 +175,14 @@ describe("Consumer", () => { handler, }); - // Suppress console.error noise - const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); - // start() will block; the error path sleeps 1s, so we need to advance timers - const startPromise = consumer.start(flowCtx); - // Advance past the 1s sleep in the error handler - await vi.advanceTimersByTimeAsync(1500); - await startPromise; + await consumer.start(flowCtx); + await advanceUntil(() => backendConsumer.negativeAcknowledge.mock.calls.length > 0); + await consumer.stop(); expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg); expect(backendConsumer.acknowledge).not.toHaveBeenCalled(); - - errorSpy.mockRestore(); }); // ── TooManyRequestsError triggers retry ──────────────────────── @@ -213,14 +198,6 @@ describe("Consumer", () => { const msg = createMockMessage("rate-limited-payload"); - let receiveCount = 0; - backendConsumer.receive.mockImplementation(async () => { - receiveCount++; - if (receiveCount === 1) return msg; - await consumer.stop(); - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "t", @@ -229,12 +206,14 @@ describe("Consumer", () => { rateLimitRetryMs: 500, }); + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); - const startPromise = consumer.start(flowCtx); - // Advance past the rate-limit retry delay (500ms) + await consumer.start(flowCtx); await vi.advanceTimersByTimeAsync(600); - await startPromise; + await advanceUntil(() => handler.mock.calls.length >= 2); + await consumer.stop(); // Handler called twice: first throws TooManyRequestsError, second succeeds expect(handler).toHaveBeenCalledTimes(2); @@ -255,14 +234,6 @@ describe("Consumer", () => { const msg = createMockMessage("rate-limited-payload"); - let receiveCount = 0; - backendConsumer.receive.mockImplementation(async () => { - receiveCount++; - if (receiveCount === 1) return msg; - await consumer.stop(); - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "t", @@ -272,9 +243,12 @@ describe("Consumer", () => { rateLimitTimeoutMs: 2_000, }); - const startPromise = consumer.start(flowCtx); + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); + + await consumer.start(flowCtx); await vi.advanceTimersByTimeAsync(1_100); - await startPromise; + await advanceUntil(() => backendConsumer.acknowledge.mock.calls.length > 0); + await consumer.stop(); expect(handler).toHaveBeenCalledTimes(3); expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg); @@ -287,13 +261,6 @@ describe("Consumer", () => { }); const msg = createMockMessage("rate-limited-payload"); - let receiveCount = 0; - backendConsumer.receive.mockImplementation(async () => { - receiveCount++; - if (receiveCount === 1) return msg; - return null; - }); - const consumer = makeConsumer({ pubsub, topic: "t", @@ -303,11 +270,12 @@ describe("Consumer", () => { rateLimitTimeoutMs: 1_000, }); - const startPromise = consumer.start(flowCtx); + backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null); + + await consumer.start(flowCtx); await vi.advanceTimersByTimeAsync(1_100); + await advanceUntil(() => backendConsumer.negativeAcknowledge.mock.calls.length > 0); await consumer.stop(); - await vi.advanceTimersByTimeAsync(1_100); - await startPromise; expect(handler).toHaveBeenCalledTimes(2); expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg); @@ -316,12 +284,7 @@ describe("Consumer", () => { // ── stop() closes the backend ────────────────────────────────── it("stop() sets running=false and closes the backend", async () => { - // Make receive block forever (returns null) until stopped - backendConsumer.receive.mockImplementation(async () => { - // Yield control so stop() can run - await new Promise((r) => setTimeout(r, 100)); - return null; - }); + backendConsumer.receive.mockResolvedValue(null); const consumer = makeConsumer({ pubsub, @@ -330,17 +293,9 @@ describe("Consumer", () => { handler: vi.fn(), }); - const startPromise = consumer.start(flowCtx); - - // Advance timers to let the consume loop iterate once - await vi.advanceTimersByTimeAsync(200); - + await consumer.start(flowCtx); await consumer.stop(); - // Advance timers further so the loop can exit - await vi.advanceTimersByTimeAsync(200); - await startPromise; - expect(backendConsumer.close).toHaveBeenCalled(); await expect(consumer.stop()).resolves.toBeUndefined(); }); diff --git a/ts/packages/base/src/backend/nats.ts b/ts/packages/base/src/backend/nats.ts index 857928fc..ffc8dee6 100644 --- a/ts/packages/base/src/backend/nats.ts +++ b/ts/packages/base/src/backend/nats.ts @@ -24,7 +24,7 @@ import { DeliverPolicy, } from "nats"; import { Effect } from "effect"; -import * as Predicate from "effect/Predicate"; +import * as P from "effect/Predicate"; import * as S from "effect/Schema"; import type { @@ -64,7 +64,7 @@ function makeNatsMessage(msg: JsMsg, decoded: T): NatsMessage { }; } -const hasJsMsg = Predicate.hasProperty("_jsMsg"); +const hasJsMsg = P.hasProperty("_jsMsg"); class NatsLookupError extends S.TaggedErrorClass()( "NatsLookupError", @@ -79,9 +79,9 @@ function natsLookupError(operation: string, cause: unknown): NatsLookupError { } function isAckableJsMsg(value: unknown): value is Pick { - if (!Predicate.isObject(value)) return false; - if (!Predicate.hasProperty(value, "ack")) return false; - if (!Predicate.hasProperty(value, "nak")) return false; + if (!P.isObject(value)) return false; + if (!P.hasProperty(value, "ack")) return false; + if (!P.hasProperty(value, "nak")) return false; return typeof value.ack === "function" && typeof value.nak === "function"; } diff --git a/ts/packages/base/src/messaging/consumer.ts b/ts/packages/base/src/messaging/consumer.ts index 2dac3f37..a105b314 100644 --- a/ts/packages/base/src/messaging/consumer.ts +++ b/ts/packages/base/src/messaging/consumer.ts @@ -4,18 +4,20 @@ * Python reference: trustgraph-base/trustgraph/base/consumer.py */ -import type { BackendConsumer, Message, PubSubBackend } from "../backend/types.js"; +import type { PubSubBackend } from "../backend/types.js"; +import { PubSub } from "../backend/pubsub.js"; import type { Flow } from "../processor/flow.js"; import { MessagingHandlerError, TooManyRequestsError, - messagingDeliveryError, messagingHandlerError, messagingLifecycleError, - messagingTimeoutError, } from "../errors.js"; -import { Duration, Effect, Schedule } from "effect"; +import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import * as P from "effect/Predicate"; import * as S from "effect/Schema"; +import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; +import { makeEffectConsumerFromPubSub, type EffectConsumer } from "./runtime.js"; export type MessageHandler = ( message: T, @@ -49,13 +51,16 @@ export interface Consumer { readonly stop: () => Promise; } +interface ConsumerRuntime { + readonly scope: Scope.Closeable; + readonly consumer: EffectConsumer; +} + +const consumerRuntime = ManagedRuntime.make(Layer.empty); + export function makeConsumer(options: ConsumerOptions): Consumer { - let backend: BackendConsumer | null = null; - let running = false; + let runtime: ConsumerRuntime | null = null; const isTooManyRequestsError = S.is(TooManyRequestsError); - const concurrency = options.concurrency ?? 1; - const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000; - const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? 7_200_000; const runHandler = ( message: T, @@ -70,135 +75,54 @@ export function makeConsumer(options: ConsumerOptions): Consumer { : messagingHandlerError(options.topic, options.subscription, error), }); - const handleWithRetry = Effect.fn("Consumer.handleWithRetry")(function* ( - message: Message, - flow: FlowContext, - ) { - const callHandler = runHandler(message.value(), message.properties(), flow); - yield* callHandler.pipe( - Effect.tapError((error) => - isTooManyRequestsError(error) - ? Effect.logWarning("[Consumer] Rate limited, retrying", { - topic: options.topic, - subscription: options.subscription, - retryMs: rateLimitRetryMs, - }) - : Effect.void, - ), - Effect.retry({ - schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)), - while: isTooManyRequestsError, - }), - Effect.timeoutOrElse({ - duration: Duration.millis(rateLimitTimeoutMs), - orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)), - }), - Effect.mapError((error) => - isTooManyRequestsError(error) - ? messagingHandlerError(options.topic, options.subscription, error) - : error, - ), - ); - }); - - const consumeOnce = Effect.fn("Consumer.consumeOnce")(function* (flow: FlowContext) { - const currentBackend = backend; - if (currentBackend === null) { - return yield* messagingLifecycleError( - `${options.topic}:${options.subscription}`, - "receive", - "Consumer backend not started", - ); - } - - const message = yield* Effect.tryPromise({ - try: () => currentBackend.receive(2000), - catch: (error) => messagingDeliveryError(options.topic, "receive", error), - }); - if (message === null) return; - - yield* handleWithRetry(message, flow).pipe( - Effect.flatMap(() => - Effect.tryPromise({ - try: () => currentBackend.acknowledge(message), - catch: (error) => messagingDeliveryError(options.topic, "acknowledge", error), - }), - ), - Effect.catch((error) => - Effect.tryPromise({ - try: () => currentBackend.negativeAcknowledge(message), - catch: (nakError) => messagingDeliveryError(options.topic, "negative-acknowledge", nakError), - }).pipe( - Effect.catch((nakError) => - Effect.logError("[Consumer] Failed to negative-acknowledge message", { - error: nakError.message, - topic: nakError.topic, - }), - ), - Effect.flatMap(() => Effect.fail(error)), - ), - ), - ); - }); - - const consumeLoop = Effect.fn("Consumer.consumeLoop")(function* (flow: FlowContext) { - yield* Effect.whileLoop({ - while: () => running, - body: () => - consumeOnce(flow).pipe( - Effect.catch((error) => { - if (!running) return Effect.void; - return Effect.logError("[Consumer] Error in consume loop", { - error: error.message, - topic: options.topic, - subscription: options.subscription, - }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - ); - }), - ), - step: () => undefined, - }); - }); - return { start: (flow) => - Effect.runPromise( - Effect.gen(function* () { - backend = yield* Effect.tryPromise({ - try: () => - options.pubsub.createConsumer({ - topic: options.topic, - subscription: options.subscription, - initialPosition: options.initialPosition ?? "latest", - }), - catch: (error) => - messagingLifecycleError(`${options.topic}:${options.subscription}`, "create-consumer", error), - }); + P.isNotNull(runtime) + ? Promise.resolve() + : consumerRuntime.runPromise( + Effect.gen(function* () { + const scope = yield* Scope.make(); + const startConsumer = Effect.gen(function* () { + const config = yield* loadMessagingRuntimeConfig(); + const consumer = yield* makeEffectConsumerFromPubSub( + PubSub.fromBackend(options.pubsub), + config, + { + topic: options.topic, + subscription: options.subscription, + handler: runHandler, + ...(options.concurrency === undefined ? {} : { concurrency: options.concurrency }), + initialPosition: options.initialPosition ?? "latest", + ...(options.rateLimitRetryMs === undefined ? {} : { rateLimitRetryMs: options.rateLimitRetryMs }), + ...(options.rateLimitTimeoutMs === undefined + ? {} + : { rateLimitTimeoutMs: options.rateLimitTimeoutMs }), + }, + flow, + ).pipe( + Scope.provide(scope), + Effect.mapError((error) => + messagingLifecycleError(`${options.topic}:${options.subscription}`, "create-consumer", error) + ), + ); + runtime = { scope, consumer }; + }); - running = true; - - const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index); - yield* Effect.forEach(workerIndexes, () => consumeLoop(flow), { - concurrency: "unbounded", - discard: true, - }); - }), - ), - stop: () => - Effect.runPromise( - Effect.gen(function* () { - running = false; - const currentBackend = backend; - backend = null; - if (currentBackend !== null) { - yield* Effect.tryPromise({ - try: () => currentBackend.close(), - catch: (error) => - messagingLifecycleError(`${options.topic}:${options.subscription}`, "close-consumer", error), - }); - } - }), - ), + yield* startConsumer.pipe( + Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))), + ); + }), + ), + stop: () => { + const current = runtime; + runtime = null; + return current === null + ? Promise.resolve() + : consumerRuntime.runPromise( + current.consumer.stop.pipe( + Effect.ensuring(Scope.close(current.scope, Exit.void)), + ), + ); + }, }; } diff --git a/ts/packages/base/src/messaging/producer.ts b/ts/packages/base/src/messaging/producer.ts index 8e355924..3824d71a 100644 --- a/ts/packages/base/src/messaging/producer.ts +++ b/ts/packages/base/src/messaging/producer.ts @@ -5,7 +5,7 @@ */ import type { PubSubBackend } from "../backend/types.js"; -import type { ProducerMetrics } from "../metrics/prometheus.js"; +import type { ProducerMetrics } from "../metrics/index.ts"; import { Effect, Exit, Scope } from "effect"; import { PubSub } from "../backend/pubsub.js"; import { makeEffectProducerFromPubSub, type EffectProducer } from "./runtime.js"; diff --git a/ts/packages/base/src/messaging/request-response.ts b/ts/packages/base/src/messaging/request-response.ts index 0c263716..fdc38720 100644 --- a/ts/packages/base/src/messaging/request-response.ts +++ b/ts/packages/base/src/messaging/request-response.ts @@ -11,7 +11,7 @@ import { Effect, Exit, Scope } from "effect"; import type { PubSubBackend } from "../backend/types.js"; import { PubSub } from "../backend/pubsub.js"; import { messagingDeliveryError, messagingLifecycleError } from "../errors.js"; -import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; +import { loadMessagingRuntimeConfig } from "../runtime/index.ts"; import { makeEffectRequestResponseFromPubSub, type EffectRequestResponse } from "./runtime.js"; export interface RequestResponseOptions { @@ -85,8 +85,8 @@ export function makeRequestResponse( * Send a request and wait for responses. * * @param request - The request payload - * @param options.timeoutMs - Total timeout in milliseconds (default: 300s) - * @param options.recipient - Optional callback for streaming responses. + * @param requestOptions.timeoutMs - Total timeout in milliseconds (default: 300s) + * @param requestOptions.recipient - Optional callback for streaming responses. * Return `true` to indicate the final response has been received. * If omitted, returns the first response. */ diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index f1e5e10f..8cf71959 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -356,7 +356,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub const workers = yield* Effect.forEach(workerIndexes, () => Effect.gen(function* () { const backend = yield* pubsub.createConsumer(createOptions); - const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkChild); + const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkScoped); return { backend, fiber }; }), );