Share text completion stream helpers

This commit is contained in:
elpresidank 2026-06-02 04:33:48 -05:00
parent 9d3f745fb0
commit 32fc7ea32d
9 changed files with 297 additions and 297 deletions

View file

@ -1,6 +1,14 @@
import { describe, expect, it } from "@effect/vitest";
import type { LlmChunk } from "@trustgraph/base";
import { providerRuntimeError, providerStatusError, toAsyncGenerator } from "../model/text-completion/common.js";
import { Effect, Stream } from "effect";
import {
llmStreamPart,
providerRuntimeError,
providerStatusError,
streamTextCompletionChunks,
textFromContent,
toAsyncGenerator,
} from "../model/text-completion/common.js";
const emptyChunkIterator = (): AsyncIterable<LlmChunk> => ({
[Symbol.asyncIterator]: () => ({
@ -33,4 +41,47 @@ describe("text completion common helpers", () => {
message: "provider failed",
});
});
it.effect(
"builds streaming chunks from async iterables with final token totals",
Effect.fnUntraced(function* () {
const chunks = yield* Stream.runCollect(
streamTextCompletionChunks(
Stream.toAsyncIterable(Stream.fromArray([
{ text: "", inToken: 3 },
{ text: "hello" },
{ outToken: 5 },
])),
{
model: "unit-model",
mapError: (error) => providerRuntimeError("test-provider", error),
extract: (chunk) => llmStreamPart(chunk),
},
),
);
expect(Array.from(chunks)).toEqual([
{
text: "hello",
inToken: null,
outToken: null,
model: "unit-model",
isFinal: false,
},
{
text: "",
inToken: 3,
outToken: 5,
model: "unit-model",
isFinal: true,
},
]);
}),
);
it("narrows provider content payloads without type assertions", () => {
expect(textFromContent("direct")).toBe("direct");
expect(textFromContent([{ text: "a" }, { text: "b" }, { wrong: "skip" }])).toBe("ab");
expect(textFromContent([{ text: 1 }])).toBe("");
});
});

View file

