Honor consumer rate limit timeouts

This commit is contained in:
elpresidank 2026-06-02 06:03:36 -05:00
parent 46ae1dca82
commit eaa7921314
6 changed files with 254 additions and 32 deletions

View file

@ -12,8 +12,8 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 NATS selective
404 slice:
Current signal counts from `ts/packages` after the 2026-06-02 consumer
rate-limit retry slice:
| Signal | Count |
| --- | ---: |
@ -28,7 +28,7 @@ Current signal counts from `ts/packages` after the 2026-06-02 NATS selective
| `makeAsyncProcessor` | 19 |
| `receive(` | 18 |
| `while (` | 2 |
| `new Error` | 8 |
| `new Error` | 7 |
| `new Promise` | 10 |
| `JSON.parse` | 4 |
| `localStorage` | 9 |
@ -110,6 +110,12 @@ Notes:
`Effect.catchIf` recovery only for NATS JetStream missing-resource errors.
Non-missing lookup failures now stay on the typed failure path without
attempting to create streams or durable consumers.
- The consumer rate-limit retry slice wired the previously unused
`rateLimitTimeoutMs` option and `TG_RATE_LIMIT_TIMEOUT_MS` config into both
legacy and Effect-native consumers. Repeated `TooManyRequestsError` failures
now retry with `Schedule.spaced` until success or a tagged rate-limit timeout.
The `new Error` count dropped by one because a touched consumer test fixture
no longer uses a normal `Error`.
- The gateway streaming callback slice added Effect-returning dispatcher
streaming methods, switched the RPC stream server off nested
`Effect.runPromiseWith(context)` queue offers, and replaced the client
@ -1208,6 +1214,30 @@ Notes:
- `cd ts && bun run build`
- `cd ts && bun run test`
### 2026-06-02: Consumer Rate-Limit Retry Slice
- Status: migrated and root-verified.
- Completed:
- Added `rateLimitTimeoutMs` to the Effect-native messaging runtime config,
backed by `TG_RATE_LIMIT_TIMEOUT_MS` and the Python-compatible default of
`7_200_000ms`.
- Reworked legacy `makeConsumer` retry handling to use `Schedule.spaced`,
retry repeated `TooManyRequestsError`s, and fail with a tagged
`MessagingTimeoutError` when the rate-limit timeout elapses.
- Reworked `makeEffectConsumerFromPubSub` handler retry handling with the
same schedule/timeout behavior while keeping handler failures in typed
Effect error channels.
- Added legacy and Effect-native tests for repeated rate-limit retry until
success and negative acknowledgement after retry timeout.
- Verification:
- `cd ts && bun run check:tsgo`
- `cd ts/packages/base && bunx --bun vitest run src/__tests__/consumer.test.ts src/__tests__/messaging-runtime.test.ts`
- `bun run --cwd ts/packages/base build`
- `bun run --cwd ts/packages/base test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
## Subagent Findings To Preserve
- MCP/workbench:
@ -1242,6 +1272,10 @@ Notes:
- NATS header construction, ack/nak operations, and lookup create-on-missing
behavior now stay typed. Remaining NATS work is scoped backend/layer
construction and stream/consumer state ownership.
- Consumer rate-limit retry timeout behavior is now wired in both legacy and
Effect-native consumer paths. Remaining consumer runtime work should focus
on per-worker backend consumer ownership and request/response pending
shutdown semantics.
- Existing constructor shims preserve callable-plus-newable public exports;
removing them needs a public API split or real class redesign.
- Typed string registries in `Flow` now have Schema-backed parameter specs
@ -1315,6 +1349,8 @@ Notes:
- Keep NATS SDK boundary failures typed and avoid catch-all
create-on-failure behavior. Future backend slices should move
connection/stream state into scoped Effect services.
- Treat rate-limit retry timeout semantics as complete; next consumer slices
should focus on concurrency ownership and shutdown, not retry policy.
- Tests:
- Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and
config-push stream tests.

View file

@ -96,6 +96,7 @@ describe("Consumer", () => {
handler: vi.fn(),
concurrency: 4,
rateLimitRetryMs: 5_000,
rateLimitTimeoutMs: 10_000,
});
expect(consumer).toMatchObject({
@ -165,7 +166,7 @@ describe("Consumer", () => {
// ── Messages are negatively acknowledged on handler error ──────
it("negatively acknowledges messages when the handler throws", async () => {
const handler = vi.fn().mockRejectedValue(new Error("handler boom"));
const handler = vi.fn().mockRejectedValue("handler boom");
const msg = createMockMessage("bad-payload");
let callCount = 0;
@ -243,6 +244,76 @@ describe("Consumer", () => {
warnSpy.mockRestore();
});
it("retries repeated TooManyRequestsError until success within the timeout", async () => {
let handlerCalls = 0;
const handler = vi.fn().mockImplementation(async () => {
handlerCalls++;
if (handlerCalls <= 2) {
throw tooManyRequestsError("rate limited");
}
});
const msg = createMockMessage("rate-limited-payload");
let receiveCount = 0;
backendConsumer.receive.mockImplementation(async () => {
receiveCount++;
if (receiveCount === 1) return msg;
await consumer.stop();
return null;
});
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
handler,
rateLimitRetryMs: 500,
rateLimitTimeoutMs: 2_000,
});
const startPromise = consumer.start(flowCtx);
await vi.advanceTimersByTimeAsync(1_100);
await startPromise;
expect(handler).toHaveBeenCalledTimes(3);
expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg);
expect(backendConsumer.negativeAcknowledge).not.toHaveBeenCalled();
});
it("negatively acknowledges when rate-limit retry timeout elapses", async () => {
const handler = vi.fn().mockImplementation(async () => {
throw tooManyRequestsError("rate limited");
});
const msg = createMockMessage("rate-limited-payload");
let receiveCount = 0;
backendConsumer.receive.mockImplementation(async () => {
receiveCount++;
if (receiveCount === 1) return msg;
return null;
});
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
handler,
rateLimitRetryMs: 500,
rateLimitTimeoutMs: 1_000,
});
const startPromise = consumer.start(flowCtx);
await vi.advanceTimersByTimeAsync(1_100);
await consumer.stop();
await vi.advanceTimersByTimeAsync(1_100);
await startPromise;
expect(handler).toHaveBeenCalledTimes(2);
expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg);
expect(backendConsumer.acknowledge).not.toHaveBeenCalled();
});
// ── stop() closes the backend ──────────────────────────────────
it("stop() sets running=false and closes the backend", async () => {
// Make receive block forever (returns null) until stopped

View file

@ -10,6 +10,7 @@ import {
runEffectConsumerScoped,
runEffectProducerScoped,
runFlowScoped,
tooManyRequestsError,
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
@ -178,6 +179,85 @@ describe("Effect-native messaging runtime", () => {
}),
);
it.effect(
"retries rate-limited Effect handlers until success within the timeout",
Effect.fnUntraced(function* () {
const message = createMessage("payload", { id: "request-1" });
const consumer = new ScriptedConsumer<string>([message]);
const backend = new RuntimeBackend(consumer as BackendConsumer<unknown>);
let attempts = 0;
yield* Effect.scoped(
Effect.gen(function* () {
yield* runEffectConsumerScoped<string>(
{
topic: "tg.test.consumer",
subscription: "sub",
receiveTimeoutMs: 1,
errorBackoffMs: 1,
rateLimitRetryMs: 10,
rateLimitTimeoutMs: 100,
handler: () =>
Effect.sync(() => {
attempts += 1;
return attempts;
}).pipe(
Effect.flatMap((attempt) =>
attempt <= 2
? Effect.fail(tooManyRequestsError("rate limited"))
: Effect.void
),
),
},
flowContext,
);
yield* TestClock.adjust(Duration.millis(35));
}).pipe(Effect.provide(PubSub.layer(backend))),
);
expect(attempts).toBe(3);
expect(consumer.acknowledged).toEqual([message]);
expect(consumer.nacked).toEqual([]);
}),
);
it.effect(
"negatively acknowledges rate-limited Effect handlers after retry timeout",
Effect.fnUntraced(function* () {
const message = createMessage("payload", { id: "request-1" });
const consumer = new ScriptedConsumer<string>([message]);
const backend = new RuntimeBackend(consumer as BackendConsumer<unknown>);
let attempts = 0;
yield* Effect.scoped(
Effect.gen(function* () {
yield* runEffectConsumerScoped<string>(
{
topic: "tg.test.consumer",
subscription: "sub",
receiveTimeoutMs: 1,
errorBackoffMs: 1,
rateLimitRetryMs: 10,
rateLimitTimeoutMs: 25,
handler: () =>
Effect.sync(() => {
attempts += 1;
}).pipe(
Effect.flatMap(() => Effect.fail(tooManyRequestsError("rate limited"))),
),
},
flowContext,
);
yield* TestClock.adjust(Duration.millis(40));
}).pipe(Effect.provide(PubSub.layer(backend))),
);
expect(attempts).toBeGreaterThanOrEqual(2);
expect(consumer.acknowledged).toEqual([]);
expect(consumer.nacked).toEqual([message]);
}),
);
it.effect(
"routes request-response replies through an Effect queue",
Effect.fnUntraced(function* () {

View file

@ -12,8 +12,9 @@ import {
messagingDeliveryError,
messagingHandlerError,
messagingLifecycleError,
messagingTimeoutError,
} from "../errors.js";
import { Duration, Effect } from "effect";
import { Duration, Effect, Schedule } from "effect";
import * as S from "effect/Schema";
export type MessageHandler<T> = (
@ -54,6 +55,7 @@ export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
const isTooManyRequestsError = S.is(TooManyRequestsError);
const concurrency = options.concurrency ?? 1;
const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000;
const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? 7_200_000;
const runHandler = (
message: T,
@ -74,15 +76,27 @@ export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
) {
const callHandler = runHandler(message.value(), message.properties(), flow);
yield* callHandler.pipe(
Effect.catchTag("TooManyRequestsError", () =>
Effect.logWarning("[Consumer] Rate limited, retrying", {
topic: options.topic,
subscription: options.subscription,
retryMs: rateLimitRetryMs,
}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(rateLimitRetryMs))),
Effect.flatMap(() => callHandler),
),
Effect.tapError((error) =>
isTooManyRequestsError(error)
? Effect.logWarning("[Consumer] Rate limited, retrying", {
topic: options.topic,
subscription: options.subscription,
retryMs: rateLimitRetryMs,
})
: Effect.void,
),
Effect.retry({
schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)),
while: isTooManyRequestsError,
}),
Effect.timeoutOrElse({
duration: Duration.millis(rateLimitTimeoutMs),
orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)),
}),
Effect.mapError((error) =>
isTooManyRequestsError(error)
? messagingHandlerError(options.topic, options.subscription, error)
: error,
),
);
});

View file

@ -3,7 +3,7 @@
*/
import { randomUUID } from "node:crypto";
import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Scope, Stream } from "effect";
import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Schedule, Scope, Stream } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import type {
@ -23,6 +23,7 @@ import {
TooManyRequestsError,
type FlowRuntimeError,
type MessagingDeliveryError,
type MessagingHandlerError,
type MessagingLifecycleError,
type MessagingTimeoutError,
type PubSubError,
@ -66,6 +67,7 @@ export interface EffectConsumerOptions<T, E = never, R = never> {
readonly receiveTimeoutMs?: number;
readonly errorBackoffMs?: number;
readonly rateLimitRetryMs?: number;
readonly rateLimitTimeoutMs?: number;
}
export interface EffectConsumer {
@ -236,28 +238,40 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* <T,
message: Message<T>,
config: MessagingRuntimeConfig,
) {
const runHandler = Effect.fn(`Consumer.handler:${options.topic}`)(() =>
const rateLimitRetryMs = options.rateLimitRetryMs ?? config.rateLimitRetryMs;
const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs;
const runHandler = (): Effect.Effect<void, TooManyRequestsError | MessagingHandlerError, R> =>
options.handler(message.value(), message.properties(), flow).pipe(
Effect.mapError((error) => messagingHandlerError(options.topic, options.subscription, error)),
),
);
Effect.mapError((error): TooManyRequestsError | MessagingHandlerError =>
isTooManyRequestsError(error)
? error
: messagingHandlerError(options.topic, options.subscription, error),
),
);
return yield* options.handler(message.value(), message.properties(), flow).pipe(
Effect.catch((error) => {
if (isTooManyRequestsError(error)) {
return Effect.gen(function* () {
yield* Effect.logWarning("[Consumer] Rate limited, retrying", {
return yield* runHandler().pipe(
Effect.tapError((error) =>
isTooManyRequestsError(error)
? Effect.logWarning("[Consumer] Rate limited, retrying", {
topic: options.topic,
subscription: options.subscription,
retryMs: config.rateLimitRetryMs,
});
yield* Effect.sleep(Duration.millis(config.rateLimitRetryMs));
yield* runHandler();
});
}
return Effect.fail(messagingHandlerError(options.topic, options.subscription, error));
retryMs: rateLimitRetryMs,
})
: Effect.void,
),
Effect.retry({
schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)),
while: isTooManyRequestsError,
}),
Effect.timeoutOrElse({
duration: Duration.millis(rateLimitTimeoutMs),
orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)),
}),
Effect.mapError((error) =>
isTooManyRequestsError(error)
? messagingHandlerError(options.topic, options.subscription, error)
: error,
),
);
});
@ -339,6 +353,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
consumerLoop(backend, options, flow, {
...config,
rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs,
rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs,
}).pipe(Effect.forkChild),
);

View file

@ -8,6 +8,7 @@ export interface MessagingRuntimeConfig {
readonly consumerReceiveTimeoutMs: number;
readonly consumerErrorBackoffMs: number;
readonly rateLimitRetryMs: number;
readonly rateLimitTimeoutMs: number;
readonly requestTimeoutMs: number;
}
@ -15,6 +16,7 @@ export const defaultMessagingRuntimeConfig: MessagingRuntimeConfig = {
consumerReceiveTimeoutMs: 2_000,
consumerErrorBackoffMs: 1_000,
rateLimitRetryMs: 10_000,
rateLimitTimeoutMs: 7_200_000,
requestTimeoutMs: 300_000,
};
@ -28,6 +30,9 @@ export const loadMessagingRuntimeConfig = Effect.fn("loadMessagingRuntimeConfig"
const rateLimitRetryMs = yield* Config.number("TG_RATE_LIMIT_RETRY_MS").pipe(
Config.withDefault(defaultMessagingRuntimeConfig.rateLimitRetryMs),
);
const rateLimitTimeoutMs = yield* Config.number("TG_RATE_LIMIT_TIMEOUT_MS").pipe(
Config.withDefault(defaultMessagingRuntimeConfig.rateLimitTimeoutMs),
);
const requestTimeoutMs = yield* Config.number("TG_REQUEST_TIMEOUT_MS").pipe(
Config.withDefault(defaultMessagingRuntimeConfig.requestTimeoutMs),
);
@ -36,6 +41,7 @@ export const loadMessagingRuntimeConfig = Effect.fn("loadMessagingRuntimeConfig"
consumerReceiveTimeoutMs,
consumerErrorBackoffMs,
rateLimitRetryMs,
rateLimitTimeoutMs,
requestTimeoutMs,
} satisfies MessagingRuntimeConfig;
});