mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-01 09:29:38 +02:00
Delegate legacy consumers to Effect runtime
This commit is contained in:
parent
1218e827d4
commit
5c4948cc2e
7 changed files with 184 additions and 256 deletions
|
|
@ -12,12 +12,12 @@ Verified source roots:
|
|||
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
|
||||
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
|
||||
|
||||
Current signal counts from `ts/packages` after the 2026-06-02
|
||||
request-response stop signal slice:
|
||||
Current signal counts from `ts/packages` after the 2026-06-02 legacy consumer
|
||||
facade slice:
|
||||
|
||||
| Signal | Count |
|
||||
| --- | ---: |
|
||||
| `Effect.runPromise` | 172 |
|
||||
| `Effect.runPromise` | 170 |
|
||||
| `Effect.runPromiseWith` | 0 |
|
||||
| `Effect.cached` | 0 |
|
||||
| `Layer.succeed` | 12 |
|
||||
|
|
@ -26,14 +26,14 @@ request-response stop signal slice:
|
|||
| `new Map` | 60 |
|
||||
| `toPromiseRequestor` | 0 |
|
||||
| `makeAsyncProcessor` | 19 |
|
||||
| `receive(` | 18 |
|
||||
| `receive(` | 17 |
|
||||
| `while (` | 2 |
|
||||
| `new Error` | 7 |
|
||||
| `new Promise` | 10 |
|
||||
| `new Promise` | 9 |
|
||||
| `JSON.parse` | 4 |
|
||||
| `localStorage` | 9 |
|
||||
| `JSON.stringify` | 8 |
|
||||
| `setTimeout` | 4 |
|
||||
| `setTimeout` | 3 |
|
||||
| `process.env` | 3 |
|
||||
|
||||
Notes:
|
||||
|
|
@ -125,6 +125,17 @@ Notes:
|
|||
`makeEffectRequestResponseFromPubSub`. Pending requests now race response
|
||||
waiting against runtime stop and fail promptly with a tagged
|
||||
`MessagingLifecycleError` instead of waiting for timeout.
|
||||
- The legacy consumer facade slice moved `makeConsumer` onto
|
||||
`makeEffectConsumerFromPubSub` with a `ManagedRuntime` Promise boundary and a
|
||||
closeable `Scope`. Consumer workers now use `Effect.forkScoped` so their
|
||||
lifetime is owned by the caller scope rather than the parent fiber. The
|
||||
`Effect.runPromise`, `receive(`, `new Promise`, and `setTimeout` counts
|
||||
dropped because the old blocking facade loop and its test timer shim were
|
||||
removed.
|
||||
- A focused broker-backend scout found no remaining P0 broker runtime rewrite
|
||||
after the producer, NATS, consumer concurrency, rate-limit, and
|
||||
request-response stop slices. `PubSubBackend` remains an intentional
|
||||
Promise-returning adapter boundary wrapped by `PubSub`/Effect services.
|
||||
- The gateway streaming callback slice added Effect-returning dispatcher
|
||||
streaming methods, switched the RPC stream server off nested
|
||||
`Effect.runPromiseWith(context)` queue offers, and replaced the client
|
||||
|
|
@ -1293,6 +1304,34 @@ Notes:
|
|||
- `cd ts && bun run build`
|
||||
- `cd ts && bun run test`
|
||||
|
||||
### 2026-06-02: Legacy Consumer Facade Slice
|
||||
|
||||
- Status: migrated and root-verified.
|
||||
- Completed:
|
||||
- `makeConsumer` is now a Promise compatibility facade over
|
||||
`makeEffectConsumerFromPubSub` instead of owning a separate mutable
|
||||
backend, `running` flag, retry loop, and direct `BackendConsumer.receive`.
|
||||
- The facade uses a module `ManagedRuntime`, `Scope.make`, `Scope.provide`,
|
||||
and `Scope.close` to keep public `start()`/`stop()` Promises at the
|
||||
boundary while the actual consumer lifetime stays scoped.
|
||||
- Legacy Promise handlers are adapted with `Effect.tryPromise` and preserve
|
||||
`TooManyRequestsError` as a typed retry signal.
|
||||
- `makeEffectConsumerFromPubSub` now forks workers with `Effect.forkScoped`,
|
||||
so a caller-owned scope keeps workers alive until stop/finalization.
|
||||
- `consumer.test.ts` no longer encodes `start()` as the blocking consume-loop
|
||||
join; it waits for observable handler/ack/nak effects and then stops the
|
||||
scoped consumer.
|
||||
- Verification:
|
||||
- `cd ts/packages/base && bunx --bun vitest run src/__tests__/consumer.test.ts`
|
||||
- `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts src/__tests__/flow-spec-runtime.test.ts`
|
||||
- `cd ts && bun run check:tsgo`
|
||||
- `bun run --cwd ts/packages/base build`
|
||||
- `bun run --cwd ts/packages/base test`
|
||||
- `cd ts && bun run check`
|
||||
- `cd ts && bun run build`
|
||||
- `cd ts && bun run test`
|
||||
- `git diff --check`
|
||||
|
||||
## Subagent Findings To Preserve
|
||||
|
||||
- MCP/workbench:
|
||||
|
|
@ -1320,19 +1359,18 @@ Notes:
|
|||
- The legacy `messaging/subscriber.ts` async queue/fanout implementation is
|
||||
removed. Use native `effect/PubSub` for future in-process fanout, while
|
||||
keeping `PubSubBackend` for broker-backed messaging.
|
||||
- The legacy producer facade now delegates to the scoped Effect producer
|
||||
runtime. Remaining broker P0 work should focus on native backend/NATS
|
||||
runtime shape and consumer polling, not replacing `PubSubBackend` with
|
||||
`effect/PubSub`.
|
||||
- The legacy producer and consumer facades now delegate to scoped Effect
|
||||
runtime factories. Public `start()`/`send()`/`stop()` Promises remain
|
||||
compatibility boundaries.
|
||||
- NATS header construction, ack/nak operations, and lookup create-on-missing
|
||||
behavior now stay typed. Remaining NATS work is scoped backend/layer
|
||||
construction and stream/consumer state ownership.
|
||||
behavior now stay typed. `PubSubBackend` remains an intentional broker
|
||||
adapter contract rather than a direct `effect/PubSub` replacement target.
|
||||
- Consumer rate-limit retry timeout behavior is now wired in both legacy and
|
||||
Effect-native consumer paths. Effect-native consumer concurrency now owns
|
||||
one backend consumer per worker, and request-response pending shutdown now
|
||||
fails through a tagged lifecycle error. Remaining consumer runtime work
|
||||
should focus on the legacy consumer facade's blocking compatibility shape
|
||||
and scoped backend/layer construction.
|
||||
fails through a tagged lifecycle error. The legacy consumer facade blocking
|
||||
shape is complete; do not reopen it unless a public API compatibility
|
||||
issue appears.
|
||||
- Existing constructor shims preserve callable-plus-newable public exports;
|
||||
removing them needs a public API split or real class redesign.
|
||||
- Typed string registries in `Flow` now have Schema-backed parameter specs
|
||||
|
|
@ -1385,19 +1423,29 @@ Notes:
|
|||
|
||||
## Ranked Findings
|
||||
|
||||
### P0: Broker Backend Effect-Native Runtime
|
||||
### No-op: Broker Backend Adapter Boundary
|
||||
|
||||
- Status:
|
||||
- Closed as a P0 rewrite item after the 2026-06-02 broker scout and the
|
||||
legacy consumer facade slice.
|
||||
- TrustGraph evidence:
|
||||
- `ts/packages/base/src/backend/types.ts`
|
||||
- `ts/packages/base/src/backend/nats.ts`
|
||||
- `ts/packages/base/src/backend/pubsub.ts`
|
||||
- `ts/packages/base/src/messaging/runtime.ts`
|
||||
- `ts/packages/base/src/processor/flow-processor.ts`
|
||||
- Effect primitives:
|
||||
- `Layer`, `Scope`, `Stream`, `Schedule`, `Queue`,
|
||||
`Effect.acquireRelease`, and `Effect.tryPromise`.
|
||||
- Rewrite shape:
|
||||
- Introduce an Effect-native broker service/layer with scoped NATS
|
||||
acquisition and stream/schedule-based consumer loops.
|
||||
- Evidence:
|
||||
- `BackendProducer`, `BackendConsumer`, and `PubSubBackend` are the external
|
||||
Promise broker adapter contract. `backend/pubsub.ts` wraps that contract in
|
||||
Effect through `Context.Service`, `Layer`, `Effect.tryPromise`, and scoped
|
||||
finalizers.
|
||||
- NATS boundary failures, selective stream/consumer lookup recovery,
|
||||
producer sends, ack/nak, and close paths are typed with Effect wrappers.
|
||||
- Producer, consumer, and request-response runtime ownership now live in
|
||||
scoped Effect factories.
|
||||
- Rule:
|
||||
- Keep `PubSubBackend` as the compatibility adapter boundary; Effect native
|
||||
`PubSub` remains in-process only.
|
||||
- Treat the producer Promise facade as a completed compatibility wrapper;
|
||||
|
|
@ -1414,9 +1462,8 @@ Notes:
|
|||
handles.
|
||||
- Treat request-response pending shutdown semantics as complete; do not flag
|
||||
`waitForResponse` timeout behavior for stopped runtimes.
|
||||
- Tests:
|
||||
- Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and
|
||||
config-push stream tests.
|
||||
- Treat the legacy consumer facade as a completed compatibility wrapper over
|
||||
`makeEffectConsumerFromPubSub`; do not flag blocking `start()` semantics.
|
||||
|
||||
### P2: Effect AI Provider Adapter Cleanup
|
||||
|
||||
|
|
@ -1469,9 +1516,8 @@ Notes:
|
|||
|
||||
## Recommended PR Order
|
||||
|
||||
1. Broker backend Effect-native runtime.
|
||||
2. Effect AI provider adapter cleanup.
|
||||
3. MCP parity/deletion decision and workbench platform polish.
|
||||
1. Effect AI provider adapter cleanup.
|
||||
2. MCP parity/deletion decision and workbench platform polish.
|
||||
|
||||
## No-Op Rules
|
||||
|
||||
|
|
@ -1510,6 +1556,9 @@ Do not flag these as rewrite blockers without additional proof:
|
|||
`makeEffectRequestResponseFromPubSub`: pending calls race response waiting
|
||||
against a `Deferred` stop signal and fail with tagged
|
||||
`MessagingLifecycleError`.
|
||||
- Legacy `makeConsumer` facade blocking-loop ownership is complete:
|
||||
`start()` now initializes scoped Effect consumers and returns after startup,
|
||||
while `stop()` closes the native consumer scope.
|
||||
- `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket
|
||||
compatibility bridge. Do not flag its internal connection maps/sets as a
|
||||
standalone replacement target until the gateway is ready to move onto Effect
|
||||
|
|
|
|||
|
|
@ -56,6 +56,16 @@ function createFlowContext(): FlowContext {
|
|||
};
|
||||
}
|
||||
|
||||
async function advanceUntil(
|
||||
predicate: () => boolean,
|
||||
totalMs = 1_000,
|
||||
stepMs = 10,
|
||||
): Promise<void> {
|
||||
for (let elapsed = 0; elapsed < totalMs && !predicate(); elapsed += stepMs) {
|
||||
await vi.advanceTimersByTimeAsync(stepMs);
|
||||
}
|
||||
}
|
||||
|
||||
describe("Consumer", () => {
|
||||
let backendConsumer: ReturnType<typeof createMockBackendConsumer>;
|
||||
let pubsub: PubSubBackend;
|
||||
|
|
@ -106,20 +116,10 @@ describe("Consumer", () => {
|
|||
});
|
||||
|
||||
// ── start() creates consumer and calls handler ─────────────────
|
||||
it("creates a backend consumer and invokes handler for received messages", async () => {
|
||||
it("starts a scoped consumer and invokes handler for received messages", async () => {
|
||||
const handler = vi.fn().mockResolvedValue(undefined);
|
||||
const msg = createMockMessage({ data: "hello" }, { id: "1" });
|
||||
|
||||
// First call returns a message, second call triggers stop
|
||||
let callCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) return msg;
|
||||
// Stop the consumer on second receive
|
||||
await consumer.stop();
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "topic-a",
|
||||
|
|
@ -127,8 +127,11 @@ describe("Consumer", () => {
|
|||
handler,
|
||||
});
|
||||
|
||||
// start() blocks until the consume loop ends, so we don't need to await separately
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
await consumer.start(flowCtx);
|
||||
await advanceUntil(() => handler.mock.calls.length > 0);
|
||||
await consumer.stop();
|
||||
|
||||
expect(pubsub.createConsumer).toHaveBeenCalledWith({
|
||||
topic: "topic-a",
|
||||
|
|
@ -143,14 +146,6 @@ describe("Consumer", () => {
|
|||
const handler = vi.fn().mockResolvedValue(undefined);
|
||||
const msg = createMockMessage("payload");
|
||||
|
||||
let callCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) return msg;
|
||||
await consumer.stop();
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "t",
|
||||
|
|
@ -158,7 +153,11 @@ describe("Consumer", () => {
|
|||
handler,
|
||||
});
|
||||
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
await consumer.start(flowCtx);
|
||||
await advanceUntil(() => backendConsumer.acknowledge.mock.calls.length > 0);
|
||||
await consumer.stop();
|
||||
|
||||
expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg);
|
||||
expect(backendConsumer.negativeAcknowledge).not.toHaveBeenCalled();
|
||||
|
|
@ -169,15 +168,6 @@ describe("Consumer", () => {
|
|||
const handler = vi.fn().mockRejectedValue("handler boom");
|
||||
const msg = createMockMessage("bad-payload");
|
||||
|
||||
let callCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) return msg;
|
||||
// Stop on second call (after the 1s sleep from error handling)
|
||||
await consumer.stop();
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "t",
|
||||
|
|
@ -185,19 +175,14 @@ describe("Consumer", () => {
|
|||
handler,
|
||||
});
|
||||
|
||||
// Suppress console.error noise
|
||||
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
// start() will block; the error path sleeps 1s, so we need to advance timers
|
||||
const startPromise = consumer.start(flowCtx);
|
||||
// Advance past the 1s sleep in the error handler
|
||||
await vi.advanceTimersByTimeAsync(1500);
|
||||
await startPromise;
|
||||
await consumer.start(flowCtx);
|
||||
await advanceUntil(() => backendConsumer.negativeAcknowledge.mock.calls.length > 0);
|
||||
await consumer.stop();
|
||||
|
||||
expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg);
|
||||
expect(backendConsumer.acknowledge).not.toHaveBeenCalled();
|
||||
|
||||
errorSpy.mockRestore();
|
||||
});
|
||||
|
||||
// ── TooManyRequestsError triggers retry ────────────────────────
|
||||
|
|
@ -213,14 +198,6 @@ describe("Consumer", () => {
|
|||
|
||||
const msg = createMockMessage("rate-limited-payload");
|
||||
|
||||
let receiveCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
receiveCount++;
|
||||
if (receiveCount === 1) return msg;
|
||||
await consumer.stop();
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "t",
|
||||
|
|
@ -229,12 +206,14 @@ describe("Consumer", () => {
|
|||
rateLimitRetryMs: 500,
|
||||
});
|
||||
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
|
||||
|
||||
const startPromise = consumer.start(flowCtx);
|
||||
// Advance past the rate-limit retry delay (500ms)
|
||||
await consumer.start(flowCtx);
|
||||
await vi.advanceTimersByTimeAsync(600);
|
||||
await startPromise;
|
||||
await advanceUntil(() => handler.mock.calls.length >= 2);
|
||||
await consumer.stop();
|
||||
|
||||
// Handler called twice: first throws TooManyRequestsError, second succeeds
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
|
|
@ -255,14 +234,6 @@ describe("Consumer", () => {
|
|||
|
||||
const msg = createMockMessage("rate-limited-payload");
|
||||
|
||||
let receiveCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
receiveCount++;
|
||||
if (receiveCount === 1) return msg;
|
||||
await consumer.stop();
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "t",
|
||||
|
|
@ -272,9 +243,12 @@ describe("Consumer", () => {
|
|||
rateLimitTimeoutMs: 2_000,
|
||||
});
|
||||
|
||||
const startPromise = consumer.start(flowCtx);
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
await consumer.start(flowCtx);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await startPromise;
|
||||
await advanceUntil(() => backendConsumer.acknowledge.mock.calls.length > 0);
|
||||
await consumer.stop();
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(3);
|
||||
expect(backendConsumer.acknowledge).toHaveBeenCalledWith(msg);
|
||||
|
|
@ -287,13 +261,6 @@ describe("Consumer", () => {
|
|||
});
|
||||
const msg = createMockMessage("rate-limited-payload");
|
||||
|
||||
let receiveCount = 0;
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
receiveCount++;
|
||||
if (receiveCount === 1) return msg;
|
||||
return null;
|
||||
});
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
topic: "t",
|
||||
|
|
@ -303,11 +270,12 @@ describe("Consumer", () => {
|
|||
rateLimitTimeoutMs: 1_000,
|
||||
});
|
||||
|
||||
const startPromise = consumer.start(flowCtx);
|
||||
backendConsumer.receive.mockResolvedValueOnce(msg).mockResolvedValue(null);
|
||||
|
||||
await consumer.start(flowCtx);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await advanceUntil(() => backendConsumer.negativeAcknowledge.mock.calls.length > 0);
|
||||
await consumer.stop();
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await startPromise;
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
expect(backendConsumer.negativeAcknowledge).toHaveBeenCalledWith(msg);
|
||||
|
|
@ -316,12 +284,7 @@ describe("Consumer", () => {
|
|||
|
||||
// ── stop() closes the backend ──────────────────────────────────
|
||||
it("stop() sets running=false and closes the backend", async () => {
|
||||
// Make receive block forever (returns null) until stopped
|
||||
backendConsumer.receive.mockImplementation(async () => {
|
||||
// Yield control so stop() can run
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
return null;
|
||||
});
|
||||
backendConsumer.receive.mockResolvedValue(null);
|
||||
|
||||
const consumer = makeConsumer({
|
||||
pubsub,
|
||||
|
|
@ -330,17 +293,9 @@ describe("Consumer", () => {
|
|||
handler: vi.fn(),
|
||||
});
|
||||
|
||||
const startPromise = consumer.start(flowCtx);
|
||||
|
||||
// Advance timers to let the consume loop iterate once
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
|
||||
await consumer.start(flowCtx);
|
||||
await consumer.stop();
|
||||
|
||||
// Advance timers further so the loop can exit
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
await startPromise;
|
||||
|
||||
expect(backendConsumer.close).toHaveBeenCalled();
|
||||
await expect(consumer.stop()).resolves.toBeUndefined();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import {
|
|||
DeliverPolicy,
|
||||
} from "nats";
|
||||
import { Effect } from "effect";
|
||||
import * as Predicate from "effect/Predicate";
|
||||
import * as P from "effect/Predicate";
|
||||
import * as S from "effect/Schema";
|
||||
|
||||
import type {
|
||||
|
|
@ -64,7 +64,7 @@ function makeNatsMessage<T>(msg: JsMsg, decoded: T): NatsMessage<T> {
|
|||
};
|
||||
}
|
||||
|
||||
const hasJsMsg = Predicate.hasProperty("_jsMsg");
|
||||
const hasJsMsg = P.hasProperty("_jsMsg");
|
||||
|
||||
class NatsLookupError extends S.TaggedErrorClass<NatsLookupError>()(
|
||||
"NatsLookupError",
|
||||
|
|
@ -79,9 +79,9 @@ function natsLookupError(operation: string, cause: unknown): NatsLookupError {
|
|||
}
|
||||
|
||||
function isAckableJsMsg(value: unknown): value is Pick<JsMsg, "ack" | "nak"> {
|
||||
if (!Predicate.isObject(value)) return false;
|
||||
if (!Predicate.hasProperty(value, "ack")) return false;
|
||||
if (!Predicate.hasProperty(value, "nak")) return false;
|
||||
if (!P.isObject(value)) return false;
|
||||
if (!P.hasProperty(value, "ack")) return false;
|
||||
if (!P.hasProperty(value, "nak")) return false;
|
||||
return typeof value.ack === "function" && typeof value.nak === "function";
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,18 +4,20 @@
|
|||
* Python reference: trustgraph-base/trustgraph/base/consumer.py
|
||||
*/
|
||||
|
||||
import type { BackendConsumer, Message, PubSubBackend } from "../backend/types.js";
|
||||
import type { PubSubBackend } from "../backend/types.js";
|
||||
import { PubSub } from "../backend/pubsub.js";
|
||||
import type { Flow } from "../processor/flow.js";
|
||||
import {
|
||||
MessagingHandlerError,
|
||||
TooManyRequestsError,
|
||||
messagingDeliveryError,
|
||||
messagingHandlerError,
|
||||
messagingLifecycleError,
|
||||
messagingTimeoutError,
|
||||
} from "../errors.js";
|
||||
import { Duration, Effect, Schedule } from "effect";
|
||||
import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
|
||||
import * as P from "effect/Predicate";
|
||||
import * as S from "effect/Schema";
|
||||
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
|
||||
import { makeEffectConsumerFromPubSub, type EffectConsumer } from "./runtime.js";
|
||||
|
||||
export type MessageHandler<T> = (
|
||||
message: T,
|
||||
|
|
@ -49,13 +51,16 @@ export interface Consumer<T> {
|
|||
readonly stop: () => Promise<void>;
|
||||
}
|
||||
|
||||
interface ConsumerRuntime {
|
||||
readonly scope: Scope.Closeable;
|
||||
readonly consumer: EffectConsumer;
|
||||
}
|
||||
|
||||
const consumerRuntime = ManagedRuntime.make(Layer.empty);
|
||||
|
||||
export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
|
||||
let backend: BackendConsumer<T> | null = null;
|
||||
let running = false;
|
||||
let runtime: ConsumerRuntime | null = null;
|
||||
const isTooManyRequestsError = S.is(TooManyRequestsError);
|
||||
const concurrency = options.concurrency ?? 1;
|
||||
const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000;
|
||||
const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? 7_200_000;
|
||||
|
||||
const runHandler = (
|
||||
message: T,
|
||||
|
|
@ -70,135 +75,54 @@ export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
|
|||
: messagingHandlerError(options.topic, options.subscription, error),
|
||||
});
|
||||
|
||||
const handleWithRetry = Effect.fn("Consumer.handleWithRetry")(function* (
|
||||
message: Message<T>,
|
||||
flow: FlowContext,
|
||||
) {
|
||||
const callHandler = runHandler(message.value(), message.properties(), flow);
|
||||
yield* callHandler.pipe(
|
||||
Effect.tapError((error) =>
|
||||
isTooManyRequestsError(error)
|
||||
? Effect.logWarning("[Consumer] Rate limited, retrying", {
|
||||
topic: options.topic,
|
||||
subscription: options.subscription,
|
||||
retryMs: rateLimitRetryMs,
|
||||
})
|
||||
: Effect.void,
|
||||
),
|
||||
Effect.retry({
|
||||
schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)),
|
||||
while: isTooManyRequestsError,
|
||||
}),
|
||||
Effect.timeoutOrElse({
|
||||
duration: Duration.millis(rateLimitTimeoutMs),
|
||||
orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)),
|
||||
}),
|
||||
Effect.mapError((error) =>
|
||||
isTooManyRequestsError(error)
|
||||
? messagingHandlerError(options.topic, options.subscription, error)
|
||||
: error,
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
const consumeOnce = Effect.fn("Consumer.consumeOnce")(function* (flow: FlowContext) {
|
||||
const currentBackend = backend;
|
||||
if (currentBackend === null) {
|
||||
return yield* messagingLifecycleError(
|
||||
`${options.topic}:${options.subscription}`,
|
||||
"receive",
|
||||
"Consumer backend not started",
|
||||
);
|
||||
}
|
||||
|
||||
const message = yield* Effect.tryPromise({
|
||||
try: () => currentBackend.receive(2000),
|
||||
catch: (error) => messagingDeliveryError(options.topic, "receive", error),
|
||||
});
|
||||
if (message === null) return;
|
||||
|
||||
yield* handleWithRetry(message, flow).pipe(
|
||||
Effect.flatMap(() =>
|
||||
Effect.tryPromise({
|
||||
try: () => currentBackend.acknowledge(message),
|
||||
catch: (error) => messagingDeliveryError(options.topic, "acknowledge", error),
|
||||
}),
|
||||
),
|
||||
Effect.catch((error) =>
|
||||
Effect.tryPromise({
|
||||
try: () => currentBackend.negativeAcknowledge(message),
|
||||
catch: (nakError) => messagingDeliveryError(options.topic, "negative-acknowledge", nakError),
|
||||
}).pipe(
|
||||
Effect.catch((nakError) =>
|
||||
Effect.logError("[Consumer] Failed to negative-acknowledge message", {
|
||||
error: nakError.message,
|
||||
topic: nakError.topic,
|
||||
}),
|
||||
),
|
||||
Effect.flatMap(() => Effect.fail(error)),
|
||||
),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
const consumeLoop = Effect.fn("Consumer.consumeLoop")(function* (flow: FlowContext) {
|
||||
yield* Effect.whileLoop({
|
||||
while: () => running,
|
||||
body: () =>
|
||||
consumeOnce(flow).pipe(
|
||||
Effect.catch((error) => {
|
||||
if (!running) return Effect.void;
|
||||
return Effect.logError("[Consumer] Error in consume loop", {
|
||||
error: error.message,
|
||||
topic: options.topic,
|
||||
subscription: options.subscription,
|
||||
}).pipe(
|
||||
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
|
||||
);
|
||||
}),
|
||||
),
|
||||
step: () => undefined,
|
||||
});
|
||||
});
|
||||
|
||||
return {
|
||||
start: (flow) =>
|
||||
Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
backend = yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
options.pubsub.createConsumer<T>({
|
||||
topic: options.topic,
|
||||
subscription: options.subscription,
|
||||
initialPosition: options.initialPosition ?? "latest",
|
||||
}),
|
||||
catch: (error) =>
|
||||
messagingLifecycleError(`${options.topic}:${options.subscription}`, "create-consumer", error),
|
||||
});
|
||||
P.isNotNull(runtime)
|
||||
? Promise.resolve()
|
||||
: consumerRuntime.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const scope = yield* Scope.make();
|
||||
const startConsumer = Effect.gen(function* () {
|
||||
const config = yield* loadMessagingRuntimeConfig();
|
||||
const consumer = yield* makeEffectConsumerFromPubSub<T, TooManyRequestsError | MessagingHandlerError, never>(
|
||||
PubSub.fromBackend(options.pubsub),
|
||||
config,
|
||||
{
|
||||
topic: options.topic,
|
||||
subscription: options.subscription,
|
||||
handler: runHandler,
|
||||
...(options.concurrency === undefined ? {} : { concurrency: options.concurrency }),
|
||||
initialPosition: options.initialPosition ?? "latest",
|
||||
...(options.rateLimitRetryMs === undefined ? {} : { rateLimitRetryMs: options.rateLimitRetryMs }),
|
||||
...(options.rateLimitTimeoutMs === undefined
|
||||
? {}
|
||||
: { rateLimitTimeoutMs: options.rateLimitTimeoutMs }),
|
||||
},
|
||||
flow,
|
||||
).pipe(
|
||||
Scope.provide(scope),
|
||||
Effect.mapError((error) =>
|
||||
messagingLifecycleError(`${options.topic}:${options.subscription}`, "create-consumer", error)
|
||||
),
|
||||
);
|
||||
runtime = { scope, consumer };
|
||||
});
|
||||
|
||||
running = true;
|
||||
|
||||
const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index);
|
||||
yield* Effect.forEach(workerIndexes, () => consumeLoop(flow), {
|
||||
concurrency: "unbounded",
|
||||
discard: true,
|
||||
});
|
||||
}),
|
||||
),
|
||||
stop: () =>
|
||||
Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
running = false;
|
||||
const currentBackend = backend;
|
||||
backend = null;
|
||||
if (currentBackend !== null) {
|
||||
yield* Effect.tryPromise({
|
||||
try: () => currentBackend.close(),
|
||||
catch: (error) =>
|
||||
messagingLifecycleError(`${options.topic}:${options.subscription}`, "close-consumer", error),
|
||||
});
|
||||
}
|
||||
}),
|
||||
),
|
||||
yield* startConsumer.pipe(
|
||||
Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))),
|
||||
);
|
||||
}),
|
||||
),
|
||||
stop: () => {
|
||||
const current = runtime;
|
||||
runtime = null;
|
||||
return current === null
|
||||
? Promise.resolve()
|
||||
: consumerRuntime.runPromise(
|
||||
current.consumer.stop.pipe(
|
||||
Effect.ensuring(Scope.close(current.scope, Exit.void)),
|
||||
),
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
*/
|
||||
|
||||
import type { PubSubBackend } from "../backend/types.js";
|
||||
import type { ProducerMetrics } from "../metrics/prometheus.js";
|
||||
import type { ProducerMetrics } from "../metrics/index.ts";
|
||||
import { Effect, Exit, Scope } from "effect";
|
||||
import { PubSub } from "../backend/pubsub.js";
|
||||
import { makeEffectProducerFromPubSub, type EffectProducer } from "./runtime.js";
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import { Effect, Exit, Scope } from "effect";
|
|||
import type { PubSubBackend } from "../backend/types.js";
|
||||
import { PubSub } from "../backend/pubsub.js";
|
||||
import { messagingDeliveryError, messagingLifecycleError } from "../errors.js";
|
||||
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
|
||||
import { loadMessagingRuntimeConfig } from "../runtime/index.ts";
|
||||
import { makeEffectRequestResponseFromPubSub, type EffectRequestResponse } from "./runtime.js";
|
||||
|
||||
export interface RequestResponseOptions {
|
||||
|
|
@ -85,8 +85,8 @@ export function makeRequestResponse<TReq, TRes>(
|
|||
* Send a request and wait for responses.
|
||||
*
|
||||
* @param request - The request payload
|
||||
* @param options.timeoutMs - Total timeout in milliseconds (default: 300s)
|
||||
* @param options.recipient - Optional callback for streaming responses.
|
||||
* @param requestOptions.timeoutMs - Total timeout in milliseconds (default: 300s)
|
||||
* @param requestOptions.recipient - Optional callback for streaming responses.
|
||||
* Return `true` to indicate the final response has been received.
|
||||
* If omitted, returns the first response.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -356,7 +356,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
|
|||
const workers = yield* Effect.forEach(workerIndexes, () =>
|
||||
Effect.gen(function* () {
|
||||
const backend = yield* pubsub.createConsumer<T>(createOptions);
|
||||
const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkChild);
|
||||
const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkScoped);
|
||||
return { backend, fiber };
|
||||
}),
|
||||
);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue