/** * High-level consumer with concurrency, retry, and rate-limit handling. * * Python reference: trustgraph-base/trustgraph/base/consumer.py */ import type { PubSubBackend, BackendConsumer, Message } from "../backend/types.js"; import type { Flow } from "../processor/flow.js"; import { TooManyRequestsError } from "../errors.js"; export type MessageHandler = ( message: T, properties: Record, flow: FlowContext, ) => Promise; export interface FlowContext { id: string; name: string; /** Reference to the owning Flow instance, giving handlers access to producers and parameters. */ flow: Flow; } export interface ConsumerOptions { pubsub: PubSubBackend; topic: string; subscription: string; handler: MessageHandler; concurrency?: number; initialPosition?: "latest" | "earliest"; rateLimitRetryMs?: number; rateLimitTimeoutMs?: number; } export class Consumer { private backend: BackendConsumer | null = null; private running = false; private abortController = new AbortController(); private readonly concurrency: number; private readonly rateLimitRetryMs: number; constructor(private readonly options: ConsumerOptions) { this.concurrency = options.concurrency ?? 1; this.rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000; } async start(flow: FlowContext): Promise { this.backend = await this.options.pubsub.createConsumer({ topic: this.options.topic, subscription: this.options.subscription, initialPosition: this.options.initialPosition ?? "latest", }); this.running = true; // Spawn concurrent consumer tasks const tasks = Array.from({ length: this.concurrency }, () => this.consumeLoop(flow), ); // Run all concurrently — first rejection stops all await Promise.all(tasks); } async stop(): Promise { this.running = false; this.abortController.abort(); if (this.backend) { await this.backend.close(); this.backend = null; } } private async consumeLoop(flow: FlowContext): Promise { while (this.running) { try { const msg = await this.backend!.receive(2000); if (!msg) continue; await this.handleWithRetry(msg, flow); await this.backend!.acknowledge(msg); } catch (err) { if (!this.running) break; console.error("[Consumer] Error in consume loop:", err); // Brief pause before retry await sleep(1000); } } } private async handleWithRetry(msg: Message, flow: FlowContext): Promise { try { await this.options.handler(msg.value(), msg.properties(), flow); } catch (err) { if (err instanceof TooManyRequestsError) { console.warn(`[Consumer] Rate limited, retrying in ${this.rateLimitRetryMs}ms`); await sleep(this.rateLimitRetryMs); await this.options.handler(msg.value(), msg.properties(), flow); } else { throw err; } } } } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }