Use Effect primitives for AI and response fanout

This commit is contained in:
elpresidank 2026-06-02 08:26:50 -05:00
parent 8f47456a4b
commit 24a2447cc3
5 changed files with 392 additions and 59 deletions

View file

@ -322,7 +322,7 @@ describe("Effect-native messaging runtime", () => {
);
it.effect(
"routes request-response replies through an Effect queue",
"routes request-response replies through Effect PubSub",
Effect.fnUntraced(function* () {
const responseConsumer = new ScriptedConsumer<string>();
const backend = new RuntimeBackend(

View file

@ -3,7 +3,20 @@
*/
import { randomUUID } from "node:crypto";
import { Context, Deferred, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect";
import {
Context,
Deferred,
Duration,
Effect,
Fiber,
Layer,
PubSub as EffectPubSub,
Ref,
Result,
Schedule,
Scope,
Stream,
} from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import type {
@ -121,6 +134,11 @@ export interface FlowRuntimeService {
) => Effect.Effect<void, FlowRuntimeError, SpecRuntimeRequirements | Requirements>;
}
interface ResponseEnvelope<T> {
readonly id: string;
readonly value: T;
}
export class ProducerFactory extends Context.Service<ProducerFactory, ProducerFactoryService>()(
"@trustgraph/base/messaging/runtime/ProducerFactory",
) {}
@ -395,7 +413,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub
const dispatchResponseLoop = <T>(
backend: BackendConsumer<T>,
responseTopic: string,
subscribers: Map<string, Queue.Queue<T>>,
responses: EffectPubSub.PubSub<ResponseEnvelope<T>>,
config: MessagingRuntimeConfig,
): Effect.Effect<void> =>
Effect.whileLoop({
@ -408,10 +426,12 @@ const dispatchResponseLoop = <T>(
}
const id = message.properties().id;
const queue = id === undefined ? undefined : subscribers.get(id);
return Effect.gen(function* () {
if (queue !== undefined) {
yield* Queue.offer(queue, message.value());
if (id !== undefined) {
yield* EffectPubSub.publish(responses, {
id,
value: message.value(),
});
}
yield* acknowledgeMessage(backend, message, responseTopic);
});
@ -427,19 +447,24 @@ const dispatchResponseLoop = <T>(
});
const waitForResponse = Effect.fn("waitForResponse")(function* <TRes, E, R>(
queue: Queue.Queue<TRes>,
subscription: EffectPubSub.Subscription<ResponseEnvelope<TRes>>,
id: string,
options: EffectRequestOptions<TRes, E, R> | undefined,
) {
const response = yield* Stream.fromQueue(queue).pipe(
const response = yield* Stream.fromSubscription(subscription).pipe(
Stream.filterMapEffect((candidate) => {
if (options?.recipient === undefined) {
return Effect.succeed(Result.succeed(candidate));
if (candidate.id !== id) {
return Effect.succeed(Result.fail(undefined));
}
return options.recipient(candidate).pipe(
if (options?.recipient === undefined) {
return Effect.succeed(Result.succeed(candidate.value));
}
return options.recipient(candidate.value).pipe(
Effect.map((complete) =>
complete
? Result.succeed(candidate)
? Result.succeed(candidate.value)
: Result.fail(undefined)
),
);
@ -475,9 +500,9 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
...(options.responseSchema === undefined ? {} : { schema: options.responseSchema }),
};
const backend = yield* pubsub.createConsumer<TRes>(createOptions);
const subscribers = new Map<string, Queue.Queue<TRes>>();
const responses = yield* EffectPubSub.unbounded<ResponseEnvelope<TRes>>();
const stoppedSignal = yield* Deferred.make<never, MessagingLifecycleError>();
const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped);
const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, responses, config).pipe(Effect.forkScoped);
let stopped = false;
const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () {
@ -487,6 +512,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
stoppedSignal,
messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"),
).pipe(Effect.ignore);
yield* EffectPubSub.shutdown(responses).pipe(Effect.ignore);
yield* Fiber.interrupt(fiber);
yield* producer.close;
yield* closeConsumerBackend(backend, options.responseTopic, options.subscription);
@ -510,33 +536,19 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
const id = randomUUID();
const timeoutMs = requestOptions?.timeoutMs ?? config.requestTimeoutMs;
return Effect.acquireUseRelease(
Queue.unbounded<TRes>().pipe(
Effect.tap((queue) =>
Effect.sync(() => {
subscribers.set(id, queue);
}),
),
),
(queue) =>
Effect.gen(function* () {
yield* producer.send(id, request);
const result = yield* waitForResponse(queue, requestOptions).pipe(
Effect.raceFirst(Deferred.await(stoppedSignal)),
Effect.timeoutOption(Duration.millis(timeoutMs)),
);
return yield* O.match(result, {
onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)),
onSome: Effect.succeed,
});
}),
(queue) =>
Effect.sync(() => {
subscribers.delete(id);
}).pipe(
Effect.flatMap(() => Queue.shutdown(queue)),
Effect.ignore,
),
return Effect.scoped(
Effect.gen(function* () {
const subscription = yield* EffectPubSub.subscribe(responses);
yield* producer.send(id, request);
const result = yield* waitForResponse(subscription, id, requestOptions).pipe(
Effect.raceFirst(Deferred.await(stoppedSignal)),
Effect.timeoutOption(Duration.millis(timeoutMs)),
);
return yield* O.match(result, {
onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)),
onSome: Effect.succeed,
});
}),
);
},
stop: stop(),

View file

@ -1,8 +1,10 @@
import { describe, expect, it } from "@effect/vitest";
import type { LlmChunk } from "@trustgraph/base";
import { Effect, Stream } from "effect";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import { AiError, LanguageModel } from "effect/unstable/ai";
import {
llmStreamPart,
makeLanguageModelProvider,
providerRuntimeError,
providerStatusError,
streamTextCompletionChunks,
@ -10,6 +12,36 @@ import {
toAsyncGenerator,
} from "../model/text-completion/common.js";
const languageModelRuntime = ManagedRuntime.make(Layer.empty);
const usage = (inputTokens: number, outputTokens: number) => ({
inputTokens: {
uncached: undefined,
total: inputTokens,
cacheRead: undefined,
cacheWrite: undefined,
},
outputTokens: {
total: outputTokens,
text: undefined,
reasoning: undefined,
},
});
const finishPart = (inputTokens: number, outputTokens: number) => ({
type: "finish",
reason: "stop",
usage: usage(inputTokens, outputTokens),
response: undefined,
});
const aiError = (reason: AiError.AiErrorReason) =>
new AiError.AiError({
module: "FakeLanguageModel",
method: "generateText",
reason,
});
const emptyChunkIterator = (): AsyncIterable<LlmChunk> => ({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
@ -84,4 +116,107 @@ describe("text completion common helpers", () => {
expect(textFromContent([{ text: "a" }, { text: "b" }, { wrong: "skip" }])).toBe("ab");
expect(textFromContent([{ text: 1 }])).toBe("");
});
it("adapts Effect LanguageModel generateText responses to LlmProvider results", async () => {
const provider = makeLanguageModelProvider({
provider: "FakeLanguageModel",
defaultModel: "fake-model",
defaultTemperature: 0.1,
runtime: languageModelRuntime,
makeLanguageModel: ({ model, temperature }) =>
LanguageModel.make({
generateText: () =>
Effect.succeed([
{ type: "text", text: `model=${model};temperature=${temperature}` },
finishPart(11, 7),
]),
streamText: () => Stream.empty,
}),
});
await expect(provider.generateContent("system", "prompt", "override-model", 0.4)).resolves.toEqual({
text: "model=override-model;temperature=0.4",
inToken: 11,
outToken: 7,
model: "override-model",
});
});
it("adapts Effect LanguageModel stream parts to TrustGraph chunks", async () => {
const provider = makeLanguageModelProvider({
provider: "FakeLanguageModel",
defaultModel: "fake-stream-model",
defaultTemperature: 0,
runtime: languageModelRuntime,
makeLanguageModel: () =>
LanguageModel.make({
generateText: () =>
Effect.succeed([
{ type: "text", text: "unused" },
finishPart(1, 1),
]),
streamText: () =>
Stream.fromArray([
{ type: "text-delta", id: "part-1", delta: "hel" },
{ type: "text-delta", id: "part-1", delta: "lo" },
finishPart(13, 8),
]),
}),
});
const chunks: Array<LlmChunk> = [];
for await (const chunk of provider.generateContentStream("system", "prompt")) {
chunks.push(chunk);
}
expect(chunks).toEqual([
{
text: "hel",
inToken: null,
outToken: null,
model: "fake-stream-model",
isFinal: false,
},
{
text: "lo",
inToken: null,
outToken: null,
model: "fake-stream-model",
isFinal: false,
},
{
text: "",
inToken: 13,
outToken: 8,
model: "fake-stream-model",
isFinal: true,
},
]);
});
it("maps Effect AI rate and quota failures to TrustGraph retry errors", async () => {
const reasons = [
new AiError.RateLimitError({}),
new AiError.QuotaExhaustedError({}),
];
for (const reason of reasons) {
const provider = makeLanguageModelProvider({
provider: "FakeLanguageModel",
defaultModel: "fake-model",
defaultTemperature: 0,
runtime: languageModelRuntime,
makeLanguageModel: () =>
LanguageModel.make({
generateText: () => Effect.fail(aiError(reason)),
streamText: () => Stream.fail(aiError(reason)),
}),
});
await expect(provider.generateContent("system", "prompt")).rejects.toMatchObject({
_tag: "TooManyRequestsError",
message: "Rate limit exceeded",
});
}
});
});

View file

@ -4,12 +4,14 @@ import {
errorMessage,
makeLlmServiceShape,
type LlmChunk,
type LlmResult,
type LlmProvider,
} from "@trustgraph/base";
import { Config, Effect, Layer, Ref, Result, Stream } from "effect";
import { Config, Effect, Layer, ManagedRuntime, Ref, Result, Stream } from "effect";
import * as O from "effect/Option";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
import { AiError, LanguageModel, Prompt, Response } from "effect/unstable/ai";
export class TextCompletionConfigError extends S.TaggedErrorClass<TextCompletionConfigError>()(
"TextCompletionConfigError",
@ -32,6 +34,21 @@ export type TextCompletionRuntimeError =
| TextCompletionProviderError
| TooManyRequestsError;
export interface LanguageModelProviderRequest {
readonly model: string;
readonly temperature: number;
}
export interface LanguageModelProviderOptions<Requirements> {
readonly provider: string;
readonly defaultModel: string;
readonly defaultTemperature: number;
readonly runtime: ManagedRuntime.ManagedRuntime<Requirements, TextCompletionRuntimeError>;
readonly makeLanguageModel: (
request: LanguageModelProviderRequest,
) => Effect.Effect<LanguageModel.Service, TextCompletionRuntimeError, Requirements>;
}
export const makeTextCompletionLayer = <E, R>(
provider: Effect.Effect<LlmProvider, E, R>,
): Layer.Layer<Llm, E, R> =>
@ -83,6 +100,33 @@ const textChunk = (model: string, text: string): LlmChunk => ({
isFinal: false,
});
const effectAiProviderError = (
provider: string,
error: unknown,
): TextCompletionRuntimeError => {
if (
AiError.isAiError(error) &&
(error.reason._tag === "RateLimitError" || error.reason._tag === "QuotaExhaustedError")
) {
return TooManyRequestsError.make({ message: "Rate limit exceeded" });
}
return providerRuntimeError(provider, error);
};
const usageInputTokens = (usage: Response.Usage): number =>
usage.inputTokens.total ?? 0;
const usageOutputTokens = (usage: Response.Usage): number =>
usage.outputTokens.total ?? 0;
const languageModelPrompt = (
system: string,
prompt: string,
): Prompt.RawInput => [
{ role: "system", content: system },
{ role: "user", content: [{ type: "text", text: prompt }] },
];
const contentPartText = (part: unknown): O.Option<string> =>
Predicate.isObject(part) &&
Predicate.hasProperty(part, "text") &&
@ -200,6 +244,105 @@ export const providerStatusError = (
: providerRuntimeError(provider, error);
};
const languageModelResult = (
response: LanguageModel.GenerateTextResponse<{}>,
model: string,
): LlmResult => ({
text: response.text,
inToken: usageInputTokens(response.usage),
outToken: usageOutputTokens(response.usage),
model,
});
const languageModelStreamChunk = (
provider: string,
model: string,
part: Response.StreamPart<{}>,
): Effect.Effect<Result.Result<LlmChunk, undefined>, TextCompletionRuntimeError> => {
switch (part.type) {
case "text-delta":
return Effect.succeed(
part.delta.length > 0
? Result.succeed(textChunk(model, part.delta))
: Result.fail(undefined),
);
case "finish":
return Effect.succeed(
Result.succeed(
finalChunk(model, {
inToken: usageInputTokens(part.usage),
outToken: usageOutputTokens(part.usage),
}),
),
);
case "error":
return Effect.fail(effectAiProviderError(provider, part.error));
default:
return Effect.succeed(Result.fail(undefined));
}
};
const runLanguageModelStream = <RuntimeRequirements, StreamRequirements extends RuntimeRequirements>(
runtime: ManagedRuntime.ManagedRuntime<RuntimeRequirements, TextCompletionRuntimeError>,
stream: Stream.Stream<LlmChunk, TextCompletionRuntimeError, StreamRequirements>,
): AsyncIterable<LlmChunk> => ({
[Symbol.asyncIterator]: () => {
const iterator = runtime.context().then((context) =>
Stream.toAsyncIterableWith(stream, context)[Symbol.asyncIterator]()
);
return {
next: () => iterator.then((current) => current.next()),
};
},
});
export const makeLanguageModelProvider = <Requirements>(
options: LanguageModelProviderOptions<Requirements>,
): LlmProvider => ({
generateContent: (system, prompt, model, temperature) => {
const modelName = model ?? options.defaultModel;
const temp = temperature ?? options.defaultTemperature;
return options.runtime.runPromise(
Effect.gen(function* () {
const languageModel = yield* options.makeLanguageModel({
model: modelName,
temperature: temp,
});
const response = yield* languageModel.generateText({
prompt: languageModelPrompt(system, prompt),
}).pipe(
Effect.mapError((error) => effectAiProviderError(options.provider, error)),
);
return languageModelResult(response, modelName);
}),
);
},
supportsStreaming: () => true,
generateContentStream: (system, prompt, model, temperature) => {
const modelName = model ?? options.defaultModel;
const temp = temperature ?? options.defaultTemperature;
const stream = Stream.unwrap(
Effect.gen(function* () {
const languageModel = yield* options.makeLanguageModel({
model: modelName,
temperature: temp,
});
return languageModel.streamText({
prompt: languageModelPrompt(system, prompt),
}).pipe(
Stream.mapError((error) => effectAiProviderError(options.provider, error)),
Stream.filterMapEffect((part) =>
languageModelStreamChunk(options.provider, modelName, part)
),
);
}),
);
return toAsyncGenerator(runLanguageModelStream(options.runtime, stream), (error) =>
effectAiProviderError(options.provider, error)
);
},
});
export const toAsyncGenerator = (
iterable: AsyncIterable<LlmChunk>,
mapError: (error: unknown) => TextCompletionRuntimeError,