Isolate concurrent Effect consumers

This commit is contained in:
elpresidank 2026-06-02 06:08:49 -05:00
parent eaa7921314
commit 0fb943c0ef
3 changed files with 120 additions and 16 deletions

View file

@ -115,6 +115,41 @@ class RuntimeBackend implements PubSubBackend {
}
}
class ConsumerHandle {
closeCount = 0;
}
class ConcurrentConsumerBackend implements PubSubBackend {
readonly consumerOptions: Array<CreateConsumerOptions> = [];
readonly consumers: Array<ConsumerHandle> = [];
async createProducer<T>(_options: CreateProducerOptions<T>): Promise<BackendProducer<T>> {
return {
send: async () => {},
flush: async () => {},
close: async () => {},
};
}
async createConsumer<T>(options: CreateConsumerOptions<T>): Promise<BackendConsumer<T>> {
const handle = new ConsumerHandle();
this.consumerOptions.push(options);
this.consumers.push(handle);
return {
receive: async () => null,
acknowledge: async () => {},
negativeAcknowledge: async () => {},
unsubscribe: async () => {},
close: async () => {
handle.closeCount += 1;
},
};
}
async close(): Promise<void> {}
}
const flowContext: FlowContext = {
id: "processor",
name: "default",
@ -179,6 +214,34 @@ describe("Effect-native messaging runtime", () => {
}),
);
it.effect(
"creates and closes one backend consumer per concurrency worker",
Effect.fnUntraced(function* () {
const backend = new ConcurrentConsumerBackend();
yield* Effect.scoped(
Effect.gen(function* () {
const consumer = yield* runEffectConsumerScoped<string>(
{
topic: "tg.test.consumer",
subscription: "sub",
concurrency: 3,
receiveTimeoutMs: 1,
errorBackoffMs: 1,
handler: () => Effect.void,
},
flowContext,
);
yield* consumer.stop;
yield* consumer.stop;
}).pipe(Effect.provide(PubSub.layer(backend))),
);
expect(backend.consumerOptions).toHaveLength(3);
expect(backend.consumers.map((consumer) => consumer.closeCount)).toEqual([1, 1, 1]);
}),
);
it.effect(
"retries rate-limited Effect handlers until success within the timeout",
Effect.fnUntraced(function* () {

View file

@ -3,7 +3,7 @@
*/
import { randomUUID } from "node:crypto";
import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Schedule, Scope, Stream } from "effect";
import { Context, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import type {
@ -346,20 +346,32 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
...(options.initialPosition === undefined ? {} : { initialPosition: options.initialPosition }),
...(options.schema === undefined ? {} : { schema: options.schema }),
};
const backend = yield* pubsub.createConsumer<T>(createOptions);
const concurrency = Math.max(1, options.concurrency ?? 1);
const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index);
const fibers = yield* Effect.forEach(workerIndexes, () =>
consumerLoop(backend, options, flow, {
...config,
rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs,
rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs,
}).pipe(Effect.forkChild),
const workerConfig = {
...config,
rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs,
rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs,
};
const workers = yield* Effect.forEach(workerIndexes, () =>
Effect.gen(function* () {
const backend = yield* pubsub.createConsumer<T>(createOptions);
const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkChild);
return { backend, fiber };
}),
);
const stopped = yield* Ref.make(false);
const stop = Effect.fn(`Consumer.stop:${options.topic}`)(function* () {
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true });
yield* closeConsumerBackend(backend, options.topic, options.subscription);
const alreadyStopped = yield* Ref.getAndSet(stopped, true);
if (alreadyStopped) return;
yield* Effect.forEach(workers, (worker) => Fiber.interrupt(worker.fiber), { discard: true });
yield* Effect.forEach(
workers,
(worker) => closeConsumerBackend(worker.backend, options.topic, options.subscription),
{ discard: true },
);
});
yield* Effect.addFinalizer(() =>
@ -375,7 +387,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
);
return {
fibers,
fibers: workers.map((worker) => worker.fiber),
stop: stop(),
} satisfies EffectConsumer;
});