Use Duration for messaging runtime config

This commit is contained in:
elpresidank 2026-06-02 09:16:33 -05:00
parent 09d34fb4d4
commit 71edff47ed
5 changed files with 164 additions and 62 deletions

View file

@ -52,6 +52,12 @@ import {
const isTooManyRequestsError = S.is(TooManyRequestsError);
const durationFromMsOption = (
value: number | undefined,
fallback: Duration.Duration,
): Duration.Duration =>
value === undefined ? fallback : Duration.millis(value);
export type EffectMessageHandler<T, E = never, R = never> = (
message: T,
properties: Record<string, string>,
@ -254,8 +260,10 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* <T,
message: Message<T>,
config: MessagingRuntimeConfig,
) {
const rateLimitRetryMs = options.rateLimitRetryMs ?? config.rateLimitRetryMs;
const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs;
const rateLimitRetry = durationFromMsOption(options.rateLimitRetryMs, config.rateLimitRetry);
const rateLimitTimeout = durationFromMsOption(options.rateLimitTimeoutMs, config.rateLimitTimeout);
const rateLimitRetryMs = Duration.toMillis(rateLimitRetry);
const rateLimitTimeoutMs = Duration.toMillis(rateLimitTimeout);
const runHandler = (): Effect.Effect<void, TooManyRequestsError | MessagingHandlerError, R> =>
options.handler(message.value(), message.properties(), flow).pipe(
Effect.mapError((error): TooManyRequestsError | MessagingHandlerError =>
@ -276,11 +284,11 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* <T,
: Effect.void,
),
Effect.retry({
schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)),
schedule: Schedule.spaced(rateLimitRetry),
while: isTooManyRequestsError,
}),
Effect.timeoutOrElse({
duration: Duration.millis(rateLimitTimeoutMs),
duration: rateLimitTimeout,
orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)),
}),
Effect.mapError((error) =>
@ -325,14 +333,18 @@ const consumerLoop = <T, E, R>(
options: EffectConsumerOptions<T, E, R>,
flow: FlowContext<R>,
config: MessagingRuntimeConfig,
): Effect.Effect<void, never, R> =>
Effect.whileLoop({
): Effect.Effect<void, never, R> => {
const receiveTimeout = durationFromMsOption(options.receiveTimeoutMs, config.consumerReceiveTimeout);
const receiveTimeoutMs = Duration.toMillis(receiveTimeout);
const errorBackoff = durationFromMsOption(options.errorBackoffMs, config.consumerErrorBackoff);
return Effect.whileLoop({
while: () => true,
body: () =>
receiveMessage(backend, options.topic, options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs).pipe(
receiveMessage(backend, options.topic, receiveTimeoutMs).pipe(
Effect.flatMap((message) =>
message === null
? Effect.sleep(Duration.millis(options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs))
? Effect.sleep(receiveTimeout)
: processConsumerMessage(backend, options, flow, message, config),
),
Effect.catch((error) =>
@ -342,13 +354,14 @@ const consumerLoop = <T, E, R>(
subscription: options.subscription,
}).pipe(
Effect.flatMap(() =>
Effect.sleep(Duration.millis(options.errorBackoffMs ?? config.consumerErrorBackoffMs)),
Effect.sleep(errorBackoff),
),
),
),
),
step: () => undefined,
});
};
export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPubSub")(function* <T, E, R>(
pubsub: PubSubService,
@ -364,15 +377,10 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
};
const concurrency = Math.max(1, options.concurrency ?? 1);
const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index);
const workerConfig = {
...config,
rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs,
rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs,
};
const workers = yield* Effect.forEach(workerIndexes, () =>
Effect.gen(function* () {
const backend = yield* pubsub.createConsumer<T>(createOptions);
const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkScoped);
const fiber = yield* consumerLoop(backend, options, flow, config).pipe(Effect.forkScoped);
return { backend, fiber };
}),
);
@ -417,10 +425,10 @@ const dispatchResponseLoop = <T>(
Effect.whileLoop({
while: () => true,
body: () =>
receiveMessage(backend, responseTopic, config.consumerReceiveTimeoutMs).pipe(
receiveMessage(backend, responseTopic, Duration.toMillis(config.consumerReceiveTimeout)).pipe(
Effect.flatMap((message) => {
if (message === null) {
return Effect.sleep(Duration.millis(config.consumerReceiveTimeoutMs));
return Effect.sleep(config.consumerReceiveTimeout);
}
const id = message.properties().id;
@ -438,7 +446,7 @@ const dispatchResponseLoop = <T>(
Effect.logError("[RequestResponse] Response dispatch failed", {
error: error.message,
topic: responseTopic,
}).pipe(Effect.flatMap(() => Effect.sleep(Duration.millis(config.consumerErrorBackoffMs)))),
}).pipe(Effect.flatMap(() => Effect.sleep(config.consumerErrorBackoff))),
),
),
step: () => undefined,
@ -532,7 +540,8 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
requestOptions?: EffectRequestOptions<TRes, E, R>,
) => {
const id = randomUUID();
const timeoutMs = requestOptions?.timeoutMs ?? config.requestTimeoutMs;
const timeout = durationFromMsOption(requestOptions?.timeoutMs, config.requestTimeout);
const timeoutMs = Duration.toMillis(timeout);
return Effect.scoped(
Effect.gen(function* () {
@ -540,7 +549,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
yield* producer.send(id, request);
const result = yield* waitForResponse(subscription, id, requestOptions).pipe(
Effect.raceFirst(Deferred.await(stoppedSignal)),
Effect.timeoutOption(Duration.millis(timeoutMs)),
Effect.timeoutOption(timeout),
);
return yield* O.match(result, {
onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)),