From 0fb943c0ef2331f1fe5390048de6ce5ea4e6060e Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 06:08:49 -0500 Subject: [PATCH] Isolate concurrent Effect consumers --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 39 ++++++++++-- .../src/__tests__/messaging-runtime.test.ts | 63 +++++++++++++++++++ ts/packages/base/src/messaging/runtime.ts | 34 ++++++---- 3 files changed, 120 insertions(+), 16 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 0d489caa..6fd07d87 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -13,7 +13,7 @@ Verified source roots: - Installed Effect beta used by this workspace: `ts/node_modules/effect` Current signal counts from `ts/packages` after the 2026-06-02 consumer -rate-limit retry slice: +concurrency ownership slice: | Signal | Count | | --- | ---: | @@ -116,6 +116,11 @@ Notes: now retry with `Schedule.spaced` until success or a tagged rate-limit timeout. The `new Error` count dropped by one because a touched consumer test fixture no longer uses a normal `Error`. +- The consumer concurrency ownership slice changed the Effect-native consumer + runtime so `concurrency > 1` allocates one backend consumer per worker instead + of sharing a single `BackendConsumer.receive()` handle. `stop` is now + idempotent through `Ref`, so explicit stop and scoped finalizers do not close + workers twice. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1238,6 +1243,26 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Consumer Concurrency Ownership Slice + +- Status: migrated and root-verified. +- Completed: + - `makeEffectConsumerFromPubSub` now creates one backend consumer per + concurrency worker rather than sharing a single backend consumer across + parallel receive loops. + - Consumer runtime `stop` is idempotent via `Ref.getAndSet`, so explicit + `consumer.stop` and scope finalization do not double-close worker handles. + - Added Effect-native runtime coverage proving `concurrency: 3` creates and + closes three independent backend consumers exactly once. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1273,9 +1298,10 @@ Notes: behavior now stay typed. Remaining NATS work is scoped backend/layer construction and stream/consumer state ownership. - Consumer rate-limit retry timeout behavior is now wired in both legacy and - Effect-native consumer paths. Remaining consumer runtime work should focus - on per-worker backend consumer ownership and request/response pending - shutdown semantics. + Effect-native consumer paths. Effect-native consumer concurrency now owns + one backend consumer per worker. Remaining consumer runtime work should + focus on request/response pending shutdown semantics and the legacy + consumer facade's blocking compatibility shape. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1350,7 +1376,10 @@ Notes: create-on-failure behavior. Future backend slices should move connection/stream state into scoped Effect services. - Treat rate-limit retry timeout semantics as complete; next consumer slices - should focus on concurrency ownership and shutdown, not retry policy. + should focus on shutdown, not retry policy. + - Treat Effect-native per-worker consumer ownership as complete; do not flag + `makeEffectConsumerFromPubSub` concurrency for shared backend receive + handles. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index 21bba204..c280bee4 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -115,6 +115,41 @@ class RuntimeBackend implements PubSubBackend { } } +class ConsumerHandle { + closeCount = 0; +} + +class ConcurrentConsumerBackend implements PubSubBackend { + readonly consumerOptions: Array = []; + readonly consumers: Array = []; + + async createProducer(_options: CreateProducerOptions): Promise> { + return { + send: async () => {}, + flush: async () => {}, + close: async () => {}, + }; + } + + async createConsumer(options: CreateConsumerOptions): Promise> { + const handle = new ConsumerHandle(); + this.consumerOptions.push(options); + this.consumers.push(handle); + + return { + receive: async () => null, + acknowledge: async () => {}, + negativeAcknowledge: async () => {}, + unsubscribe: async () => {}, + close: async () => { + handle.closeCount += 1; + }, + }; + } + + async close(): Promise {} +} + const flowContext: FlowContext = { id: "processor", name: "default", @@ -179,6 +214,34 @@ describe("Effect-native messaging runtime", () => { }), ); + it.effect( + "creates and closes one backend consumer per concurrency worker", + Effect.fnUntraced(function* () { + const backend = new ConcurrentConsumerBackend(); + + yield* Effect.scoped( + Effect.gen(function* () { + const consumer = yield* runEffectConsumerScoped( + { + topic: "tg.test.consumer", + subscription: "sub", + concurrency: 3, + receiveTimeoutMs: 1, + errorBackoffMs: 1, + handler: () => Effect.void, + }, + flowContext, + ); + yield* consumer.stop; + yield* consumer.stop; + }).pipe(Effect.provide(PubSub.layer(backend))), + ); + + expect(backend.consumerOptions).toHaveLength(3); + expect(backend.consumers.map((consumer) => consumer.closeCount)).toEqual([1, 1, 1]); + }), + ); + it.effect( "retries rate-limited Effect handlers until success within the timeout", Effect.fnUntraced(function* () { diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index f5772bcb..8a514fba 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -3,7 +3,7 @@ */ import { randomUUID } from "node:crypto"; -import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Schedule, Scope, Stream } from "effect"; +import { Context, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { @@ -346,20 +346,32 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub ...(options.initialPosition === undefined ? {} : { initialPosition: options.initialPosition }), ...(options.schema === undefined ? {} : { schema: options.schema }), }; - const backend = yield* pubsub.createConsumer(createOptions); const concurrency = Math.max(1, options.concurrency ?? 1); const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index); - const fibers = yield* Effect.forEach(workerIndexes, () => - consumerLoop(backend, options, flow, { - ...config, - rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs, - rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs, - }).pipe(Effect.forkChild), + const workerConfig = { + ...config, + rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs, + rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs, + }; + const workers = yield* Effect.forEach(workerIndexes, () => + Effect.gen(function* () { + const backend = yield* pubsub.createConsumer(createOptions); + const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkChild); + return { backend, fiber }; + }), ); + const stopped = yield* Ref.make(false); const stop = Effect.fn(`Consumer.stop:${options.topic}`)(function* () { - yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true }); - yield* closeConsumerBackend(backend, options.topic, options.subscription); + const alreadyStopped = yield* Ref.getAndSet(stopped, true); + if (alreadyStopped) return; + + yield* Effect.forEach(workers, (worker) => Fiber.interrupt(worker.fiber), { discard: true }); + yield* Effect.forEach( + workers, + (worker) => closeConsumerBackend(worker.backend, options.topic, options.subscription), + { discard: true }, + ); }); yield* Effect.addFinalizer(() => @@ -375,7 +387,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub ); return { - fibers, + fibers: workers.map((worker) => worker.fiber), stop: stop(), } satisfies EffectConsumer; });