Migrate strict Effect runtime surfaces

This commit is contained in:
elpresidank 2026-06-02 00:22:04 -05:00
parent f6878d4dd7
commit b4ee2b691f
35 changed files with 1717 additions and 1410 deletions

View file

@ -115,11 +115,10 @@ const makeNativeRecordingProcessor = (
events.push(`pubsub:${pubsub.backend.constructor.name}`);
}),
});
const stopEffect = processor.stopEffect;
processor.stopEffect = () => {
processor.onShutdown(() => {
events.push("native-stop");
return stopEffect();
};
return Promise.resolve();
});
return processor;
};

View file

@ -19,6 +19,8 @@ import {
AckPolicy,
DeliverPolicy,
} from "nats";
import { Effect } from "effect";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
import type {
@ -29,6 +31,7 @@ import type {
CreateConsumerOptions,
Message,
} from "./types.js";
import { pubSubError } from "../errors.js";
const sc = StringCodec();
@ -57,36 +60,61 @@ function makeNatsMessage<T>(msg: JsMsg, decoded: T): NatsMessage<T> {
};
}
const hasJsMsg = Predicate.hasProperty("_jsMsg");
function isAckableJsMsg(value: unknown): value is Pick<JsMsg, "ack" | "nak"> {
if (!Predicate.isObject(value)) return false;
if (!Predicate.hasProperty(value, "ack")) return false;
if (!Predicate.hasProperty(value, "nak")) return false;
return typeof value.ack === "function" && typeof value.nak === "function";
}
function isNatsMessage<T>(message: Message<T>): message is NatsMessage<T> {
return hasJsMsg(message) && isAckableJsMsg(message._jsMsg);
}
function makeNatsProducer<T>(
js: JetStreamClient,
subject: string,
schema?: S.Top,
schema?: S.Codec<T, unknown>,
): BackendProducer<T> {
return {
send: async (message, properties) => {
const encoded = schema !== undefined
? S.encodeUnknownSync(schema as S.Codec<unknown, unknown>)(message)
: message;
const data = sc.encode(JSON.stringify(encoded));
const opts: Record<string, unknown> = {};
send: (message, properties) =>
Effect.runPromise(
Effect.gen(function* () {
const encoded = schema !== undefined
? yield* S.encodeUnknownEffect(schema)(message).pipe(
Effect.mapError((error) => pubSubError(`encode:${subject}`, error)),
)
: message;
const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(encoded).pipe(
Effect.mapError((error) => pubSubError(`encode-json:${subject}`, error)),
);
const data = sc.encode(json);
const opts: Record<string, unknown> = {};
if (properties !== undefined && Object.keys(properties).length > 0) {
const { headers } = await import("nats");
const hdrs = headers();
for (const [key, val] of Object.entries(properties)) {
hdrs.append(key, val);
}
opts.headers = hdrs;
}
if (properties !== undefined && Object.keys(properties).length > 0) {
const { headers } = yield* Effect.tryPromise({
try: () => import("nats"),
catch: (error) => pubSubError("import:nats-headers", error),
});
const hdrs = headers();
for (const [key, val] of Object.entries(properties)) {
hdrs.append(key, val);
}
opts.headers = hdrs;
}
await js.publish(subject, data, opts);
},
flush: async () => {
// NATS publishes are flushed on the connection level.
},
close: async () => {
// No per-producer cleanup needed for NATS.
},
yield* Effect.tryPromise({
try: () => js.publish(subject, data, opts),
catch: (error) => pubSubError(`publish:${subject}`, error),
});
}),
),
// NATS publishes are flushed on the connection level.
flush: () => Promise.resolve(),
// No per-producer cleanup needed for NATS.
close: () => Promise.resolve(),
};
}
@ -101,60 +129,109 @@ function makeNatsConsumer<T>(
subscription: string,
initialPosition: "latest" | "earliest",
streamName: string,
schema?: S.Top,
schema?: S.Codec<T, unknown>,
): InitializableBackendConsumer<T> {
let consumer: NatsJsConsumer | null = null;
return {
init: async () => {
// Stream is already ensured by makeNatsBackend(). Create or bind to a durable consumer.
try {
consumer = await js.consumers.get(streamName, subscription);
} catch {
const deliverPolicy =
initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
init: () =>
Effect.runPromise(
Effect.gen(function* () {
const existing = yield* Effect.tryPromise({
try: () => js.consumers.get(streamName, subscription),
catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error),
}).pipe(
Effect.catch(() =>
Effect.gen(function* () {
const deliverPolicy =
initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
await jsm.consumers.add(streamName, {
durable_name: subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: subject,
});
yield* Effect.tryPromise({
try: () =>
jsm.consumers.add(streamName, {
durable_name: subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: subject,
}),
catch: (error) => pubSubError(`add-consumer:${streamName}:${subscription}`, error),
});
consumer = await js.consumers.get(streamName, subscription);
}
},
receive: async (timeoutMs = 2000) => {
if (consumer === null) throw new Error("Consumer not initialized");
return yield* Effect.tryPromise({
try: () => js.consumers.get(streamName, subscription),
catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error),
});
}),
),
);
consumer = existing;
}),
),
receive: (timeoutMs = 2000) =>
Effect.runPromise(
Effect.gen(function* () {
const current = consumer;
if (current === null) {
return yield* pubSubError("receive", "Consumer not initialized");
}
// Pull a single message with a timeout using the pull-based API.
// consumer.next() returns a JsMsg or null when the timeout expires.
const msg = await consumer.next({ expires: timeoutMs });
if (msg === null) return null;
// Pull a single message with a timeout using the pull-based API.
// consumer.next() returns a JsMsg or null when the timeout expires.
const msg = yield* Effect.tryPromise({
try: () => current.next({ expires: timeoutMs }),
catch: (error) => pubSubError(`receive:${subject}`, error),
});
if (msg === null) return null;
const parsed = JSON.parse(sc.decode(msg.data));
const decoded = schema !== undefined
? S.decodeUnknownSync(schema as S.Codec<unknown, unknown>)(parsed) as T
: parsed as T;
return makeNatsMessage(msg, decoded);
},
acknowledge: async (message) => {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.ack();
},
negativeAcknowledge: async (message) => {
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.nak();
},
unsubscribe: async () => {
const parsed = yield* S.decodeUnknownEffect(S.UnknownFromJsonString)(sc.decode(msg.data)).pipe(
Effect.mapError((error) => pubSubError(`decode-json:${subject}`, error)),
);
const decoded = schema !== undefined
? yield* S.decodeUnknownEffect(schema)(parsed).pipe(
Effect.mapError((error) => pubSubError(`decode-schema:${subject}`, error)),
)
: yield* S.decodeUnknownEffect(S.Any)(parsed).pipe(
Effect.mapError((error) => pubSubError(`decode-any:${subject}`, error)),
);
return makeNatsMessage(msg, decoded);
}),
),
acknowledge: (message) =>
Effect.runPromise(
Effect.gen(function* () {
if (!isNatsMessage(message)) {
return yield* pubSubError(`acknowledge:${subject}`, "Message was not produced by NATS backend");
}
yield* Effect.sync(() => {
message._jsMsg.ack();
});
}),
),
negativeAcknowledge: (message) =>
Effect.runPromise(
Effect.gen(function* () {
if (!isNatsMessage(message)) {
return yield* pubSubError(
`negative-acknowledge:${subject}`,
"Message was not produced by NATS backend",
);
}
yield* Effect.sync(() => {
message._jsMsg.nak();
});
}),
),
unsubscribe: () => {
// The pull-based consumer does not have a persistent subscription to drain.
// Clearing the reference is sufficient; the durable consumer persists server-side.
consumer = null;
return Promise.resolve();
},
close: async () => {
close: () => {
consumer = null;
return Promise.resolve();
},
};
}
@ -165,19 +242,26 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
let jsm: JetStreamManager | null = null;
const initializedStreams = new Set<string>();
const ensureConnected = async (): Promise<void> => {
const ensureConnected = Effect.fn("NatsBackend.ensureConnected")(function* () {
if (connection === null) {
connection = await connect({ servers: url });
js = connection.jetstream();
jsm = await connection.jetstreamManager();
const conn = yield* Effect.tryPromise({
try: () => connect({ servers: url }),
catch: (error) => pubSubError("connect", error),
});
connection = conn;
js = conn.jetstream();
jsm = yield* Effect.tryPromise({
try: () => conn.jetstreamManager(),
catch: (error) => pubSubError("jetstream-manager", error),
});
}
};
});
/**
* Ensure the stream for a given subject exists with a wildcard filter.
* E.g. subject "tg.flow.config-request" stream "tg_flow" with subjects ["tg.flow.>"]
*/
const ensureStream = async (subject: string): Promise<string> => {
const ensureStream = Effect.fn("NatsBackend.ensureStream")(function* (subject: string) {
const parts = subject.split(".");
const streamName = parts.slice(0, 2).join("_");
@ -186,53 +270,78 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
const wildcardSubject = `${parts.slice(0, 2).join(".")}.>`;
const manager = jsm;
if (manager === null) throw new Error("NATS backend not connected");
if (manager === null) return yield* pubSubError("ensure-stream", "NATS backend not connected");
try {
await manager.streams.info(streamName);
} catch {
await manager.streams.add({
name: streamName,
subjects: [wildcardSubject],
});
}
yield* Effect.tryPromise({
try: () => manager.streams.info(streamName),
catch: (error) => pubSubError(`stream-info:${streamName}`, error),
}).pipe(
Effect.catch(() =>
Effect.tryPromise({
try: () =>
manager.streams.add({
name: streamName,
subjects: [wildcardSubject],
}),
catch: (error) => pubSubError(`stream-add:${streamName}`, error),
}),
),
);
initializedStreams.add(streamName);
return streamName;
};
});
return {
createProducer: async <T>(options: CreateProducerOptions) => {
await ensureConnected();
await ensureStream(options.topic);
const client = js;
if (client === null) throw new Error("NATS backend not connected");
return makeNatsProducer<T>(client, options.topic, options.schema);
},
createConsumer: async <T>(options: CreateConsumerOptions) => {
await ensureConnected();
const streamName = await ensureStream(options.topic);
const client = js;
const manager = jsm;
if (client === null || manager === null) throw new Error("NATS backend not connected");
const consumer = makeNatsConsumer<T>(
client,
manager,
options.topic,
options.subscription,
options.initialPosition ?? "latest",
streamName,
options.schema,
);
await consumer.init();
return consumer;
},
close: async () => {
if (connection !== null) {
await connection.drain();
connection = null;
js = null;
jsm = null;
}
},
createProducer: <T>(options: CreateProducerOptions<T>) =>
Effect.runPromise(
Effect.gen(function* () {
yield* ensureConnected();
yield* ensureStream(options.topic);
const client = js;
if (client === null) return yield* pubSubError("create-producer", "NATS backend not connected");
return makeNatsProducer<T>(client, options.topic, options.schema);
}),
),
createConsumer: <T>(options: CreateConsumerOptions<T>) =>
Effect.runPromise(
Effect.gen(function* () {
yield* ensureConnected();
const streamName = yield* ensureStream(options.topic);
const client = js;
const manager = jsm;
if (client === null || manager === null) {
return yield* pubSubError("create-consumer", "NATS backend not connected");
}
const consumer = makeNatsConsumer<T>(
client,
manager,
options.topic,
options.subscription,
options.initialPosition ?? "latest",
streamName,
options.schema,
);
yield* Effect.tryPromise({
try: () => consumer.init(),
catch: (error) => pubSubError(`init-consumer:${options.topic}`, error),
});
return consumer;
}),
),
close: () =>
Effect.runPromise(
Effect.gen(function* () {
const conn = connection;
if (conn !== null) {
yield* Effect.tryPromise({
try: () => conn.drain(),
catch: (error) => pubSubError("close", error),
});
connection = null;
js = null;
jsm = null;
}
}),
),
};
}

