diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 0b8909dd..4dc8c889 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Ollama -embeddings effectful layer slice: +Current signal counts from `ts/packages` after the 2026-06-02 text completion +provider stream helper slice: | Signal | Count | | --- | ---: | @@ -27,7 +27,7 @@ embeddings effectful layer slice: | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 17 | -| `while (` | 9 | +| `while (` | 3 | | `new Error` | 8 | | `new Promise` | 10 | | `JSON.parse` | 4 | @@ -123,6 +123,12 @@ Notes: match by making `OllamaEmbeddingsLive` effectful and mapping config/load failures to `EmbeddingsError`. The `JSON.stringify` count increased by one because the new layer test uses a JSON response fixture. +- The text completion provider stream helper slice removed all provider-local + `Stream.unfold` pull loops, dropped the `while (` count from 9 to 3, and + removed the Mistral `content as string` assertion. The only remaining + text-completion `iterator.next` match is the `toAsyncGenerator` + compatibility adapter that exposes Effect streams through the public + `AsyncGenerator` provider contract. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -892,6 +898,39 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Text Completion Provider Stream Helper Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/model/text-completion/common.ts` now exposes + `streamTextCompletionChunks`, an Effect-native helper built on + `Stream.fromAsyncIterable`, `Ref`, `Stream.filterMap`/`Result`, and a final + token chunk append. + - OpenAI, Azure OpenAI, OpenAI-compatible, Mistral, Ollama, and Claude + streaming providers now share the helper instead of each hand-rolling + `Stream.unfold` plus `iterator.next` loops. + - Mistral non-streaming and streaming content normalization now uses + `effect/Predicate` and `Option` narrowing through `textFromContent`, removing + the prior `content as string` assertion. + - The helper uses the installed Effect beta's `Option.fromNullishOr` and + Result-shaped `Stream.filterMap` API, verified by `check:tsgo`. + - `ts/packages/flow/src/__tests__/text-completion-common.test.ts` covers + token accumulation/final chunk emission and non-string content narrowing. +- Remaining: + - Full Effect AI provider swaps still need parity tests first; current OpenAI + and Azure behavior is Chat Completions based, no installed + Azure/Mistral/Ollama Effect AI provider exists, and Anthropic needs explicit + text/token/streaming/rate-limit parity coverage before replacement. +- Verification: + - `bunx --bun vitest run src/__tests__/text-completion-common.test.ts` + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -959,8 +998,9 @@ Notes: store/query modules. Qdrant still has no close/disconnect surface in the installed client, so do not reopen it as an `acquireRelease` close slice without new SDK evidence. - - The next safe provider cleanup is shared text-completion stream iteration - and assertion removal, not a direct SDK swap. + - Shared text-completion stream iteration and the Mistral content assertion are + complete. The remaining provider-layer item is parity-backed Effect AI + adapter work, not a direct SDK swap. ## Ranked Findings @@ -973,10 +1013,6 @@ Notes: `effect/unstable/ai/LanguageModel`, `effect/unstable/ai/EmbeddingModel`, Effect AI OpenAI/Anthropic provider layers. - Rewrite shape: - - Consolidate duplicated text-completion stream iterator plumbing and error - conversion in a shared helper. - - Remove remaining provider assertions such as the Mistral content cast using - guards or Schema-backed normalization. - Add an Effect AI adapter layer beside the current `LlmProvider` contract before flipping any public provider interface. - Use Effect AI provider layers only where parity is proven. diff --git a/ts/packages/flow/src/__tests__/text-completion-common.test.ts b/ts/packages/flow/src/__tests__/text-completion-common.test.ts index 71e484ac..967e77aa 100644 --- a/ts/packages/flow/src/__tests__/text-completion-common.test.ts +++ b/ts/packages/flow/src/__tests__/text-completion-common.test.ts @@ -1,6 +1,14 @@ import { describe, expect, it } from "@effect/vitest"; import type { LlmChunk } from "@trustgraph/base"; -import { providerRuntimeError, providerStatusError, toAsyncGenerator } from "../model/text-completion/common.js"; +import { Effect, Stream } from "effect"; +import { + llmStreamPart, + providerRuntimeError, + providerStatusError, + streamTextCompletionChunks, + textFromContent, + toAsyncGenerator, +} from "../model/text-completion/common.js"; const emptyChunkIterator = (): AsyncIterable => ({ [Symbol.asyncIterator]: () => ({ @@ -33,4 +41,47 @@ describe("text completion common helpers", () => { message: "provider failed", }); }); + + it.effect( + "builds streaming chunks from async iterables with final token totals", + Effect.fnUntraced(function* () { + const chunks = yield* Stream.runCollect( + streamTextCompletionChunks( + Stream.toAsyncIterable(Stream.fromArray([ + { text: "", inToken: 3 }, + { text: "hello" }, + { outToken: 5 }, + ])), + { + model: "unit-model", + mapError: (error) => providerRuntimeError("test-provider", error), + extract: (chunk) => llmStreamPart(chunk), + }, + ), + ); + + expect(Array.from(chunks)).toEqual([ + { + text: "hello", + inToken: null, + outToken: null, + model: "unit-model", + isFinal: false, + }, + { + text: "", + inToken: 3, + outToken: 5, + model: "unit-model", + isFinal: true, + }, + ]); + }), + ); + + it("narrows provider content payloads without type assertions", () => { + expect(textFromContent("direct")).toBe("direct"); + expect(textFromContent([{ text: "a" }, { text: "b" }, { wrong: "skip" }])).toBe("ab"); + expect(textFromContent([{ text: 1 }])).toBe(""); + }); }); diff --git a/ts/packages/flow/src/model/text-completion/azure-openai.ts b/ts/packages/flow/src/model/text-completion/azure-openai.ts index 0414c401..143ae67c 100644 --- a/ts/packages/flow/src/model/text-completion/azure-openai.ts +++ b/ts/packages/flow/src/model/text-completion/azure-openai.ts @@ -23,9 +23,11 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerStatusError, requiredString, + streamTextCompletionChunks, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -156,53 +158,18 @@ export function makeAzureOpenAIProvider(config: AzureOpenAIProcessorConfig): Llm catch: mapAzureOpenAIError, }), ).pipe( - Stream.flatMap((openAIStream) => { - const iterator = openAIStream[Symbol.asyncIterator](); - let totalInputTokens = 0; - let totalOutputTokens = 0; - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapAzureOpenAIError, - }); - - if (next.done === true) { - return [{ - text: "", - inToken: totalInputTokens, - outToken: totalOutputTokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const chunk = next.value; - const content = chunk.choices[0]?.delta?.content; - if (chunk.usage !== null && chunk.usage !== undefined) { - totalInputTokens = chunk.usage.prompt_tokens; - totalOutputTokens = chunk.usage.completion_tokens; - } - if (content !== null && content !== undefined && content.length > 0) { - return [{ - text: content, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((openAIStream) => + streamTextCompletionChunks(openAIStream, { + model: modelName, + mapError: mapAzureOpenAIError, + extract: (chunk) => + llmStreamPart({ + text: chunk.choices[0]?.delta?.content, + inToken: chunk.usage?.prompt_tokens, + outToken: chunk.usage?.completion_tokens, + }), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapAzureOpenAIError); diff --git a/ts/packages/flow/src/model/text-completion/claude.ts b/ts/packages/flow/src/model/text-completion/claude.ts index 83f10409..b4f7ba75 100644 --- a/ts/packages/flow/src/model/text-completion/claude.ts +++ b/ts/packages/flow/src/model/text-completion/claude.ts @@ -19,9 +19,11 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerStatusError, requiredString, + streamTextCompletionChunks, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -136,53 +138,25 @@ export function makeClaudeProvider(config: ClaudeProcessorConfig): LlmProvider { catch: mapClaudeError, }), ).pipe( - Stream.flatMap((anthropicStream) => { - const iterator = anthropicStream[Symbol.asyncIterator](); - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapClaudeError, - }); - - if (next.done === true) { - const finalMessage = yield* Effect.tryPromise({ - try: () => anthropicStream.finalMessage(), - catch: mapClaudeError, - }); - return [{ - text: "", - inToken: finalMessage.usage.input_tokens, - outToken: finalMessage.usage.output_tokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const event = next.value; - if ( - event.type === "content_block_delta" && - event.delta.type === "text_delta" - ) { - return [{ - text: event.delta.text, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((anthropicStream) => + streamTextCompletionChunks(anthropicStream, { + model: modelName, + mapError: mapClaudeError, + extract: (event) => + event.type === "content_block_delta" && event.delta.type === "text_delta" + ? llmStreamPart({ text: event.delta.text }) + : llmStreamPart({}), + finalTokens: Effect.tryPromise({ + try: () => anthropicStream.finalMessage(), + catch: mapClaudeError, + }).pipe( + Effect.map((finalMessage) => ({ + inToken: finalMessage.usage.input_tokens, + outToken: finalMessage.usage.output_tokens, + })), + ), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapClaudeError); diff --git a/ts/packages/flow/src/model/text-completion/common.ts b/ts/packages/flow/src/model/text-completion/common.ts index 9cfd1774..84f7f44d 100644 --- a/ts/packages/flow/src/model/text-completion/common.ts +++ b/ts/packages/flow/src/model/text-completion/common.ts @@ -3,7 +3,7 @@ import { errorMessage, type LlmChunk, } from "@trustgraph/base"; -import { Config, Effect } from "effect"; +import { Config, Effect, Ref, Result, Stream } from "effect"; import * as O from "effect/Option"; import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; @@ -29,6 +29,112 @@ export type TextCompletionRuntimeError = | TextCompletionProviderError | TooManyRequestsError; +type StreamingTokenTotals = { + readonly inToken: number; + readonly outToken: number; +}; + +type LlmStreamPart = { + readonly text: O.Option; + readonly inToken: O.Option; + readonly outToken: O.Option; +}; + +const initialTokenTotals = { + inToken: 0, + outToken: 0, +} satisfies StreamingTokenTotals; + +const updateTokenTotals = ( + current: StreamingTokenTotals, + part: LlmStreamPart, +): StreamingTokenTotals => ({ + inToken: O.getOrElse(part.inToken, () => current.inToken), + outToken: O.getOrElse(part.outToken, () => current.outToken), +}); + +const finalChunk = (model: string, totals: StreamingTokenTotals): LlmChunk => ({ + text: "", + inToken: totals.inToken, + outToken: totals.outToken, + model, + isFinal: true, +}); + +const textChunk = (model: string, text: string): LlmChunk => ({ + text, + inToken: null, + outToken: null, + model, + isFinal: false, +}); + +const contentPartText = (part: unknown): O.Option => + Predicate.isObject(part) && + Predicate.hasProperty(part, "text") && + Predicate.isString(part.text) + ? O.some(part.text) + : O.none(); + +export const textFromContent = (content: unknown): string => { + if (Predicate.isString(content)) { + return content; + } + + return Array.isArray(content) + ? content.flatMap((part) => O.toArray(contentPartText(part))).join("") + : ""; +}; + +export const llmStreamPart = (part: { + readonly text?: string | null | undefined; + readonly inToken?: number | null | undefined; + readonly outToken?: number | null | undefined; +}): LlmStreamPart => ({ + text: O.fromNullishOr(part.text), + inToken: O.fromNullishOr(part.inToken), + outToken: O.fromNullishOr(part.outToken), +}); + +export const streamTextCompletionChunks = ( + iterable: AsyncIterable, + options: { + readonly model: string; + readonly mapError: (error: unknown) => TextCompletionRuntimeError; + readonly extract: (chunk: A) => LlmStreamPart; + readonly finalTokens?: Effect.Effect; + }, +): Stream.Stream => + Stream.unwrap(Effect.gen(function* () { + const totals = yield* Ref.make(initialTokenTotals); + + const chunks = Stream.fromAsyncIterable(iterable, options.mapError).pipe( + Stream.mapEffect((chunk) => + Effect.gen(function* () { + const part = options.extract(chunk); + yield* Ref.update(totals, (current) => updateTokenTotals(current, part)); + return O.map( + O.filter(part.text, (text) => text.length > 0), + (text) => textChunk(options.model, text), + ); + }) + ), + Stream.filterMap((chunk) => + O.match(chunk, { + onNone: () => Result.fail(undefined), + onSome: Result.succeed, + }) + ), + ); + + const tokenTotals = options.finalTokens ?? Ref.get(totals); + return chunks.pipe( + Stream.concat(Stream.fromEffect(tokenTotals.pipe( + Effect.map((tokens) => finalChunk(options.model, tokens)), + ))), + ); + })); + export const optionalStringConfig = Effect.fn("TextCompletion.optionalStringConfig")(function*( provider: string, name: string, diff --git a/ts/packages/flow/src/model/text-completion/mistral.ts b/ts/packages/flow/src/model/text-completion/mistral.ts index 8ae43154..383f1478 100644 --- a/ts/packages/flow/src/model/text-completion/mistral.ts +++ b/ts/packages/flow/src/model/text-completion/mistral.ts @@ -21,9 +21,12 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerStatusError, requiredString, + streamTextCompletionChunks, + textFromContent, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -101,7 +104,7 @@ export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider catch: mapMistralError, }).pipe( Effect.map((resp): LlmResult => ({ - text: (resp.choices?.[0]?.message?.content as string) ?? "", + text: textFromContent(resp.choices?.[0]?.message?.content), inToken: resp.usage?.promptTokens ?? 0, outToken: resp.usage?.completionTokens ?? 0, model: modelName, @@ -134,54 +137,18 @@ export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider catch: mapMistralError, }), ).pipe( - Stream.flatMap((mistralStream) => { - const iterator = mistralStream[Symbol.asyncIterator](); - let totalInputTokens = 0; - let totalOutputTokens = 0; - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapMistralError, - }); - - if (next.done === true) { - return [{ - text: "", - inToken: totalInputTokens, - outToken: totalOutputTokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const chunk = next.value; - const delta = chunk.data?.choices?.[0]?.delta; - const content = delta?.content; - if (chunk.data?.usage !== undefined) { - totalInputTokens = chunk.data.usage.promptTokens ?? 0; - totalOutputTokens = chunk.data.usage.completionTokens ?? 0; - } - if (typeof content === "string" && content.length > 0) { - return [{ - text: content, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((mistralStream) => + streamTextCompletionChunks(mistralStream, { + model: modelName, + mapError: mapMistralError, + extract: (chunk) => + llmStreamPart({ + text: textFromContent(chunk.data?.choices?.[0]?.delta?.content), + inToken: chunk.data?.usage?.promptTokens, + outToken: chunk.data?.usage?.completionTokens, + }), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapMistralError); diff --git a/ts/packages/flow/src/model/text-completion/ollama.ts b/ts/packages/flow/src/model/text-completion/ollama.ts index 61b87693..8bf055ff 100644 --- a/ts/packages/flow/src/model/text-completion/ollama.ts +++ b/ts/packages/flow/src/model/text-completion/ollama.ts @@ -21,8 +21,10 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerRuntimeError, + streamTextCompletionChunks, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -112,55 +114,18 @@ export function makeOllamaProvider(config: OllamaProcessorConfig): LlmProvider { catch: mapOllamaError, }), ).pipe( - Stream.flatMap((ollamaStream) => { - const iterator = ollamaStream[Symbol.asyncIterator](); - let totalInputTokens = 0; - let totalOutputTokens = 0; - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapOllamaError, - }); - - if (next.done === true) { - return [{ - text: "", - inToken: totalInputTokens, - outToken: totalOutputTokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const chunk = next.value; - if (chunk.prompt_eval_count !== undefined) { - totalInputTokens = chunk.prompt_eval_count; - } - if (chunk.eval_count !== undefined) { - totalOutputTokens = chunk.eval_count; - } - - if (chunk.response.length > 0) { - return [{ - text: chunk.response, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((ollamaStream) => + streamTextCompletionChunks(ollamaStream, { + model: modelName, + mapError: mapOllamaError, + extract: (chunk) => + llmStreamPart({ + text: chunk.response, + inToken: chunk.prompt_eval_count, + outToken: chunk.eval_count, + }), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOllamaError); diff --git a/ts/packages/flow/src/model/text-completion/openai-compatible.ts b/ts/packages/flow/src/model/text-completion/openai-compatible.ts index 754a5081..61e27365 100644 --- a/ts/packages/flow/src/model/text-completion/openai-compatible.ts +++ b/ts/packages/flow/src/model/text-completion/openai-compatible.ts @@ -24,9 +24,11 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerStatusError, requiredString, + streamTextCompletionChunks, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -147,53 +149,18 @@ export function makeOpenAICompatibleProvider( catch: mapOpenAICompatibleError, }), ).pipe( - Stream.flatMap((openAIStream) => { - const iterator = openAIStream[Symbol.asyncIterator](); - let totalInputTokens = 0; - let totalOutputTokens = 0; - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapOpenAICompatibleError, - }); - - if (next.done === true) { - return [{ - text: "", - inToken: totalInputTokens, - outToken: totalOutputTokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const chunk = next.value; - const content = chunk.choices[0]?.delta?.content; - if (chunk.usage !== null && chunk.usage !== undefined) { - totalInputTokens = chunk.usage.prompt_tokens; - totalOutputTokens = chunk.usage.completion_tokens; - } - if (content !== null && content !== undefined && content.length > 0) { - return [{ - text: content, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((openAIStream) => + streamTextCompletionChunks(openAIStream, { + model: modelName, + mapError: mapOpenAICompatibleError, + extract: (chunk) => + llmStreamPart({ + text: chunk.choices[0]?.delta?.content, + inToken: chunk.usage?.prompt_tokens, + outToken: chunk.usage?.completion_tokens, + }), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAICompatibleError); diff --git a/ts/packages/flow/src/model/text-completion/openai.ts b/ts/packages/flow/src/model/text-completion/openai.ts index fd123e9e..c93dbaa5 100644 --- a/ts/packages/flow/src/model/text-completion/openai.ts +++ b/ts/packages/flow/src/model/text-completion/openai.ts @@ -19,9 +19,11 @@ import { } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime, Stream } from "effect"; import { + llmStreamPart, optionalStringConfig, providerStatusError, requiredString, + streamTextCompletionChunks, toAsyncGenerator, type TextCompletionRuntimeError, } from "./common.ts"; @@ -138,53 +140,18 @@ export function makeOpenAIProvider(config: OpenAIProcessorConfig): LlmProvider { catch: mapOpenAIError, }), ).pipe( - Stream.flatMap((openAIStream) => { - const iterator = openAIStream[Symbol.asyncIterator](); - let totalInputTokens = 0; - let totalOutputTokens = 0; - - return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>( - "pulling", - (state) => { - if (state === "done") return Effect.as(Effect.void, undefined); - - return Effect.gen(function* () { - while (true) { - const next = yield* Effect.tryPromise({ - try: () => iterator.next(), - catch: mapOpenAIError, - }); - - if (next.done === true) { - return [{ - text: "", - inToken: totalInputTokens, - outToken: totalOutputTokens, - model: modelName, - isFinal: true, - }, "done"] as const; - } - - const chunk = next.value; - const content = chunk.choices[0]?.delta?.content; - if (chunk.usage !== null && chunk.usage !== undefined) { - totalInputTokens = chunk.usage.prompt_tokens; - totalOutputTokens = chunk.usage.completion_tokens; - } - if (content !== null && content !== undefined && content.length > 0) { - return [{ - text: content, - inToken: null, - outToken: null, - model: modelName, - isFinal: false, - }, "pulling"] as const; - } - } - }); - }, - ); - }), + Stream.flatMap((openAIStream) => + streamTextCompletionChunks(openAIStream, { + model: modelName, + mapError: mapOpenAIError, + extract: (chunk) => + llmStreamPart({ + text: chunk.choices[0]?.delta?.content, + inToken: chunk.usage?.prompt_tokens, + outToken: chunk.usage?.completion_tokens, + }), + }) + ), ); return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAIError);