/** * Effect-native messaging factories and scoped runtime helpers. */ import { randomUUID } from "node:crypto"; import { Context, Deferred, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { BackendConsumer, BackendProducer, CreateConsumerOptions, CreateProducerOptions, Message, } from "../backend/types.js"; import { PubSub, type PubSubService } from "../backend/pubsub.js"; import { flowRuntimeError, messagingDeliveryError, messagingHandlerError, messagingLifecycleError, messagingTimeoutError, TooManyRequestsError, type FlowRuntimeError, type MessagingDeliveryError, type MessagingHandlerError, type MessagingLifecycleError, type MessagingTimeoutError, type PubSubError, } from "../errors.js"; import type { ProducerMetrics } from "../metrics/prometheus.js"; import type { FlowContext } from "./consumer.js"; import type { Flow } from "../processor/flow.js"; import type { SpecRuntimeRequirements } from "../spec/types.js"; import { loadMessagingRuntimeConfig, type MessagingRuntimeConfig, } from "../runtime/messaging-config.js"; const isTooManyRequestsError = S.is(TooManyRequestsError); export type EffectMessageHandler = ( message: T, properties: Record, flow: FlowContext, ) => Effect.Effect; export interface EffectProducerOptions { readonly topic: string; readonly schema?: S.Codec; readonly metrics?: ProducerMetrics; } export interface EffectProducer { readonly send: (id: string, message: T) => Effect.Effect; readonly flush: Effect.Effect; readonly close: Effect.Effect; } export interface EffectConsumerOptions { readonly topic: string; readonly subscription: string; readonly handler: EffectMessageHandler; readonly concurrency?: number; readonly initialPosition?: "latest" | "earliest"; readonly schema?: S.Codec; readonly receiveTimeoutMs?: number; readonly errorBackoffMs?: number; readonly rateLimitRetryMs?: number; readonly rateLimitTimeoutMs?: number; } export interface EffectConsumer { readonly stop: Effect.Effect; readonly fibers: ReadonlyArray>; } export interface EffectRequestResponseOptions { readonly requestTopic: string; readonly responseTopic: string; readonly subscription: string; readonly requestSchema?: S.Codec; readonly responseSchema?: S.Codec; } export interface EffectRequestOptions { readonly timeoutMs?: number; readonly recipient?: (response: TRes) => Effect.Effect; } export interface EffectRequestResponse { readonly request: ( request: TReq, options?: EffectRequestOptions, ) => Effect.Effect; readonly stop: Effect.Effect; } export interface ProducerFactoryService { readonly make: ( options: EffectProducerOptions, ) => Effect.Effect, PubSubError, Scope.Scope>; } export interface ConsumerFactoryService { readonly run: ( options: EffectConsumerOptions, flow: FlowContext, ) => Effect.Effect; } export interface RequestResponseFactoryService { readonly make: ( options: EffectRequestResponseOptions, ) => Effect.Effect, PubSubError, Scope.Scope>; } export interface FlowRuntimeService { readonly run: ( flow: Flow, ) => Effect.Effect; } export class ProducerFactory extends Context.Service()( "@trustgraph/base/messaging/runtime/ProducerFactory", ) {} export class ConsumerFactory extends Context.Service()( "@trustgraph/base/messaging/runtime/ConsumerFactory", ) {} export class RequestResponseFactory extends Context.Service< RequestResponseFactory, RequestResponseFactoryService >()("@trustgraph/base/messaging/runtime/RequestResponseFactory") {} export class FlowRuntime extends Context.Service()( "@trustgraph/base/messaging/runtime/FlowRuntime", ) {} export function makeEffectProducerHandle( backend: BackendProducer, options: EffectProducerOptions, ): EffectProducer { return { send: Effect.fn(`Producer.send:${options.topic}`)((id: string, message: T) => Effect.tryPromise({ try: () => backend.send(message, { id }), catch: (error) => messagingDeliveryError(options.topic, "send", error), }).pipe( Effect.tap(() => options.metrics === undefined ? Effect.void : Effect.sync(() => { options.metrics?.inc(); }), ), ), ), flush: Effect.tryPromise({ try: () => backend.flush(), catch: (error) => messagingDeliveryError(options.topic, "flush", error), }), close: Effect.tryPromise({ try: () => backend.close(), catch: (error) => messagingDeliveryError(options.topic, "close", error), }), }; } export const makeEffectProducerFromPubSub = Effect.fn("makeEffectProducerFromPubSub")(function* ( pubsub: PubSubService, options: EffectProducerOptions, ) { const createOptions: CreateProducerOptions = options.schema === undefined ? { topic: options.topic } : { topic: options.topic, schema: options.schema }; const backend = yield* pubsub.createProducer(createOptions); const producer = makeEffectProducerHandle(backend, options); yield* Effect.addFinalizer(() => producer.close.pipe( Effect.catch((error) => Effect.logError("[Producer] Failed to close producer", { error: error.message, topic: error.topic, }), ), ), ); return producer; }); const closeConsumerBackend = ( backend: BackendConsumer, topic: string, subscription: string, ) => Effect.tryPromise({ try: () => backend.close(), catch: (error) => messagingLifecycleError(`${topic}:${subscription}`, "close-consumer", error), }); const acknowledgeMessage = ( backend: BackendConsumer, message: Message, topic: string, ) => Effect.tryPromise({ try: () => backend.acknowledge(message), catch: (error) => messagingDeliveryError(topic, "acknowledge", error), }); const negativeAcknowledgeMessage = ( backend: BackendConsumer, message: Message, topic: string, ) => Effect.tryPromise({ try: () => backend.negativeAcknowledge(message), catch: (error) => messagingDeliveryError(topic, "negative-acknowledge", error), }); const receiveMessage = ( backend: BackendConsumer, topic: string, timeoutMs: number, ) => Effect.tryPromise({ try: () => backend.receive(timeoutMs), catch: (error) => messagingDeliveryError(topic, "receive", error), }); const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* ( options: EffectConsumerOptions, flow: FlowContext, message: Message, config: MessagingRuntimeConfig, ) { const rateLimitRetryMs = options.rateLimitRetryMs ?? config.rateLimitRetryMs; const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs; const runHandler = (): Effect.Effect => options.handler(message.value(), message.properties(), flow).pipe( Effect.mapError((error): TooManyRequestsError | MessagingHandlerError => isTooManyRequestsError(error) ? error : messagingHandlerError(options.topic, options.subscription, error), ), ); return yield* runHandler().pipe( Effect.tapError((error) => isTooManyRequestsError(error) ? Effect.logWarning("[Consumer] Rate limited, retrying", { topic: options.topic, subscription: options.subscription, retryMs: rateLimitRetryMs, }) : Effect.void, ), Effect.retry({ schedule: Schedule.spaced(Duration.millis(rateLimitRetryMs)), while: isTooManyRequestsError, }), Effect.timeoutOrElse({ duration: Duration.millis(rateLimitTimeoutMs), orElse: () => Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)), }), Effect.mapError((error) => isTooManyRequestsError(error) ? messagingHandlerError(options.topic, options.subscription, error) : error, ), ); }); const processConsumerMessage = Effect.fn("processConsumerMessage")(function* ( backend: BackendConsumer, options: EffectConsumerOptions, flow: FlowContext, message: Message, config: MessagingRuntimeConfig, ) { yield* handleMessageWithRetry(options, flow, message, config).pipe( Effect.flatMap(() => acknowledgeMessage(backend, message, options.topic)), Effect.catch((error) => negativeAcknowledgeMessage(backend, message, options.topic).pipe( Effect.catch((nakError) => Effect.logError("[Consumer] Failed to negative-acknowledge message", { error: nakError.message, topic: nakError.topic, }), ), Effect.flatMap(() => Effect.logError("[Consumer] Message handling failed", { error: error.message, topic: options.topic, subscription: options.subscription, }), ), ), ), ); }); const consumerLoop = ( backend: BackendConsumer, options: EffectConsumerOptions, flow: FlowContext, config: MessagingRuntimeConfig, ): Effect.Effect => Effect.whileLoop({ while: () => true, body: () => receiveMessage(backend, options.topic, options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs).pipe( Effect.flatMap((message) => message === null ? Effect.sleep(Duration.millis(options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs)) : processConsumerMessage(backend, options, flow, message, config), ), Effect.catch((error) => Effect.logError("[Consumer] Receive loop failed", { error: error.message, topic: options.topic, subscription: options.subscription, }).pipe( Effect.flatMap(() => Effect.sleep(Duration.millis(options.errorBackoffMs ?? config.consumerErrorBackoffMs)), ), ), ), ), step: () => undefined, }); export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPubSub")(function* ( pubsub: PubSubService, config: MessagingRuntimeConfig, options: EffectConsumerOptions, flow: FlowContext, ) { const createOptions: CreateConsumerOptions = { topic: options.topic, subscription: options.subscription, ...(options.initialPosition === undefined ? {} : { initialPosition: options.initialPosition }), ...(options.schema === undefined ? {} : { schema: options.schema }), }; const concurrency = Math.max(1, options.concurrency ?? 1); const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index); const workerConfig = { ...config, rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs, rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs, }; const workers = yield* Effect.forEach(workerIndexes, () => Effect.gen(function* () { const backend = yield* pubsub.createConsumer(createOptions); const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkScoped); return { backend, fiber }; }), ); const stopped = yield* Ref.make(false); const stop = Effect.fn(`Consumer.stop:${options.topic}`)(function* () { const alreadyStopped = yield* Ref.getAndSet(stopped, true); if (alreadyStopped) return; yield* Effect.forEach(workers, (worker) => Fiber.interrupt(worker.fiber), { discard: true }); yield* Effect.forEach( workers, (worker) => closeConsumerBackend(worker.backend, options.topic, options.subscription), { discard: true }, ); }); yield* Effect.addFinalizer(() => stop().pipe( Effect.catch((error) => Effect.logError("[Consumer] Failed to stop consumer", { error: error.message, resource: error.resource, operation: error.operation, }), ), ), ); return { fibers: workers.map((worker) => worker.fiber), stop: stop(), } satisfies EffectConsumer; }); const dispatchResponseLoop = ( backend: BackendConsumer, responseTopic: string, subscribers: Map>, config: MessagingRuntimeConfig, ): Effect.Effect => Effect.whileLoop({ while: () => true, body: () => receiveMessage(backend, responseTopic, config.consumerReceiveTimeoutMs).pipe( Effect.flatMap((message) => { if (message === null) { return Effect.sleep(Duration.millis(config.consumerReceiveTimeoutMs)); } const id = message.properties().id; const queue = id === undefined ? undefined : subscribers.get(id); return Effect.gen(function* () { if (queue !== undefined) { yield* Queue.offer(queue, message.value()); } yield* acknowledgeMessage(backend, message, responseTopic); }); }), Effect.catch((error) => Effect.logError("[RequestResponse] Response dispatch failed", { error: error.message, topic: responseTopic, }).pipe(Effect.flatMap(() => Effect.sleep(Duration.millis(config.consumerErrorBackoffMs)))), ), ), step: () => undefined, }); const waitForResponse = Effect.fn("waitForResponse")(function* ( queue: Queue.Queue, options: EffectRequestOptions | undefined, ) { const response = yield* Stream.fromQueue(queue).pipe( Stream.filterMapEffect((candidate) => { if (options?.recipient === undefined) { return Effect.succeed(Result.succeed(candidate)); } return options.recipient(candidate).pipe( Effect.map((complete) => complete ? Result.succeed(candidate) : Result.fail(undefined) ), ); }), Stream.runHead, ); return yield* O.match(response, { onNone: () => Effect.never, onSome: Effect.succeed, }); }); export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestResponseFromPubSub")(function* < TReq, TRes, >( pubsub: PubSubService, config: MessagingRuntimeConfig, options: EffectRequestResponseOptions, ) { const producerOptions: CreateProducerOptions = options.requestSchema === undefined ? { topic: options.requestTopic } : { topic: options.requestTopic, schema: options.requestSchema }; const producerBackend = yield* pubsub.createProducer(producerOptions); const producer = makeEffectProducerHandle(producerBackend, { topic: options.requestTopic, ...(options.requestSchema === undefined ? {} : { schema: options.requestSchema }), }); const createOptions: CreateConsumerOptions = { topic: options.responseTopic, subscription: options.subscription, ...(options.responseSchema === undefined ? {} : { schema: options.responseSchema }), }; const backend = yield* pubsub.createConsumer(createOptions); const subscribers = new Map>(); const stoppedSignal = yield* Deferred.make(); const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped); let stopped = false; const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { if (stopped) return; stopped = true; yield* Deferred.fail( stoppedSignal, messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"), ).pipe(Effect.ignore); yield* Fiber.interrupt(fiber); yield* producer.close; yield* closeConsumerBackend(backend, options.responseTopic, options.subscription); }); yield* Effect.addFinalizer(() => stop().pipe( Effect.catch((error) => Effect.logError("[RequestResponse] Failed to stop runtime", { error: error.message, }), ), ), ); return { request: ( request: TReq, requestOptions?: EffectRequestOptions, ) => { const id = randomUUID(); const timeoutMs = requestOptions?.timeoutMs ?? config.requestTimeoutMs; return Effect.acquireUseRelease( Queue.unbounded().pipe( Effect.tap((queue) => Effect.sync(() => { subscribers.set(id, queue); }), ), ), (queue) => Effect.gen(function* () { yield* producer.send(id, request); const result = yield* waitForResponse(queue, requestOptions).pipe( Effect.raceFirst(Deferred.await(stoppedSignal)), Effect.timeoutOption(Duration.millis(timeoutMs)), ); return yield* O.match(result, { onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)), onSome: Effect.succeed, }); }), (queue) => Effect.sync(() => { subscribers.delete(id); }).pipe( Effect.flatMap(() => Queue.shutdown(queue)), Effect.ignore, ), ); }, stop: stop(), } satisfies EffectRequestResponse; }); export function makeProducerFactoryService(pubsub: PubSubService): ProducerFactoryService { return { make: Effect.fn("ProducerFactory.make")((options: EffectProducerOptions) => makeEffectProducerFromPubSub(pubsub, options), ), }; } export function makeConsumerFactoryService( pubsub: PubSubService, config: MessagingRuntimeConfig, ): ConsumerFactoryService { return { run: Effect.fn("ConsumerFactory.run")(( options: EffectConsumerOptions, flow: FlowContext, ) => makeEffectConsumerFromPubSub(pubsub, config, options, flow), ), }; } export function makeRequestResponseFactoryService( pubsub: PubSubService, config: MessagingRuntimeConfig, ): RequestResponseFactoryService { return { make: Effect.fn("RequestResponseFactory.make")(( options: EffectRequestResponseOptions, ) => makeEffectRequestResponseFromPubSub(pubsub, config, options)), }; } export const ProducerFactoryLive = Layer.effect( ProducerFactory, Effect.gen(function* () { const pubsub = yield* PubSub; return ProducerFactory.of(makeProducerFactoryService(pubsub)); }), ); export const ConsumerFactoryLive = Layer.effect( ConsumerFactory, Effect.gen(function* () { const pubsub = yield* PubSub; const config = yield* loadMessagingRuntimeConfig(); return ConsumerFactory.of(makeConsumerFactoryService(pubsub, config)); }), ); export const RequestResponseFactoryLive = Layer.effect( RequestResponseFactory, Effect.gen(function* () { const pubsub = yield* PubSub; const config = yield* loadMessagingRuntimeConfig(); return RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, config)); }), ); export const runFlowRuntimeScoped = Effect.fn("FlowRuntime.run")(function* ( flow: Flow, ) { yield* flow.startEffect().pipe( Effect.mapError((error) => flowRuntimeError(flow.name, "start", error)), ); yield* Effect.addFinalizer(() => Effect.sync(() => { flow.clearResources(); }), ); }); export const FlowRuntimeLive = Layer.succeed( FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped, }), ); export const MessagingRuntimeLive = Layer.mergeAll( ProducerFactoryLive, ConsumerFactoryLive, RequestResponseFactoryLive, FlowRuntimeLive, ); export const runEffectProducerScoped = Effect.fn("runEffectProducerScoped")(function* ( options: EffectProducerOptions, ) { const pubsub = yield* PubSub; return yield* makeEffectProducerFromPubSub(pubsub, options); }); export const runEffectConsumerScoped = Effect.fn("runEffectConsumerScoped")(function* ( options: EffectConsumerOptions, flow: FlowContext, ) { const pubsub = yield* PubSub; const config = yield* loadMessagingRuntimeConfig(); return yield* makeEffectConsumerFromPubSub(pubsub, config, options, flow); }); export const runEffectRequestResponseScoped = Effect.fn("runEffectRequestResponseScoped")(function* ( options: EffectRequestResponseOptions, ) { const pubsub = yield* PubSub; const config = yield* loadMessagingRuntimeConfig(); return yield* makeEffectRequestResponseFromPubSub(pubsub, config, options); }); export const runFlowScoped = Effect.fn("runFlowScoped")(function* ( flow: Flow, ) { yield* runFlowRuntimeScoped(flow); });