View file

@ -20,10 +20,10 @@ import { pubSubError } from "../errors.js";
export interface PubSubService {
readonly backend: PubSubBackend;
readonly createProducer: <T>(
options: CreateProducerOptions,
options: CreateProducerOptions<T>,
) => Effect.Effect<BackendProducer<T>, ReturnType<typeof pubSubError>>;
readonly createConsumer: <T>(
options: CreateConsumerOptions,
options: CreateConsumerOptions<T>,
) => Effect.Effect<BackendConsumer<T>, ReturnType<typeof pubSubError>>;
readonly close: Effect.Effect<void, ReturnType<typeof pubSubError>>;
}
@ -41,12 +41,12 @@ export class PubSub extends Context.Service<PubSub, PubSubService>()("@trustgrap
export function makePubSubService(backend: PubSubBackend): PubSubService {
return {
backend,
createProducer: <T>(options: CreateProducerOptions) =>
createProducer: <T>(options: CreateProducerOptions<T>) =>
Effect.tryPromise({
try: () => backend.createProducer<T>(options),
catch: (error) => pubSubError(`createProducer:${options.topic}`, error),
}),
createConsumer: <T>(options: CreateConsumerOptions) =>
createConsumer: <T>(options: CreateConsumerOptions<T>) =>
Effect.tryPromise({
try: () => backend.createConsumer<T>(options),
catch: (error) => pubSubError(`createConsumer:${options.topic}`, error),

View file

@ -29,21 +29,21 @@ export interface BackendConsumer<T = unknown> {
export type ConsumerType = "shared" | "exclusive" | "failover";
export type InitialPosition = "latest" | "earliest";
export interface CreateProducerOptions {
export interface CreateProducerOptions<T = unknown> {
topic: string;
schema?: S.Top;
schema?: S.Codec<T, unknown>;
}
export interface CreateConsumerOptions {
export interface CreateConsumerOptions<T = unknown> {
topic: string;
subscription: string;
initialPosition?: InitialPosition;
consumerType?: ConsumerType;
schema?: S.Top;
schema?: S.Codec<T, unknown>;
}
export interface PubSubBackend {
createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>>;
createConsumer<T>(options: CreateConsumerOptions): Promise<BackendConsumer<T>>;
createProducer<T>(options: CreateProducerOptions<T>): Promise<BackendProducer<T>>;
createConsumer<T>(options: CreateConsumerOptions<T>): Promise<BackendConsumer<T>>;
close(): Promise<void>;
}

View file

@ -168,11 +168,11 @@ export type MessagingRuntimeError =
| FlowResourceNotFoundError;
export function tooManyRequestsError(message = "Rate limit exceeded"): TooManyRequestsError {
return new TooManyRequestsError({ message });
return TooManyRequestsError.make({ message });
}
export function llmError(message: string, errorType = "llm-error"): LlmError {
return new LlmError({ message, errorType });
return LlmError.make({ message, errorType });
}
export function embeddingsError(
@ -180,7 +180,7 @@ export function embeddingsError(
error: unknown,
provider?: string,
): EmbeddingsError {
return new EmbeddingsError({
return EmbeddingsError.make({
operation,
message: errorMessage(error),
...(provider === undefined ? {} : { provider }),
@ -188,11 +188,11 @@ export function embeddingsError(
}
export function parseError(message: string): ParseError {
return new ParseError({ message });
return ParseError.make({ message });
}
export function pubSubError(operation: string, error: unknown): PubSubError {
return new PubSubError({ operation, message: errorMessage(error) });
return PubSubError.make({ operation, message: errorMessage(error) });
}
export function processorLifecycleError(
@ -200,7 +200,7 @@ export function processorLifecycleError(
operation: string,
error: unknown,
): ProcessorLifecycleError {
return new ProcessorLifecycleError({
return ProcessorLifecycleError.make({
processorId,
operation,
message: errorMessage(error),
@ -212,7 +212,7 @@ export function messagingLifecycleError(
operation: string,
error: unknown,
): MessagingLifecycleError {
return new MessagingLifecycleError({
return MessagingLifecycleError.make({
resource,
operation,
message: errorMessage(error),
@ -224,7 +224,7 @@ export function messagingDeliveryError(
operation: string,
error: unknown,
): MessagingDeliveryError {
return new MessagingDeliveryError({
return MessagingDeliveryError.make({
topic,
operation,
message: errorMessage(error),
@ -236,7 +236,7 @@ export function messagingDecodeError(
error: unknown,
topic?: string,
): MessagingDecodeError {
return new MessagingDecodeError({
return MessagingDecodeError.make({
operation,
message: errorMessage(error),
...(topic === undefined ? {} : { topic }),
@ -247,7 +247,7 @@ export function messagingTimeoutError(
operation: string,
timeoutMs: number,
): MessagingTimeoutError {
return new MessagingTimeoutError({
return MessagingTimeoutError.make({
operation,
timeoutMs,
message: `${operation} timed out after ${timeoutMs}ms`,
@ -259,7 +259,7 @@ export function messagingHandlerError(
subscription: string,
error: unknown,
): MessagingHandlerError {
return new MessagingHandlerError({
return MessagingHandlerError.make({
topic,
subscription,
message: errorMessage(error),
@ -271,7 +271,7 @@ export function flowRuntimeError(
operation: string,
error: unknown,
): FlowRuntimeError {
return new FlowRuntimeError({
return FlowRuntimeError.make({
flowName,
operation,
message: errorMessage(error),
@ -283,7 +283,7 @@ export function flowResourceNotFoundError(
resourceType: FlowResourceNotFoundError["resourceType"],
resourceName: string,
): FlowResourceNotFoundError {
return new FlowResourceNotFoundError({
return FlowResourceNotFoundError.make({
flowName,
resourceType,
resourceName,

View file

@ -4,9 +4,16 @@
* Python reference: trustgraph-base/trustgraph/base/consumer.py
*/
import type { PubSubBackend, BackendConsumer, Message } from "../backend/types.js";
import type { BackendConsumer, Message, PubSubBackend } from "../backend/types.js";
import type { Flow } from "../processor/flow.js";
import { TooManyRequestsError } from "../errors.js";
import {
MessagingHandlerError,
TooManyRequestsError,
messagingDeliveryError,
messagingHandlerError,
messagingLifecycleError,
} from "../errors.js";
import { Duration, Effect } from "effect";
import * as S from "effect/Schema";
export type MessageHandler<T> = (
@ -44,83 +51,140 @@ export interface Consumer<T> {
export function makeConsumer<T>(options: ConsumerOptions<T>): Consumer<T> {
let backend: BackendConsumer<T> | null = null;
let running = false;
let abortController = new AbortController();
const isTooManyRequestsError = S.is(TooManyRequestsError);
const concurrency = options.concurrency ?? 1;
const rateLimitRetryMs = options.rateLimitRetryMs ?? 10_000;
const handleWithRetry = async (msg: Message<T>, flow: FlowContext): Promise<void> => {
try {
await options.handler(msg.value(), msg.properties(), flow);
} catch (err) {
if (S.is(TooManyRequestsError)(err)) {
console.warn(`[Consumer] Rate limited, retrying in ${rateLimitRetryMs}ms`);
await sleep(rateLimitRetryMs);
await options.handler(msg.value(), msg.properties(), flow);
} else {
throw err;
}
const runHandler = (
message: T,
properties: Record<string, string>,
flow: FlowContext,
): Effect.Effect<void, TooManyRequestsError | MessagingHandlerError> =>
Effect.tryPromise({
try: () => options.handler(message, properties, flow),
catch: (error) =>
isTooManyRequestsError(error)
? error
: messagingHandlerError(options.topic, options.subscription, error),
});
const handleWithRetry = Effect.fn("Consumer.handleWithRetry")(function* (
message: Message<T>,
flow: FlowContext,
) {
const callHandler = runHandler(message.value(), message.properties(), flow);
yield* callHandler.pipe(
Effect.catchTag("TooManyRequestsError", () =>
Effect.logWarning("[Consumer] Rate limited, retrying", {
topic: options.topic,
subscription: options.subscription,
retryMs: rateLimitRetryMs,
}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(rateLimitRetryMs))),
Effect.flatMap(() => callHandler),
),
),
);
});
const consumeOnce = Effect.fn("Consumer.consumeOnce")(function* (flow: FlowContext) {
const currentBackend = backend;
if (currentBackend === null) {
return yield* messagingLifecycleError(
`${options.topic}:${options.subscription}`,
"receive",
"Consumer backend not started",
);
}
};
const consumeLoop = async (flow: FlowContext): Promise<void> => {
while (running) {
let msg: Message<T> | null = null;
try {
const currentBackend = backend;
if (currentBackend === null) throw new Error("Consumer backend not started");
const message = yield* Effect.tryPromise({
try: () => currentBackend.receive(2000),
catch: (error) => messagingDeliveryError(options.topic, "receive", error),
});
if (message === null) return;
msg = await currentBackend.receive(2000);
if (msg === null) continue;
yield* handleWithRetry(message, flow).pipe(
Effect.flatMap(() =>
Effect.tryPromise({
try: () => currentBackend.acknowledge(message),
catch: (error) => messagingDeliveryError(options.topic, "acknowledge", error),
}),
),
Effect.catch((error) =>
Effect.tryPromise({
try: () => currentBackend.negativeAcknowledge(message),
catch: (nakError) => messagingDeliveryError(options.topic, "negative-acknowledge", nakError),
}).pipe(
Effect.catch((nakError) =>
Effect.logError("[Consumer] Failed to negative-acknowledge message", {
error: nakError.message,
topic: nakError.topic,
}),
),
Effect.flatMap(() => Effect.fail(error)),
),
),
);
});
await handleWithRetry(msg, flow);
await currentBackend.acknowledge(msg);
} catch (err) {
if (!running) break;
console.error("[Consumer] Error in consume loop:", err);
if (msg !== null) {
try {
const currentBackend = backend;
if (currentBackend !== null) {
await currentBackend.negativeAcknowledge(msg);
}
} catch (nakErr) {
console.error("[Consumer] Failed to nak message:", nakErr);
}
}
await sleep(1000);
}
}
};
const consumeLoop = Effect.fn("Consumer.consumeLoop")(function* (flow: FlowContext) {
yield* Effect.whileLoop({
while: () => running,
body: () =>
consumeOnce(flow).pipe(
Effect.catch((error) => {
if (!running) return Effect.void;
return Effect.logError("[Consumer] Error in consume loop", {
error: error.message,
topic: options.topic,
subscription: options.subscription,
}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
});
return {
start: async (flow) => {
backend = await options.pubsub.createConsumer<T>({
topic: options.topic,
subscription: options.subscription,
initialPosition: options.initialPosition ?? "latest",
});
start: (flow) =>
Effect.runPromise(
Effect.gen(function* () {
backend = yield* Effect.tryPromise({
try: () =>
options.pubsub.createConsumer<T>({
topic: options.topic,
subscription: options.subscription,
initialPosition: options.initialPosition ?? "latest",
}),
catch: (error) =>
messagingLifecycleError(`${options.topic}:${options.subscription}`, "create-consumer", error),
});
running = true;
running = true;
// Spawn concurrent consumer tasks.
const tasks = Array.from({ length: concurrency }, () =>
consumeLoop(flow),
);
// Run all concurrently: first rejection stops all.
await Promise.all(tasks);
},
stop: async () => {
running = false;
abortController.abort();
abortController = new AbortController();
if (backend !== null) {
await backend.close();
backend = null;
}
},
const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index);
yield* Effect.forEach(workerIndexes, () => consumeLoop(flow), {
concurrency: "unbounded",
discard: true,
});
}),
),
stop: () =>
Effect.runPromise(
Effect.gen(function* () {
running = false;
const currentBackend = backend;
backend = null;
if (currentBackend !== null) {
yield* Effect.tryPromise({
try: () => currentBackend.close(),
catch: (error) =>
messagingLifecycleError(`${options.topic}:${options.subscription}`, "close-consumer", error),
});
}
}),
),
};
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View file

@ -8,6 +8,7 @@ import type { PubSubBackend } from "../backend/types.js";
import type { ProducerMetrics } from "../metrics/prometheus.js";
import { Effect } from "effect";
import { makeEffectProducerHandle, type EffectProducer } from "./runtime.js";
import { messagingLifecycleError } from "../errors.js";
export interface Producer<T> {
readonly start: () => Promise<void>;
@ -23,28 +24,38 @@ export function makeProducer<T>(
let effectProducer: EffectProducer<T> | null = null;
return {
start: async () => {
const backend = await pubsub.createProducer<T>({ topic });
effectProducer = makeEffectProducerHandle(backend, {
topic,
...(metrics === undefined ? {} : { metrics }),
});
},
send: async (id, message) => {
if (effectProducer === null) throw new Error("Producer not started");
await Effect.runPromise(effectProducer.send(id, message));
},
stop: async () => {
if (effectProducer !== null) {
const producer = effectProducer;
await Effect.runPromise(
producer.flush.pipe(
Effect.flatMap(() => producer.close),
),
);
effectProducer = null;
}
},
start: () =>
Effect.runPromise(
Effect.gen(function* () {
const backend = yield* Effect.tryPromise({
try: () => pubsub.createProducer<T>({ topic }),
catch: (error) => messagingLifecycleError(topic, "create-producer", error),
});
effectProducer = makeEffectProducerHandle(backend, {
topic,
...(metrics === undefined ? {} : { metrics }),
});
}),
),
send: (id, message) =>
effectProducer === null
? Effect.runPromise(Effect.fail(messagingLifecycleError(topic, "send", "Producer not started")))
: Effect.runPromise(effectProducer.send(id, message)),
stop: () =>
Effect.runPromise(
Effect.gen(function* () {
if (effectProducer !== null) {
const producer = effectProducer;
yield* producer.flush.pipe(
Effect.flatMap(() => producer.close),
Effect.ensuring(
Effect.sync(() => {
effectProducer = null;
}),
),
);
}
}),
),
};
}

View file

@ -44,37 +44,42 @@ export function makeRequestResponse<TReq, TRes>(
let runtime: RequestResponseRuntime<TReq, TRes> | null = null;
return {
start: async () => {
if (runtime !== null) return;
start: () =>
runtime !== null
? Promise.resolve()
: Effect.runPromise(
Effect.gen(function* () {
const scope = yield* Scope.make();
const startRuntime = Effect.gen(function* () {
const config = yield* loadMessagingRuntimeConfig();
const requestor = yield* makeEffectRequestResponseFromPubSub<TReq, TRes>(
PubSub.fromBackend(options.pubsub),
config,
{
requestTopic: options.requestTopic,
responseTopic: options.responseTopic,
subscription: options.subscription,
},
).pipe(Scope.provide(scope));
const scope = await Effect.runPromise(Scope.make());
runtime = { scope, requestor };
});
try {
const config = await Effect.runPromise(loadMessagingRuntimeConfig());
const requestor = await Effect.runPromise(
makeEffectRequestResponseFromPubSub<TReq, TRes>(
PubSub.fromBackend(options.pubsub),
config,
{
requestTopic: options.requestTopic,
responseTopic: options.responseTopic,
subscription: options.subscription,
},
).pipe(Scope.provide(scope)),
);
runtime = { scope, requestor };
} catch (error) {
await Effect.runPromise(Scope.close(scope, Exit.fail(error))).catch(() => undefined);
throw error;
}
},
stop: async () => {
yield* startRuntime.pipe(
Effect.catch((error) =>
Scope.close(scope, Exit.fail(error)).pipe(
Effect.flatMap(() => Effect.fail(error)),
),
),
);
}),
),
stop: () => {
const current = runtime;
runtime = null;
if (current === null) return;
await Effect.runPromise(Scope.close(current.scope, Exit.void));
return current === null
? Promise.resolve()
: Effect.runPromise(Scope.close(current.scope, Exit.void));
},
/**
* Send a request and wait for responses.
@ -85,20 +90,24 @@ export function makeRequestResponse<TReq, TRes>(
* Return `true` to indicate the final response has been received.
* If omitted, returns the first response.
*/
request: async (request, requestOptions) => {
request: (request, requestOptions) => {
const current = runtime;
if (current === null) {
throw messagingLifecycleError(
`${options.requestTopic}:${options.responseTopic}`,
"request",
"RequestResponse not started",
return Effect.runPromise(
Effect.fail(
messagingLifecycleError(
`${options.requestTopic}:${options.responseTopic}`,
"request",
"RequestResponse not started",
),
),
);
}
const timeoutMs = requestOptions?.timeoutMs ?? 300_000;
const recipient = requestOptions?.recipient;
return await Effect.runPromise(
return Effect.runPromise(
current.requestor.request(request, {
timeoutMs,
...(recipient === undefined

View file

@ -44,9 +44,9 @@ export type EffectMessageHandler<T, E = never, R = never> = (
flow: FlowContext<R>,
) => Effect.Effect<void, E, R>;
export interface EffectProducerOptions {
export interface EffectProducerOptions<T = unknown> {
readonly topic: string;
readonly schema?: S.Top;
readonly schema?: S.Codec<T, unknown>;
readonly metrics?: ProducerMetrics;
}
@ -62,7 +62,7 @@ export interface EffectConsumerOptions<T, E = never, R = never> {
readonly handler: EffectMessageHandler<T, E, R>;
readonly concurrency?: number;
readonly initialPosition?: "latest" | "earliest";
readonly schema?: S.Top;
readonly schema?: S.Codec<T, unknown>;
readonly receiveTimeoutMs?: number;
readonly errorBackoffMs?: number;
readonly rateLimitRetryMs?: number;
@ -73,12 +73,12 @@ export interface EffectConsumer {
readonly fibers: ReadonlyArray<Fiber.Fiber<void, never>>;
}
export interface EffectRequestResponseOptions {
export interface EffectRequestResponseOptions<TReq = unknown, TRes = unknown> {
readonly requestTopic: string;
readonly responseTopic: string;
readonly subscription: string;
readonly requestSchema?: S.Top;
readonly responseSchema?: S.Top;
readonly requestSchema?: S.Codec<TReq, unknown>;
readonly responseSchema?: S.Codec<TRes, unknown>;
}
export interface EffectRequestOptions<TRes, E = never, R = never> {
@ -96,7 +96,7 @@ export interface EffectRequestResponse<TReq, TRes> {
export interface ProducerFactoryService {
readonly make: <T>(
options: EffectProducerOptions,
options: EffectProducerOptions<T>,
) => Effect.Effect<EffectProducer<T>, PubSubError, Scope.Scope>;
}
@ -109,7 +109,7 @@ export interface ConsumerFactoryService {
export interface RequestResponseFactoryService {
readonly make: <TReq, TRes>(
options: EffectRequestResponseOptions,
options: EffectRequestResponseOptions<TReq, TRes>,
) => Effect.Effect<EffectRequestResponse<TReq, TRes>, PubSubError, Scope.Scope>;
}
@ -138,7 +138,7 @@ export class FlowRuntime extends Context.Service<FlowRuntime, FlowRuntimeService
export function makeEffectProducerHandle<T>(
backend: BackendProducer<T>,
options: EffectProducerOptions,
options: EffectProducerOptions<T>,
): EffectProducer<T> {
return {
send: Effect.fn(`Producer.send:${options.topic}`)((id: string, message: T) =>
@ -168,9 +168,9 @@ export function makeEffectProducerHandle<T>(
export const makeEffectProducerFromPubSub = Effect.fn("makeEffectProducerFromPubSub")(function* <T>(
pubsub: PubSubService,
options: EffectProducerOptions,
options: EffectProducerOptions<T>,
) {
const createOptions: CreateProducerOptions = options.schema === undefined
const createOptions: CreateProducerOptions<T> = options.schema === undefined
? { topic: options.topic }
: { topic: options.topic, schema: options.schema };
const backend = yield* pubsub.createProducer<T>(createOptions);
@ -326,7 +326,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
options: EffectConsumerOptions<T, E, R>,
flow: FlowContext<R>,
) {
const createOptions: CreateConsumerOptions = {
const createOptions: CreateConsumerOptions<T> = {
topic: options.topic,
subscription: options.subscription,
...(options.initialPosition === undefined ? {} : { initialPosition: options.initialPosition }),
@ -422,9 +422,9 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
>(
pubsub: PubSubService,
config: MessagingRuntimeConfig,
options: EffectRequestResponseOptions,
options: EffectRequestResponseOptions<TReq, TRes>,
) {
const producerOptions: CreateProducerOptions = options.requestSchema === undefined
const producerOptions: CreateProducerOptions<TReq> = options.requestSchema === undefined
? { topic: options.requestTopic }
: { topic: options.requestTopic, schema: options.requestSchema };
const producerBackend = yield* pubsub.createProducer<TReq>(producerOptions);
@ -432,7 +432,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
topic: options.requestTopic,
...(options.requestSchema === undefined ? {} : { schema: options.requestSchema }),
});
const createOptions: CreateConsumerOptions = {
const createOptions: CreateConsumerOptions<TRes> = {
topic: options.responseTopic,
subscription: options.subscription,
...(options.responseSchema === undefined ? {} : { schema: options.responseSchema }),
@ -502,7 +502,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
export function makeProducerFactoryService(pubsub: PubSubService): ProducerFactoryService {
return {
make: Effect.fn("ProducerFactory.make")(<T>(options: EffectProducerOptions) =>
make: Effect.fn("ProducerFactory.make")(<T>(options: EffectProducerOptions<T>) =>
makeEffectProducerFromPubSub<T>(pubsub, options),
),
};
@ -526,13 +526,11 @@ export function makeRequestResponseFactoryService(
pubsub: PubSubService,
config: MessagingRuntimeConfig,
): RequestResponseFactoryService {
const make = Effect.fn("RequestResponseFactory.make")(function* <TReq, TRes>(
options: EffectRequestResponseOptions,
) {
return yield* makeEffectRequestResponseFromPubSub<TReq, TRes>(pubsub, config, options);
}) as RequestResponseFactoryService["make"];
return { make };
return {
make: Effect.fn("RequestResponseFactory.make")(<TReq, TRes>(
options: EffectRequestResponseOptions<TReq, TRes>,
) => makeEffectRequestResponseFromPubSub<TReq, TRes>(pubsub, config, options)),
};
}
export const ProducerFactoryLive = Layer.effect(
@ -589,7 +587,7 @@ export const MessagingRuntimeLive = Layer.mergeAll(
);
export const runEffectProducerScoped = Effect.fn("runEffectProducerScoped")(function* <T>(
options: EffectProducerOptions,
options: EffectProducerOptions<T>,
) {
const pubsub = yield* PubSub;
return yield* makeEffectProducerFromPubSub<T>(pubsub, options);
@ -605,7 +603,7 @@ export const runEffectConsumerScoped = Effect.fn("runEffectConsumerScoped")(func
});
export const runEffectRequestResponseScoped = Effect.fn("runEffectRequestResponseScoped")(function* <TReq, TRes>(
options: EffectRequestResponseOptions,
options: EffectRequestResponseOptions<TReq, TRes>,
) {
const pubsub = yield* PubSub;
const config = yield* loadMessagingRuntimeConfig();

View file

@ -5,6 +5,8 @@
*/
import type { PubSubBackend, BackendConsumer } from "../backend/types.js";
import { Duration, Effect, Fiber } from "effect";
import { messagingDeliveryError, messagingLifecycleError, messagingTimeoutError } from "../errors.js";
type Resolver<T> = {
queue: AsyncQueue<T>;
@ -32,28 +34,33 @@ export function makeAsyncQueue<T>(): AsyncQueue<T> {
buffer.push(item);
}
},
pop: async (timeoutMs) => {
pop: (timeoutMs) => {
const buffered = buffer.shift();
if (buffered !== undefined) return buffered;
return new Promise<T>((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined;
if (buffered !== undefined) return Promise.resolve(buffered);
const take = Effect.callback<T>((resume) => {
const waiter = (value: T) => {
if (timer !== undefined) clearTimeout(timer);
resolve(value);
resume(Effect.succeed(value));
};
waiters.push(waiter);
if (timeoutMs !== undefined) {
timer = setTimeout(() => {
const idx = waiters.indexOf(waiter);
if (idx !== -1) waiters.splice(idx, 1);
reject(new Error(`Queue.pop timed out after ${timeoutMs}ms`));
}, timeoutMs);
}
return Effect.sync(() => {
const idx = waiters.indexOf(waiter);
if (idx !== -1) waiters.splice(idx, 1);
});
});
return Effect.runPromise(
timeoutMs === undefined
? take
: take.pipe(
Effect.timeout(Duration.millis(timeoutMs)),
Effect.catchTag("TimeoutError", () =>
Effect.fail(messagingTimeoutError("queue.pop", timeoutMs)),
),
),
);
},
get length() {
return buffer.length;
@ -77,76 +84,113 @@ export function makeSubscriber<T>(
): Subscriber<T> {
let backend: BackendConsumer<T> | null = null;
let running = false;
let fiber: Fiber.Fiber<void, never> | null = null;
// ID-specific subscriptions (request/response correlation)
const idSubscribers = new Map<string, Resolver<T>>();
// Wildcard subscribers (receive all messages)
const allSubscribers = new Map<string, Resolver<T>>();
const dispatchLoop = async (): Promise<void> => {
const dispatchLoop = Effect.fn("Subscriber.dispatchLoop")(function* () {
let consecutiveErrors = 0;
while (running) {
try {
const currentBackend = backend;
if (currentBackend === null) throw new Error("Subscriber backend not started");
const dispatchOnce = Effect.fn("Subscriber.dispatchOnce")(function* () {
const currentBackend = backend;
if (currentBackend === null) {
return yield* messagingLifecycleError(
`${topic}:${subscription}`,
"dispatch",
"Subscriber backend not started",
);
}
const msg = await currentBackend.receive(2000);
if (msg === null) continue;
const msg = yield* Effect.tryPromise({
try: () => currentBackend.receive(2000),
catch: (error) => messagingDeliveryError(topic, "receive", error),
});
if (msg === null) return;
consecutiveErrors = 0;
consecutiveErrors = 0;
const props = msg.properties();
const id = props.id;
const value = msg.value();
const props = msg.properties();
const id = props.id;
const value = msg.value();
// Route to ID-specific subscriber
if (id !== undefined && id.length > 0) {
const sub = idSubscribers.get(id);
if (sub !== undefined) {
// Route to ID-specific subscriber
if (id !== undefined && id.length > 0) {
const sub = idSubscribers.get(id);
if (sub !== undefined) {
sub.queue.push(value);
}
}
// Broadcast to all-subscribers
for (const sub of allSubscribers.values()) {
sub.queue.push(value);
}
}
// Broadcast to all-subscribers
for (const sub of allSubscribers.values()) {
sub.queue.push(value);
}
yield* Effect.tryPromise({
try: () => currentBackend.acknowledge(msg),
catch: (error) => messagingDeliveryError(topic, "acknowledge", error),
});
});
await currentBackend.acknowledge(msg);
} catch (err) {
if (!running) break;
consecutiveErrors++;
if (consecutiveErrors <= 3) {
console.error("[Subscriber] Error:", err);
} else if (consecutiveErrors === 4) {
console.error("[Subscriber] Suppressing further errors (will retry with backoff)");
}
// Exponential backoff: 1s, 2s, 4s, max 10s
const delay = Math.min(1000 * Math.pow(2, consecutiveErrors - 1), 10_000);
await new Promise((r) => setTimeout(r, delay));
}
}
};
yield* Effect.whileLoop({
while: () => running,
body: () =>
dispatchOnce().pipe(
Effect.catch((error) => {
if (!running) return Effect.void;
consecutiveErrors++;
const logEffect = consecutiveErrors <= 3
? Effect.logError("[Subscriber] Error", { error })
: consecutiveErrors === 4
? Effect.logError("[Subscriber] Suppressing further errors (will retry with backoff)", { error })
: Effect.void;
const delay = Math.min(1000 * 2 ** (consecutiveErrors - 1), 10_000);
return logEffect.pipe(Effect.flatMap(() => Effect.sleep(Duration.millis(delay))));
}),
),
step: () => undefined,
});
});
return {
start: async () => {
backend = await pubsub.createConsumer<T>({
topic,
subscription,
});
running = true;
// Start the dispatch loop (fire and forget; runs until stop).
dispatchLoop().catch((err) => {
if (running === true) console.error("[Subscriber] dispatch loop error:", err);
});
},
stop: async () => {
running = false;
if (backend !== null) {
await backend.close();
backend = null;
}
},
start: () =>
Effect.runPromise(
Effect.gen(function* () {
backend = yield* Effect.tryPromise({
try: () =>
pubsub.createConsumer<T>({
topic,
subscription,
}),
catch: (error) =>
messagingLifecycleError(`${topic}:${subscription}`, "create-consumer", error),
});
running = true;
fiber = yield* dispatchLoop().pipe(Effect.forkDetach);
}),
),
stop: () =>
Effect.runPromise(
Effect.gen(function* () {
running = false;
const activeFiber = fiber;
fiber = null;
if (activeFiber !== null) {
yield* Fiber.interrupt(activeFiber);
}
const currentBackend = backend;
if (currentBackend !== null) {
backend = null;
yield* Effect.tryPromise({
try: () => currentBackend.close(),
catch: (error) =>
messagingLifecycleError(`${topic}:${subscription}`, "close-consumer", error),
});
}
}),
),
subscribe: (id) => {
const queue = makeAsyncQueue<T>();
idSubscribers.set(id, { queue });

View file

@ -8,7 +8,7 @@
import type { PubSubBackend } from "../backend/types.js";
import { makeNatsBackend } from "../backend/nats.js";
import { Effect } from "effect";
import { Context, Effect } from "effect";
import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js";
import { loadProcessorRuntimeConfig } from "../runtime/config.js";
@ -36,10 +36,10 @@ declare const processorRunRequirementsType: unique symbol;
export interface ProcessorRuntime<RunError = ProcessorLifecycleError, RunRequirements = never> {
readonly [processorRunErrorType]?: RunError;
readonly [processorRunRequirementsType]?: RunRequirements;
readonly start: () => Promise<void>;
readonly start: (context: Context.Context<RunRequirements>) => Promise<void>;
readonly stop: () => Promise<void>;
startEffect(): unknown;
stopEffect(): unknown;
startEffect: Effect.Effect<void, RunError | ProcessorLifecycleError, RunRequirements>;
stopEffect: Effect.Effect<void, ProcessorLifecycleError>;
}
export interface AsyncProcessorRuntime<
@ -53,8 +53,8 @@ export interface AsyncProcessorRuntime<
readonly isRunning: () => boolean;
readonly registerConfigHandler: (handler: ConfigHandler) => void;
readonly onShutdown: (callback: () => Promise<void>) => void;
readonly run: () => Promise<void>;
runEffect(): unknown;
readonly run: (context: Context.Context<RunRequirements>) => Promise<void>;
runEffect: Effect.Effect<void, RunError | ProcessorLifecycleError, RunRequirements>;
}
export interface AsyncProcessorRuntimeOptions<
@ -94,8 +94,16 @@ export function makeAsyncProcessor<
}
const shutdown = () => {
console.log(`[${config.id}] Shutting down...`);
void processor.stop().then(() => process.exit(0));
void Effect.runPromise(
Effect.log(`[${config.id}] Shutting down...`).pipe(
Effect.flatMap(() =>
Effect.tryPromise({
try: () => processor.stop(),
catch: (error) => processorLifecycleError(config.id, "signal-shutdown", error),
}),
),
),
).then(() => process.exit(0), () => process.exit(1));
};
const handlers: RegisteredSignalHandler[] = [
{ signal: "SIGINT", handler: shutdown },
@ -125,29 +133,19 @@ export function makeAsyncProcessor<
registerConfigHandler: (handler) => {
configHandlers.push(handler);
},
start: async () => {
await Effect.runPromise(
processor.startEffect() as Effect.Effect<void, RunError | ProcessorLifecycleError>,
);
},
stop: async () => {
await Effect.runPromise(
processor.stopEffect() as Effect.Effect<void, ProcessorLifecycleError>,
);
},
start: (context) => Effect.runPromiseWith(context)(processor.startEffect),
stop: () => Effect.runPromise(processor.stopEffect),
onShutdown: (callback) => {
shutdownCallbacks.push(callback);
},
startEffect() {
get startEffect() {
const startProcessor = Effect.fn("trustgraph.processor.start")(function* () {
yield* Effect.sync(() => {
running = true;
registerProcessSignalHandlers();
});
yield* (
processor.runEffect() as Effect.Effect<void, RunError, RunRequirements>
);
yield* processor.runEffect;
});
return startProcessor().pipe(
Effect.withSpan("trustgraph.processor.start", {
@ -157,7 +155,7 @@ export function makeAsyncProcessor<
}),
);
},
stopEffect() {
get stopEffect() {
const stopProcessor = Effect.fn("trustgraph.processor.stop")(function* () {
yield* Effect.sync(() => {
running = false;
@ -180,18 +178,15 @@ export function makeAsyncProcessor<
});
return stopProcessor();
},
run: () =>
Effect.runPromise(
processor.runEffect() as unknown as Effect.Effect<void, RunError>,
),
runEffect: () => {
run: (context) => Effect.runPromiseWith(context)(processor.runEffect),
get runEffect() {
if (options.runEffect !== undefined) {
return options.runEffect(processor);
}
return Effect.tryPromise({
try: () => options.run?.(processor) ?? Promise.resolve(),
catch: (error) => processorLifecycleError(config.id, "start", error),
}) as unknown as Effect.Effect<void, RunError, RunRequirements>;
});
},
};
@ -208,13 +203,21 @@ export const AsyncProcessor = Object.assign(
return makeAsyncProcessor(config);
},
{
async launch<T extends ProcessorRuntime<unknown, unknown>>(
launch<T extends ProcessorRuntime<unknown, never>>(
this: new (config: ProcessorConfig) => T,
id: string,
): Promise<void> {
const config = await Effect.runPromise(loadProcessorRuntimeConfig(id));
const processor = new this(config);
await processor.start();
const ProcessorCtor = this;
return Effect.runPromise(
Effect.gen(function* () {
const config = yield* loadProcessorRuntimeConfig(id);
const processor = new ProcessorCtor(config);
yield* Effect.tryPromise({
try: () => processor.start(Context.empty()),
catch: (error) => processorLifecycleError(id, "launch", error),
});
}),
);
},
},
) as unknown as {
@ -224,7 +227,7 @@ export const AsyncProcessor = Object.assign(
<RunError = ProcessorLifecycleError, RunRequirements = never>(
config: ProcessorConfig,
): AsyncProcessor<RunError, RunRequirements>;
launch<T extends ProcessorRuntime<unknown, unknown>>(
launch<T extends ProcessorRuntime<unknown, never>>(
this: new (config: ProcessorConfig) => T,
id: string,
): Promise<void>;

View file

@ -38,6 +38,7 @@ import {
import { makePubSubService, PubSub } from "../backend/pubsub.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import { Duration, Effect, Exit, Scope } from "effect";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
interface ConfigPush {
@ -88,9 +89,7 @@ export interface FlowProcessorRuntime<FlowRequirements = never>
readonly configHandlers: ConfigHandler[];
readonly isRunning: () => boolean;
readonly registerConfigHandler: (handler: ConfigHandler) => void;
readonly registerSpecification: <Requirements extends FlowRequirements>(
spec: Spec<Requirements>,
) => void;
readonly registerSpecification: (spec: Spec<FlowRequirements>) => void;
readonly specifications: ReadonlyArray<Spec<FlowRequirements>>;
}
@ -106,6 +105,20 @@ const ConfigPushSchema = S.Struct({
config: S.Record(S.String, S.Unknown),
});
const isStringRecord = (value: unknown): value is Record<string, unknown> =>
Predicate.isObject(value) && !Array.isArray(value);
const isTopicsRecord = (value: unknown): value is Record<string, string> =>
isStringRecord(value) && Object.values(value).every((item) => typeof item === "string");
const isFlowDefinition = (value: unknown): value is FlowDefinition => {
if (!isStringRecord(value)) return false;
const topics = value.topics;
const parameters = value.parameters;
return (topics === undefined || isTopicsRecord(topics)) &&
(parameters === undefined || isStringRecord(parameters));
};
export function runFlowProcessorDefinitionScoped<
FlowRequirements = never,
ConfigHandlerError = never,
@ -202,11 +215,15 @@ export function runFlowProcessorDefinitionScoped<
FlowRuntime | ProducerFactory | ConsumerFactory | RequestResponseFactory | FlowRequirements
> =>
Effect.gen(function* () {
const flowDefs = config.flows as Record<string, FlowDefinition> | undefined;
const flowDefs = config.flows;
if (flowDefs === undefined) {
yield* Effect.log(`[${options.id}] No flows in config push, skipping`);
return;
}
if (!isStringRecord(flowDefs)) {
yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`);
return;
}
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefs).pipe(
Effect.catch((error) => Effect.succeed(String(error))),
@ -226,7 +243,7 @@ export function runFlowProcessorDefinitionScoped<
}
for (const [name, defn] of Object.entries(flowDefs)) {
if (typeof defn !== "object" || defn === null) {
if (!isFlowDefinition(defn)) {
yield* Effect.logWarning(`[${options.id}] Skipping flow "${name}": definition is not an object`);
continue;
}
@ -353,8 +370,8 @@ export function makeFlowProcessor<FlowRequirements = never>(
},
});
const startEffect = (): FlowProcessorStartEffect<FlowRequirements> => {
const effect = base.startEffect() as FlowProcessorStartEffect<FlowRequirements>;
const makeStartEffect = (): FlowProcessorStartEffect<FlowRequirements> => {
const effect = base.startEffect;
return options.provide?.(effect) ?? effect;
};
@ -362,24 +379,29 @@ export function makeFlowProcessor<FlowRequirements = never>(
...base,
specifications,
registerSpecification: (spec) => {
specifications.push(spec as Spec<FlowRequirements>);
specifications.push(spec);
},
startEffect,
start: async () => {
const pubsub = makePubSubService(base.pubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
const start = startEffect().pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
) as Effect.Effect<void, PubSubError | FlowRuntimeError | ProcessorLifecycleError>;
await Effect.runPromise(Effect.scoped(start));
get startEffect() {
return makeStartEffect();
},
start: (context) =>
Effect.runPromiseWith(context)(
Effect.gen(function* () {
const pubsub = makePubSubService(base.pubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
const start = processor.startEffect.pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
);
yield* Effect.scoped(start);
}),
),
};
return processor;

View file

@ -4,7 +4,7 @@
* Python reference: trustgraph-base/trustgraph/base/flow.py
*/
import { Effect, Exit, Scope } from "effect";
import { Context, Effect, Exit, Scope } from "effect";
import type { PubSubBackend } from "../backend/types.js";
import { makePubSubService } from "../backend/pubsub.js";
import {
@ -64,19 +64,20 @@ export function makeFlow<Requirements = never>(
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
) {
const producers = new Map<string, EffectProducer<unknown>>();
const producers = new Map<string, EffectProducer<never>>();
const consumers = new Map<string, EffectConsumer>();
const requestors = new Map<string, EffectRequestResponse<unknown, unknown>>();
const requestors = new Map<string, EffectRequestResponse<never, unknown>>();
const parameters = new Map<string, unknown>();
let compatibilityScope: Scope.Closeable | null = null;
const ensureCompatibilityScope = async (): Promise<Scope.Closeable> => {
const ensureCompatibilityScopeEffect = Effect.fn("Flow.ensureCompatibilityScope")(function* () {
if (compatibilityScope !== null) {
return compatibilityScope;
}
compatibilityScope = await Effect.runPromise(Scope.make());
return compatibilityScope;
};
const scope = yield* Scope.make();
compatibilityScope = scope;
return scope;
});
const toEffectRequestOptions = <TRes>(
options: FlowRequestOptions<TRes> | undefined,
@ -105,41 +106,58 @@ export function makeFlow<Requirements = never>(
}
});
},
async start(): Promise<void> {
if (compatibilityScope !== null) {
await flow.stop();
}
await flow.runInCompatibilityScope(
flow.startEffect() as Effect.Effect<void, PubSubError, SpecRuntimeRequirements>,
pubsub,
start(context: Context.Context<Requirements>): Promise<void> {
return Effect.runPromise(
Effect.gen(function* () {
if (compatibilityScope !== null) {
yield* flow.stopEffect();
}
yield* flow.runInCompatibilityScopeEffect(flow.startEffect(), pubsub, context);
}),
);
},
async stop(): Promise<void> {
const scope = compatibilityScope;
compatibilityScope = null;
if (scope !== null) {
await Effect.runPromise(Scope.close(scope, Exit.void));
}
flow.clearResources();
stop(): Promise<void> {
return Effect.runPromise(flow.stopEffect());
},
async runInCompatibilityScope<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements>,
stopEffect(): Effect.Effect<void> {
return Effect.gen(function* () {
const scope = compatibilityScope;
compatibilityScope = null;
if (scope !== null) {
yield* Scope.close(scope, Exit.void);
}
flow.clearResources();
});
},
runInCompatibilityScopeEffect<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
): Promise<A> {
const scope = await ensureCompatibilityScope();
const pubsubService = makePubSubService(runtimePubsub);
const messagingConfig = await Effect.runPromise(loadMessagingRuntimeConfig());
return await Effect.runPromise(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
context: Context.Context<Requirements>,
) {
return Effect.gen(function* () {
const scope = yield* ensureCompatibilityScopeEffect();
const pubsubService = makePubSubService(runtimePubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
return yield* Effect.provide(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
),
Scope.provide(scope),
),
Scope.provide(scope),
),
);
context,
);
});
},
runInCompatibilityScope<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
context: Context.Context<Requirements>,
): Promise<A> {
return Effect.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context));
},
clearResources(): void {
producers.clear();
@ -147,13 +165,13 @@ export function makeFlow<Requirements = never>(
requestors.clear();
parameters.clear();
},
registerProducer(registerName: string, producer: EffectProducer<unknown>): void {
registerProducer<T>(registerName: string, producer: EffectProducer<T>): void {
producers.set(registerName, producer);
},
registerConsumer(registerName: string, consumer: EffectConsumer): void {
consumers.set(registerName, consumer);
},
registerRequestor(registerName: string, rr: EffectRequestResponse<unknown, unknown>): void {
registerRequestor<TReq, TRes>(registerName: string, rr: EffectRequestResponse<TReq, TRes>): void {
requestors.set(registerName, rr);
},
setParameter(parameterName: string, value: unknown): void {

View file

@ -5,7 +5,7 @@
* executable path while the processor internals remain Promise-based.
*/
import { Config as EffectConfig, Effect, Layer, Scope } from "effect";
import { Config as EffectConfig, Effect, Layer } from "effect";
import {
processorLifecycleError,
type FlowRuntimeError,
@ -37,18 +37,16 @@ import type {
import { runFlowProcessorDefinitionScoped } from "./flow-processor.js";
import type { Spec } from "../spec/types.js";
type ProcessorRunError<Processor> = Processor extends ProcessorRuntime<infer Error, unknown> ? Error : never;
type ProcessorRunRequirements<Processor> = Processor extends ProcessorRuntime<unknown, infer Requirements> ? Requirements : never;
export interface ProcessorProgramOptions<
Config extends ProcessorConfig,
Error,
Requirements,
Processor extends ProcessorRuntime<unknown, unknown>,
LoadError,
LoadRequirements,
RunError,
RunRequirements,
> {
readonly id: string;
readonly make: (config: Config) => Processor;
readonly loadConfig?: Effect.Effect<Config, Error, Requirements>;
readonly make: (config: Config) => ProcessorRuntime<RunError, RunRequirements>;
readonly loadConfig?: Effect.Effect<Config, LoadError, LoadRequirements>;
}
export interface FlowProcessorProgramOptions<
@ -68,18 +66,14 @@ export interface FlowProcessorProgramOptions<
) => Layer.Layer<FlowRequirements, Error, LayerRequirements>;
}
export function runProcessorScoped<
export const runProcessorScoped = Effect.fn("runProcessorScoped")(function* <
Config extends ProcessorConfig,
Processor extends ProcessorRuntime<unknown, unknown>,
RunError,
RunRequirements,
>(
config: Config,
make: (config: Config) => Processor,
): Effect.Effect<
void,
ProcessorRunError<Processor> | ProcessorLifecycleError,
PubSub | Scope.Scope | ProcessorRunRequirements<Processor>
> {
return Effect.gen(function* () {
make: (config: Config) => ProcessorRuntime<RunError, RunRequirements>,
) {
const pubsub = yield* PubSub;
const runtimeConfig = {
...config,
@ -103,23 +97,17 @@ export function runProcessorScoped<
),
);
yield* (
processor.startEffect() as Effect.Effect<
void,
ProcessorRunError<Processor> | ProcessorLifecycleError,
ProcessorRunRequirements<Processor>
>
);
});
}
yield* processor.startEffect;
});
export function makeProcessorProgram<
Config extends ProcessorConfig,
Error = never,
Requirements = never,
Processor extends ProcessorRuntime<unknown, unknown> = ProcessorRuntime,
LoadError = never,
LoadRequirements = never,
RunError = ProcessorLifecycleError,
RunRequirements = never,
>(
options: ProcessorProgramOptions<Config, Error, Requirements, Processor>,
options: ProcessorProgramOptions<Config, LoadError, LoadRequirements, RunError, RunRequirements>,
) {
return Effect.scoped(
Effect.gen(function* () {
@ -147,7 +135,7 @@ export function makeProcessorProgram<
),
),
);
const processorEffect = runProcessorScoped<Config, Processor>(
const processorEffect = runProcessorScoped<Config, RunError, RunRequirements>(
runtimeConfig,
options.make,
);
@ -173,12 +161,37 @@ export function makeFlowProcessorProgram<
FlowRequirements = never,
LayerRequirements = never,
>(
options: FlowProcessorProgramOptions<Config, Error, FlowRequirements, LayerRequirements>,
options: FlowProcessorProgramOptions<Config, Error, FlowRequirements, LayerRequirements> & {
readonly layer: (config: Config) => Layer.Layer<FlowRequirements, Error, LayerRequirements>;
},
): Effect.Effect<
void,
never,
Error | EffectConfig.ConfigError | PubSubError | FlowRuntimeError,
LayerRequirements
> {
>;
export function makeFlowProcessorProgram<
Config extends ProcessorConfig,
Error = never,
FlowRequirements = never,
>(
options: FlowProcessorProgramOptions<Config, Error, FlowRequirements, FlowRequirements> & {
readonly layer?: undefined;
},
): Effect.Effect<
never,
Error | EffectConfig.ConfigError | PubSubError | FlowRuntimeError,
FlowRequirements
>;
export function makeFlowProcessorProgram<
Config extends ProcessorConfig,
Error = never,
FlowRequirements = never,
LayerRequirements = never,
>(
options: FlowProcessorProgramOptions<Config, Error, FlowRequirements, LayerRequirements>,
) {
return Effect.scoped(
Effect.gen(function* () {
const config = yield* (
@ -226,14 +239,16 @@ export function makeFlowProcessorProgram<
),
Layer.succeed(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
);
const dependencyLayer = options.layer?.(runtimeConfig) ??
(Layer.empty as unknown as Layer.Layer<FlowRequirements, Error, LayerRequirements>);
const providedProcessorLayer = processorLayer.pipe(
Layer.provide(dependencyLayer),
Layer.provide(runtimeLayer),
);
if (options.layer !== undefined) {
return yield* Layer.launch(
processorLayer.pipe(
Layer.provide(options.layer(runtimeConfig)),
Layer.provide(runtimeLayer),
),
);
}
return yield* Layer.launch(providedProcessorLayer);
return yield* Layer.launch(processorLayer.pipe(Layer.provide(runtimeLayer)));
}),
);
}

View file

@ -4,7 +4,7 @@
* Python reference: trustgraph-base/trustgraph/base/llm_service.py
*/
import { Context, Effect } from "effect";
import { Context, Effect, Stream } from "effect";
import * as S from "effect/Schema";
import {
errorMessage,
@ -69,7 +69,7 @@ export class Llm extends Context.Service<Llm, LlmServiceShape>()(
) {}
const llmServiceError = (operation: string, cause: unknown) =>
new LlmServiceError({
LlmServiceError.make({
operation,
message: errorMessage(cause),
});
@ -135,22 +135,19 @@ const sendStreamingResponse = Effect.fn("LlmService.sendStreamingResponse")(func
) => Effect.Effect<void, MessagingDeliveryError>;
},
) {
const context = yield* Effect.context<never>();
yield* Effect.tryPromise({
try: async () => {
for await (const chunk of llm.generateContentStream(
msg.system,
msg.prompt,
msg.model,
msg.temperature,
)) {
await Effect.runPromiseWith(context)(
responseProducer.send(requestId, chunkToResponse(chunk)),
);
}
},
catch: (cause) => llmServiceError("generate-content-stream", cause),
});
yield* Stream.fromAsyncIterable(
llm.generateContentStream(
msg.system,
msg.prompt,
msg.model,
msg.temperature,
),
(cause) => llmServiceError("generate-content-stream", cause),
).pipe(
Stream.runForEach((chunk) =>
responseProducer.send(requestId, chunkToResponse(chunk)),
),
);
});
const onLlmRequest = Effect.fn("LlmService.onRequest")(function* (
@ -168,16 +165,22 @@ const onLlmRequest = Effect.fn("LlmService.onRequest")(function* (
if (msg.streaming === true && llm.supportsStreaming()) {
yield* sendStreamingResponse(llm, requestId, msg, responseProducer).pipe(
Effect.catch((error) =>
Effect.logError("[LlmService] Error processing streaming request", {
error: error.message,
operation: error.operation,
}).pipe(
Effect.flatMap(() =>
responseProducer.send(requestId, llmErrorResponse(error)),
Effect.catchTags({
LlmServiceError: (error) =>
Effect.logError("[LlmService] Error processing streaming request", {
error: error.message,
operation: error.operation,
}).pipe(
Effect.flatMap(() =>
responseProducer.send(requestId, llmErrorResponse(error)),
),
),
),
),
MessagingDeliveryError: (error) =>
Effect.logError("[LlmService] Error sending streaming response", {
error: error.message,
operation: error.operation,
}),
}),
);
return;
}

View file

@ -62,14 +62,8 @@ export function makeConsumerSpec<T, E = never, R = never>(
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
const effect = addEffect(flow as Flow<R>, definition) as Effect.Effect<
void,
PubSubError,
SpecRuntimeRequirements
>;
await flow.runInCompatibilityScope(effect, pubsub);
},
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
};
}

View file

@ -20,8 +20,6 @@ export function makeParameterSpec(name: string): ParameterSpec {
return {
name,
addEffect,
add: async (flow, _pubsub, definition) => {
await Effect.runPromise(addEffect(flow, definition));
},
add: (flow, _pubsub, definition) => Effect.runPromise(addEffect(flow, definition)),
};
}

View file

@ -9,7 +9,6 @@ import type { Spec } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
ProducerFactory,
type EffectProducer,
} from "../messaging/runtime.js";
declare const ProducerSpecType: unique symbol;
@ -26,14 +25,13 @@ export function makeProducerSpec<T>(name: string): ProducerSpec<T> {
const topic = definition.topics?.[name] ?? name;
const factory = yield* ProducerFactory;
const producer = yield* factory.make<T>({ topic });
flow.registerProducer(name, producer as EffectProducer<unknown>);
flow.registerProducer(name, producer);
});
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
await flow.runInCompatibilityScope(addEffect(flow, definition), pubsub);
},
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
};
}

View file

@ -12,7 +12,6 @@ import type { Spec } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
RequestResponseFactory,
type EffectRequestResponse,
} from "../messaging/runtime.js";
declare const RequestResponseSpecType: unique symbol;
@ -41,14 +40,13 @@ export function makeRequestResponseSpec<TReq, TRes>(
responseTopic,
subscription: `${flow.processorId}-${flow.name}-${name}`,
});
flow.registerRequestor(name, requestor as EffectRequestResponse<unknown, unknown>);
flow.registerRequestor(name, requestor);
});
return {
name,
addEffect,
add: async (flow, pubsub, definition) => {
await flow.runInCompatibilityScope(addEffect(flow, definition), pubsub);
},
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
};
}

View file

@ -4,7 +4,7 @@
* Python reference: trustgraph-base/trustgraph/base/spec.py and siblings
*/
import type { Effect, Scope } from "effect";
import type { Context, Effect, Scope } from "effect";
import type { PubSubBackend } from "../backend/types.js";
import type {
ConsumerFactory,
@ -28,5 +28,10 @@ export interface Spec<Requirements = never> {
flow: Flow<Requirements>,
definition: FlowDefinition,
): Effect.Effect<void, SpecRuntimeError, SpecRuntimeRequirements | Requirements>;
add(flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition): Promise<void>;
add(
flow: Flow<Requirements>,
pubsub: PubSubBackend,
definition: FlowDefinition,
context: Context.Context<Requirements>,
): Promise<void>;
}