fix: sanitize no_proxy for managed embeddings

This commit is contained in:
Andrey Avtomonov 2026-05-19 18:17:00 +02:00
parent 7cec0041eb
commit 630287f7ac
13 changed files with 235 additions and 180 deletions

View file

@ -111,12 +111,12 @@ describe('createKtxEmbeddingProvider', () => {
);
});
it('falls back to one-shot ktx-daemon inference when the local HTTP daemon is unavailable', async () => {
const fetch = vi.fn().mockRejectedValue(new TypeError('fetch failed'));
const runSentenceTransformersJson = vi
it('reports local HTTP daemon failures without a ktx-daemon spawn fallback cascade', async () => {
const fetch = vi
.fn()
.mockResolvedValueOnce({ embedding: [0.1, 0.2] })
.mockResolvedValueOnce({ embeddings: [[0.3, 0.4], [0.5, 0.6]] });
.mockResolvedValue(
new Response('Embedding compute failed: httpx.InvalidURL: Invalid port', { status: 500 }),
);
const provider = createKtxEmbeddingProvider(
{
@ -125,19 +125,13 @@ describe('createKtxEmbeddingProvider', () => {
dimensions: 2,
sentenceTransformers: { baseURL: 'http://127.0.0.1:8765', pathPrefix: '' },
},
{ fetch, runSentenceTransformersJson },
{ fetch },
);
await expect(provider.embedMany(['hello', 'world'])).resolves.toEqual([
[0.3, 0.4],
[0.5, 0.6],
]);
await expect(provider.embed('hello')).rejects.toThrow(
'Embedding provider sentence-transformers request failed with HTTP 500: Embedding compute failed: httpx.InvalidURL: Invalid port',
);
await expect(provider.embed('hello')).rejects.not.toThrow('ktx-daemon fallback failed');
expect(fetch).toHaveBeenCalledTimes(1);
expect(runSentenceTransformersJson).toHaveBeenNthCalledWith(1, 'embedding-compute', {
text: '__ktx_embedding_probe__',
});
expect(runSentenceTransformersJson).toHaveBeenNthCalledWith(2, 'embedding-compute-bulk', {
texts: ['hello', 'world'],
});
});
});

View file

@ -1,15 +1,7 @@
import { spawn } from 'node:child_process';
import { join } from 'node:path';
import OpenAI from 'openai';
import type { KtxEmbeddingConfig, KtxEmbeddingProvider } from './types.js';
type FetchFn = typeof fetch;
type SentenceTransformersCommand = 'embedding-compute' | 'embedding-compute-bulk';
type SentenceTransformersJsonRunner = (
subcommand: SentenceTransformersCommand,
payload: Record<string, unknown>,
) => Promise<Record<string, unknown>>;
type SentenceTransformersProcessCommand = { command: string; args: string[] };
export interface KtxEmbeddingProviderDeps {
createOpenAIClient?: (options: { apiKey?: string; baseURL?: string }) => {
@ -23,14 +15,10 @@ export interface KtxEmbeddingProviderDeps {
};
};
fetch?: FetchFn;
runSentenceTransformersJson?: SentenceTransformersJsonRunner;
sentenceTransformersCommand?: string;
sentenceTransformersArgs?: string[];
sentenceTransformersCwd?: string;
sentenceTransformersEnv?: NodeJS.ProcessEnv;
}
const DEFAULT_BATCH_SIZE = 100;
const HTTP_ERROR_BODY_MAX_LENGTH = 2_000;
function assertNonEmptyText(text: string): void {
if (!text.trim()) {
@ -69,110 +57,12 @@ function joinUrl(baseURL: string, pathPrefix: string, path: string): string {
return prefix ? `${base}/${prefix}/${suffix}` : `${base}/${suffix}`;
}
function errorText(error: unknown): string {
if (error instanceof Error) {
return error.cause
? `${error.name}: ${error.message}; cause: ${errorText(error.cause)}`
: `${error.name}: ${error.message}`;
function boundedHttpBody(text: string): string {
const normalized = text.trim();
if (normalized.length <= HTTP_ERROR_BODY_MAX_LENGTH) {
return normalized;
}
return String(error);
}
function parseJsonObject(raw: string, subcommand: string): Record<string, unknown> {
const parsed = JSON.parse(raw) as unknown;
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
throw new Error(`ktx-daemon ${subcommand} returned non-object JSON`);
}
return parsed as Record<string, unknown>;
}
function isCommandNotFound(error: unknown): boolean {
return (
error instanceof Error &&
('code' in error || 'errno' in error) &&
((error as { code?: unknown }).code === 'ENOENT' || (error as { errno?: unknown }).errno === 'ENOENT')
);
}
function defaultSentenceTransformersProcessCommands(): SentenceTransformersProcessCommand[] {
const venvBin =
process.platform === 'win32' ? join('.venv', 'Scripts', 'ktx-daemon.exe') : join('.venv', 'bin', 'ktx-daemon');
const repoVenvBin =
process.platform === 'win32'
? join('ktx', '.venv', 'Scripts', 'ktx-daemon.exe')
: join('ktx', '.venv', 'bin', 'ktx-daemon');
return [
{ command: 'ktx-daemon', args: [] },
{ command: venvBin, args: [] },
{ command: repoVenvBin, args: [] },
];
}
function runSentenceTransformersProcessCommand(
options: SentenceTransformersProcessCommand & {
cwd?: string;
env?: NodeJS.ProcessEnv;
},
): SentenceTransformersJsonRunner {
return async (
subcommand: SentenceTransformersCommand,
payload: Record<string, unknown>,
): Promise<Record<string, unknown>> =>
new Promise((resolve, reject) => {
const child = spawn(options.command, [...options.args, subcommand], {
cwd: options.cwd,
env: { ...process.env, ...options.env },
stdio: ['pipe', 'pipe', 'pipe'],
});
const stdout: Buffer[] = [];
const stderr: Buffer[] = [];
child.stdout.on('data', (chunk: Buffer) => stdout.push(chunk));
child.stderr.on('data', (chunk: Buffer) => stderr.push(chunk));
child.on('error', reject);
child.on('close', (code) => {
const stdoutText = Buffer.concat(stdout).toString('utf8').trim();
const stderrText = Buffer.concat(stderr).toString('utf8').trim();
if (code !== 0) {
reject(new Error(`ktx-daemon ${subcommand} failed: ${stderrText || `exit code ${code}`}`));
return;
}
try {
resolve(parseJsonObject(stdoutText, subcommand));
} catch (error) {
reject(error);
}
});
child.stdin.end(`${JSON.stringify(payload)}\n`);
});
}
function runSentenceTransformersProcessJson(options: {
commands: SentenceTransformersProcessCommand[];
cwd?: string;
env?: NodeJS.ProcessEnv;
}): SentenceTransformersJsonRunner {
return async (
subcommand: SentenceTransformersCommand,
payload: Record<string, unknown>,
): Promise<Record<string, unknown>> => {
const errors: string[] = [];
for (const command of options.commands) {
try {
return await runSentenceTransformersProcessCommand({
...command,
cwd: options.cwd,
env: options.env,
})(subcommand, payload);
} catch (error) {
errors.push(`${command.command}: ${errorText(error)}`);
if (!isCommandNotFound(error)) {
break;
}
}
}
throw new Error(`ktx-daemon ${subcommand} failed: ${errors.join('; ')}`);
};
return `${normalized.slice(0, HTTP_ERROR_BODY_MAX_LENGTH)}...`;
}
class OpenAIEmbeddingProvider implements KtxEmbeddingProvider {
@ -228,9 +118,7 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
private readonly fetch: FetchFn;
private readonly baseURL: string;
private readonly pathPrefix: string;
private readonly runJson: SentenceTransformersJsonRunner;
private readonly startupProbe: Promise<void>;
private useProcessRunner = false;
constructor(config: KtxEmbeddingConfig, deps: KtxEmbeddingProviderDeps) {
if (!config.sentenceTransformers?.baseURL) {
@ -241,15 +129,6 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
this.fetch = deps.fetch ?? fetch;
this.baseURL = config.sentenceTransformers.baseURL;
this.pathPrefix = config.sentenceTransformers.pathPrefix ?? '/api';
this.runJson =
deps.runSentenceTransformersJson ??
runSentenceTransformersProcessJson({
commands: deps.sentenceTransformersCommand
? [{ command: deps.sentenceTransformersCommand, args: deps.sentenceTransformersArgs ?? [] }]
: defaultSentenceTransformersProcessCommands(),
cwd: deps.sentenceTransformersCwd,
env: deps.sentenceTransformersEnv,
});
this.startupProbe = this.requestSingle('__ktx_embedding_probe__').then((embedding) => {
assertVectorDimensions(embedding, this.dimensions, 'sentence-transformers');
});
@ -264,7 +143,7 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
async embedMany(texts: string[]): Promise<number[][]> {
assertBatchSize(texts, this.maxBatchSize);
await this.startupProbe;
const response = await this.requestJson('embedding-compute-bulk', '/embeddings/compute-bulk', { texts });
const response = await this.requestJson('/embeddings/compute-bulk', { texts });
if (
!response ||
typeof response !== 'object' ||
@ -285,37 +164,15 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
}
private async requestSingle(text: string): Promise<number[]> {
const response = await this.requestJson('embedding-compute', '/embeddings/compute', { text });
const response = await this.requestJson('/embeddings/compute', { text });
if (!response || typeof response !== 'object' || !('embedding' in response) || !Array.isArray(response.embedding)) {
throw new Error('Embedding provider sentence-transformers returned malformed single response');
}
return response.embedding;
}
private async requestJson(
command: SentenceTransformersCommand,
path: string,
body: Record<string, unknown>,
): Promise<Record<string, unknown>> {
if (this.useProcessRunner) {
return this.runJson(command, body);
}
try {
return await this.postJson(path, body);
} catch (httpError) {
try {
const response = await this.runJson(command, body);
this.useProcessRunner = true;
return response;
} catch (processError) {
throw new Error(
`Embedding provider sentence-transformers local HTTP request failed (${errorText(
httpError,
)}) and ktx-daemon fallback failed (${errorText(processError)})`,
);
}
}
private async requestJson(path: string, body: Record<string, unknown>): Promise<Record<string, unknown>> {
return await this.postJson(path, body);
}
private async postJson(path: string, body: Record<string, unknown>): Promise<Record<string, unknown>> {
@ -325,7 +182,12 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`Embedding provider sentence-transformers request failed with HTTP ${response.status}`);
const bodyText = boundedHttpBody(await response.text());
throw new Error(
`Embedding provider sentence-transformers request failed with HTTP ${response.status}${
bodyText ? `: ${bodyText}` : ''
}`,
);
}
const parsed = (await response.json()) as unknown;
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {