diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 28371edf..89fd4302 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,15 +12,15 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 client streaming -callback Effect boundary slice: +Current signal counts from `ts/packages` after the 2026-06-02 text completion +provider effectful layer slice: | Signal | Count | | --- | ---: | | `Effect.runPromise` | 172 | | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | -| `Layer.succeed` | 18 | +| `Layer.succeed` | 12 | | `Map<` | 88 | | `WebSocket` | 74 | | `new Map` | 60 | @@ -151,6 +151,12 @@ Notes: centralizing legacy callback request failures in `runLegacyStreamingRequest`. The public callback facades still return/ignore Promises where required, but failure mapping now uses `Effect.tryPromise` and `Effect.catch`. +- The text completion provider effectful layer slice dropped six + `Layer.succeed` matches by moving OpenAI, OpenAI-compatible, Azure OpenAI, + Claude, Mistral, and Ollama processor layers onto + `makeTextCompletionLayer(makeXProviderEffect(config))`. SDK construction and + config lookup now live in Effect; sync `makeXProvider` exports remain + compatibility facades. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -1065,6 +1071,33 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Text Completion Provider Effectful Layer Slice + +- Status: migrated and root-verified. +- Completed: + - Added shared `makeTextCompletionLayer` for constructing `Llm` from an + effectful `LlmProvider`. + - Added `makeOpenAIProviderEffect`, + `makeOpenAICompatibleProviderEffect`, `makeAzureOpenAIProviderEffect`, + `makeClaudeProviderEffect`, `makeMistralProviderEffect`, and + `makeOllamaProviderEffect`. + - Processor `program.layer` definitions now use `Layer.effect` via the + shared helper instead of constructing providers inside `Layer.succeed`. + - Provider object assembly is split into pure `makeXProviderFromClient` + helpers so Promise-returning provider methods remain external + compatibility facades and do not trigger `effect(runEffectInsideEffect)`. + - Added tests for explicit provider config, shared `Llm` layer provisioning, + and tagged missing-config errors. +- Verification: + - `bunx --bun vitest run src/__tests__/text-completion-providers.test.ts src/__tests__/text-completion-common.test.ts` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1124,7 +1157,9 @@ Notes: Azure currently use Chat Completions while `@effect/ai-openai` is Responses API oriented, and no installed Azure/Mistral/Ollama Effect AI provider is available. Anthropic is the closest direct provider swap, but must preserve - text, token counts, streaming final usage, and rate-limit mapping. + text, token counts, streaming final usage, and rate-limit mapping. The + local provider layer-construction cleanup is complete; remaining provider + work is adapter/parity work, not `Layer.succeed` cleanup. - FalkorDB scoped lifecycle is complete for triples query/store. Use the fakeable client/graph factory pattern from that slice for future storage client tests. @@ -1138,13 +1173,53 @@ Notes: ## Ranked Findings -### P2: Provider Layer And Effect AI Cleanup +### P0: Broker Backend Effect-Native Runtime + +- TrustGraph evidence: + - `ts/packages/base/src/backend/types.ts` + - `ts/packages/base/src/backend/nats.ts` + - `ts/packages/base/src/messaging/runtime.ts` + - `ts/packages/base/src/processor/flow-processor.ts` +- Effect primitives: + - `Layer`, `Scope`, `Stream`, `Schedule`, `Queue`, + `Effect.acquireRelease`, and `Effect.tryPromise`. +- Rewrite shape: + - Introduce an Effect-native broker service/layer with scoped NATS + acquisition and stream/schedule-based consumer loops. + - Keep `PubSubBackend` as the compatibility adapter boundary; Effect native + `PubSub` remains in-process only. +- Tests: + - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and + config-push stream tests. + +### P1: Gateway Dispatcher Ownership And Serialization + +- TrustGraph evidence: + - `ts/packages/flow/src/gateway/dispatch/manager.ts` + - `ts/packages/flow/src/gateway/dispatch/serialize.ts` + - `ts/packages/flow/src/gateway/server.ts` +- Effect primitives: + - `Layer`, `Scope`, `Effect.acquireUseRelease`, `Effect.try`, `Result.try`, + and typed dispatch errors. +- Rewrite shape: + - Track whether the dispatcher owns `PubSubBackend` so injected backends are + not closed. + - Use `Effect.acquireUseRelease` for one-shot gateway producers so producer + close runs even when send fails. + - Replace throwing gateway serialization helpers with Effect/Result-returning + helpers mapped to typed dispatch or wire errors. + - Longer term, move `createGateway` to a scoped `createGatewayEffect` while + keeping Fastify route `Effect.runPromise` calls as host boundaries. +- Tests: + - Injected pubsub is not closed, one-shot producer closes on send failure, + and malformed gateway payloads return typed dispatch errors. + +### P2: Effect AI Provider Adapter Cleanup - TrustGraph evidence: - `ts/packages/flow/src/model/text-completion/*.ts` - Effect primitives: - - `Config`, `ConfigProvider`, `Metric`, `Logger`, - `effect/unstable/ai/LanguageModel`, `effect/unstable/ai/EmbeddingModel`, + - `effect/unstable/ai/LanguageModel`, `effect/unstable/ai/EmbeddingModel`, Effect AI OpenAI/Anthropic provider layers. - Rewrite shape: - Add an Effect AI adapter layer beside the current `LlmProvider` contract @@ -1190,8 +1265,10 @@ Notes: ## Recommended PR Order -1. Provider layer and Effect AI cleanup. -2. MCP parity/deletion decision and workbench platform polish. +1. Gateway dispatcher ownership and serialization. +2. Broker backend Effect-native runtime. +3. Effect AI provider adapter cleanup. +4. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/text-completion-providers.test.ts b/ts/packages/flow/src/__tests__/text-completion-providers.test.ts new file mode 100644 index 00000000..58b33af1 --- /dev/null +++ b/ts/packages/flow/src/__tests__/text-completion-providers.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Llm } from "@trustgraph/base"; +import { ConfigProvider, Effect } from "effect"; +import { makeAzureOpenAIProviderEffect } from "../model/text-completion/azure-openai.js"; +import { makeClaudeProviderEffect } from "../model/text-completion/claude.js"; +import { makeTextCompletionLayer } from "../model/text-completion/common.js"; +import { makeMistralProviderEffect } from "../model/text-completion/mistral.js"; +import { makeOllamaProviderEffect } from "../model/text-completion/ollama.js"; +import { makeOpenAICompatibleProviderEffect } from "../model/text-completion/openai-compatible.js"; +import { makeOpenAIProviderEffect } from "../model/text-completion/openai.js"; + +const emptyConfig = ConfigProvider.layer( + ConfigProvider.fromEnv({ env: {} }), +); + +describe("text completion provider construction", () => { + it.effect( + "constructs providers from explicit config through Effect", + Effect.fnUntraced(function* () { + const providers = yield* Effect.all([ + makeOpenAIProviderEffect({ id: "openai", apiKey: "test-key" }), + makeOpenAICompatibleProviderEffect({ + id: "openai-compatible", + baseUrl: "http://localhost:1234/v1", + }), + makeAzureOpenAIProviderEffect({ + id: "azure-openai", + apiKey: "test-key", + endpoint: "https://example.openai.azure.com", + }), + makeClaudeProviderEffect({ id: "claude", apiKey: "test-key" }), + makeMistralProviderEffect({ id: "mistral", apiKey: "test-key" }), + makeOllamaProviderEffect({ + id: "ollama", + ollamaUrl: "http://localhost:11434", + }), + ]).pipe( + Effect.provide(emptyConfig), + ); + + expect(providers.map((provider) => provider.supportsStreaming())).toEqual([ + true, + true, + true, + true, + true, + true, + ]); + }), + ); + + it.effect( + "provides Llm through the shared Effect layer helper", + Effect.fnUntraced(function* () { + const llm = yield* Effect.gen(function* () { + return yield* Llm; + }).pipe( + Effect.provide( + makeTextCompletionLayer(makeOllamaProviderEffect({ + id: "ollama", + ollamaUrl: "http://localhost:11434", + })), + ), + Effect.provide(emptyConfig), + ); + + expect(llm.supportsStreaming()).toBe(true); + }), + ); + + it.effect( + "fails missing required config as a tagged config error", + Effect.fnUntraced(function* () { + const error = yield* makeOpenAIProviderEffect({ id: "openai" }).pipe( + Effect.flip, + Effect.provide(emptyConfig), + ); + + expect(error).toMatchObject({ + _tag: "TextCompletionConfigError", + provider: "OpenAI", + key: "OPENAI_TOKEN", + }); + }), + ); +}); diff --git a/ts/packages/flow/src/model/text-completion/azure-openai.ts b/ts/packages/flow/src/model/text-completion/azure-openai.ts index 143ae67c..2428f804 100644 --- a/ts/packages/flow/src/model/text-completion/azure-openai.ts +++ b/ts/packages/flow/src/model/text-completion/azure-openai.ts @@ -11,11 +11,10 @@ import { AzureOpenAI } from "openai"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -24,11 +23,13 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerStatusError, requiredString, streamTextCompletionChunks, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -79,24 +80,21 @@ const loadAzureOpenAIConfig = Effect.fn("loadAzureOpenAIConfig")(function* ( apiKey, endpoint, apiVersion, - }; + } satisfies ResolvedAzureOpenAIConfig; }); const mapAzureOpenAIError = (error: unknown): TextCompletionRuntimeError => providerStatusError("AzureOpenAI", error); -export function makeAzureOpenAIProvider(config: AzureOpenAIProcessorConfig): LlmProvider { +const makeAzureOpenAIProviderFromClient = ( + resolved: ResolvedAzureOpenAIConfig, + client: AzureOpenAI, +): LlmProvider => { const { defaultModel, defaultTemperature, maxOutput, - apiKey, - endpoint, - apiVersion, - } = Effect.runSync(loadAzureOpenAIConfig(config)) satisfies ResolvedAzureOpenAIConfig; - const client = new AzureOpenAI({ apiKey, apiVersion, endpoint }); - - Effect.runSync(Effect.log("[AzureOpenAI] LLM service initialized")); + } = resolved; return { generateContent: ( @@ -174,9 +172,31 @@ export function makeAzureOpenAIProvider(config: AzureOpenAIProcessorConfig): Llm return toAsyncGenerator(Stream.toAsyncIterable(stream), mapAzureOpenAIError); }, - }; + } satisfies LlmProvider; +}; + +export function makeAzureOpenAIProvider(config: AzureOpenAIProcessorConfig): LlmProvider { + return Effect.runSync(makeAzureOpenAIProviderEffect(config)); } +export const makeAzureOpenAIProviderEffect = Effect.fn("makeAzureOpenAIProvider")(function*( + config: AzureOpenAIProcessorConfig, +) { + const resolved = yield* loadAzureOpenAIConfig(config); + const client = yield* Effect.try({ + try: () => + new AzureOpenAI({ + apiKey: resolved.apiKey, + apiVersion: resolved.apiVersion, + endpoint: resolved.endpoint, + }), + catch: mapAzureOpenAIError, + }); + + yield* Effect.log("[AzureOpenAI] LLM service initialized"); + return makeAzureOpenAIProviderFromClient(resolved, client); +}); + export type AzureOpenAIProcessor = ReturnType; export function makeAzureOpenAIProcessor( @@ -187,14 +207,14 @@ export function makeAzureOpenAIProcessor( export const AzureOpenAIProcessor = makeAzureOpenAIProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + AzureOpenAIProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeAzureOpenAIProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeAzureOpenAIProviderEffect(config)), }); const azureOpenAITextCompletionRuntime = ManagedRuntime.make(Layer.empty); diff --git a/ts/packages/flow/src/model/text-completion/claude.ts b/ts/packages/flow/src/model/text-completion/claude.ts index b4f7ba75..e7f8e1a6 100644 --- a/ts/packages/flow/src/model/text-completion/claude.ts +++ b/ts/packages/flow/src/model/text-completion/claude.ts @@ -7,11 +7,10 @@ import Anthropic from "@anthropic-ai/sdk"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -20,11 +19,13 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerStatusError, requiredString, streamTextCompletionChunks, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -61,17 +62,15 @@ const loadClaudeConfig = Effect.fn("loadClaudeConfig")(function*(config: ClaudeP const mapClaudeError = (error: unknown): TextCompletionRuntimeError => providerStatusError("Claude", error); -export function makeClaudeProvider(config: ClaudeProcessorConfig): LlmProvider { +const makeClaudeProviderFromClient = ( + resolved: ResolvedClaudeConfig, + client: Anthropic, +): LlmProvider => { const { defaultModel, defaultTemperature, maxOutput, - apiKey, - } = Effect.runSync(loadClaudeConfig(config)) satisfies ResolvedClaudeConfig; - - const client = new Anthropic({ apiKey }); - - Effect.runSync(Effect.log("[Claude] LLM service initialized")); + } = resolved; return { generateContent: ( @@ -161,9 +160,26 @@ export function makeClaudeProvider(config: ClaudeProcessorConfig): LlmProvider { return toAsyncGenerator(Stream.toAsyncIterable(stream), mapClaudeError); }, - }; + } satisfies LlmProvider; +}; + +export function makeClaudeProvider(config: ClaudeProcessorConfig): LlmProvider { + return Effect.runSync(makeClaudeProviderEffect(config)); } +export const makeClaudeProviderEffect = Effect.fn("makeClaudeProvider")(function*( + config: ClaudeProcessorConfig, +) { + const resolved = yield* loadClaudeConfig(config); + const client = yield* Effect.try({ + try: () => new Anthropic({ apiKey: resolved.apiKey }), + catch: mapClaudeError, + }); + + yield* Effect.log("[Claude] LLM service initialized"); + return makeClaudeProviderFromClient(resolved, client); +}); + export type ClaudeProcessor = ReturnType; export function makeClaudeProcessor( @@ -174,14 +190,14 @@ export function makeClaudeProcessor( export const ClaudeProcessor = makeClaudeProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + ClaudeProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeClaudeProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeClaudeProviderEffect(config)), }); const claudeTextCompletionRuntime = ManagedRuntime.make(Layer.empty); diff --git a/ts/packages/flow/src/model/text-completion/common.ts b/ts/packages/flow/src/model/text-completion/common.ts index 84f7f44d..7b52cd37 100644 --- a/ts/packages/flow/src/model/text-completion/common.ts +++ b/ts/packages/flow/src/model/text-completion/common.ts @@ -1,9 +1,12 @@ import { + Llm, TooManyRequestsError, errorMessage, + makeLlmServiceShape, type LlmChunk, + type LlmProvider, } from "@trustgraph/base"; -import { Config, Effect, Ref, Result, Stream } from "effect"; +import { Config, Effect, Layer, Ref, Result, Stream } from "effect"; import * as O from "effect/Option"; import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; @@ -29,6 +32,17 @@ export type TextCompletionRuntimeError = | TextCompletionProviderError | TooManyRequestsError; +export const makeTextCompletionLayer = ( + provider: Effect.Effect, +): Layer.Layer => + Layer.effect(Llm)( + provider.pipe( + Effect.map((resolvedProvider) => + Llm.of(makeLlmServiceShape(resolvedProvider)) + ), + ), + ); + type StreamingTokenTotals = { readonly inToken: number; readonly outToken: number; diff --git a/ts/packages/flow/src/model/text-completion/mistral.ts b/ts/packages/flow/src/model/text-completion/mistral.ts index 383f1478..2f2bb3ec 100644 --- a/ts/packages/flow/src/model/text-completion/mistral.ts +++ b/ts/packages/flow/src/model/text-completion/mistral.ts @@ -9,11 +9,10 @@ import { Mistral } from "@mistralai/mistralai"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -22,12 +21,14 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerStatusError, requiredString, streamTextCompletionChunks, textFromContent, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -67,17 +68,15 @@ const loadMistralConfig = Effect.fn("loadMistralConfig")(function*(config: Mistr const mapMistralError = (error: unknown): TextCompletionRuntimeError => providerStatusError("Mistral", error); -export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider { +const makeMistralProviderFromClient = ( + resolved: ResolvedMistralConfig, + client: Mistral, +): LlmProvider => { const { defaultModel, defaultTemperature, maxOutput, - apiKey, - } = Effect.runSync(loadMistralConfig(config)) satisfies ResolvedMistralConfig; - - const client = new Mistral({ apiKey }); - - Effect.runSync(Effect.log("[Mistral] LLM service initialized")); + } = resolved; return { generateContent: ( @@ -153,9 +152,26 @@ export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider return toAsyncGenerator(Stream.toAsyncIterable(stream), mapMistralError); }, - }; + } satisfies LlmProvider; +}; + +export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider { + return Effect.runSync(makeMistralProviderEffect(config)); } +export const makeMistralProviderEffect = Effect.fn("makeMistralProvider")(function*( + config: MistralProcessorConfig, +) { + const resolved = yield* loadMistralConfig(config); + const client = yield* Effect.try({ + try: () => new Mistral({ apiKey: resolved.apiKey }), + catch: mapMistralError, + }); + + yield* Effect.log("[Mistral] LLM service initialized"); + return makeMistralProviderFromClient(resolved, client); +}); + export type MistralProcessor = ReturnType; export function makeMistralProcessor( @@ -166,14 +182,14 @@ export function makeMistralProcessor( export const MistralProcessor = makeMistralProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + MistralProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeMistralProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeMistralProviderEffect(config)), }); const mistralTextCompletionRuntime = ManagedRuntime.make(Layer.empty); diff --git a/ts/packages/flow/src/model/text-completion/ollama.ts b/ts/packages/flow/src/model/text-completion/ollama.ts index 8bf055ff..051ac9fd 100644 --- a/ts/packages/flow/src/model/text-completion/ollama.ts +++ b/ts/packages/flow/src/model/text-completion/ollama.ts @@ -9,11 +9,10 @@ import { Ollama } from "ollama"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -22,10 +21,12 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerRuntimeError, streamTextCompletionChunks, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -55,14 +56,11 @@ const loadOllamaConfig = Effect.fn("loadOllamaConfig")(function*(config: OllamaP const mapOllamaError = (error: unknown): TextCompletionRuntimeError => providerRuntimeError("Ollama", error); -export function makeOllamaProvider(config: OllamaProcessorConfig): LlmProvider { - const { defaultModel, host } = Effect.runSync(loadOllamaConfig(config)) satisfies ResolvedOllamaConfig; - - const client = new Ollama({ host }); - - Effect.runSync(Effect.log( - `[Ollama] LLM service initialized (host=${host}, model=${defaultModel})`, - )); +const makeOllamaProviderFromClient = ( + resolved: ResolvedOllamaConfig, + client: Ollama, +): LlmProvider => { + const { defaultModel } = resolved; return { generateContent: ( @@ -130,9 +128,28 @@ export function makeOllamaProvider(config: OllamaProcessorConfig): LlmProvider { return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOllamaError); }, - }; + } satisfies LlmProvider; +}; + +export function makeOllamaProvider(config: OllamaProcessorConfig): LlmProvider { + return Effect.runSync(makeOllamaProviderEffect(config)); } +export const makeOllamaProviderEffect = Effect.fn("makeOllamaProvider")(function*( + config: OllamaProcessorConfig, +) { + const resolved = yield* loadOllamaConfig(config); + const client = yield* Effect.try({ + try: () => new Ollama({ host: resolved.host }), + catch: mapOllamaError, + }); + + yield* Effect.log( + `[Ollama] LLM service initialized (host=${resolved.host}, model=${resolved.defaultModel})`, + ); + return makeOllamaProviderFromClient(resolved, client); +}); + export type OllamaProcessor = ReturnType; export function makeOllamaProcessor( @@ -143,14 +160,14 @@ export function makeOllamaProcessor( export const OllamaProcessor = makeOllamaProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + OllamaProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeOllamaProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeOllamaProviderEffect(config)), }); const ollamaTextCompletionRuntime = ManagedRuntime.make(Layer.empty); diff --git a/ts/packages/flow/src/model/text-completion/openai-compatible.ts b/ts/packages/flow/src/model/text-completion/openai-compatible.ts index 61e27365..07ef2d7a 100644 --- a/ts/packages/flow/src/model/text-completion/openai-compatible.ts +++ b/ts/packages/flow/src/model/text-completion/openai-compatible.ts @@ -12,11 +12,10 @@ import OpenAI from "openai"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -25,11 +24,13 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerStatusError, requiredString, streamTextCompletionChunks, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -75,20 +76,15 @@ const loadOpenAICompatibleConfig = Effect.fn("loadOpenAICompatibleConfig")(funct const mapOpenAICompatibleError = (error: unknown): TextCompletionRuntimeError => providerStatusError("OpenAI-Compatible", error); -export function makeOpenAICompatibleProvider( - config: OpenAICompatibleProcessorConfig, -): LlmProvider { +const makeOpenAICompatibleProviderFromClient = ( + resolved: ResolvedOpenAICompatibleConfig, + client: OpenAI, +): LlmProvider => { const { defaultModel, defaultTemperature, maxOutput, - apiKey, - baseURL, - } = Effect.runSync(loadOpenAICompatibleConfig(config)) satisfies ResolvedOpenAICompatibleConfig; - - const client = new OpenAI({ baseURL, apiKey }); - - Effect.runSync(Effect.log("[OpenAI-Compatible] LLM service initialized")); + } = resolved; return { generateContent: ( @@ -165,9 +161,28 @@ export function makeOpenAICompatibleProvider( return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAICompatibleError); }, - }; + } satisfies LlmProvider; +}; + +export function makeOpenAICompatibleProvider( + config: OpenAICompatibleProcessorConfig, +): LlmProvider { + return Effect.runSync(makeOpenAICompatibleProviderEffect(config)); } +export const makeOpenAICompatibleProviderEffect = Effect.fn("makeOpenAICompatibleProvider")(function*( + config: OpenAICompatibleProcessorConfig, +) { + const resolved = yield* loadOpenAICompatibleConfig(config); + const client = yield* Effect.try({ + try: () => new OpenAI({ baseURL: resolved.baseURL, apiKey: resolved.apiKey }), + catch: mapOpenAICompatibleError, + }); + + yield* Effect.log("[OpenAI-Compatible] LLM service initialized"); + return makeOpenAICompatibleProviderFromClient(resolved, client); +}); + export type OpenAICompatibleProcessor = ReturnType; export function makeOpenAICompatibleProcessor( @@ -178,14 +193,14 @@ export function makeOpenAICompatibleProcessor( export const OpenAICompatibleProcessor = makeOpenAICompatibleProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + OpenAICompatibleProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeOpenAICompatibleProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeOpenAICompatibleProviderEffect(config)), }); const openAICompatibleTextCompletionRuntime = ManagedRuntime.make(Layer.empty); diff --git a/ts/packages/flow/src/model/text-completion/openai.ts b/ts/packages/flow/src/model/text-completion/openai.ts index c93dbaa5..b36cc2d7 100644 --- a/ts/packages/flow/src/model/text-completion/openai.ts +++ b/ts/packages/flow/src/model/text-completion/openai.ts @@ -7,11 +7,10 @@ import OpenAI from "openai"; import { NodeRuntime } from "@effect/platform-node"; import { - Llm, makeLlmService, makeFlowProcessorProgram, - makeLlmServiceShape, makeLlmSpecs, + type Llm, type LlmProvider, type ProcessorConfig, type LlmResult, @@ -20,11 +19,13 @@ import { import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { llmStreamPart, + makeTextCompletionLayer, optionalStringConfig, providerStatusError, requiredString, streamTextCompletionChunks, toAsyncGenerator, + type TextCompletionConfigError, type TextCompletionRuntimeError, } from "./common.ts"; @@ -64,21 +65,15 @@ const loadOpenAIConfig = Effect.fn("loadOpenAIConfig")(function*(config: OpenAIP const mapOpenAIError = (error: unknown): TextCompletionRuntimeError => providerStatusError("OpenAI", error); -export function makeOpenAIProvider(config: OpenAIProcessorConfig): LlmProvider { +const makeOpenAIProviderFromClient = ( + resolved: ResolvedOpenAIConfig, + client: OpenAI, +): LlmProvider => { const { defaultModel, defaultTemperature, maxOutput, - apiKey, - baseURL, - } = Effect.runSync(loadOpenAIConfig(config)) satisfies ResolvedOpenAIConfig; - - const client = new OpenAI({ - apiKey, - baseURL, - }); - - Effect.runSync(Effect.log("[OpenAI] LLM service initialized")); + } = resolved; return { generateContent: ( @@ -156,9 +151,30 @@ export function makeOpenAIProvider(config: OpenAIProcessorConfig): LlmProvider { return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAIError); }, - }; + } satisfies LlmProvider; +}; + +export function makeOpenAIProvider(config: OpenAIProcessorConfig): LlmProvider { + return Effect.runSync(makeOpenAIProviderEffect(config)); } +export const makeOpenAIProviderEffect = Effect.fn("makeOpenAIProvider")(function*( + config: OpenAIProcessorConfig, +) { + const resolved = yield* loadOpenAIConfig(config); + const client = yield* Effect.try({ + try: () => + new OpenAI({ + apiKey: resolved.apiKey, + baseURL: resolved.baseURL, + }), + catch: mapOpenAIError, + }); + + yield* Effect.log("[OpenAI] LLM service initialized"); + return makeOpenAIProviderFromClient(resolved, client); +}); + export type OpenAIProcessor = ReturnType; export function makeOpenAIProcessor( @@ -169,14 +185,14 @@ export function makeOpenAIProcessor( export const OpenAIProcessor = makeOpenAIProcessor; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + OpenAIProcessorConfig, + TextCompletionConfigError | TextCompletionRuntimeError, + Llm +>({ id: "text-completion", specs: () => makeLlmSpecs(), - layer: (config) => - Layer.succeed( - Llm, - Llm.of(makeLlmServiceShape(makeOpenAIProvider(config))), - ), + layer: (config) => makeTextCompletionLayer(makeOpenAIProviderEffect(config)), }); const openAITextCompletionRuntime = ManagedRuntime.make(Layer.empty);