From eaa792131482748f7241f811b6fa8d47bf37f21e Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 06:03:36 -0500 Subject: [PATCH] Honor consumer rate limit timeouts --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 42 +++++++++- .../base/src/__tests__/consumer.test.ts | 73 ++++++++++++++++- .../src/__tests__/messaging-runtime.test.ts | 80 +++++++++++++++++++ ts/packages/base/src/messaging/consumer.ts | 34 +++++--- ts/packages/base/src/messaging/runtime.ts | 51 +++++++----- .../base/src/runtime/messaging-config.ts | 6 ++ 6 files changed, 254 insertions(+), 32 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index b8b082a7..0d489caa 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 NATS selective -404 slice: +Current signal counts from `ts/packages` after the 2026-06-02 consumer +rate-limit retry slice: | Signal | Count | | --- | ---: | @@ -28,7 +28,7 @@ Current signal counts from `ts/packages` after the 2026-06-02 NATS selective | `makeAsyncProcessor` | 19 | | `receive(` | 18 | | `while (` | 2 | -| `new Error` | 8 | +| `new Error` | 7 | | `new Promise` | 10 | | `JSON.parse` | 4 | | `localStorage` | 9 | @@ -110,6 +110,12 @@ Notes: `Effect.catchIf` recovery only for NATS JetStream missing-resource errors. Non-missing lookup failures now stay on the typed failure path without attempting to create streams or durable consumers. +- The consumer rate-limit retry slice wired the previously unused + `rateLimitTimeoutMs` option and `TG_RATE_LIMIT_TIMEOUT_MS` config into both + legacy and Effect-native consumers. Repeated `TooManyRequestsError` failures + now retry with `Schedule.spaced` until success or a tagged rate-limit timeout. + The `new Error` count dropped by one because a touched consumer test fixture + no longer uses a normal `Error`. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1208,6 +1214,30 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Consumer Rate-Limit Retry Slice + +- Status: migrated and root-verified. +- Completed: + - Added `rateLimitTimeoutMs` to the Effect-native messaging runtime config, + backed by `TG_RATE_LIMIT_TIMEOUT_MS` and the Python-compatible default of + `7_200_000ms`. + - Reworked legacy `makeConsumer` retry handling to use `Schedule.spaced`, + retry repeated `TooManyRequestsError`s, and fail with a tagged + `MessagingTimeoutError` when the rate-limit timeout elapses. + - Reworked `makeEffectConsumerFromPubSub` handler retry handling with the + same schedule/timeout behavior while keeping handler failures in typed + Effect error channels. + - Added legacy and Effect-native tests for repeated rate-limit retry until + success and negative acknowledgement after retry timeout. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/consumer.test.ts src/__tests__/messaging-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1242,6 +1272,10 @@ Notes: - NATS header construction, ack/nak operations, and lookup create-on-missing behavior now stay typed. Remaining NATS work is scoped backend/layer construction and stream/consumer state ownership. + - Consumer rate-limit retry timeout behavior is now wired in both legacy and + Effect-native consumer paths. Remaining consumer runtime work should focus + on per-worker backend consumer ownership and request/response pending + shutdown semantics. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1315,6 +1349,8 @@ Notes: - Keep NATS SDK boundary failures typed and avoid catch-all create-on-failure behavior. Future backend slices should move connection/stream state into scoped Effect services. + - Treat rate-limit retry timeout semantics as complete; next consumer slices + should focus on concurrency ownership and shutdown, not retry policy. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. diff --git a/ts/packages/base/src/__tests__/consumer.test.ts b/ts/packages/base/src/__tests__/consumer.test.ts index e8270d41..f552a9a5 100644 --- a/ts/packages/base/src/__tests__/consumer.test.ts +++ b/ts/packages/base/src/__tests__/consumer.test.ts @@ -96,6 +96,7 @@ describe("Consumer", () => { handler: vi.fn(), concurrency: 4, rateLimitRetryMs: 5_000, + rateLimitTimeoutMs: 10_000, }); expect(consumer).toMatchObject({ @@ -165,7 +166,7 @@ describe("Consumer", () => { // ── Messages are negatively acknowledged on handler error ────── it("negatively acknowledges messages when the handler throws", async () => { - const handler = vi.fn().mockRejectedValue(new Error("handler boom")); + const handler = vi.fn().mockRejectedValue("handler boom"); const msg = createMockMessage("bad-payload"); let callCount = 0; @@ -243,6 +244,76 @@ describe("Consumer", () => { warnSpy.mockRestore(); }); + it("retries repeated TooManyRequestsError until success within the timeout", async () => { + let handlerCalls = 0; + const handler = vi.fn().mockImplementation(async () => { + handlerCalls++; + if (handlerCalls <= 2) { + throw tooManyRequestsError("rate limited"); + } + }); + + const msg = createMockMessage("rate-limited-payload"); + + let receiveCount = 0; + backendConsumer.receive.mockImplementation(async () => { + receiveCount++; + if (receiveCount === 1) return msg; + await consumer.stop(); + return null; + }); + + const consumer = makeConsumer({ + pubsub, + topic: "t", + subscription: "s", + handler, + rateLimitRetryMs: 500, + rateLimitTimeoutMs: 2_000, + }); + + const startPromise = consumer.start(flowCtx); + await vi.advanceTimersByTimeAsync(1_100); + await startPromise; + + expect(handler).toHaveBeenCalledTimes(3); + expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg); + expect(backendConsumer.negativeAcknowledge).not.toHaveBeenCalled(); + }); + + it("negatively acknowledges when rate-limit retry timeout elapses", async () => { + const handler = vi.fn().mockImplementation(async () => { + throw tooManyRequestsError("rate limited"); + }); + const msg = createMockMessage("rate-limited-payload"); + + let receiveCount = 0; + backendConsumer.receive.mockImplementation(async () => { + receiveCount++; + if (receiveCount === 1) return msg; + return null; + }); + + const consumer = makeConsumer({ + pubsub, + topic: "t", + subscription: "s", + handler, + rateLimitRetryMs: 500, + rateLimitTimeoutMs: 1_000, + }); + + const startPromise = consumer.start(flowCtx); + await vi.advanceTimersByTimeAsync(1_100); + await consumer.stop(); + await vi.advanceTimersByTimeAsync(1_100); + await startPromise; + + expect(handler).toHaveBeenCalledTimes(2); + expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg); + expect(backendConsumer.acknowledge).not.toHaveBeenCalled(); + }); + // ── stop() closes the backend ────────────────────────────────── it("stop() sets running=false and closes the backend", async () => { // Make receive block forever (returns null) until stopped diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index 2868e8f3..21bba204 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -10,6 +10,7 @@ import { runEffectConsumerScoped, runEffectProducerScoped, runFlowScoped, + tooManyRequestsError, type BackendConsumer, type BackendProducer, type CreateConsumerOptions, @@ -178,6 +179,85 @@ describe("Effect-native messaging runtime", () => { }), ); + it.effect( + "retries rate-limited Effect handlers until success within the timeout", + Effect.fnUntraced(function* () { + const message = createMessage("payload", { id: "request-1" }); + const consumer = new ScriptedConsumer([message]); + const backend = new RuntimeBackend(consumer as BackendConsumer); + let attempts = 0; + + yield* Effect.scoped( + Effect.gen(function* () { + yield* runEffectConsumerScoped( + { + topic: "tg.test.consumer", + subscription: "sub", + receiveTimeoutMs: 1, + errorBackoffMs: 1, + rateLimitRetryMs: 10, + rateLimitTimeoutMs: 100, + handler: () => + Effect.sync(() => { + attempts += 1; + return attempts; + }).pipe( + Effect.flatMap((attempt) => + attempt <= 2 + ? Effect.fail(tooManyRequestsError("rate limited")) + : Effect.void + ), + ), + }, + flowContext, + ); + yield* TestClock.adjust(Duration.millis(35)); + }).pipe(Effect.provide(PubSub.layer(backend))), + ); + + expect(attempts).toBe(3); + expect(consumer.acknowledged).toEqual([message]); + expect(consumer.nacked).toEqual([]); + }), + ); + + it.effect( + "negatively acknowledges rate-limited Effect handlers after retry timeout", + Effect.fnUntraced(function* () { + const message = createMessage("payload", { id: "request-1" }); + const consumer = new ScriptedConsumer([message]); + const backend = new RuntimeBackend(consumer as BackendConsumer); + let attempts = 0; + + yield* Effect.scoped( + Effect.gen(function* () { + yield* runEffectConsumerScoped( + { + topic: "tg.test.consumer", + subscription: "sub", + receiveTimeoutMs: 1, + errorBackoffMs: 1, + rateLimitRetryMs: 10, + rateLimitTimeoutMs: 25, + handler: () => + Effect.sync(() => { + attempts += 1; + }).pipe( + Effect.flatMap(() => Effect.fail(tooManyRequestsError("rate limited"))), + ), + }, + flowContext, + ); + yield* TestClock.adjust(Duration.millis(40)); + }).pipe(Effect.provide(PubSub.layer(backend))), + ); + + expect(attempts).toBeGreaterThanOrEqual(2); + expect(consumer.acknowledged).toEqual([]); + expect(consumer.nacked).toEqual([message]); + }), + ); + it.effect( "routes request-response replies through an Effect queue", Effect.fnUntraced(function* () { diff --git a/ts/packages/base/src/messaging/consumer.ts b/ts/packages/base/src/messaging/consumer.ts index 365fd00f..2dac3f37 100644 --- a/ts/packages/base/src/messaging/consumer.ts +++ b/ts/packages/base/src/messaging/consumer.ts @@ -12,8 +12,9 @@ import { messagingDeliveryError, messagingHandlerError, messagingLifecycleError, + messagingTimeoutError, } from "../errors.js"; -import { Duration, Effect } from "effect"; +import { Duration, Effect, Schedule } from "effect"; import * as S from "effect/Schema"; export type MessageHandler = ( @@ -54,6 +55,7 @@ export function makeConsumer(options: ConsumerOptions): Consumer { const isTooManyRequestsError = S.is(TooManyRequestsError); const concurrency = options.concurrency ?? 1; const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000; + const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? 7_200_000; const runHandler = ( message: T, @@ -74,15 +76,27 @@ export function makeConsumer(options: ConsumerOptions): Consumer { ) { const callHandler = runHandler(message.value(), message.properties(), flow); yield* callHandler.pipe( - Effect.catchTag("TooManyRequestsError", () => - Effect.logWarning("[Consumer] Rate limited, retrying", { - topic: options.topic, - subscription: options.subscription, - retryMs: rateLimitRetryMs, - }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(rateLimitRetryMs))), - Effect.flatMap(() => callHandler), - ), + Effect.tapError((error) => + isTooManyRequestsError(error) + ? Effect.logWarning("[Consumer] Rate limited, retrying", { + topic: options.topic, + subscription: options.subscription, + retryMs: rateLimitRetryMs, + }) + : Effect.void, + ), + Effect.retry({ + schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)), + while: isTooManyRequestsError, + }), + Effect.timeoutOrElse({ + duration: Duration.millis(rateLimitTimeoutMs), + orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)), + }), + Effect.mapError((error) => + isTooManyRequestsError(error) + ? messagingHandlerError(options.topic, options.subscription, error) + : error, ), ); }); diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index 5ffbee5d..f5772bcb 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -3,7 +3,7 @@ */ import { randomUUID } from "node:crypto"; -import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Scope, Stream } from "effect"; +import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Schedule, Scope, Stream } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { @@ -23,6 +23,7 @@ import { TooManyRequestsError, type FlowRuntimeError, type MessagingDeliveryError, + type MessagingHandlerError, type MessagingLifecycleError, type MessagingTimeoutError, type PubSubError, @@ -66,6 +67,7 @@ export interface EffectConsumerOptions { readonly receiveTimeoutMs?: number; readonly errorBackoffMs?: number; readonly rateLimitRetryMs?: number; + readonly rateLimitTimeoutMs?: number; } export interface EffectConsumer { @@ -236,28 +238,40 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* , config: MessagingRuntimeConfig, ) { - const runHandler = Effect.fn(`Consumer.handler:${options.topic}`)(() => + const rateLimitRetryMs = options.rateLimitRetryMs ?? config.rateLimitRetryMs; + const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs; + const runHandler = (): Effect.Effect => options.handler(message.value(), message.properties(), flow).pipe( - Effect.mapError((error) => messagingHandlerError(options.topic, options.subscription, error)), - ), - ); + Effect.mapError((error): TooManyRequestsError | MessagingHandlerError => + isTooManyRequestsError(error) + ? error + : messagingHandlerError(options.topic, options.subscription, error), + ), + ); - return yield* options.handler(message.value(), message.properties(), flow).pipe( - Effect.catch((error) => { - if (isTooManyRequestsError(error)) { - return Effect.gen(function* () { - yield* Effect.logWarning("[Consumer] Rate limited, retrying", { + return yield* runHandler().pipe( + Effect.tapError((error) => + isTooManyRequestsError(error) + ? Effect.logWarning("[Consumer] Rate limited, retrying", { topic: options.topic, subscription: options.subscription, - retryMs: config.rateLimitRetryMs, - }); - yield* Effect.sleep(Duration.millis(config.rateLimitRetryMs)); - yield* runHandler(); - }); - } - - return Effect.fail(messagingHandlerError(options.topic, options.subscription, error)); + retryMs: rateLimitRetryMs, + }) + : Effect.void, + ), + Effect.retry({ + schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)), + while: isTooManyRequestsError, }), + Effect.timeoutOrElse({ + duration: Duration.millis(rateLimitTimeoutMs), + orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)), + }), + Effect.mapError((error) => + isTooManyRequestsError(error) + ? messagingHandlerError(options.topic, options.subscription, error) + : error, + ), ); }); @@ -339,6 +353,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub consumerLoop(backend, options, flow, { ...config, rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs, + rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs, }).pipe(Effect.forkChild), ); diff --git a/ts/packages/base/src/runtime/messaging-config.ts b/ts/packages/base/src/runtime/messaging-config.ts index 556a4c7f..bb45334d 100644 --- a/ts/packages/base/src/runtime/messaging-config.ts +++ b/ts/packages/base/src/runtime/messaging-config.ts @@ -8,6 +8,7 @@ export interface MessagingRuntimeConfig { readonly consumerReceiveTimeoutMs: number; readonly consumerErrorBackoffMs: number; readonly rateLimitRetryMs: number; + readonly rateLimitTimeoutMs: number; readonly requestTimeoutMs: number; } @@ -15,6 +16,7 @@ export const defaultMessagingRuntimeConfig: MessagingRuntimeConfig = { consumerReceiveTimeoutMs: 2_000, consumerErrorBackoffMs: 1_000, rateLimitRetryMs: 10_000, + rateLimitTimeoutMs: 7_200_000, requestTimeoutMs: 300_000, }; @@ -28,6 +30,9 @@ export const loadMessagingRuntimeConfig = Effect.fn("loadMessagingRuntimeConfig" const rateLimitRetryMs = yield* Config.number("TG_RATE_LIMIT_RETRY_MS").pipe( Config.withDefault(defaultMessagingRuntimeConfig.rateLimitRetryMs), ); + const rateLimitTimeoutMs = yield* Config.number("TG_RATE_LIMIT_TIMEOUT_MS").pipe( + Config.withDefault(defaultMessagingRuntimeConfig.rateLimitTimeoutMs), + ); const requestTimeoutMs = yield* Config.number("TG_REQUEST_TIMEOUT_MS").pipe( Config.withDefault(defaultMessagingRuntimeConfig.requestTimeoutMs), ); @@ -36,6 +41,7 @@ export const loadMessagingRuntimeConfig = Effect.fn("loadMessagingRuntimeConfig" consumerReceiveTimeoutMs, consumerErrorBackoffMs, rateLimitRetryMs, + rateLimitTimeoutMs, requestTimeoutMs, } satisfies MessagingRuntimeConfig; });