Remove native classes from TS runtime

This commit is contained in:
elpresidank 2026-06-01 20:26:47 -05:00
parent 952daf325d
commit dca2786828
79 changed files with 7622 additions and 6703 deletions

View file

@ -1,5 +1,5 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { Consumer, type ConsumerOptions, type FlowContext } from "../messaging/consumer.js";
import { makeConsumer, type ConsumerOptions, type FlowContext } from "../messaging/consumer.js";
import type {
PubSubBackend,
BackendConsumer,
@ -75,20 +75,21 @@ describe("Consumer", () => {
// ── Constructor ──────────────────────────────────────────────────
it("stores options and applies defaults", () => {
const handler = vi.fn();
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "my-topic",
subscription: "my-sub",
handler,
});
// Access private fields via any-cast to verify defaults
expect((consumer as any).concurrency).toBe(1);
expect((consumer as any).rateLimitRetryMs).toBe(10_000);
expect(consumer).toMatchObject({
start: expect.any(Function),
stop: expect.any(Function),
});
});
it("accepts custom concurrency and rateLimitRetryMs", () => {
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
@ -97,8 +98,10 @@ describe("Consumer", () => {
rateLimitRetryMs: 5_000,
});
expect((consumer as any).concurrency).toBe(4);
expect((consumer as any).rateLimitRetryMs).toBe(5_000);
expect(consumer).toMatchObject({
start: expect.any(Function),
stop: expect.any(Function),
});
});
// ── start() creates consumer and calls handler ─────────────────
@ -116,7 +119,7 @@ describe("Consumer", () => {
return null;
});
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "topic-a",
subscription: "sub-a",
@ -147,7 +150,7 @@ describe("Consumer", () => {
return null;
});
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
@ -174,7 +177,7 @@ describe("Consumer", () => {
return null;
});
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
@ -217,7 +220,7 @@ describe("Consumer", () => {
return null;
});
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
@ -249,7 +252,7 @@ describe("Consumer", () => {
return null;
});
const consumer = new Consumer({
const consumer = makeConsumer({
pubsub,
topic: "t",
subscription: "s",
@ -268,6 +271,6 @@ describe("Consumer", () => {
await startPromise;
expect(backendConsumer.close).toHaveBeenCalled();
expect((consumer as any).running).toBe(false);
await expect(consumer.stop()).resolves.toBeUndefined();
});
});

View file

@ -3,7 +3,7 @@ import { ConfigProvider, Effect, Fiber } from "effect";
import {
FlowProcessor,
MessagingRuntimeLive,
ProducerSpec,
makeProducerSpec,
PubSub,
runFlowProcessorDefinitionScoped,
runProcessorScoped,
@ -146,7 +146,7 @@ class TestFlowProcessor extends FlowProcessor {
private readonly events: Array<string>,
) {
super(config);
this.registerSpecification(new ProducerSpec<string>("output"));
this.registerSpecification(makeProducerSpec<string>("output"));
this.registerConfigHandler(async (_config, version) => {
this.events.push(`handler:${version}`);
});
@ -225,7 +225,7 @@ describe("Effect-native FlowProcessor runtime", () => {
const fiber = yield* runFlowProcessorDefinitionScoped({
id: "functional-flow-processor-test",
pubsub: backend,
specifications: [new ProducerSpec<string>("output")],
specifications: [makeProducerSpec<string>("output")],
configHandlers: [
(_config, version) => Effect.sync(() => {
events.push(`handler:${version}`);

View file

@ -2,13 +2,14 @@ import { describe, expect, it } from "@effect/vitest";
import { ConfigProvider, Duration, Effect, Fiber } from "effect";
import * as TestClock from "effect/testing/TestClock";
import {
ConsumerSpec,
makeConsumerSpec,
makeConsumerSpecFromPromise,
Flow,
MessagingRuntimeLive,
ParameterSpec,
ProducerSpec,
makeParameterSpec,
makeProducerSpec,
PubSub,
RequestResponseSpec,
makeRequestResponseSpec,
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
@ -156,7 +157,7 @@ describe("Effect-native flow specifications", () => {
"processor",
backend,
{ topics: { output: "actual-output" } },
[new ProducerSpec<string>("output")],
[makeProducerSpec<string>("output")],
);
yield* Effect.scoped(
@ -179,7 +180,7 @@ describe("Effect-native flow specifications", () => {
);
it.effect(
"runs Promise handlers through the explicit ConsumerSpec compatibility helper",
"runs Promise handlers through the explicit makeConsumerSpec compatibility helper",
Effect.fnUntraced(function* () {
const message = createMessage("payload", { id: "request-1" });
const consumer = new ScriptedConsumer<string>([message]);
@ -191,7 +192,7 @@ describe("Effect-native flow specifications", () => {
backend,
{},
[
ConsumerSpec.fromPromise<string>(
makeConsumerSpecFromPromise<string>(
"input",
async (value, properties, flowContext: FlowContext) => {
handled.push(`${flowContext.name}:${properties.id}:${value}`);
@ -237,7 +238,7 @@ describe("Effect-native flow specifications", () => {
response: "actual-response",
},
},
[new RequestResponseSpec<string, string>("rr", "request", "response")],
[makeRequestResponseSpec<string, string>("rr", "request", "response")],
);
const response = yield* Effect.scoped(
@ -270,7 +271,7 @@ describe("Effect-native flow specifications", () => {
"processor",
backend,
{ parameters: { present: 42 } },
[new ParameterSpec("present")],
[makeParameterSpec("present")],
);
const errors = yield* Effect.scoped(

View file

@ -6,7 +6,7 @@ import {
defaultMessagingRuntimeConfig,
makeEffectRequestResponseFromPubSub,
MessagingRuntimeLive,
ProducerSpec,
makeProducerSpec,
runEffectConsumerScoped,
runEffectProducerScoped,
runFlowScoped,
@ -260,7 +260,7 @@ describe("Effect-native messaging runtime", () => {
"processor",
backend,
{},
[new ProducerSpec<string>("flow-output")],
[makeProducerSpec<string>("flow-output")],
);
yield* Effect.scoped(

View file

@ -1,8 +1,8 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect } from "effect";
import {
AsyncProcessor,
PubSub,
makeAsyncProcessor,
runProcessorScoped,
type BackendConsumer,
type BackendProducer,
@ -79,53 +79,49 @@ class FailingProducerBackend extends FakePubSubBackend {
}
}
class RecordingProcessor extends AsyncProcessor {
constructor(
config: ProcessorConfig,
private readonly events: Array<string>,
) {
super(config);
}
const makeRecordingProcessor = (
config: ProcessorConfig,
events: Array<string>,
) => {
const processor = makeAsyncProcessor(config, {
run: async (runtime) => {
events.push(`run:${runtime.config.manageProcessSignals === false ? "effect-signals" : "class-signals"}`);
},
});
const stop = processor.stop;
processor.stop = async () => {
events.push("stop");
await stop();
};
return processor;
};
protected async run(): Promise<void> {
this.events.push(`run:${this.config.manageProcessSignals === false ? "effect-signals" : "class-signals"}`);
}
const makeFailingProcessor = (config: ProcessorConfig) =>
makeAsyncProcessor(config, {
run: async () => {
throw new Error("processor failed");
},
});
override async stop(): Promise<void> {
this.events.push("stop");
await super.stop();
}
}
class FailingProcessor extends AsyncProcessor {
protected async run(): Promise<void> {
throw new Error("processor failed");
}
}
class NativeRecordingProcessor extends AsyncProcessor<never, PubSub> {
constructor(
config: ProcessorConfig,
private readonly events: Array<string>,
) {
super(config);
}
protected override runEffect() {
const events = this.events;
const config = this.config;
return Effect.gen(function* () {
const pubsub = yield* PubSub;
events.push(`native:${config.manageProcessSignals === false ? "effect-signals" : "class-signals"}`);
events.push(`pubsub:${pubsub.backend.constructor.name}`);
});
}
override stopEffect() {
this.events.push("native-stop");
return super.stopEffect();
}
}
const makeNativeRecordingProcessor = (
config: ProcessorConfig,
events: Array<string>,
) => {
const processor = makeAsyncProcessor<never, PubSub>(config, {
runEffect: (runtime) =>
Effect.gen(function* () {
const pubsub = yield* PubSub;
events.push(`native:${runtime.config.manageProcessSignals === false ? "effect-signals" : "class-signals"}`);
events.push(`pubsub:${pubsub.backend.constructor.name}`);
}),
});
const stopEffect = processor.stopEffect;
processor.stopEffect = () => {
events.push("native-stop");
return stopEffect();
};
return processor;
};
describe("Effect runtime services", () => {
it.effect(
@ -180,7 +176,7 @@ describe("Effect runtime services", () => {
metricsPort: 8000,
manageProcessSignals: true,
},
(config) => new RecordingProcessor(config, events),
(config) => makeRecordingProcessor(config, events),
).pipe(Effect.provide(PubSub.layer(backend))),
);
@ -203,7 +199,7 @@ describe("Effect runtime services", () => {
metricsPort: 8000,
manageProcessSignals: true,
},
(config) => new NativeRecordingProcessor(config, events),
(config) => makeNativeRecordingProcessor(config, events),
).pipe(Effect.provide(PubSub.layer(backend))),
);
@ -224,7 +220,7 @@ describe("Effect runtime services", () => {
metricsPort: 8000,
manageProcessSignals: true,
},
(config) => new FailingProcessor(config),
makeFailingProcessor,
).pipe(
Effect.provide(PubSub.layer(backend)),
Effect.flip,

View file

@ -9,7 +9,7 @@ export type {
InitialPosition,
} from "./types.js";
export { NatsBackend } from "./nats.js";
export { makeNatsBackend } from "./nats.js";
export {
PubSub,
NatsPubSubLive,

View file

@ -32,239 +32,207 @@ import type {
const sc = StringCodec();
class NatsMessage<T> implements Message<T> {
interface NatsMessage<T> extends Message<T> {
/** Exposed so acknowledge/negativeAcknowledge can access the raw JsMsg */
readonly _jsMsg: JsMsg;
private readonly decoded: T;
}
constructor(msg: JsMsg, decoded: T) {
this._jsMsg = msg;
this.decoded = decoded;
}
value(): T {
return this.decoded;
}
properties(): Record<string, string> {
const headers = this._jsMsg.headers;
const props: Record<string, string> = {};
if (headers !== undefined) {
for (const [key, values] of headers) {
const value = values[0];
if (value !== undefined) {
props[key] = value;
function makeNatsMessage<T>(msg: JsMsg, decoded: T): NatsMessage<T> {
return {
_jsMsg: msg,
value: () => decoded,
properties: () => {
const headers = msg.headers;
const props: Record<string, string> = {};
if (headers !== undefined) {
for (const [key, values] of headers) {
const value = values[0];
if (value !== undefined) {
props[key] = value;
}
}
}
}
return props;
}
return props;
},
};
}
class NatsProducer<T> implements BackendProducer<T> {
private readonly js: JetStreamClient;
private readonly subject: string;
private readonly schema: S.Top | undefined;
function makeNatsProducer<T>(
js: JetStreamClient,
subject: string,
schema?: S.Top,
): BackendProducer<T> {
return {
send: async (message, properties) => {
const encoded = schema !== undefined
? S.encodeUnknownSync(schema as S.Codec<unknown, unknown>)(message)
: message;
const data = sc.encode(JSON.stringify(encoded));
const opts: Record<string, unknown> = {};
constructor(js: JetStreamClient, subject: string, schema?: S.Top) {
this.js = js;
this.subject = subject;
this.schema = schema;
}
async send(message: T, properties?: Record<string, string>): Promise<void> {
const encoded = this.schema !== undefined
? S.encodeUnknownSync(this.schema as S.Codec<unknown, unknown>)(message)
: message;
const data = sc.encode(JSON.stringify(encoded));
const opts: Record<string, unknown> = {};
if (properties !== undefined && Object.keys(properties).length > 0) {
const { headers } = await import("nats");
const hdrs = headers();
for (const [key, val] of Object.entries(properties)) {
hdrs.append(key, val);
if (properties !== undefined && Object.keys(properties).length > 0) {
const { headers } = await import("nats");
const hdrs = headers();
for (const [key, val] of Object.entries(properties)) {
hdrs.append(key, val);
}
opts.headers = hdrs;
}
opts.headers = hdrs;
}
await this.js.publish(this.subject, data, opts);
}
async flush(): Promise<void> {
// NATS publishes are flushed on the connection level
}
async close(): Promise<void> {
// No per-producer cleanup needed for NATS
}
await js.publish(subject, data, opts);
},
flush: async () => {
// NATS publishes are flushed on the connection level.
},
close: async () => {
// No per-producer cleanup needed for NATS.
},
};
}
class NatsConsumer<T> implements BackendConsumer<T> {
private consumer: NatsJsConsumer | null = null;
private readonly js: JetStreamClient;
private readonly jsm: JetStreamManager;
private readonly subject: string;
private readonly subscription: string;
private readonly initialPosition: "latest" | "earliest";
private readonly streamName: string;
private readonly schema: S.Top | undefined;
constructor(
js: JetStreamClient,
jsm: JetStreamManager,
subject: string,
subscription: string,
initialPosition: "latest" | "earliest",
streamName: string,
schema?: S.Top,
) {
this.js = js;
this.jsm = jsm;
this.subject = subject;
this.subscription = subscription;
this.initialPosition = initialPosition;
this.streamName = streamName;
this.schema = schema;
}
async init(): Promise<void> {
// Stream is already ensured by NatsBackend.ensureStream().
// Create or bind to durable consumer.
try {
this.consumer = await this.js.consumers.get(this.streamName, this.subscription);
} catch {
const deliverPolicy =
this.initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
await this.jsm.consumers.add(this.streamName, {
durable_name: this.subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: this.subject,
});
this.consumer = await this.js.consumers.get(this.streamName, this.subscription);
}
}
async receive(timeoutMs = 2000): Promise<Message<T> | null> {
if (this.consumer === null) throw new Error("Consumer not initialized");
// Pull a single message with a timeout using the pull-based API.
// consumer.next() returns a JsMsg or null when the timeout expires.
const msg = await this.consumer.next({ expires: timeoutMs });
if (msg === null) return null;
const parsed = JSON.parse(sc.decode(msg.data));
const decoded = this.schema !== undefined
? S.decodeUnknownSync(this.schema as S.Codec<unknown, unknown>)(parsed) as T
: parsed as T;
return new NatsMessage(msg, decoded);
}
async acknowledge(message: Message<T>): Promise<void> {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.ack();
}
async negativeAcknowledge(message: Message<T>): Promise<void> {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.nak();
}
async unsubscribe(): Promise<void> {
// The pull-based consumer does not have a persistent subscription to drain.
// Clearing the reference is sufficient; the durable consumer persists server-side.
this.consumer = null;
}
async close(): Promise<void> {
this.consumer = null;
}
interface InitializableBackendConsumer<T> extends BackendConsumer<T> {
readonly init: () => Promise<void>;
}
export class NatsBackend implements PubSubBackend {
private connection: NatsConnection | null = null;
private js: JetStreamClient | null = null;
private jsm: JetStreamManager | null = null;
private initializedStreams = new Set<string>();
private readonly url: string;
function makeNatsConsumer<T>(
js: JetStreamClient,
jsm: JetStreamManager,
subject: string,
subscription: string,
initialPosition: "latest" | "earliest",
streamName: string,
schema?: S.Top,
): InitializableBackendConsumer<T> {
let consumer: NatsJsConsumer | null = null;
constructor(url = "nats://localhost:4222") {
this.url = url;
}
return {
init: async () => {
// Stream is already ensured by makeNatsBackend(). Create or bind to a durable consumer.
try {
consumer = await js.consumers.get(streamName, subscription);
} catch {
const deliverPolicy =
initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
private async ensureConnected(): Promise<void> {
if (this.connection === null) {
this.connection = await connect({ servers: this.url });
this.js = this.connection.jetstream();
this.jsm = await this.connection.jetstreamManager();
await jsm.consumers.add(streamName, {
durable_name: subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: subject,
});
consumer = await js.consumers.get(streamName, subscription);
}
},
receive: async (timeoutMs = 2000) => {
if (consumer === null) throw new Error("Consumer not initialized");
// Pull a single message with a timeout using the pull-based API.
// consumer.next() returns a JsMsg or null when the timeout expires.
const msg = await consumer.next({ expires: timeoutMs });
if (msg === null) return null;
const parsed = JSON.parse(sc.decode(msg.data));
const decoded = schema !== undefined
? S.decodeUnknownSync(schema as S.Codec<unknown, unknown>)(parsed) as T
: parsed as T;
return makeNatsMessage(msg, decoded);
},
acknowledge: async (message) => {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.ack();
},
negativeAcknowledge: async (message) => {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.nak();
},
unsubscribe: async () => {
// The pull-based consumer does not have a persistent subscription to drain.
// Clearing the reference is sufficient; the durable consumer persists server-side.
consumer = null;
},
close: async () => {
consumer = null;
},
};
}
export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
let connection: NatsConnection | null = null;
let js: JetStreamClient | null = null;
let jsm: JetStreamManager | null = null;
const initializedStreams = new Set<string>();
const ensureConnected = async (): Promise<void> => {
if (connection === null) {
connection = await connect({ servers: url });
js = connection.jetstream();
jsm = await connection.jetstreamManager();
}
}
};
/**
* Ensure the stream for a given subject exists with a wildcard filter.
* E.g. subject "tg.flow.config-request" stream "tg_flow" with subjects ["tg.flow.>"]
*/
private async ensureStream(subject: string): Promise<string> {
const ensureStream = async (subject: string): Promise<string> => {
const parts = subject.split(".");
const streamName = parts.slice(0, 2).join("_");
if (this.initializedStreams.has(streamName)) return streamName;
if (initializedStreams.has(streamName)) return streamName;
const wildcardSubject = `${parts.slice(0, 2).join(".")}.>`;
const jsm = this.jsm;
if (jsm === null) throw new Error("NATS backend not connected");
const manager = jsm;
if (manager === null) throw new Error("NATS backend not connected");
try {
await jsm.streams.info(streamName);
await manager.streams.info(streamName);
} catch {
await jsm.streams.add({
await manager.streams.add({
name: streamName,
subjects: [wildcardSubject],
});
}
this.initializedStreams.add(streamName);
initializedStreams.add(streamName);
return streamName;
}
};
async createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>> {
await this.ensureConnected();
await this.ensureStream(options.topic);
const js = this.js;
if (js === null) throw new Error("NATS backend not connected");
return new NatsProducer<T>(js, options.topic, options.schema);
}
async createConsumer<T>(options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
await this.ensureConnected();
const streamName = await this.ensureStream(options.topic);
const js = this.js;
const jsm = this.jsm;
if (js === null || jsm === null) throw new Error("NATS backend not connected");
const consumer = new NatsConsumer<T>(
js,
jsm,
options.topic,
options.subscription,
options.initialPosition ?? "latest",
streamName,
options.schema,
);
await consumer.init();
return consumer;
}
async close(): Promise<void> {
if (this.connection !== null) {
await this.connection.drain();
this.connection = null;
this.js = null;
this.jsm = null;
}
}
return {
createProducer: async <T>(options: CreateProducerOptions) => {
await ensureConnected();
await ensureStream(options.topic);
const client = js;
if (client === null) throw new Error("NATS backend not connected");
return makeNatsProducer<T>(client, options.topic, options.schema);
},
createConsumer: async <T>(options: CreateConsumerOptions) => {
await ensureConnected();
const streamName = await ensureStream(options.topic);
const client = js;
const manager = jsm;
if (client === null || manager === null) throw new Error("NATS backend not connected");
const consumer = makeNatsConsumer<T>(
client,
manager,
options.topic,
options.subscription,
options.initialPosition ?? "latest",
streamName,
options.schema,
);
await consumer.init();
return consumer;
},
close: async () => {
if (connection !== null) {
await connection.drain();
connection = null;
js = null;
jsm = null;
}
},
};
}

View file

@ -14,7 +14,7 @@ import type {
CreateProducerOptions,
PubSubBackend,
} from "./types.js";
import { NatsBackend } from "./nats.js";
import { makeNatsBackend } from "./nats.js";
import { pubSubError } from "../errors.js";
export interface PubSubService {
@ -78,14 +78,14 @@ export function pubSubLayer(backend: PubSubBackend): Layer.Layer<PubSub> {
}
export function makeNatsPubSubLayer(url = "nats://localhost:4222"): Layer.Layer<PubSub> {
return pubSubLayer(new NatsBackend(url));
return pubSubLayer(makeNatsBackend(url));
}
export const NatsPubSubLive = Layer.effect(PubSub)(
Effect.gen(function* () {
const natsUrl = O.getOrUndefined(yield* Config.string("NATS_URL").pipe(Config.option));
const pulsarHost = O.getOrUndefined(yield* Config.string("PULSAR_HOST").pipe(Config.option));
const service = makePubSubService(new NatsBackend(natsUrl ?? pulsarHost ?? "nats://localhost:4222"));
const service = makePubSubService(makeNatsBackend(natsUrl ?? pulsarHost ?? "nats://localhost:4222"));
yield* Effect.addFinalizer(() =>
service.close.pipe(
Effect.catch((error) =>

View file

@ -5,7 +5,7 @@
*/
import * as S from "effect/Schema";
import type { TgError } from "./schema/primitives.js";
import type { TgError } from "./schema/index.ts";
export class TooManyRequestsError extends S.TaggedErrorClass<TooManyRequestsError>()(
"TooManyRequestsError",

View file

@ -33,67 +33,55 @@ export interface ConsumerOptions<T> {
rateLimitTimeoutMs?: number;
}
export class Consumer<T> {
private backend: BackendConsumer<T> | null = null;
private running = false;
private abortController = new AbortController();
private readonly options: ConsumerOptions<T>;
declare const ConsumerMessageType: unique symbol;
private readonly concurrency: number;
private readonly rateLimitRetryMs: number;
export interface Consumer<T> {
readonly [ConsumerMessageType]?: (_: T) => T;
readonly start: (flow: FlowContext) => Promise<void>;
readonly stop: () => Promise<void>;
}
constructor(options: ConsumerOptions<T>) {
this.options = options;
this.concurrency = options.concurrency ?? 1;
this.rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000;
}
export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
let backend: BackendConsumer<T> | null = null;
let running = false;
let abortController = new AbortController();
const concurrency = options.concurrency ?? 1;
const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000;
async start(flow: FlowContext): Promise<void> {
this.backend = await this.options.pubsub.createConsumer<T>({
topic: this.options.topic,
subscription: this.options.subscription,
initialPosition: this.options.initialPosition ?? "latest",
});
this.running = true;
// Spawn concurrent consumer tasks
const tasks = Array.from({ length: this.concurrency }, () =>
this.consumeLoop(flow),
);
// Run all concurrently — first rejection stops all
await Promise.all(tasks);
}
async stop(): Promise<void> {
this.running = false;
this.abortController.abort();
if (this.backend !== null) {
await this.backend.close();
this.backend = null;
const handleWithRetry = async (msg: Message<T>, flow: FlowContext): Promise<void> => {
try {
await options.handler(msg.value(), msg.properties(), flow);
} catch (err) {
if (S.is(TooManyRequestsError)(err)) {
console.warn(`[Consumer] Rate limited, retrying in ${rateLimitRetryMs}ms`);
await sleep(rateLimitRetryMs);
await options.handler(msg.value(), msg.properties(), flow);
} else {
throw err;
}
}
}
};
private async consumeLoop(flow: FlowContext): Promise<void> {
while (this.running) {
const consumeLoop = async (flow: FlowContext): Promise<void> => {
while (running) {
let msg: Message<T> | null = null;
try {
const backend = this.backend;
if (backend === null) throw new Error("Consumer backend not started");
const currentBackend = backend;
if (currentBackend === null) throw new Error("Consumer backend not started");
msg = await backend.receive(2000);
msg = await currentBackend.receive(2000);
if (msg === null) continue;
await this.handleWithRetry(msg, flow);
await backend.acknowledge(msg);
await handleWithRetry(msg, flow);
await currentBackend.acknowledge(msg);
} catch (err) {
if (!this.running) break;
if (!running) break;
console.error("[Consumer] Error in consume loop:", err);
if (msg !== null) {
try {
const backend = this.backend;
if (backend !== null) {
await backend.negativeAcknowledge(msg);
const currentBackend = backend;
if (currentBackend !== null) {
await currentBackend.negativeAcknowledge(msg);
}
} catch (nakErr) {
console.error("[Consumer] Failed to nak message:", nakErr);
@ -102,21 +90,35 @@ export class Consumer<T> {
await sleep(1000);
}
}
}
};
private async handleWithRetry(msg: Message<T>, flow: FlowContext): Promise<void> {
try {
await this.options.handler(msg.value(), msg.properties(), flow);
} catch (err) {
if (S.is(TooManyRequestsError)(err)) {
console.warn(`[Consumer] Rate limited, retrying in ${this.rateLimitRetryMs}ms`);
await sleep(this.rateLimitRetryMs);
await this.options.handler(msg.value(), msg.properties(), flow);
} else {
throw err;
return {
start: async (flow) => {
backend = await options.pubsub.createConsumer<T>({
topic: options.topic,
subscription: options.subscription,
initialPosition: options.initialPosition ?? "latest",
});
running = true;
// Spawn concurrent consumer tasks.
const tasks = Array.from({ length: concurrency }, () =>
consumeLoop(flow),
);
// Run all concurrently: first rejection stops all.
await Promise.all(tasks);
},
stop: async () => {
running = false;
abortController.abort();
abortController = new AbortController();
if (backend !== null) {
await backend.close();
backend = null;
}
}
}
},
};
}
function sleep(ms: number): Promise<void> {

View file

@ -1,7 +1,7 @@
export { Producer } from "./producer.js";
export { Consumer, type MessageHandler, type FlowContext, type ConsumerOptions } from "./consumer.js";
export { Subscriber, AsyncQueue } from "./subscriber.js";
export { RequestResponse, type RequestResponseOptions } from "./request-response.js";
export { makeProducer, type Producer } from "./producer.js";
export { makeConsumer, type Consumer, type MessageHandler, type FlowContext, type ConsumerOptions } from "./consumer.js";
export { makeAsyncQueue, makeSubscriber, type Subscriber, type AsyncQueue } from "./subscriber.js";
export { makeRequestResponse, type RequestResponse, type RequestResponseOptions } from "./request-response.js";
export {
ConsumerFactory,
ConsumerFactoryLive,

View file

@ -4,47 +4,47 @@
* Python reference: trustgraph-base/trustgraph/base/producer.py
*/
import type { PubSubBackend, BackendProducer } from "../backend/types.js";
import type { PubSubBackend } from "../backend/types.js";
import type { ProducerMetrics } from "../metrics/prometheus.js";
import { Effect } from "effect";
import { makeEffectProducerHandle, type EffectProducer } from "./runtime.js";
export class Producer<T> {
private backend: BackendProducer<T> | null = null;
private effectProducer: EffectProducer<T> | null = null;
private readonly pubsub: PubSubBackend;
private readonly topic: string;
private readonly metrics: ProducerMetrics | undefined;
constructor(pubsub: PubSubBackend, topic: string, metrics?: ProducerMetrics) {
this.pubsub = pubsub;
this.topic = topic;
this.metrics = metrics;
}
async start(): Promise<void> {
this.backend = await this.pubsub.createProducer<T>({ topic: this.topic });
this.effectProducer = makeEffectProducerHandle(this.backend, {
topic: this.topic,
...(this.metrics === undefined ? {} : { metrics: this.metrics }),
});
}
async send(id: string, message: T): Promise<void> {
if (this.effectProducer === null) throw new Error("Producer not started");
await Effect.runPromise(this.effectProducer.send(id, message));
}
async stop(): Promise<void> {
if (this.effectProducer !== null) {
await Effect.runPromise(
this.effectProducer.flush.pipe(
Effect.flatMap(() => this.effectProducer === null ? Effect.void : this.effectProducer.close),
),
);
this.effectProducer = null;
this.backend = null;
}
}
export interface Producer<T> {
readonly start: () => Promise<void>;
readonly send: (id: string, message: T) => Promise<void>;
readonly stop: () => Promise<void>;
}
export function makeProducer<T>(
pubsub: PubSubBackend,
topic: string,
metrics?: ProducerMetrics,
): Producer<T> {
let effectProducer: EffectProducer<T> | null = null;
return {
start: async () => {
const backend = await pubsub.createProducer<T>({ topic });
effectProducer = makeEffectProducerHandle(backend, {
topic,
...(metrics === undefined ? {} : { metrics }),
});
},
send: async (id, message) => {
if (effectProducer === null) throw new Error("Producer not started");
await Effect.runPromise(effectProducer.send(id, message));
},
stop: async () => {
if (effectProducer !== null) {
const producer = effectProducer;
await Effect.runPromise(
producer.flush.pipe(
Effect.flatMap(() => producer.close),
),
);
effectProducer = null;
}
},
};
}

View file

@ -8,8 +8,8 @@
*/
import { randomUUID } from "node:crypto";
import { Producer } from "./producer.js";
import { Subscriber } from "./subscriber.js";
import { makeProducer, type Producer } from "./producer.js";
import { makeSubscriber, type Subscriber } from "./subscriber.js";
import type { PubSubBackend } from "../backend/types.js";
export interface RequestResponseOptions {
@ -19,73 +19,76 @@ export interface RequestResponseOptions {
subscription: string;
}
export class RequestResponse<TReq, TRes> {
private producer: Producer<TReq>;
private subscriber: Subscriber<TRes>;
constructor(options: RequestResponseOptions) {
this.producer = new Producer<TReq>(options.pubsub, options.requestTopic);
this.subscriber = new Subscriber<TRes>(
options.pubsub,
options.responseTopic,
options.subscription,
);
}
async start(): Promise<void> {
await this.producer.start();
await this.subscriber.start();
}
async stop(): Promise<void> {
await this.producer.stop();
await this.subscriber.stop();
}
/**
* Send a request and wait for responses.
*
* @param request - The request payload
* @param options.timeoutMs - Total timeout in milliseconds (default: 300s)
* @param options.recipient - Optional callback for streaming responses.
* Return `true` to indicate the final response has been received.
* If omitted, returns the first response.
*/
async request(
export interface RequestResponse<TReq, TRes> {
readonly start: () => Promise<void>;
readonly stop: () => Promise<void>;
readonly request: (
request: TReq,
options?: {
timeoutMs?: number;
recipient?: (response: TRes) => Promise<boolean>;
},
): Promise<TRes> {
const id = randomUUID();
const timeoutMs = options?.timeoutMs ?? 300_000;
const recipient = options?.recipient;
const queue = this.subscriber.subscribe(id);
try {
await this.producer.send(id, request);
const deadline = Date.now() + timeoutMs;
while (true) {
const remaining = deadline - Date.now();
if (remaining <= 0) {
throw new Error(`Request timed out after ${timeoutMs}ms`);
}
const response = await queue.pop(remaining);
if (recipient !== undefined) {
const isFinal = await recipient(response);
if (isFinal) return response;
} else {
return response;
}
}
} finally {
this.subscriber.unsubscribe(id);
}
}
) => Promise<TRes>;
}
export function makeRequestResponse<TReq, TRes>(
options: RequestResponseOptions,
): RequestResponse<TReq, TRes> {
const producer: Producer<TReq> = makeProducer<TReq>(options.pubsub, options.requestTopic);
const subscriber: Subscriber<TRes> = makeSubscriber<TRes>(
options.pubsub,
options.responseTopic,
options.subscription,
);
return {
start: async () => {
await producer.start();
await subscriber.start();
},
stop: async () => {
await producer.stop();
await subscriber.stop();
},
/**
* Send a request and wait for responses.
*
* @param request - The request payload
* @param options.timeoutMs - Total timeout in milliseconds (default: 300s)
* @param options.recipient - Optional callback for streaming responses.
* Return `true` to indicate the final response has been received.
* If omitted, returns the first response.
*/
request: async (request, requestOptions) => {
const id = randomUUID();
const timeoutMs = requestOptions?.timeoutMs ?? 300_000;
const recipient = requestOptions?.recipient;
const queue = subscriber.subscribe(id);
try {
await producer.send(id, request);
const deadline = Date.now() + timeoutMs;
while (true) {
const remaining = deadline - Date.now();
if (remaining <= 0) {
throw new Error(`Request timed out after ${timeoutMs}ms`);
}
const response = await queue.pop(remaining);
if (recipient !== undefined) {
const isFinal = await recipient(response);
if (isFinal) return response;
} else {
return response;
}
}
} finally {
subscriber.unsubscribe(id);
}
},
};
}

View file

@ -13,114 +13,84 @@ type Resolver<T> = {
/**
* Simple async queue for inter-task communication (replaces asyncio.Queue).
*/
export class AsyncQueue<T> {
private buffer: T[] = [];
private waiters: Array<(value: T) => void> = [];
push(item: T): void {
const waiter = this.waiters.shift();
if (waiter !== undefined) {
waiter(item);
} else {
this.buffer.push(item);
}
}
async pop(timeoutMs?: number): Promise<T> {
const buffered = this.buffer.shift();
if (buffered !== undefined) return buffered;
return new Promise<T>((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined;
const waiter = (value: T) => {
if (timer !== undefined) clearTimeout(timer);
resolve(value);
};
this.waiters.push(waiter);
if (timeoutMs !== undefined) {
timer = setTimeout(() => {
const idx = this.waiters.indexOf(waiter);
if (idx !== -1) this.waiters.splice(idx, 1);
reject(new Error(`Queue.pop timed out after ${timeoutMs}ms`));
}, timeoutMs);
}
});
}
get length(): number {
return this.buffer.length;
}
export interface AsyncQueue<T> {
readonly push: (item: T) => void;
readonly pop: (timeoutMs?: number) => Promise<T>;
readonly length: number;
}
export class Subscriber<T> {
private backend: BackendConsumer<T> | null = null;
private running = false;
private readonly pubsub: PubSubBackend;
private readonly topic: string;
private readonly subscription: string;
export function makeAsyncQueue<T>(): AsyncQueue<T> {
const buffer: T[] = [];
const waiters: Array<(value: T) => void> = [];
return {
push: (item) => {
const waiter = waiters.shift();
if (waiter !== undefined) {
waiter(item);
} else {
buffer.push(item);
}
},
pop: async (timeoutMs) => {
const buffered = buffer.shift();
if (buffered !== undefined) return buffered;
return new Promise<T>((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined;
const waiter = (value: T) => {
if (timer !== undefined) clearTimeout(timer);
resolve(value);
};
waiters.push(waiter);
if (timeoutMs !== undefined) {
timer = setTimeout(() => {
const idx = waiters.indexOf(waiter);
if (idx !== -1) waiters.splice(idx, 1);
reject(new Error(`Queue.pop timed out after ${timeoutMs}ms`));
}, timeoutMs);
}
});
},
get length() {
return buffer.length;
},
};
}
export interface Subscriber<T> {
readonly start: () => Promise<void>;
readonly stop: () => Promise<void>;
readonly subscribe: (id: string) => AsyncQueue<T>;
readonly subscribeAll: (id: string) => AsyncQueue<T>;
readonly unsubscribe: (id: string) => void;
readonly unsubscribeAll: (id: string) => void;
}
export function makeSubscriber<T>(
pubsub: PubSubBackend,
topic: string,
subscription: string,
): Subscriber<T> {
let backend: BackendConsumer<T> | null = null;
let running = false;
// ID-specific subscriptions (request/response correlation)
private idSubscribers = new Map<string, Resolver<T>>();
const idSubscribers = new Map<string, Resolver<T>>();
// Wildcard subscribers (receive all messages)
private allSubscribers = new Map<string, Resolver<T>>();
const allSubscribers = new Map<string, Resolver<T>>();
constructor(pubsub: PubSubBackend, topic: string, subscription: string) {
this.pubsub = pubsub;
this.topic = topic;
this.subscription = subscription;
}
async start(): Promise<void> {
this.backend = await this.pubsub.createConsumer<T>({
topic: this.topic,
subscription: this.subscription,
});
this.running = true;
// Start the dispatch loop (fire and forget — runs until stop)
this.dispatchLoop().catch((err) => {
if (this.running === true) console.error("[Subscriber] dispatch loop error:", err);
});
}
async stop(): Promise<void> {
this.running = false;
if (this.backend !== null) {
await this.backend.close();
this.backend = null;
}
}
subscribe(id: string): AsyncQueue<T> {
const queue = new AsyncQueue<T>();
this.idSubscribers.set(id, { queue });
return queue;
}
subscribeAll(id: string): AsyncQueue<T> {
const queue = new AsyncQueue<T>();
this.allSubscribers.set(id, { queue });
return queue;
}
unsubscribe(id: string): void {
this.idSubscribers.delete(id);
}
unsubscribeAll(id: string): void {
this.allSubscribers.delete(id);
}
private async dispatchLoop(): Promise<void> {
const dispatchLoop = async (): Promise<void> => {
let consecutiveErrors = 0;
while (this.running) {
while (running) {
try {
const backend = this.backend;
if (backend === null) throw new Error("Subscriber backend not started");
const currentBackend = backend;
if (currentBackend === null) throw new Error("Subscriber backend not started");
const msg = await backend.receive(2000);
const msg = await currentBackend.receive(2000);
if (msg === null) continue;
consecutiveErrors = 0;
@ -131,20 +101,20 @@ export class Subscriber<T> {
// Route to ID-specific subscriber
if (id !== undefined && id.length > 0) {
const sub = this.idSubscribers.get(id);
const sub = idSubscribers.get(id);
if (sub !== undefined) {
sub.queue.push(value);
}
}
// Broadcast to all-subscribers
for (const sub of this.allSubscribers.values()) {
for (const sub of allSubscribers.values()) {
sub.queue.push(value);
}
await backend.acknowledge(msg);
await currentBackend.acknowledge(msg);
} catch (err) {
if (!this.running) break;
if (!running) break;
consecutiveErrors++;
if (consecutiveErrors <= 3) {
console.error("[Subscriber] Error:", err);
@ -156,5 +126,42 @@ export class Subscriber<T> {
await new Promise((r) => setTimeout(r, delay));
}
}
}
};
return {
start: async () => {
backend = await pubsub.createConsumer<T>({
topic,
subscription,
});
running = true;
// Start the dispatch loop (fire and forget; runs until stop).
dispatchLoop().catch((err) => {
if (running === true) console.error("[Subscriber] dispatch loop error:", err);
});
},
stop: async () => {
running = false;
if (backend !== null) {
await backend.close();
backend = null;
}
},
subscribe: (id) => {
const queue = makeAsyncQueue<T>();
idSubscribers.set(id, { queue });
return queue;
},
subscribeAll: (id) => {
const queue = makeAsyncQueue<T>();
allSubscribers.set(id, { queue });
return queue;
},
unsubscribe: (id) => {
idSubscribers.delete(id);
},
unsubscribeAll: (id) => {
allSubscribers.delete(id);
},
};
}

View file

@ -1 +1,7 @@
export { ConsumerMetrics, ProducerMetrics, registry } from "./prometheus.js";
export {
makeConsumerMetrics,
makeProducerMetrics,
registry,
type ConsumerMetrics,
type ProducerMetrics,
} from "./prometheus.js";

View file

@ -9,64 +9,64 @@ import { Counter, Histogram, Registry, collectDefaultMetrics } from "prom-client
export const registry = new Registry();
collectDefaultMetrics({ register: registry });
export class ConsumerMetrics {
private requestHistogram: Histogram;
private processingCounter: Counter;
private rateLimitCounter: Counter;
private readonly labels: { processor: string; flow: string; name: string };
export interface ConsumerMetrics {
readonly recordTime: (seconds: number) => void;
readonly process: (status: "success" | "error") => void;
readonly rateLimit: () => void;
}
constructor(processor: string, flow: string, name: string) {
this.labels = { processor, flow, name };
this.requestHistogram = new Histogram({
export function makeConsumerMetrics(
processor: string,
flow: string,
name: string,
): ConsumerMetrics {
const labels = { processor, flow, name };
const requestHistogram = new Histogram({
name: "tg_consumer_request_duration_seconds",
help: "Consumer request processing time",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
});
this.processingCounter = new Counter({
const processingCounter = new Counter({
name: "tg_consumer_processing_total",
help: "Consumer processing outcomes",
labelNames: ["processor", "flow", "name", "status"],
registers: [registry],
});
});
this.rateLimitCounter = new Counter({
const rateLimitCounter = new Counter({
name: "tg_consumer_rate_limit_total",
help: "Consumer rate limit events",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
}
});
recordTime(seconds: number): void {
this.requestHistogram.observe(this.labels, seconds);
}
process(status: "success" | "error"): void {
this.processingCounter.inc({ ...this.labels, status });
}
rateLimit(): void {
this.rateLimitCounter.inc(this.labels);
}
return {
recordTime: (seconds) => requestHistogram.observe(labels, seconds),
process: (status) => processingCounter.inc({ ...labels, status }),
rateLimit: () => rateLimitCounter.inc(labels),
};
}
export class ProducerMetrics {
private counter: Counter;
private readonly labels: { processor: string; flow: string; name: string };
export interface ProducerMetrics {
readonly inc: () => void;
}
constructor(processor: string, flow: string, name: string) {
this.labels = { processor, flow, name };
this.counter = new Counter({
export function makeProducerMetrics(
processor: string,
flow: string,
name: string,
): ProducerMetrics {
const labels = { processor, flow, name };
const counter = new Counter({
name: "tg_producer_items_total",
help: "Producer items sent",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
}
});
inc(): void {
this.counter.inc(this.labels);
}
return {
inc: () => counter.inc(labels),
};
}

View file

@ -7,7 +7,7 @@
*/
import type { PubSubBackend } from "../backend/types.js";
import { NatsBackend } from "../backend/nats.js";
import { makeNatsBackend } from "../backend/nats.js";
import { Effect } from "effect";
import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js";
import { loadProcessorRuntimeConfig } from "../runtime/config.js";
@ -30,105 +30,72 @@ export type EffectConfigHandler<E = never, R = never> = (
version: number,
) => Effect.Effect<void, E, R>;
declare const processorRunErrorType: unique symbol;
declare const processorRunRequirementsType: unique symbol;
export interface ProcessorRuntime<RunError = ProcessorLifecycleError, RunRequirements = never> {
readonly [processorRunErrorType]?: RunError;
readonly [processorRunRequirementsType]?: RunRequirements;
readonly start: () => Promise<void>;
readonly stop: () => Promise<void>;
startEffect(): unknown;
stopEffect(): unknown;
}
export interface AsyncProcessorRuntime<
RunError = ProcessorLifecycleError,
RunRequirements = never,
> extends ProcessorRuntime<RunError, RunRequirements> {
readonly config: ProcessorConfig;
readonly pubsub: PubSubBackend;
readonly configHandlers: ConfigHandler[];
readonly running: boolean;
readonly isRunning: () => boolean;
readonly registerConfigHandler: (handler: ConfigHandler) => void;
readonly onShutdown: (callback: () => Promise<void>) => void;
readonly run: () => Promise<void>;
runEffect(): unknown;
}
export interface AsyncProcessorRuntimeOptions<
RunError = ProcessorLifecycleError,
RunRequirements = never,
> {
readonly run?: (
processor: AsyncProcessorRuntime<RunError, RunRequirements>,
) => Promise<void>;
readonly runEffect?: (
processor: AsyncProcessorRuntime<RunError, RunRequirements>,
) => Effect.Effect<void, RunError, RunRequirements>;
}
interface RegisteredSignalHandler {
readonly signal: NodeJS.Signals;
readonly handler: () => void;
}
export abstract class AsyncProcessor<RunError = ProcessorLifecycleError, RunRequirements = never> {
protected pubsub: PubSubBackend;
protected running = false;
protected configHandlers: ConfigHandler[] = [];
private shutdownCallbacks: Array<() => Promise<void>> = [];
private signalHandlers: RegisteredSignalHandler[] = [];
private readonly ownsPubSub: boolean;
protected readonly config: ProcessorConfig;
export function makeAsyncProcessor<
RunError = ProcessorLifecycleError,
RunRequirements = never,
>(
config: ProcessorConfig,
options: AsyncProcessorRuntimeOptions<RunError, RunRequirements> = {},
): AsyncProcessorRuntime<RunError, RunRequirements> {
const pubsub = config.pubsub ?? makeNatsBackend(config.pubsubUrl ?? "nats://localhost:4222");
const ownsPubSub = config.pubsub === undefined;
const configHandlers: ConfigHandler[] = [];
const shutdownCallbacks: Array<() => Promise<void>> = [];
let running = false;
let signalHandlers: RegisteredSignalHandler[] = [];
constructor(config: ProcessorConfig) {
this.config = config;
this.pubsub = config.pubsub ?? new NatsBackend(config.pubsubUrl ?? "nats://localhost:4222");
this.ownsPubSub = config.pubsub === undefined;
}
registerConfigHandler(handler: ConfigHandler): void {
this.configHandlers.push(handler);
}
async start(): Promise<void> {
await Effect.runPromise(
this.startEffect() as Effect.Effect<void, RunError | ProcessorLifecycleError>,
);
}
async stop(): Promise<void> {
await Effect.runPromise(this.stopEffect());
}
protected onShutdown(callback: () => Promise<void>): void {
this.shutdownCallbacks.push(callback);
}
startEffect(): Effect.Effect<void, RunError | ProcessorLifecycleError, RunRequirements> {
const processor = this;
return Effect.gen(function* () {
yield* Effect.sync(() => {
processor.running = true;
processor.registerProcessSignalHandlers();
});
yield* processor.runEffect();
}).pipe(
Effect.withSpan("trustgraph.processor.start", {
attributes: {
"trustgraph.processor.id": processor.config.id,
},
}),
);
}
stopEffect(): Effect.Effect<void, ProcessorLifecycleError> {
const processor = this;
return Effect.gen(function* () {
yield* Effect.sync(() => {
processor.running = false;
processor.unregisterProcessSignalHandlers();
});
for (const cb of processor.shutdownCallbacks) {
yield* Effect.tryPromise({
try: () => cb(),
catch: (error) => processorLifecycleError(processor.config.id, "shutdown-callback", error),
});
}
if (processor.ownsPubSub) {
yield* Effect.tryPromise({
try: () => processor.pubsub.close(),
catch: (error) => processorLifecycleError(processor.config.id, "close-pubsub", error),
});
}
});
}
protected run(): Promise<void> {
return Effect.runPromise(this.runEffect() as unknown as Effect.Effect<void, RunError>);
}
protected runEffect(): Effect.Effect<void, RunError, RunRequirements> {
return Effect.tryPromise({
try: () => this.run(),
catch: (error) => processorLifecycleError(this.config.id, "start", error),
}) as unknown as Effect.Effect<void, RunError, RunRequirements>;
}
private registerProcessSignalHandlers(): void {
if (this.config.manageProcessSignals === false || this.signalHandlers.length > 0) {
const registerProcessSignalHandlers = (): void => {
if (config.manageProcessSignals === false || signalHandlers.length > 0) {
return;
}
const shutdown = () => {
console.log(`[${this.config.id}] Shutting down...`);
void this.stop().then(() => process.exit(0));
console.log(`[${config.id}] Shutting down...`);
void processor.stop().then(() => process.exit(0));
};
const handlers: RegisteredSignalHandler[] = [
{ signal: "SIGINT", handler: shutdown },
@ -137,26 +104,128 @@ export abstract class AsyncProcessor<RunError = ProcessorLifecycleError, RunRequ
for (const { signal, handler } of handlers) {
process.once(signal, handler);
}
this.signalHandlers = handlers;
}
signalHandlers = handlers;
};
private unregisterProcessSignalHandlers(): void {
for (const { signal, handler } of this.signalHandlers) {
const unregisterProcessSignalHandlers = (): void => {
for (const { signal, handler } of signalHandlers) {
process.off(signal, handler);
}
this.signalHandlers = [];
}
signalHandlers = [];
};
/**
* Static launch helper parses env/args and starts the processor.
* Subclasses call: `MyProcessor.launch("my-service")`
*/
static async launch<T extends AsyncProcessor<unknown, unknown>>(
const processor: AsyncProcessorRuntime<RunError, RunRequirements> = {
config,
pubsub,
configHandlers,
get running() {
return running;
},
isRunning: () => running,
registerConfigHandler: (handler) => {
configHandlers.push(handler);
},
start: async () => {
await Effect.runPromise(
processor.startEffect() as Effect.Effect<void, RunError | ProcessorLifecycleError>,
);
},
stop: async () => {
await Effect.runPromise(
processor.stopEffect() as Effect.Effect<void, ProcessorLifecycleError>,
);
},
onShutdown: (callback) => {
shutdownCallbacks.push(callback);
},
startEffect() {
const startProcessor = Effect.fn("trustgraph.processor.start")(function* () {
yield* Effect.sync(() => {
running = true;
registerProcessSignalHandlers();
});
yield* (
processor.runEffect() as Effect.Effect<void, RunError, RunRequirements>
);
});
return startProcessor().pipe(
Effect.withSpan("trustgraph.processor.start", {
attributes: {
"trustgraph.processor.id": config.id,
},
}),
);
},
stopEffect() {
const stopProcessor = Effect.fn("trustgraph.processor.stop")(function* () {
yield* Effect.sync(() => {
running = false;
unregisterProcessSignalHandlers();
});
for (const cb of shutdownCallbacks) {
yield* Effect.tryPromise({
try: () => cb(),
catch: (error) => processorLifecycleError(config.id, "shutdown-callback", error),
});
}
if (ownsPubSub) {
yield* Effect.tryPromise({
try: () => pubsub.close(),
catch: (error) => processorLifecycleError(config.id, "close-pubsub", error),
});
}
});
return stopProcessor();
},
run: () =>
Effect.runPromise(
processor.runEffect() as unknown as Effect.Effect<void, RunError>,
),
runEffect: () => {
if (options.runEffect !== undefined) {
return options.runEffect(processor);
}
return Effect.tryPromise({
try: () => options.run?.(processor) ?? Promise.resolve(),
catch: (error) => processorLifecycleError(config.id, "start", error),
}) as unknown as Effect.Effect<void, RunError, RunRequirements>;
},
};
return processor;
}
export type AsyncProcessor<
RunError = ProcessorLifecycleError,
RunRequirements = never,
> = AsyncProcessorRuntime<RunError, RunRequirements>;
export const AsyncProcessor = Object.assign(
function AsyncProcessor(config: ProcessorConfig) {
return makeAsyncProcessor(config);
},
{
async launch<T extends ProcessorRuntime<unknown, unknown>>(
this: new (config: ProcessorConfig) => T,
id: string,
): Promise<void> {
const config = await Effect.runPromise(loadProcessorRuntimeConfig(id));
const processor = new this(config);
await processor.start();
},
},
) as unknown as {
new <RunError = ProcessorLifecycleError, RunRequirements = never>(
config: ProcessorConfig,
): AsyncProcessor<RunError, RunRequirements>;
<RunError = ProcessorLifecycleError, RunRequirements = never>(
config: ProcessorConfig,
): AsyncProcessor<RunError, RunRequirements>;
launch<T extends ProcessorRuntime<unknown, unknown>>(
this: new (config: ProcessorConfig) => T,
id: string,
): Promise<void> {
const config = await Effect.runPromise(loadProcessorRuntimeConfig(id));
const processor = new this(config);
await processor.start();
}
}
): Promise<void>;
};

View file

@ -8,8 +8,11 @@
*/
import {
AsyncProcessor,
makeAsyncProcessor,
type AsyncProcessorRuntime,
type ConfigHandler,
type EffectConfigHandler,
type ProcessorRuntime,
type ProcessorConfig,
} from "./async-processor.js";
import type { Spec } from "../spec/types.js";
@ -60,6 +63,44 @@ export interface FlowProcessorRuntimeOptions<
readonly isRunning?: () => boolean;
}
type FlowProcessorRuntimeRequirements<FlowRequirements> =
| PubSub
| FlowRuntime
| ProducerFactory
| ConsumerFactory
| RequestResponseFactory
| Scope.Scope
| FlowRequirements;
export type FlowProcessorStartEffect<FlowRequirements> = Effect.Effect<
void,
PubSubError | FlowRuntimeError | ProcessorLifecycleError,
FlowProcessorRuntimeRequirements<FlowRequirements>
>;
export interface FlowProcessorRuntime<FlowRequirements = never>
extends ProcessorRuntime<
PubSubError | FlowRuntimeError | ProcessorLifecycleError,
FlowProcessorRuntimeRequirements<FlowRequirements>
> {
readonly config: ProcessorConfig;
readonly pubsub: PubSubBackend;
readonly configHandlers: ConfigHandler[];
readonly isRunning: () => boolean;
readonly registerConfigHandler: (handler: ConfigHandler) => void;
readonly registerSpecification: <Requirements extends FlowRequirements>(
spec: Spec<Requirements>,
) => void;
readonly specifications: ReadonlyArray<Spec<FlowRequirements>>;
}
export interface MakeFlowProcessorOptions<FlowRequirements = never> {
readonly specifications?: ReadonlyArray<Spec<FlowRequirements>>;
readonly provide?: (
effect: FlowProcessorStartEffect<FlowRequirements>,
) => FlowProcessorStartEffect<FlowRequirements>;
}
const ConfigPushSchema = S.Struct({
version: S.Number,
config: S.Record(S.String, S.Unknown),
@ -281,78 +322,76 @@ export function runFlowProcessorDefinitionScoped<
});
}
export abstract class FlowProcessor<FlowRequirements = never> extends AsyncProcessor<
PubSubError | FlowRuntimeError | ProcessorLifecycleError,
| PubSub
| FlowRuntime
| ProducerFactory
| ConsumerFactory
| RequestResponseFactory
| Scope.Scope
| FlowRequirements
> {
private specifications: Array<Spec<FlowRequirements>> = [];
protected constructor(config: ProcessorConfig) {
super(config);
}
registerSpecification<Requirements extends FlowRequirements>(
spec: Spec<Requirements>,
): void {
this.specifications.push(spec as Spec<FlowRequirements>);
}
override async start(): Promise<void> {
const pubsub = makePubSubService(this.pubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
const start = this.startEffect().pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
) as Effect.Effect<void, PubSubError | FlowRuntimeError | ProcessorLifecycleError>;
await Effect.runPromise(
Effect.scoped(
start,
),
);
}
protected override runEffect(): Effect.Effect<
void,
export function makeFlowProcessor<FlowRequirements = never>(
config: ProcessorConfig,
options: MakeFlowProcessorOptions<FlowRequirements> = {},
): FlowProcessorRuntime<FlowRequirements> {
const specifications: Array<Spec<FlowRequirements>> = [
...(options.specifications ?? []),
];
let processor: FlowProcessorRuntime<FlowRequirements>;
const base: AsyncProcessorRuntime<
PubSubError | FlowRuntimeError | ProcessorLifecycleError,
| PubSub
| FlowRuntime
| ProducerFactory
| ConsumerFactory
| RequestResponseFactory
| Scope.Scope
| FlowRequirements
> {
const processor = this;
const configHandlers = processor.configHandlers.map(
(handler): EffectConfigHandler<PubSubError> =>
(config, version) =>
Effect.tryPromise({
try: () => handler(config, version),
catch: (error) => pubSubError("config-handler", error),
}),
);
return runFlowProcessorDefinitionScoped({
id: processor.config.id,
pubsub: processor.pubsub,
specifications: processor.specifications,
configHandlers,
isRunning: () => processor.running,
});
}
FlowProcessorRuntimeRequirements<FlowRequirements>
> = makeAsyncProcessor(config, {
runEffect: (runtime) => {
const configHandlers = runtime.configHandlers.map(
(handler): EffectConfigHandler<PubSubError> =>
(pushedConfig, version) =>
Effect.tryPromise({
try: () => handler(pushedConfig, version),
catch: (error) => pubSubError("config-handler", error),
}),
);
return runFlowProcessorDefinitionScoped({
id: runtime.config.id,
pubsub: runtime.pubsub,
specifications,
configHandlers,
isRunning: runtime.isRunning,
});
},
});
override stopEffect(): Effect.Effect<void, ProcessorLifecycleError> {
return super.stopEffect();
}
const startEffect = (): FlowProcessorStartEffect<FlowRequirements> => {
const effect = base.startEffect() as FlowProcessorStartEffect<FlowRequirements>;
return options.provide?.(effect) ?? effect;
};
processor = {
...base,
specifications,
registerSpecification: (spec) => {
specifications.push(spec as Spec<FlowRequirements>);
},
startEffect,
start: async () => {
const pubsub = makePubSubService(base.pubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
const start = startEffect().pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
) as Effect.Effect<void, PubSubError | FlowRuntimeError | ProcessorLifecycleError>;
await Effect.runPromise(Effect.scoped(start));
},
};
return processor;
}
export type FlowProcessor<FlowRequirements = never> = FlowProcessorRuntime<FlowRequirements>;
export const FlowProcessor = makeFlowProcessor as unknown as {
new <FlowRequirements = never>(
config: ProcessorConfig,
): FlowProcessor<FlowRequirements>;
<FlowRequirements = never>(
config: ProcessorConfig,
): FlowProcessor<FlowRequirements>;
};

View file

@ -57,183 +57,30 @@ export interface FlowRequestor<TReq, TRes> {
readonly stop: () => Promise<void>;
}
export class Flow<Requirements = never> {
private producers = new Map<string, EffectProducer<unknown>>();
private consumers = new Map<string, EffectConsumer>();
private requestors = new Map<string, EffectRequestResponse<unknown, unknown>>();
private parameters = new Map<string, unknown>();
private compatibilityScope: Scope.Closeable | null = null;
public readonly name: string;
public readonly processorId: string;
private readonly pubsub: PubSubBackend;
private readonly definition: FlowDefinition;
private readonly specifications: ReadonlyArray<Spec<Requirements>>;
export function makeFlow<Requirements = never>(
name: string,
processorId: string,
pubsub: PubSubBackend,
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
) {
const producers = new Map<string, EffectProducer<unknown>>();
const consumers = new Map<string, EffectConsumer>();
const requestors = new Map<string, EffectRequestResponse<unknown, unknown>>();
const parameters = new Map<string, unknown>();
let compatibilityScope: Scope.Closeable | null = null;
constructor(
name: string,
processorId: string,
pubsub: PubSubBackend,
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
) {
this.name = name;
this.processorId = processorId;
this.pubsub = pubsub;
this.definition = definition;
this.specifications = specifications;
}
startEffect(): Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements> {
const flow = this;
return Effect.gen(function* () {
for (const spec of flow.specifications) {
yield* spec.addEffect(flow, flow.definition);
}
});
}
async start(): Promise<void> {
if (this.compatibilityScope !== null) {
await this.stop();
const ensureCompatibilityScope = async (): Promise<Scope.Closeable> => {
if (compatibilityScope !== null) {
return compatibilityScope;
}
await this.runInCompatibilityScope(
this.startEffect() as Effect.Effect<void, PubSubError, SpecRuntimeRequirements>,
this.pubsub,
);
}
compatibilityScope = await Effect.runPromise(Scope.make());
return compatibilityScope;
};
async stop(): Promise<void> {
const scope = this.compatibilityScope;
this.compatibilityScope = null;
if (scope !== null) {
await Effect.runPromise(Scope.close(scope, Exit.void));
}
this.clearResources();
}
async runInCompatibilityScope<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements>,
pubsub: PubSubBackend,
): Promise<A> {
const scope = await this.ensureCompatibilityScope();
const pubsubService = makePubSubService(pubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
return await Effect.runPromise(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
),
Scope.provide(scope),
),
);
}
clearResources(): void {
this.producers.clear();
this.consumers.clear();
this.requestors.clear();
this.parameters.clear();
}
registerProducer(name: string, producer: EffectProducer<unknown>): void {
this.producers.set(name, producer);
}
registerConsumer(name: string, consumer: EffectConsumer): void {
this.consumers.set(name, consumer);
}
registerRequestor(name: string, rr: EffectRequestResponse<unknown, unknown>): void {
this.requestors.set(name, rr);
}
setParameter(name: string, value: unknown): void {
this.parameters.set(name, value);
}
producerEffect<T>(name: string): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError> {
const p = this.producers.get(name);
return p === undefined
? Effect.fail(flowResourceNotFoundError(this.name, "producer", name))
: Effect.succeed(p as EffectProducer<T>);
}
consumerEffect(name: string): Effect.Effect<EffectConsumer, FlowResourceNotFoundError> {
const c = this.consumers.get(name);
return c === undefined
? Effect.fail(flowResourceNotFoundError(this.name, "consumer", name))
: Effect.succeed(c);
}
requestorEffect<TReq, TRes>(
name: string,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> {
const rr = this.requestors.get(name);
return rr === undefined
? Effect.fail(flowResourceNotFoundError(this.name, "requestor", name))
: Effect.succeed(rr as EffectRequestResponse<TReq, TRes>);
}
parameterEffect<T>(name: string): Effect.Effect<T, FlowResourceNotFoundError> {
const v = this.parameters.get(name);
return v === undefined
? Effect.fail(flowResourceNotFoundError(this.name, "parameter", name))
: Effect.succeed(v as T);
}
producer<T>(name: string): FlowProducer<T> {
const p = this.producers.get(name);
if (p === undefined) throw flowResourceNotFoundError(this.name, "producer", name);
return {
send: (id, message) => Effect.runPromise((p as EffectProducer<T>).send(id, message)),
flush: () => Effect.runPromise(p.flush),
stop: () => Effect.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))),
};
}
consumer(name: string): FlowConsumer {
const c = this.consumers.get(name);
if (c === undefined) throw flowResourceNotFoundError(this.name, "consumer", name);
return {
stop: () => Effect.runPromise(c.stop),
};
}
requestor<TReq, TRes>(name: string): FlowRequestor<TReq, TRes> {
const rr = this.requestors.get(name);
if (rr === undefined) throw flowResourceNotFoundError(this.name, "requestor", name);
return {
request: (request, options) =>
Effect.runPromise(
(rr as EffectRequestResponse<TReq, TRes>).request(
request,
this.toEffectRequestOptions(options),
),
),
stop: () => Effect.runPromise(rr.stop),
};
}
parameter<T>(name: string): T {
const v = this.parameters.get(name);
if (v === undefined) throw flowResourceNotFoundError(this.name, "parameter", name);
return v as T;
}
private async ensureCompatibilityScope(): Promise<Scope.Closeable> {
if (this.compatibilityScope !== null) {
return this.compatibilityScope;
}
this.compatibilityScope = await Effect.runPromise(Scope.make());
return this.compatibilityScope;
}
private toEffectRequestOptions<TRes>(
const toEffectRequestOptions = <TRes>(
options: FlowRequestOptions<TRes> | undefined,
): EffectRequestOptions<TRes> | undefined {
): EffectRequestOptions<TRes> | undefined => {
if (options === undefined) {
return undefined;
}
@ -246,5 +93,153 @@ export class Flow<Requirements = never> {
recipient: (response: TRes) => Effect.promise(() => recipient(response)),
}),
};
}
};
const flow = {
name,
processorId,
startEffect(): Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements> {
return Effect.gen(function* () {
for (const spec of specifications) {
yield* spec.addEffect(flow, definition);
}
});
},
async start(): Promise<void> {
if (compatibilityScope !== null) {
await flow.stop();
}
await flow.runInCompatibilityScope(
flow.startEffect() as Effect.Effect<void, PubSubError, SpecRuntimeRequirements>,
pubsub,
);
},
async stop(): Promise<void> {
const scope = compatibilityScope;
compatibilityScope = null;
if (scope !== null) {
await Effect.runPromise(Scope.close(scope, Exit.void));
}
flow.clearResources();
},
async runInCompatibilityScope<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements>,
runtimePubsub: PubSubBackend,
): Promise<A> {
const scope = await ensureCompatibilityScope();
const pubsubService = makePubSubService(runtimePubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
return await Effect.runPromise(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
),
Scope.provide(scope),
),
);
},
clearResources(): void {
producers.clear();
consumers.clear();
requestors.clear();
parameters.clear();
},
registerProducer(registerName: string, producer: EffectProducer<unknown>): void {
producers.set(registerName, producer);
},
registerConsumer(registerName: string, consumer: EffectConsumer): void {
consumers.set(registerName, consumer);
},
registerRequestor(registerName: string, rr: EffectRequestResponse<unknown, unknown>): void {
requestors.set(registerName, rr);
},
setParameter(parameterName: string, value: unknown): void {
parameters.set(parameterName, value);
},
producerEffect<T>(producerName: string): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError> {
const p = producers.get(producerName);
return p === undefined
? Effect.fail(flowResourceNotFoundError(name, "producer", producerName))
: Effect.succeed(p as EffectProducer<T>);
},
consumerEffect(consumerName: string): Effect.Effect<EffectConsumer, FlowResourceNotFoundError> {
const c = consumers.get(consumerName);
return c === undefined
? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName))
: Effect.succeed(c);
},
requestorEffect<TReq, TRes>(
requestorName: string,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> {
const rr = requestors.get(requestorName);
return rr === undefined
? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName))
: Effect.succeed(rr as EffectRequestResponse<TReq, TRes>);
},
parameterEffect<T>(parameterName: string): Effect.Effect<T, FlowResourceNotFoundError> {
const v = parameters.get(parameterName);
return v === undefined
? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName))
: Effect.succeed(v as T);
},
producer<T>(producerName: string): FlowProducer<T> {
const p = producers.get(producerName);
if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
return {
send: (id, message) => Effect.runPromise((p as EffectProducer<T>).send(id, message)),
flush: () => Effect.runPromise(p.flush),
stop: () => Effect.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))),
};
},
consumer(consumerName: string): FlowConsumer {
const c = consumers.get(consumerName);
if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName);
return {
stop: () => Effect.runPromise(c.stop),
};
},
requestor<TReq, TRes>(requestorName: string): FlowRequestor<TReq, TRes> {
const rr = requestors.get(requestorName);
if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
return {
request: (request, options) =>
Effect.runPromise(
(rr as EffectRequestResponse<TReq, TRes>).request(
request,
toEffectRequestOptions(options),
),
),
stop: () => Effect.runPromise(rr.stop),
};
},
parameter<T>(parameterName: string): T {
const v = parameters.get(parameterName);
if (v === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName);
return v as T;
},
};
return flow;
}
export type Flow<Requirements = never> = ReturnType<typeof makeFlow<Requirements>>;
export const Flow = makeFlow as unknown as {
new <Requirements = never>(
name: string,
processorId: string,
pubsub: PubSubBackend,
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
): Flow<Requirements>;
<Requirements = never>(
name: string,
processorId: string,
pubsub: PubSubBackend,
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
): Flow<Requirements>;
};

View file

@ -1,13 +1,21 @@
export {
AsyncProcessor,
makeAsyncProcessor,
type ConfigHandler,
type EffectConfigHandler,
type AsyncProcessorRuntime,
type AsyncProcessorRuntimeOptions,
type ProcessorConfig,
type ProcessorRuntime,
} from "./async-processor.js";
export {
FlowProcessor,
makeFlowProcessor,
runFlowProcessorDefinitionScoped,
type FlowProcessorRuntime,
type FlowProcessorRuntimeOptions,
type FlowProcessorStartEffect,
type MakeFlowProcessorOptions,
} from "./flow-processor.js";
export {
Flow,

View file

@ -12,7 +12,7 @@ import {
type ProcessorLifecycleError,
type PubSubError,
} from "../errors.js";
import { NatsBackend } from "../backend/nats.js";
import { makeNatsBackend } from "../backend/nats.js";
import { makePubSubService, PubSub } from "../backend/pubsub.js";
import {
ConsumerFactory,
@ -30,21 +30,21 @@ import {
} from "../runtime/config.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import type {
AsyncProcessor,
EffectConfigHandler,
ProcessorConfig,
ProcessorRuntime,
} from "./async-processor.js";
import { runFlowProcessorDefinitionScoped } from "./flow-processor.js";
import type { Spec } from "../spec/types.js";
type ProcessorRunError<Processor> = Processor extends AsyncProcessor<infer Error, unknown> ? Error : never;
type ProcessorRunRequirements<Processor> = Processor extends AsyncProcessor<unknown, infer Requirements> ? Requirements : never;
type ProcessorRunError<Processor> = Processor extends ProcessorRuntime<infer Error, unknown> ? Error : never;
type ProcessorRunRequirements<Processor> = Processor extends ProcessorRuntime<unknown, infer Requirements> ? Requirements : never;
export interface ProcessorProgramOptions<
Config extends ProcessorConfig,
Error,
Requirements,
Processor extends AsyncProcessor<unknown, unknown>,
Processor extends ProcessorRuntime<unknown, unknown>,
> {
readonly id: string;
readonly make: (config: Config) => Processor;
@ -70,7 +70,7 @@ export interface FlowProcessorProgramOptions<
export function runProcessorScoped<
Config extends ProcessorConfig,
Processor extends AsyncProcessor<unknown, unknown>,
Processor extends ProcessorRuntime<unknown, unknown>,
>(
config: Config,
make: (config: Config) => Processor,
@ -103,11 +103,13 @@ export function runProcessorScoped<
),
);
const typedProcessor = processor as unknown as AsyncProcessor<
ProcessorRunError<Processor>,
ProcessorRunRequirements<Processor>
>;
yield* typedProcessor.startEffect();
yield* (
processor.startEffect() as Effect.Effect<
void,
ProcessorRunError<Processor> | ProcessorLifecycleError,
ProcessorRunRequirements<Processor>
>
);
});
}
@ -115,7 +117,7 @@ export function makeProcessorProgram<
Config extends ProcessorConfig,
Error = never,
Requirements = never,
Processor extends AsyncProcessor<unknown, unknown> = AsyncProcessor,
Processor extends ProcessorRuntime<unknown, unknown> = ProcessorRuntime,
>(
options: ProcessorProgramOptions<Config, Error, Requirements, Processor>,
) {
@ -133,7 +135,7 @@ export function makeProcessorProgram<
manageProcessSignals: false,
} as Config;
const pubsub = makePubSubService(new NatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"));
const pubsub = makePubSubService(makeNatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"));
const messagingConfig = yield* loadMessagingRuntimeConfig();
yield* Effect.addFinalizer(() =>
pubsub.close.pipe(
@ -191,7 +193,7 @@ export function makeFlowProcessorProgram<
manageProcessSignals: false,
} as Config;
const pubsub = makePubSubService(new NatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"));
const pubsub = makePubSubService(makeNatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"));
const messagingConfig = yield* loadMessagingRuntimeConfig();
yield* Effect.addFinalizer(() =>
pubsub.close.pipe(

View file

@ -5,7 +5,7 @@
*/
import * as S from "effect/Schema";
import { Term, TgError, Triple } from "./primitives.js";
import {Term, TgError, Triple} from "./primitives.js";
const UnknownRecord = S.Record(S.String, S.Unknown);
const MutableArray = <A extends S.Top>(schema: A) => schema.pipe(S.Array, S.mutable);
@ -98,13 +98,14 @@ export const AgentRequest = S.Struct({
export type AgentRequest = typeof AgentRequest.Type;
export const AgentResponse = S.Struct({
chunk_type: S.optionalKey(S.Union([
S.Literal("thought"),
S.Literal("observation"),
S.Literal("answer"),
S.Literal("error"),
S.Literal("explain"),
])),
chunk_type: S.optionalKey(S.Literals(
[
"thought",
"observation",
"answer",
"error",
"explain",
])),
content: S.optionalKey(S.String),
end_of_message: S.optionalKey(S.Boolean),
end_of_dialog: S.optionalKey(S.Boolean),

View file

@ -12,12 +12,12 @@ import {
type MessagingDeliveryError,
} from "../errors.js";
import type { FlowContext } from "../messaging/consumer.js";
import { FlowProcessor } from "../processor/flow-processor.js";
import type { ProcessorConfig } from "../processor/async-processor.js";
import { makeFlowProcessor } from "../processor/index.ts";
import type { FlowProcessorRuntime, ProcessorConfig } from "../processor/index.ts";
import type { EmbeddingsRequest, EmbeddingsResponse } from "../schema/messages.js";
import { ConsumerSpec } from "../spec/consumer-spec.js";
import { ParameterSpec } from "../spec/parameter-spec.js";
import { ProducerSpec } from "../spec/producer-spec.js";
import { makeConsumerSpec } from "../spec/index.ts";
import { makeParameterSpec } from "../spec/index.ts";
import { makeProducerSpec } from "../spec/index.ts";
import type { Spec } from "../spec/types.js";
export interface EmbeddingsServiceShape {
@ -66,20 +66,31 @@ const onEmbeddingsRequest = Effect.fn("EmbeddingsService.onRequest")(function* (
});
export const makeEmbeddingsSpecs = (): ReadonlyArray<Spec<Embeddings>> => [
new ConsumerSpec<EmbeddingsRequest, FlowResourceNotFoundError | MessagingDeliveryError, Embeddings>(
makeConsumerSpec<EmbeddingsRequest, FlowResourceNotFoundError | MessagingDeliveryError, Embeddings>(
"embeddings-request",
onEmbeddingsRequest,
),
new ProducerSpec<EmbeddingsResponse>("embeddings-response"),
new ParameterSpec("model"),
makeProducerSpec<EmbeddingsResponse>("embeddings-response"),
makeParameterSpec("model"),
];
export class EmbeddingsService extends FlowProcessor<Embeddings> {
constructor(config: ProcessorConfig) {
super(config);
export type EmbeddingsService = FlowProcessorRuntime<Embeddings>;
for (const spec of makeEmbeddingsSpecs()) {
this.registerSpecification(spec);
}
}
export function makeEmbeddingsService(
config: ProcessorConfig,
embeddings?: EmbeddingsServiceShape,
): EmbeddingsService {
return makeFlowProcessor(config, {
specifications: makeEmbeddingsSpecs(),
...(embeddings === undefined
? {}
: {
provide: (effect) =>
effect.pipe(
Effect.provideService(Embeddings, Embeddings.of(embeddings)),
),
}),
});
}
export const EmbeddingsService = makeEmbeddingsService;

View file

@ -2,6 +2,7 @@ export {
Llm,
LlmService,
LlmServiceError,
makeLlmService,
makeLlmServiceShape,
makeLlmSpecs,
type LlmProvider,
@ -10,6 +11,7 @@ export {
export {
Embeddings,
EmbeddingsService,
makeEmbeddingsService,
makeEmbeddingsSpecs,
type EmbeddingsServiceShape,
} from "./embeddings-service.js";

View file

@ -12,16 +12,16 @@ import {
type MessagingDeliveryError,
} from "../errors.js";
import type { FlowContext } from "../messaging/consumer.js";
import { FlowProcessor } from "../processor/flow-processor.js";
import type { ProcessorConfig } from "../processor/async-processor.js";
import { makeFlowProcessor } from "../processor/index.ts";
import type { FlowProcessorRuntime, ProcessorConfig } from "../processor/index.ts";
import type {
TextCompletionRequest,
TextCompletionResponse,
} from "../schema/messages.js";
import type { LlmChunk, LlmResult } from "../schema/primitives.js";
import { ConsumerSpec } from "../spec/consumer-spec.js";
import { ParameterSpec } from "../spec/parameter-spec.js";
import { ProducerSpec } from "../spec/producer-spec.js";
import type { LlmChunk, LlmResult } from "../schema/index.ts";
import { makeConsumerSpec } from "../spec/index.ts";
import { makeParameterSpec } from "../spec/index.ts";
import { makeProducerSpec } from "../spec/index.ts";
import type { Spec } from "../spec/types.js";
export class LlmServiceError extends S.TaggedErrorClass<LlmServiceError>()(
@ -203,45 +203,29 @@ const onLlmRequest = Effect.fn("LlmService.onRequest")(function* (
});
export const makeLlmSpecs = (): ReadonlyArray<Spec<Llm>> => [
new ConsumerSpec<TextCompletionRequest, LlmHandlerError, Llm>(
makeConsumerSpec<TextCompletionRequest, LlmHandlerError, Llm>(
"text-completion-request",
onLlmRequest,
),
new ProducerSpec<TextCompletionResponse>("text-completion-response"),
new ParameterSpec("model"),
new ParameterSpec("temperature"),
makeProducerSpec<TextCompletionResponse>("text-completion-response"),
makeParameterSpec("model"),
makeParameterSpec("temperature"),
];
export abstract class LlmService extends FlowProcessor<Llm> implements LlmProvider {
protected constructor(config: ProcessorConfig) {
super(config);
export type LlmService = FlowProcessorRuntime<Llm> & LlmProvider;
for (const spec of makeLlmSpecs()) {
this.registerSpecification(spec);
}
}
override startEffect() {
return super.startEffect().pipe(
Effect.provideService(Llm, Llm.of(makeLlmServiceShape(this))),
);
}
abstract generateContent(
system: string,
prompt: string,
model?: string,
temperature?: number,
): Promise<LlmResult>;
abstract generateContentStream(
system: string,
prompt: string,
model?: string,
temperature?: number,
): AsyncGenerator<LlmChunk>;
supportsStreaming(): boolean {
return false;
}
export function makeLlmService(
config: ProcessorConfig,
provider: LlmProvider,
): LlmService {
const service = makeFlowProcessor(config, {
specifications: makeLlmSpecs(),
provide: (effect) =>
effect.pipe(
Effect.provideService(Llm, Llm.of(makeLlmServiceShape(provider))),
),
});
return Object.assign(service, provider);
}
export const LlmService = makeLlmService;

View file

@ -8,7 +8,6 @@ import { Effect } from "effect";
import * as S from "effect/Schema";
import type { Spec } from "./types.js";
import type { SpecRuntimeRequirements } from "./types.js";
import type { PubSubBackend } from "../backend/types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import { type MessageHandler } from "../messaging/consumer.js";
import {
@ -24,64 +23,71 @@ import {
const isTooManyRequestsError = S.is(TooManyRequestsError);
export class ConsumerSpec<T, E = never, R = never> implements Spec<R> {
public readonly name: string;
private readonly handler: EffectMessageHandler<T, E, R>;
private readonly concurrency: number;
declare const ConsumerSpecType: unique symbol;
constructor(
name: string,
handler: EffectMessageHandler<T, E, R>,
concurrency = 1,
export interface ConsumerSpec<T, E = never, R = never> extends Spec<R> {
readonly [ConsumerSpecType]?: {
readonly message: T;
readonly error: E;
};
readonly addEffect: (
flow: Flow<R>,
definition: FlowDefinition,
) => Effect.Effect<void, PubSubError, SpecRuntimeRequirements | R>;
}
export function makeConsumerSpec<T, E = never, R = never>(
name: string,
handler: EffectMessageHandler<T, E, R>,
concurrency = 1,
): ConsumerSpec<T, E, R> {
const addEffect = Effect.fn("ConsumerSpec.addEffect")(function* (
flow: Flow<R>,
definition: FlowDefinition,
) {
this.name = name;
this.handler = handler;
this.concurrency = concurrency;
}
static fromPromise<T>(
name: string,
handler: MessageHandler<T>,
concurrency = 1,
): ConsumerSpec<T, TooManyRequestsError | MessagingHandlerError> {
return new ConsumerSpec<T, TooManyRequestsError | MessagingHandlerError>(
name,
(message, properties, flow) =>
Effect.tryPromise({
try: () => handler(message, properties, flow),
catch: (error) =>
isTooManyRequestsError(error)
? error
: messagingHandlerError(name, `${flow.id}-${flow.name}-${name}`, error),
}),
concurrency,
);
}
addEffect(flow: Flow<R>, definition: FlowDefinition) {
const spec = this;
return Effect.gen(function* () {
const topic = definition.topics?.[spec.name] ?? spec.name;
const topic = definition.topics?.[name] ?? name;
const factory = yield* ConsumerFactory;
const consumer = yield* factory.run<T, E, R>(
{
topic,
subscription: `${flow.processorId}-${flow.name}-${spec.name}`,
handler: spec.handler,
concurrency: spec.concurrency,
subscription: `${flow.processorId}-${flow.name}-${name}`,
handler,
concurrency,
},
{ id: flow.processorId, name: flow.name, flow },
);
flow.registerConsumer(spec.name, consumer);
});
}
flow.registerConsumer(name, consumer);
});
async add(flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition): Promise<void> {
const effect = this.addEffect(flow, definition) as Effect.Effect<
void,
PubSubError,
SpecRuntimeRequirements
>;
await flow.runInCompatibilityScope(effect, pubsub);
}
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
const effect = addEffect(flow as Flow<R>, definition) as Effect.Effect<
void,
PubSubError,
SpecRuntimeRequirements
>;
await flow.runInCompatibilityScope(effect, pubsub);
},
};
}
export function makeConsumerSpecFromPromise<T>(
name: string,
handler: MessageHandler<T>,
concurrency = 1,
): ConsumerSpec<T, TooManyRequestsError | MessagingHandlerError> {
return makeConsumerSpec<T, TooManyRequestsError | MessagingHandlerError>(
name,
(message, properties, flow) =>
Effect.tryPromise({
try: () => handler(message, properties, flow),
catch: (error) =>
isTooManyRequestsError(error)
? error
: messagingHandlerError(name, `${flow.id}-${flow.name}-${name}`, error),
}),
concurrency,
);
}

View file

@ -1,5 +1,5 @@
export type { Spec, SpecRuntimeError, SpecRuntimeRequirements } from "./types.js";
export { ConsumerSpec } from "./consumer-spec.js";
export { ProducerSpec } from "./producer-spec.js";
export { ParameterSpec } from "./parameter-spec.js";
export { RequestResponseSpec } from "./request-response-spec.js";
export { makeConsumerSpec, makeConsumerSpecFromPromise, type ConsumerSpec } from "./consumer-spec.js";
export { makeProducerSpec, type ProducerSpec } from "./producer-spec.js";
export { makeParameterSpec, type ParameterSpec } from "./parameter-spec.js";
export { makeRequestResponseSpec, type RequestResponseSpec } from "./request-response-spec.js";

View file

@ -6,25 +6,22 @@
import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { PubSubBackend } from "../backend/types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
export class ParameterSpec implements Spec {
public readonly name: string;
export interface ParameterSpec extends Spec {}
constructor(name: string) {
this.name = name;
}
addEffect(flow: Flow, definition: FlowDefinition) {
const spec = this;
return Effect.sync(() => {
const value = definition.parameters?.[spec.name];
flow.setParameter(spec.name, value);
export function makeParameterSpec(name: string): ParameterSpec {
const addEffect = (flow: Flow, definition: FlowDefinition) =>
Effect.sync(() => {
const value = definition.parameters?.[name];
flow.setParameter(name, value);
});
}
async add(flow: Flow, _pubsub: PubSubBackend, definition: FlowDefinition): Promise<void> {
await Effect.runPromise(this.addEffect(flow, definition));
}
return {
name,
addEffect,
add: async (flow, _pubsub, definition) => {
await Effect.runPromise(addEffect(flow, definition));
},
};
}

View file

@ -6,31 +6,34 @@
import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { PubSubBackend } from "../backend/types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
ProducerFactory,
type EffectProducer,
} from "../messaging/runtime.js";
export class ProducerSpec<T> implements Spec {
public readonly name: string;
declare const ProducerSpecType: unique symbol;
constructor(name: string) {
this.name = name;
}
export interface ProducerSpec<T> extends Spec {
readonly [ProducerSpecType]?: (_: T) => T;
}
addEffect(flow: Flow, definition: FlowDefinition) {
const spec = this;
return Effect.gen(function* () {
const topic = definition.topics?.[spec.name] ?? spec.name;
export function makeProducerSpec<T>(name: string): ProducerSpec<T> {
const addEffect = Effect.fn("ProducerSpec.addEffect")(function* (
flow: Flow,
definition: FlowDefinition,
) {
const topic = definition.topics?.[name] ?? name;
const factory = yield* ProducerFactory;
const producer = yield* factory.make<T>({ topic });
flow.registerProducer(spec.name, producer as EffectProducer<unknown>);
});
}
flow.registerProducer(name, producer as EffectProducer<unknown>);
});
async add(flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition): Promise<void> {
await flow.runInCompatibilityScope(this.addEffect(flow, definition), pubsub);
}
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
await flow.runInCompatibilityScope(addEffect(flow, definition), pubsub);
},
};
}

View file

@ -9,44 +9,46 @@
import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { PubSubBackend } from "../backend/types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
RequestResponseFactory,
type EffectRequestResponse,
} from "../messaging/runtime.js";
export class RequestResponseSpec<TReq, TRes> implements Spec {
public readonly name: string;
private readonly requestTopicName: string;
private readonly responseTopicName: string;
declare const RequestResponseSpecType: unique symbol;
constructor(
name: string,
requestTopicName: string,
responseTopicName: string,
export interface RequestResponseSpec<TReq, TRes> extends Spec {
readonly [RequestResponseSpecType]?: {
readonly request: TReq;
readonly response: TRes;
};
}
export function makeRequestResponseSpec<TReq, TRes>(
name: string,
requestTopicName: string,
responseTopicName: string,
): RequestResponseSpec<TReq, TRes> {
const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* (
flow: Flow,
definition: FlowDefinition,
) {
this.name = name;
this.requestTopicName = requestTopicName;
this.responseTopicName = responseTopicName;
}
addEffect(flow: Flow, definition: FlowDefinition) {
const spec = this;
return Effect.gen(function* () {
const requestTopic = definition.topics?.[spec.requestTopicName] ?? spec.requestTopicName;
const responseTopic = definition.topics?.[spec.responseTopicName] ?? spec.responseTopicName;
const requestTopic = definition.topics?.[requestTopicName] ?? requestTopicName;
const responseTopic = definition.topics?.[responseTopicName] ?? responseTopicName;
const factory = yield* RequestResponseFactory;
const requestor = yield* factory.make<TReq, TRes>({
requestTopic,
responseTopic,
subscription: `${flow.processorId}-${flow.name}-${spec.name}`,
subscription: `${flow.processorId}-${flow.name}-${name}`,
});
flow.registerRequestor(spec.name, requestor as EffectRequestResponse<unknown, unknown>);
});
}
flow.registerRequestor(name, requestor as EffectRequestResponse<unknown, unknown>);
});
async add(flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition): Promise<void> {
await flow.runInCompatibilityScope(this.addEffect(flow, definition), pubsub);
}
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
await flow.runInCompatibilityScope(addEffect(flow, definition), pubsub);
},
};
}