diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 2e7be673..41419c04 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 workbench theme -storage slice: +Current signal counts from `ts/packages` after the 2026-06-02 Effect AI +adapter and native request/response PubSub slices: | Signal | Count | | --- | ---: | @@ -21,9 +21,9 @@ storage slice: | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | | `Layer.succeed` | 12 | -| `Map<` | 38 | +| `Map<` | 37 | | `WebSocket` | 72 | -| `new Map` | 60 | +| `new Map` | 59 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 17 | @@ -31,7 +31,7 @@ storage slice: | `new Error` | 7 | | `new Promise` | 9 | | `JSON.parse` | 4 | -| `localStorage` | 9 | +| `localStorage` | 11 | | `JSON.stringify` | 8 | | `setTimeout` | 3 | | `process.env` | 3 | @@ -138,6 +138,18 @@ Notes: `BrowserKeyValueStore.layerLocalStorage`; the first-paint host script reads that JSON-encoded key before React mounts and falls back to `tg-theme` only for legacy installs. +- The Effect AI `LanguageModel` adapter slice added a reusable + `makeLanguageModelProvider` bridge in text-completion common code. It maps + `generateText` responses to `LlmResult`, maps streaming `text-delta` and + final `finish.usage` parts to TrustGraph chunks, and converts Effect AI rate + and quota failures into `TooManyRequestsError`. No concrete provider has + been flipped yet. +- The native request/response PubSub slice removed the local + `Map` response subscriber fanout in + `makeEffectRequestResponseFromPubSub`. Response dispatch now publishes + `{ id, value }` envelopes through native `effect/PubSub`, and each request + uses a scoped `PubSub.Subscription` plus `Stream.fromSubscription` to wait + for its matching response. - A focused broker-backend scout found no remaining P0 broker runtime rewrite after the producer, NATS, consumer concurrency, rate-limit, and request-response stop slices. `PubSubBackend` remains an intentional @@ -1358,6 +1370,38 @@ Notes: - `cd ts && bun run test` - `cd ts && bun run lint` +### 2026-06-02: Effect AI LanguageModel Adapter Slice + +- Status: migrated and package-verified. +- Completed: + - Added `makeLanguageModelProvider`, a bridge from + `effect/unstable/ai/LanguageModel` into the existing TrustGraph + `LlmProvider` contract. + - Covered non-streaming text/token mapping, streaming text/final-token + mapping, and Effect AI rate/quota failure mapping with fake + `LanguageModel` tests. + - Kept concrete provider swaps deferred until provider-specific parity is + proven. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/text-completion-common.test.ts src/__tests__/text-completion-providers.test.ts` + +### 2026-06-02: Native Request/Response PubSub Fanout Slice + +- Status: migrated and package-verified. +- Completed: + - Replaced the request/response runtime's hand-managed + `Map` response fanout with native `effect/PubSub`. + - Each request subscribes before sending, consumes through + `Stream.fromSubscription`, filters by response id, and releases the + subscription at scope exit. + - Kept `PubSubBackend` as the broker boundary because Effect native PubSub is + in-process only and does not provide NATS topics, ack/nack, durable + subscriptions, schema codecs, or backend lifecycle. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts src/__tests__/request-response.test.ts src/__tests__/flow-spec-runtime.test.ts` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1442,9 +1486,9 @@ Notes: text, token counts, streaming final usage, and rate-limit mapping. The local provider layer-construction cleanup is complete; remaining provider work is adapter/parity work, not `Layer.succeed` cleanup. - - The next provider PR should add a small `effect/unstable/ai/LanguageModel` - to TrustGraph `LlmProvider` adapter and prove it with fake - `LanguageModel` parts before migrating Claude. Direct OpenAI, Azure, and + - The `effect/unstable/ai/LanguageModel` to TrustGraph `LlmProvider` adapter + baseline is complete. The next provider PR should migrate Claude through + that adapter with provider-specific parity tests. Direct OpenAI, Azure, and OpenAI-compatible swaps are no-ops until Responses-vs-Chat-Completions parity is proven. - FalkorDB scoped lifecycle is complete for triples query/store. Use the @@ -1499,6 +1543,9 @@ Notes: handles. - Treat request-response pending shutdown semantics as complete; do not flag `waitForResponse` timeout behavior for stopped runtimes. + - Treat request-response in-process fanout as complete: response routing now + uses native `effect/PubSub` subscriptions instead of a hand-managed + subscriber map. - Treat the legacy consumer facade as a completed compatibility wrapper over `makeEffectConsumerFromPubSub`; do not flag blocking `start()` semantics. @@ -1510,13 +1557,9 @@ Notes: - `effect/unstable/ai/LanguageModel`, `effect/unstable/ai/EmbeddingModel`, Effect AI OpenAI/Anthropic provider layers. - Rewrite shape: - - Add an Effect AI `LanguageModel` to `LlmProvider` adapter beside the - current `LlmProvider` contract before flipping any public provider - interface. - - Prove `LlmResult`, streaming `text-delta` plus final `finish.usage`, - `AiError.RateLimitError` mapping, and missing-token config behavior with - fake Effect `LanguageModel` tests. - - Claude is the first plausible provider migration after the adapter. + - Adapter baseline is complete: `makeLanguageModelProvider` bridges + `LanguageModel` into `LlmProvider`. + - Claude is the first plausible provider migration through the adapter. - Do not directly swap OpenAI, Azure, or OpenAI-compatible providers yet: current TrustGraph code uses Chat Completions/local-server semantics while `@effect/ai-openai` is Responses API backed. @@ -1560,7 +1603,7 @@ Notes: ## Recommended PR Order -1. Effect AI `LanguageModel` to `LlmProvider` adapter, then Claude parity. +1. Claude provider parity through the Effect AI `LanguageModel` adapter. 2. MCP Effect stdio parity and canonicalization. ## No-Op Rules diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index f65ab0c1..d9f4c3b8 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -322,7 +322,7 @@ describe("Effect-native messaging runtime", () => { ); it.effect( - "routes request-response replies through an Effect queue", + "routes request-response replies through Effect PubSub", Effect.fnUntraced(function* () { const responseConsumer = new ScriptedConsumer(); const backend = new RuntimeBackend( diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index 8cf71959..b1ad12f7 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -3,7 +3,20 @@ */ import { randomUUID } from "node:crypto"; -import { Context, Deferred, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect"; +import { + Context, + Deferred, + Duration, + Effect, + Fiber, + Layer, + PubSub as EffectPubSub, + Ref, + Result, + Schedule, + Scope, + Stream, +} from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { @@ -121,6 +134,11 @@ export interface FlowRuntimeService { ) => Effect.Effect; } +interface ResponseEnvelope { + readonly id: string; + readonly value: T; +} + export class ProducerFactory extends Context.Service()( "@trustgraph/base/messaging/runtime/ProducerFactory", ) {} @@ -395,7 +413,7 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub const dispatchResponseLoop = ( backend: BackendConsumer, responseTopic: string, - subscribers: Map>, + responses: EffectPubSub.PubSub>, config: MessagingRuntimeConfig, ): Effect.Effect => Effect.whileLoop({ @@ -408,10 +426,12 @@ const dispatchResponseLoop = ( } const id = message.properties().id; - const queue = id === undefined ? undefined : subscribers.get(id); return Effect.gen(function* () { - if (queue !== undefined) { - yield* Queue.offer(queue, message.value()); + if (id !== undefined) { + yield* EffectPubSub.publish(responses, { + id, + value: message.value(), + }); } yield* acknowledgeMessage(backend, message, responseTopic); }); @@ -427,19 +447,24 @@ const dispatchResponseLoop = ( }); const waitForResponse = Effect.fn("waitForResponse")(function* ( - queue: Queue.Queue, + subscription: EffectPubSub.Subscription>, + id: string, options: EffectRequestOptions | undefined, ) { - const response = yield* Stream.fromQueue(queue).pipe( + const response = yield* Stream.fromSubscription(subscription).pipe( Stream.filterMapEffect((candidate) => { - if (options?.recipient === undefined) { - return Effect.succeed(Result.succeed(candidate)); + if (candidate.id !== id) { + return Effect.succeed(Result.fail(undefined)); } - return options.recipient(candidate).pipe( + if (options?.recipient === undefined) { + return Effect.succeed(Result.succeed(candidate.value)); + } + + return options.recipient(candidate.value).pipe( Effect.map((complete) => complete - ? Result.succeed(candidate) + ? Result.succeed(candidate.value) : Result.fail(undefined) ), ); @@ -475,9 +500,9 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR ...(options.responseSchema === undefined ? {} : { schema: options.responseSchema }), }; const backend = yield* pubsub.createConsumer(createOptions); - const subscribers = new Map>(); + const responses = yield* EffectPubSub.unbounded>(); const stoppedSignal = yield* Deferred.make(); - const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped); + const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, responses, config).pipe(Effect.forkScoped); let stopped = false; const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { @@ -487,6 +512,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR stoppedSignal, messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"), ).pipe(Effect.ignore); + yield* EffectPubSub.shutdown(responses).pipe(Effect.ignore); yield* Fiber.interrupt(fiber); yield* producer.close; yield* closeConsumerBackend(backend, options.responseTopic, options.subscription); @@ -510,33 +536,19 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR const id = randomUUID(); const timeoutMs = requestOptions?.timeoutMs ?? config.requestTimeoutMs; - return Effect.acquireUseRelease( - Queue.unbounded().pipe( - Effect.tap((queue) => - Effect.sync(() => { - subscribers.set(id, queue); - }), - ), - ), - (queue) => - Effect.gen(function* () { - yield* producer.send(id, request); - const result = yield* waitForResponse(queue, requestOptions).pipe( - Effect.raceFirst(Deferred.await(stoppedSignal)), - Effect.timeoutOption(Duration.millis(timeoutMs)), - ); - return yield* O.match(result, { - onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)), - onSome: Effect.succeed, - }); - }), - (queue) => - Effect.sync(() => { - subscribers.delete(id); - }).pipe( - Effect.flatMap(() => Queue.shutdown(queue)), - Effect.ignore, - ), + return Effect.scoped( + Effect.gen(function* () { + const subscription = yield* EffectPubSub.subscribe(responses); + yield* producer.send(id, request); + const result = yield* waitForResponse(subscription, id, requestOptions).pipe( + Effect.raceFirst(Deferred.await(stoppedSignal)), + Effect.timeoutOption(Duration.millis(timeoutMs)), + ); + return yield* O.match(result, { + onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)), + onSome: Effect.succeed, + }); + }), ); }, stop: stop(), diff --git a/ts/packages/flow/src/__tests__/text-completion-common.test.ts b/ts/packages/flow/src/__tests__/text-completion-common.test.ts index 967e77aa..8cc41e10 100644 --- a/ts/packages/flow/src/__tests__/text-completion-common.test.ts +++ b/ts/packages/flow/src/__tests__/text-completion-common.test.ts @@ -1,8 +1,10 @@ import { describe, expect, it } from "@effect/vitest"; import type { LlmChunk } from "@trustgraph/base"; -import { Effect, Stream } from "effect"; +import { Effect, Layer, ManagedRuntime, Stream } from "effect"; +import { AiError, LanguageModel } from "effect/unstable/ai"; import { llmStreamPart, + makeLanguageModelProvider, providerRuntimeError, providerStatusError, streamTextCompletionChunks, @@ -10,6 +12,36 @@ import { toAsyncGenerator, } from "../model/text-completion/common.js"; +const languageModelRuntime = ManagedRuntime.make(Layer.empty); + +const usage = (inputTokens: number, outputTokens: number) => ({ + inputTokens: { + uncached: undefined, + total: inputTokens, + cacheRead: undefined, + cacheWrite: undefined, + }, + outputTokens: { + total: outputTokens, + text: undefined, + reasoning: undefined, + }, +}); + +const finishPart = (inputTokens: number, outputTokens: number) => ({ + type: "finish", + reason: "stop", + usage: usage(inputTokens, outputTokens), + response: undefined, +}); + +const aiError = (reason: AiError.AiErrorReason) => + new AiError.AiError({ + module: "FakeLanguageModel", + method: "generateText", + reason, + }); + const emptyChunkIterator = (): AsyncIterable => ({ [Symbol.asyncIterator]: () => ({ next: () => Promise.resolve({ done: true, value: undefined }), @@ -84,4 +116,107 @@ describe("text completion common helpers", () => { expect(textFromContent([{ text: "a" }, { text: "b" }, { wrong: "skip" }])).toBe("ab"); expect(textFromContent([{ text: 1 }])).toBe(""); }); + + it("adapts Effect LanguageModel generateText responses to LlmProvider results", async () => { + const provider = makeLanguageModelProvider({ + provider: "FakeLanguageModel", + defaultModel: "fake-model", + defaultTemperature: 0.1, + runtime: languageModelRuntime, + makeLanguageModel: ({ model, temperature }) => + LanguageModel.make({ + generateText: () => + Effect.succeed([ + { type: "text", text: `model=${model};temperature=${temperature}` }, + finishPart(11, 7), + ]), + streamText: () => Stream.empty, + }), + }); + + await expect(provider.generateContent("system", "prompt", "override-model", 0.4)).resolves.toEqual({ + text: "model=override-model;temperature=0.4", + inToken: 11, + outToken: 7, + model: "override-model", + }); + }); + + it("adapts Effect LanguageModel stream parts to TrustGraph chunks", async () => { + const provider = makeLanguageModelProvider({ + provider: "FakeLanguageModel", + defaultModel: "fake-stream-model", + defaultTemperature: 0, + runtime: languageModelRuntime, + makeLanguageModel: () => + LanguageModel.make({ + generateText: () => + Effect.succeed([ + { type: "text", text: "unused" }, + finishPart(1, 1), + ]), + streamText: () => + Stream.fromArray([ + { type: "text-delta", id: "part-1", delta: "hel" }, + { type: "text-delta", id: "part-1", delta: "lo" }, + finishPart(13, 8), + ]), + }), + }); + + const chunks: Array = []; + for await (const chunk of provider.generateContentStream("system", "prompt")) { + chunks.push(chunk); + } + + expect(chunks).toEqual([ + { + text: "hel", + inToken: null, + outToken: null, + model: "fake-stream-model", + isFinal: false, + }, + { + text: "lo", + inToken: null, + outToken: null, + model: "fake-stream-model", + isFinal: false, + }, + { + text: "", + inToken: 13, + outToken: 8, + model: "fake-stream-model", + isFinal: true, + }, + ]); + }); + + it("maps Effect AI rate and quota failures to TrustGraph retry errors", async () => { + const reasons = [ + new AiError.RateLimitError({}), + new AiError.QuotaExhaustedError({}), + ]; + + for (const reason of reasons) { + const provider = makeLanguageModelProvider({ + provider: "FakeLanguageModel", + defaultModel: "fake-model", + defaultTemperature: 0, + runtime: languageModelRuntime, + makeLanguageModel: () => + LanguageModel.make({ + generateText: () => Effect.fail(aiError(reason)), + streamText: () => Stream.fail(aiError(reason)), + }), + }); + + await expect(provider.generateContent("system", "prompt")).rejects.toMatchObject({ + _tag: "TooManyRequestsError", + message: "Rate limit exceeded", + }); + } + }); }); diff --git a/ts/packages/flow/src/model/text-completion/common.ts b/ts/packages/flow/src/model/text-completion/common.ts index 7b52cd37..88d0dc44 100644 --- a/ts/packages/flow/src/model/text-completion/common.ts +++ b/ts/packages/flow/src/model/text-completion/common.ts @@ -4,12 +4,14 @@ import { errorMessage, makeLlmServiceShape, type LlmChunk, + type LlmResult, type LlmProvider, } from "@trustgraph/base"; -import { Config, Effect, Layer, Ref, Result, Stream } from "effect"; +import { Config, Effect, Layer, ManagedRuntime, Ref, Result, Stream } from "effect"; import * as O from "effect/Option"; import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; +import { AiError, LanguageModel, Prompt, Response } from "effect/unstable/ai"; export class TextCompletionConfigError extends S.TaggedErrorClass()( "TextCompletionConfigError", @@ -32,6 +34,21 @@ export type TextCompletionRuntimeError = | TextCompletionProviderError | TooManyRequestsError; +export interface LanguageModelProviderRequest { + readonly model: string; + readonly temperature: number; +} + +export interface LanguageModelProviderOptions { + readonly provider: string; + readonly defaultModel: string; + readonly defaultTemperature: number; + readonly runtime: ManagedRuntime.ManagedRuntime; + readonly makeLanguageModel: ( + request: LanguageModelProviderRequest, + ) => Effect.Effect; +} + export const makeTextCompletionLayer = ( provider: Effect.Effect, ): Layer.Layer => @@ -83,6 +100,33 @@ const textChunk = (model: string, text: string): LlmChunk => ({ isFinal: false, }); +const effectAiProviderError = ( + provider: string, + error: unknown, +): TextCompletionRuntimeError => { + if ( + AiError.isAiError(error) && + (error.reason._tag === "RateLimitError" || error.reason._tag === "QuotaExhaustedError") + ) { + return TooManyRequestsError.make({ message: "Rate limit exceeded" }); + } + return providerRuntimeError(provider, error); +}; + +const usageInputTokens = (usage: Response.Usage): number => + usage.inputTokens.total ?? 0; + +const usageOutputTokens = (usage: Response.Usage): number => + usage.outputTokens.total ?? 0; + +const languageModelPrompt = ( + system: string, + prompt: string, +): Prompt.RawInput => [ + { role: "system", content: system }, + { role: "user", content: [{ type: "text", text: prompt }] }, +]; + const contentPartText = (part: unknown): O.Option => Predicate.isObject(part) && Predicate.hasProperty(part, "text") && @@ -200,6 +244,105 @@ export const providerStatusError = ( : providerRuntimeError(provider, error); }; +const languageModelResult = ( + response: LanguageModel.GenerateTextResponse<{}>, + model: string, +): LlmResult => ({ + text: response.text, + inToken: usageInputTokens(response.usage), + outToken: usageOutputTokens(response.usage), + model, +}); + +const languageModelStreamChunk = ( + provider: string, + model: string, + part: Response.StreamPart<{}>, +): Effect.Effect, TextCompletionRuntimeError> => { + switch (part.type) { + case "text-delta": + return Effect.succeed( + part.delta.length > 0 + ? Result.succeed(textChunk(model, part.delta)) + : Result.fail(undefined), + ); + case "finish": + return Effect.succeed( + Result.succeed( + finalChunk(model, { + inToken: usageInputTokens(part.usage), + outToken: usageOutputTokens(part.usage), + }), + ), + ); + case "error": + return Effect.fail(effectAiProviderError(provider, part.error)); + default: + return Effect.succeed(Result.fail(undefined)); + } +}; + +const runLanguageModelStream = ( + runtime: ManagedRuntime.ManagedRuntime, + stream: Stream.Stream, +): AsyncIterable => ({ + [Symbol.asyncIterator]: () => { + const iterator = runtime.context().then((context) => + Stream.toAsyncIterableWith(stream, context)[Symbol.asyncIterator]() + ); + return { + next: () => iterator.then((current) => current.next()), + }; + }, +}); + +export const makeLanguageModelProvider = ( + options: LanguageModelProviderOptions, +): LlmProvider => ({ + generateContent: (system, prompt, model, temperature) => { + const modelName = model ?? options.defaultModel; + const temp = temperature ?? options.defaultTemperature; + return options.runtime.runPromise( + Effect.gen(function* () { + const languageModel = yield* options.makeLanguageModel({ + model: modelName, + temperature: temp, + }); + const response = yield* languageModel.generateText({ + prompt: languageModelPrompt(system, prompt), + }).pipe( + Effect.mapError((error) => effectAiProviderError(options.provider, error)), + ); + return languageModelResult(response, modelName); + }), + ); + }, + supportsStreaming: () => true, + generateContentStream: (system, prompt, model, temperature) => { + const modelName = model ?? options.defaultModel; + const temp = temperature ?? options.defaultTemperature; + const stream = Stream.unwrap( + Effect.gen(function* () { + const languageModel = yield* options.makeLanguageModel({ + model: modelName, + temperature: temp, + }); + return languageModel.streamText({ + prompt: languageModelPrompt(system, prompt), + }).pipe( + Stream.mapError((error) => effectAiProviderError(options.provider, error)), + Stream.filterMapEffect((part) => + languageModelStreamChunk(options.provider, modelName, part) + ), + ); + }), + ); + return toAsyncGenerator(runLanguageModelStream(options.runtime, stream), (error) => + effectAiProviderError(options.provider, error) + ); + }, +}); + export const toAsyncGenerator = ( iterable: AsyncIterable, mapError: (error: unknown) => TextCompletionRuntimeError,