@ -23,9 +23,11 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerStatusError,
requiredString,
streamTextCompletionChunks,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -156,53 +158,18 @@ export function makeAzureOpenAIProvider(config: AzureOpenAIProcessorConfig): Llm
catch: mapAzureOpenAIError,
}),
).pipe(
Stream.flatMap((openAIStream) => {
const iterator = openAIStream[Symbol.asyncIterator]();
let totalInputTokens = 0;
let totalOutputTokens = 0;
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapAzureOpenAIError,
});
if (next.done === true) {
return [{
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const chunk = next.value;
const content = chunk.choices[0]?.delta?.content;
if (chunk.usage !== null && chunk.usage !== undefined) {
totalInputTokens = chunk.usage.prompt_tokens;
totalOutputTokens = chunk.usage.completion_tokens;
}
if (content !== null && content !== undefined && content.length > 0) {
return [{
text: content,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((openAIStream) =>
streamTextCompletionChunks(openAIStream, {
model: modelName,
mapError: mapAzureOpenAIError,
extract: (chunk) =>
llmStreamPart({
text: chunk.choices[0]?.delta?.content,
inToken: chunk.usage?.prompt_tokens,
outToken: chunk.usage?.completion_tokens,
}),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapAzureOpenAIError);

View file

@ -19,9 +19,11 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerStatusError,
requiredString,
streamTextCompletionChunks,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -136,53 +138,25 @@ export function makeClaudeProvider(config: ClaudeProcessorConfig): LlmProvider {
catch: mapClaudeError,
}),
).pipe(
Stream.flatMap((anthropicStream) => {
const iterator = anthropicStream[Symbol.asyncIterator]();
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapClaudeError,
});
if (next.done === true) {
const finalMessage = yield* Effect.tryPromise({
try: () => anthropicStream.finalMessage(),
catch: mapClaudeError,
});
return [{
text: "",
inToken: finalMessage.usage.input_tokens,
outToken: finalMessage.usage.output_tokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const event = next.value;
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
return [{
text: event.delta.text,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((anthropicStream) =>
streamTextCompletionChunks(anthropicStream, {
model: modelName,
mapError: mapClaudeError,
extract: (event) =>
event.type === "content_block_delta" && event.delta.type === "text_delta"
? llmStreamPart({ text: event.delta.text })
: llmStreamPart({}),
finalTokens: Effect.tryPromise({
try: () => anthropicStream.finalMessage(),
catch: mapClaudeError,
}).pipe(
Effect.map((finalMessage) => ({
inToken: finalMessage.usage.input_tokens,
outToken: finalMessage.usage.output_tokens,
})),
),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapClaudeError);

View file

@ -3,7 +3,7 @@ import {
errorMessage,
type LlmChunk,
} from "@trustgraph/base";
import { Config, Effect } from "effect";
import { Config, Effect, Ref, Result, Stream } from "effect";
import * as O from "effect/Option";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
@ -29,6 +29,112 @@ export type TextCompletionRuntimeError =
| TextCompletionProviderError
| TooManyRequestsError;
type StreamingTokenTotals = {
readonly inToken: number;
readonly outToken: number;
};
type LlmStreamPart = {
readonly text: O.Option<string>;
readonly inToken: O.Option<number>;
readonly outToken: O.Option<number>;
};
const initialTokenTotals = {
inToken: 0,
outToken: 0,
} satisfies StreamingTokenTotals;
const updateTokenTotals = (
current: StreamingTokenTotals,
part: LlmStreamPart,
): StreamingTokenTotals => ({
inToken: O.getOrElse(part.inToken, () => current.inToken),
outToken: O.getOrElse(part.outToken, () => current.outToken),
});
const finalChunk = (model: string, totals: StreamingTokenTotals): LlmChunk => ({
text: "",
inToken: totals.inToken,
outToken: totals.outToken,
model,
isFinal: true,
});
const textChunk = (model: string, text: string): LlmChunk => ({
text,
inToken: null,
outToken: null,
model,
isFinal: false,
});
const contentPartText = (part: unknown): O.Option<string> =>
Predicate.isObject(part) &&
Predicate.hasProperty(part, "text") &&
Predicate.isString(part.text)
? O.some(part.text)
: O.none();
export const textFromContent = (content: unknown): string => {
if (Predicate.isString(content)) {
return content;
}
return Array.isArray(content)
? content.flatMap((part) => O.toArray(contentPartText(part))).join("")
: "";
};
export const llmStreamPart = (part: {
readonly text?: string | null | undefined;
readonly inToken?: number | null | undefined;
readonly outToken?: number | null | undefined;
}): LlmStreamPart => ({
text: O.fromNullishOr(part.text),
inToken: O.fromNullishOr(part.inToken),
outToken: O.fromNullishOr(part.outToken),
});
export const streamTextCompletionChunks = <A>(
iterable: AsyncIterable<A>,
options: {
readonly model: string;
readonly mapError: (error: unknown) => TextCompletionRuntimeError;
readonly extract: (chunk: A) => LlmStreamPart;
readonly finalTokens?: Effect.Effect<StreamingTokenTotals, TextCompletionRuntimeError>;
},
): Stream.Stream<LlmChunk, TextCompletionRuntimeError> =>
Stream.unwrap(Effect.gen(function* () {
const totals = yield* Ref.make(initialTokenTotals);
const chunks = Stream.fromAsyncIterable(iterable, options.mapError).pipe(
Stream.mapEffect((chunk) =>
Effect.gen(function* () {
const part = options.extract(chunk);
yield* Ref.update(totals, (current) => updateTokenTotals(current, part));
return O.map(
O.filter(part.text, (text) => text.length > 0),
(text) => textChunk(options.model, text),
);
})
),
Stream.filterMap((chunk) =>
O.match(chunk, {
onNone: () => Result.fail(undefined),
onSome: Result.succeed,
})
),
);
const tokenTotals = options.finalTokens ?? Ref.get(totals);
return chunks.pipe(
Stream.concat(Stream.fromEffect(tokenTotals.pipe(
Effect.map((tokens) => finalChunk(options.model, tokens)),
))),
);
}));
export const optionalStringConfig = Effect.fn("TextCompletion.optionalStringConfig")(function*(
provider: string,
name: string,

View file

@ -21,9 +21,12 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerStatusError,
requiredString,
streamTextCompletionChunks,
textFromContent,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -101,7 +104,7 @@ export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider
catch: mapMistralError,
}).pipe(
Effect.map((resp): LlmResult => ({
text: (resp.choices?.[0]?.message?.content as string) ?? "",
text: textFromContent(resp.choices?.[0]?.message?.content),
inToken: resp.usage?.promptTokens ?? 0,
outToken: resp.usage?.completionTokens ?? 0,
model: modelName,
@ -134,54 +137,18 @@ export function makeMistralProvider(config: MistralProcessorConfig): LlmProvider
catch: mapMistralError,
}),
).pipe(
Stream.flatMap((mistralStream) => {
const iterator = mistralStream[Symbol.asyncIterator]();
let totalInputTokens = 0;
let totalOutputTokens = 0;
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapMistralError,
});
if (next.done === true) {
return [{
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const chunk = next.value;
const delta = chunk.data?.choices?.[0]?.delta;
const content = delta?.content;
if (chunk.data?.usage !== undefined) {
totalInputTokens = chunk.data.usage.promptTokens ?? 0;
totalOutputTokens = chunk.data.usage.completionTokens ?? 0;
}
if (typeof content === "string" && content.length > 0) {
return [{
text: content,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((mistralStream) =>
streamTextCompletionChunks(mistralStream, {
model: modelName,
mapError: mapMistralError,
extract: (chunk) =>
llmStreamPart({
text: textFromContent(chunk.data?.choices?.[0]?.delta?.content),
inToken: chunk.data?.usage?.promptTokens,
outToken: chunk.data?.usage?.completionTokens,
}),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapMistralError);

View file

@ -21,8 +21,10 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerRuntimeError,
streamTextCompletionChunks,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -112,55 +114,18 @@ export function makeOllamaProvider(config: OllamaProcessorConfig): LlmProvider {
catch: mapOllamaError,
}),
).pipe(
Stream.flatMap((ollamaStream) => {
const iterator = ollamaStream[Symbol.asyncIterator]();
let totalInputTokens = 0;
let totalOutputTokens = 0;
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapOllamaError,
});
if (next.done === true) {
return [{
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const chunk = next.value;
if (chunk.prompt_eval_count !== undefined) {
totalInputTokens = chunk.prompt_eval_count;
}
if (chunk.eval_count !== undefined) {
totalOutputTokens = chunk.eval_count;
}
if (chunk.response.length > 0) {
return [{
text: chunk.response,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((ollamaStream) =>
streamTextCompletionChunks(ollamaStream, {
model: modelName,
mapError: mapOllamaError,
extract: (chunk) =>
llmStreamPart({
text: chunk.response,
inToken: chunk.prompt_eval_count,
outToken: chunk.eval_count,
}),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOllamaError);

View file

@ -24,9 +24,11 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerStatusError,
requiredString,
streamTextCompletionChunks,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -147,53 +149,18 @@ export function makeOpenAICompatibleProvider(
catch: mapOpenAICompatibleError,
}),
).pipe(
Stream.flatMap((openAIStream) => {
const iterator = openAIStream[Symbol.asyncIterator]();
let totalInputTokens = 0;
let totalOutputTokens = 0;
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapOpenAICompatibleError,
});
if (next.done === true) {
return [{
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const chunk = next.value;
const content = chunk.choices[0]?.delta?.content;
if (chunk.usage !== null && chunk.usage !== undefined) {
totalInputTokens = chunk.usage.prompt_tokens;
totalOutputTokens = chunk.usage.completion_tokens;
}
if (content !== null && content !== undefined && content.length > 0) {
return [{
text: content,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((openAIStream) =>
streamTextCompletionChunks(openAIStream, {
model: modelName,
mapError: mapOpenAICompatibleError,
extract: (chunk) =>
llmStreamPart({
text: chunk.choices[0]?.delta?.content,
inToken: chunk.usage?.prompt_tokens,
outToken: chunk.usage?.completion_tokens,
}),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAICompatibleError);

View file

@ -19,9 +19,11 @@ import {
} from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime, Stream } from "effect";
import {
llmStreamPart,
optionalStringConfig,
providerStatusError,
requiredString,
streamTextCompletionChunks,
toAsyncGenerator,
type TextCompletionRuntimeError,
} from "./common.ts";
@ -138,53 +140,18 @@ export function makeOpenAIProvider(config: OpenAIProcessorConfig): LlmProvider {
catch: mapOpenAIError,
}),
).pipe(
Stream.flatMap((openAIStream) => {
const iterator = openAIStream[Symbol.asyncIterator]();
let totalInputTokens = 0;
let totalOutputTokens = 0;
return Stream.unfold<"pulling" | "done", LlmChunk, TextCompletionRuntimeError, never>(
"pulling",
(state) => {
if (state === "done") return Effect.as(Effect.void, undefined);
return Effect.gen(function* () {
while (true) {
const next = yield* Effect.tryPromise({
try: () => iterator.next(),
catch: mapOpenAIError,
});
if (next.done === true) {
return [{
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
}, "done"] as const;
}
const chunk = next.value;
const content = chunk.choices[0]?.delta?.content;
if (chunk.usage !== null && chunk.usage !== undefined) {
totalInputTokens = chunk.usage.prompt_tokens;
totalOutputTokens = chunk.usage.completion_tokens;
}
if (content !== null && content !== undefined && content.length > 0) {
return [{
text: content,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
}, "pulling"] as const;
}
}
});
},
);
}),
Stream.flatMap((openAIStream) =>
streamTextCompletionChunks(openAIStream, {
model: modelName,
mapError: mapOpenAIError,
extract: (chunk) =>
llmStreamPart({
text: chunk.choices[0]?.delta?.content,
inToken: chunk.usage?.prompt_tokens,
outToken: chunk.usage?.completion_tokens,
}),
})
),
);
return toAsyncGenerator(Stream.toAsyncIterable(stream), mapOpenAIError);