mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
fix: sanitize no_proxy for managed embeddings (#153)
This commit is contained in:
parent
af0567c57e
commit
8bc60e8e56
13 changed files with 235 additions and 180 deletions
|
|
@ -150,6 +150,8 @@ describe('ensureManagedLocalEmbeddingsDaemon', () => {
|
|||
}),
|
||||
).resolves.toEqual({
|
||||
baseUrl: 'http://127.0.0.1:61234',
|
||||
stdoutLog: '/work/proj/.ktx/runtime/daemon.stdout.log',
|
||||
stderrLog: '/work/proj/.ktx/runtime/daemon.stderr.log',
|
||||
env: {
|
||||
[MANAGED_SENTENCE_TRANSFORMERS_BASE_URL_ENV]: 'http://127.0.0.1:61234',
|
||||
},
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import { startManagedPythonDaemon, type ManagedPythonDaemonStartResult } from '.
|
|||
|
||||
export interface ManagedLocalEmbeddingsDaemon {
|
||||
baseUrl: string;
|
||||
stdoutLog: string;
|
||||
stderrLog: string;
|
||||
env: Record<typeof MANAGED_SENTENCE_TRANSFORMERS_BASE_URL_ENV, string>;
|
||||
}
|
||||
|
||||
|
|
@ -91,6 +93,8 @@ export async function ensureManagedLocalEmbeddingsDaemon(
|
|||
|
||||
return {
|
||||
baseUrl: daemon.baseUrl,
|
||||
stdoutLog: daemon.state.stdoutLog,
|
||||
stderrLog: daemon.state.stderrLog,
|
||||
env: {
|
||||
[MANAGED_SENTENCE_TRANSFORMERS_BASE_URL_ENV]: daemon.baseUrl,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ import {
|
|||
type KtxMcpDaemonState,
|
||||
} from './managed-mcp-daemon.js';
|
||||
|
||||
type KtxMcpDaemonStartOptions = Parameters<typeof startKtxMcpDaemon>[0];
|
||||
|
||||
function child(pid = 4242): KtxMcpDaemonChild {
|
||||
return { pid, unref: vi.fn() };
|
||||
}
|
||||
|
|
@ -40,6 +42,7 @@ describe('managed MCP daemon lifecycle', () => {
|
|||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.unstubAllEnvs();
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
|
|
@ -94,6 +97,33 @@ describe('managed MCP daemon lifecycle', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('sanitizes IPv6 CIDR entries from child NO_PROXY env', async () => {
|
||||
vi.stubEnv('NO_PROXY', 'localhost,fd07:b51a:cc66:f0::/64');
|
||||
vi.stubEnv('no_proxy', '::1,fd00::/8,*.orb.local');
|
||||
const spawnDaemon = vi.fn<NonNullable<KtxMcpDaemonStartOptions['spawnDaemon']>>(() => child(5555));
|
||||
|
||||
await startKtxMcpDaemon({
|
||||
projectDir,
|
||||
cliVersion: '0.0.0-test',
|
||||
host: '127.0.0.1',
|
||||
port: 7879,
|
||||
allowedHosts: [],
|
||||
allowedOrigins: [],
|
||||
binPath: '/repo/packages/cli/dist/bin.js',
|
||||
spawnDaemon,
|
||||
processAlive: vi.fn(() => false),
|
||||
portAvailable: vi.fn(async () => true),
|
||||
now: () => new Date('2026-05-14T00:00:00.000Z'),
|
||||
});
|
||||
|
||||
const env = spawnDaemon.mock.calls[0]?.[2].env;
|
||||
if (!env) {
|
||||
throw new Error('Expected MCP daemon spawn env');
|
||||
}
|
||||
expect(env.NO_PROXY).toBe('localhost,::1,*.orb.local');
|
||||
expect(env.no_proxy).toBe(env.NO_PROXY);
|
||||
});
|
||||
|
||||
it('returns already-running without spawning when the daemon is alive at the same host/port', async () => {
|
||||
await mkdir(join(projectDir, '.ktx'), { recursive: true });
|
||||
await writeFile(join(projectDir, '.ktx/mcp.json'), `${JSON.stringify(state(projectDir), null, 2)}\n`);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { createServer } from 'node:net';
|
|||
import { dirname, join } from 'node:path';
|
||||
import { setTimeout as delay } from 'node:timers/promises';
|
||||
import { z } from 'zod';
|
||||
import { sanitizeChildProxyEnv } from './proxy-env.js';
|
||||
|
||||
export interface KtxMcpDaemonState {
|
||||
schemaVersion: 1;
|
||||
|
|
@ -166,11 +167,11 @@ export async function startKtxMcpDaemon(options: {
|
|||
const child = (options.spawnDaemon ?? defaultSpawnDaemon)(process.execPath, args, {
|
||||
detached: true,
|
||||
stdio: ['ignore', log.fd, log.fd],
|
||||
env: {
|
||||
env: sanitizeChildProxyEnv({
|
||||
...process.env,
|
||||
KTX_CLI_VERSION: options.cliVersion,
|
||||
...(options.token ? { KTX_MCP_TOKEN: options.token } : {}),
|
||||
},
|
||||
}),
|
||||
});
|
||||
if (!child.pid) {
|
||||
throw new Error('Failed to start KTX MCP daemon: child process pid was not available.');
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ describe('managed Python daemon lifecycle', () => {
|
|||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.unstubAllEnvs();
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
|
|
@ -188,6 +189,27 @@ describe('managed Python daemon lifecycle', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('sanitizes IPv6 CIDR entries from child NO_PROXY env', async () => {
|
||||
vi.stubEnv('NO_PROXY', 'localhost,fd07:b51a:cc66:f0::/64,127.0.0.0/8');
|
||||
vi.stubEnv('no_proxy', '::1,fd00::/8,*.orb.local');
|
||||
const spawnDaemon = makeSpawn(5555);
|
||||
|
||||
await startManagedPythonDaemon({
|
||||
...daemonOptionsBase(tempDir),
|
||||
features: ['local-embeddings'],
|
||||
installRuntime: vi.fn(async () => installResult(tempDir, ['core', 'local-embeddings'])),
|
||||
spawnDaemon,
|
||||
fetch: makeFetch(),
|
||||
allocatePort: vi.fn(async () => 61234),
|
||||
now: () => new Date('2026-05-11T00:00:00.000Z'),
|
||||
pollIntervalMs: 1,
|
||||
});
|
||||
|
||||
const env = vi.mocked(spawnDaemon).mock.calls[0]?.[2].env;
|
||||
expect(env?.NO_PROXY).toBe('localhost,127.0.0.0/8,::1,*.orb.local');
|
||||
expect(env?.no_proxy).toBe(env?.NO_PROXY);
|
||||
});
|
||||
|
||||
it('makes a final health probe before reporting startup failure', async () => {
|
||||
const spawnDaemon = makeSpawn(5556);
|
||||
const installRuntime = vi.fn(async () => installResult(tempDir));
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import {
|
|||
type ManagedPythonRuntimeInstallOptions,
|
||||
type ManagedPythonRuntimeInstallResult,
|
||||
} from './managed-python-runtime.js';
|
||||
import { sanitizeChildProxyEnv } from './proxy-env.js';
|
||||
|
||||
export interface ManagedPythonDaemonState {
|
||||
schemaVersion: 1;
|
||||
|
|
@ -696,10 +697,10 @@ export async function startManagedPythonDaemon(
|
|||
{
|
||||
detached: true,
|
||||
stdio: ['ignore', stdout.fd, stderr.fd],
|
||||
env: {
|
||||
env: sanitizeChildProxyEnv({
|
||||
...process.env,
|
||||
KTX_DAEMON_VERSION: options.cliVersion,
|
||||
},
|
||||
}),
|
||||
},
|
||||
);
|
||||
child.unref();
|
||||
|
|
|
|||
21
packages/cli/src/proxy-env.test.ts
Normal file
21
packages/cli/src/proxy-env.test.ts
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
import { sanitizeChildProxyEnv } from './proxy-env.js';
|
||||
|
||||
describe('sanitizeChildProxyEnv', () => {
|
||||
it('drops IPv6 CIDR no-proxy entries and normalizes both env keys', () => {
|
||||
const env = sanitizeChildProxyEnv({
|
||||
NO_PROXY: 'localhost,127.0.0.1,127.0.0.0/8,fd07:b51a:cc66:f0::/64,*.orb.local',
|
||||
no_proxy: '::1,0.250.250.0/24,fd00::/8,*.orb.internal',
|
||||
});
|
||||
|
||||
expect(env.NO_PROXY).toBe('localhost,127.0.0.1,127.0.0.0/8,*.orb.local,::1,0.250.250.0/24,*.orb.internal');
|
||||
expect(env.no_proxy).toBe(env.NO_PROXY);
|
||||
});
|
||||
|
||||
it('preserves the input object and leaves missing proxy env unset', () => {
|
||||
const input = { PATH: '/usr/bin' };
|
||||
|
||||
expect(sanitizeChildProxyEnv(input)).toEqual({ PATH: '/usr/bin' });
|
||||
expect(input).toEqual({ PATH: '/usr/bin' });
|
||||
});
|
||||
});
|
||||
27
packages/cli/src/proxy-env.ts
Normal file
27
packages/cli/src/proxy-env.ts
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
const NO_PROXY_KEYS = ['NO_PROXY', 'no_proxy'] as const;
|
||||
|
||||
function isIpv6CidrNoProxyEntry(entry: string): boolean {
|
||||
return entry.includes('/') && entry.includes(':');
|
||||
}
|
||||
|
||||
function cleanedNoProxyValue(env: NodeJS.ProcessEnv): string | undefined {
|
||||
const entries = NO_PROXY_KEYS.flatMap((key) => (env[key] ?? '').split(','))
|
||||
.map((entry) => entry.trim())
|
||||
.filter((entry) => entry.length > 0 && !isIpv6CidrNoProxyEntry(entry));
|
||||
|
||||
if (!NO_PROXY_KEYS.some((key) => env[key] !== undefined)) {
|
||||
return undefined;
|
||||
}
|
||||
return [...new Set(entries)].join(',');
|
||||
}
|
||||
|
||||
export function sanitizeChildProxyEnv(env: NodeJS.ProcessEnv): NodeJS.ProcessEnv {
|
||||
const sanitized = { ...env };
|
||||
const noProxy = cleanedNoProxyValue(env);
|
||||
if (noProxy === undefined) {
|
||||
return sanitized;
|
||||
}
|
||||
sanitized.NO_PROXY = noProxy;
|
||||
sanitized.no_proxy = noProxy;
|
||||
return sanitized;
|
||||
}
|
||||
|
|
@ -46,9 +46,14 @@ function makePromptAdapter(options: {
|
|||
};
|
||||
}
|
||||
|
||||
function managedDaemon(baseUrl = 'http://127.0.0.1:61234') {
|
||||
function managedDaemon(
|
||||
baseUrl = 'http://127.0.0.1:61234',
|
||||
logs: { stdoutLog?: string; stderrLog?: string } = {},
|
||||
) {
|
||||
return {
|
||||
baseUrl,
|
||||
stdoutLog: logs.stdoutLog ?? '/tmp/ktx-daemon.stdout.log',
|
||||
stderrLog: logs.stderrLog ?? '/tmp/ktx-daemon.stderr.log',
|
||||
env: {
|
||||
KTX_MANAGED_SENTENCE_TRANSFORMERS_BASE_URL: baseUrl,
|
||||
},
|
||||
|
|
@ -330,6 +335,65 @@ describe('setup embeddings step', () => {
|
|||
expect(io.stderr()).not.toContain('skip for now');
|
||||
});
|
||||
|
||||
it('prints the recent daemon stderr tail when local embedding health check fails', async () => {
|
||||
const io = makeIo();
|
||||
const stderrLog = join(tempDir, '.ktx', 'runtime', 'daemon.stderr.log');
|
||||
await mkdir(join(tempDir, '.ktx', 'runtime'), { recursive: true });
|
||||
await writeFile(
|
||||
stderrLog,
|
||||
Array.from({ length: 45 }, (_value, index) => `daemon traceback line ${index + 1}`).join('\n'),
|
||||
);
|
||||
|
||||
const result = await runKtxSetupEmbeddingsStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
cliVersion: '0.2.0',
|
||||
runtimeInstallPolicy: 'auto',
|
||||
skipEmbeddings: false,
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
env: {},
|
||||
ensureLocalEmbeddings: vi.fn(async () => managedDaemon('http://127.0.0.1:61234', { stderrLog })),
|
||||
healthCheck: vi.fn(async () => ({ ok: false as const, message: 'HTTP 500' })),
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(io.stderr()).toContain('Recent local embeddings daemon stderr:');
|
||||
expect(io.stderr()).toContain('daemon traceback line 6');
|
||||
expect(io.stderr()).toContain('daemon traceback line 45');
|
||||
expect(io.stderr()).not.toContain('daemon traceback line 5');
|
||||
});
|
||||
|
||||
it('does not print daemon stderr diagnostics when the log is unavailable or empty', async () => {
|
||||
const io = makeIo();
|
||||
|
||||
const result = await runKtxSetupEmbeddingsStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
cliVersion: '0.2.0',
|
||||
runtimeInstallPolicy: 'auto',
|
||||
skipEmbeddings: false,
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
env: {},
|
||||
ensureLocalEmbeddings: vi.fn(async () =>
|
||||
managedDaemon('http://127.0.0.1:61234', {
|
||||
stderrLog: join(tempDir, '.ktx', 'runtime', 'missing.stderr.log'),
|
||||
}),
|
||||
),
|
||||
healthCheck: vi.fn(async () => ({ ok: false as const, message: 'HTTP 500' })),
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(io.stderr()).not.toContain('Recent local embeddings daemon stderr:');
|
||||
});
|
||||
|
||||
it('uses fixed OpenAI defaults and only asks for credentials when OpenAI is selected', async () => {
|
||||
const io = makeIo();
|
||||
const healthCheck = vi.fn(async () => ({ ok: true as const }));
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { writeFile } from 'node:fs/promises';
|
||||
import { readFile, writeFile } from 'node:fs/promises';
|
||||
import { resolveKtxConfigReference } from '@ktx/context/core';
|
||||
import {
|
||||
type KtxProjectConfig,
|
||||
|
|
@ -59,6 +59,7 @@ export interface KtxSetupEmbeddingsDeps {
|
|||
healthCheck?: (config: KtxEmbeddingConfig) => Promise<KtxEmbeddingHealthCheckResult>;
|
||||
ensureLocalEmbeddings?: (options: {
|
||||
cliVersion: string;
|
||||
projectDir: string;
|
||||
installPolicy: KtxManagedPythonInstallPolicy;
|
||||
io: KtxCliIo;
|
||||
}) => Promise<ManagedLocalEmbeddingsDaemon>;
|
||||
|
|
@ -85,6 +86,7 @@ const EMBEDDING_OPTION_PROMPT_CONTEXT =
|
|||
'KTX uses embeddings for semantic search over semantic-layer sources, wiki context, schema metadata, ' +
|
||||
'and relationship evidence.';
|
||||
const LOCAL_EMBEDDING_HEALTH_TIMEOUT_MS = 120_000;
|
||||
const LOCAL_EMBEDDING_STDERR_TAIL_LINES = 40;
|
||||
|
||||
function createPromptAdapter(): KtxSetupEmbeddingsPromptAdapter {
|
||||
return createKtxSetupPromptAdapter({ selectCancelValue: 'back' });
|
||||
|
|
@ -286,14 +288,33 @@ async function chooseEmbeddingBackend(
|
|||
return 'back';
|
||||
}
|
||||
|
||||
function localEmbeddingSetupMessage(message: string): string {
|
||||
return [
|
||||
async function readLocalEmbeddingDaemonStderrTail(stderrLog: string | undefined): Promise<string[]> {
|
||||
if (!stderrLog) {
|
||||
return [];
|
||||
}
|
||||
try {
|
||||
const lines = (await readFile(stderrLog, 'utf8'))
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trimEnd())
|
||||
.filter((line) => line.trim().length > 0);
|
||||
return lines.slice(-LOCAL_EMBEDDING_STDERR_TAIL_LINES);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
function localEmbeddingSetupMessage(message: string, stderrTail: string[] = []): string {
|
||||
const lines = [
|
||||
`Local embedding health check failed: ${message}`,
|
||||
'Local embeddings use the KTX-managed Python runtime.',
|
||||
'Prepare the runtime with: ktx dev runtime start --feature local-embeddings',
|
||||
'Use --yes with setup to install and start the runtime without prompting.',
|
||||
'The first run may download Python packages and the all-MiniLM-L6-v2 model.',
|
||||
].join('\n');
|
||||
];
|
||||
if (stderrTail.length > 0) {
|
||||
lines.push('Recent local embeddings daemon stderr:', ...stderrTail);
|
||||
}
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
async function promptAfterLocalEmbeddingFailure(
|
||||
|
|
@ -447,9 +468,13 @@ export async function runKtxSetupEmbeddingsStep(
|
|||
}
|
||||
|
||||
progress.fail('Embedding test failed');
|
||||
const stderrTail =
|
||||
selectedBackend === 'sentence-transformers'
|
||||
? await readLocalEmbeddingDaemonStderrTail(managedLocalEmbeddings?.stderrLog)
|
||||
: [];
|
||||
io.stderr.write(
|
||||
selectedBackend === 'sentence-transformers'
|
||||
? `${localEmbeddingSetupMessage(health.message)}\n`
|
||||
? `${localEmbeddingSetupMessage(health.message, stderrTail)}\n`
|
||||
: `Embedding health check failed: ${health.message}\n`,
|
||||
);
|
||||
if (args.inputMode === 'disabled') {
|
||||
|
|
|
|||
|
|
@ -101,6 +101,8 @@ describe('runKtxSetupRuntimeStep', () => {
|
|||
const io = makeIo();
|
||||
const ensureLocalEmbeddings = vi.fn(async () => ({
|
||||
baseUrl: 'http://127.0.0.1:61234',
|
||||
stdoutLog: join(tempDir, '.ktx', 'runtime', 'daemon.stdout.log'),
|
||||
stderrLog: join(tempDir, '.ktx', 'runtime', 'daemon.stderr.log'),
|
||||
env: { KTX_MANAGED_SENTENCE_TRANSFORMERS_BASE_URL: 'http://127.0.0.1:61234' },
|
||||
}));
|
||||
const config: KtxProjectConfig = {
|
||||
|
|
|
|||
|
|
@ -111,12 +111,12 @@ describe('createKtxEmbeddingProvider', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('falls back to one-shot ktx-daemon inference when the local HTTP daemon is unavailable', async () => {
|
||||
const fetch = vi.fn().mockRejectedValue(new TypeError('fetch failed'));
|
||||
const runSentenceTransformersJson = vi
|
||||
it('reports local HTTP daemon failures without a ktx-daemon spawn fallback cascade', async () => {
|
||||
const fetch = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ embedding: [0.1, 0.2] })
|
||||
.mockResolvedValueOnce({ embeddings: [[0.3, 0.4], [0.5, 0.6]] });
|
||||
.mockResolvedValue(
|
||||
new Response('Embedding compute failed: httpx.InvalidURL: Invalid port', { status: 500 }),
|
||||
);
|
||||
|
||||
const provider = createKtxEmbeddingProvider(
|
||||
{
|
||||
|
|
@ -125,19 +125,13 @@ describe('createKtxEmbeddingProvider', () => {
|
|||
dimensions: 2,
|
||||
sentenceTransformers: { baseURL: 'http://127.0.0.1:8765', pathPrefix: '' },
|
||||
},
|
||||
{ fetch, runSentenceTransformersJson },
|
||||
{ fetch },
|
||||
);
|
||||
|
||||
await expect(provider.embedMany(['hello', 'world'])).resolves.toEqual([
|
||||
[0.3, 0.4],
|
||||
[0.5, 0.6],
|
||||
]);
|
||||
await expect(provider.embed('hello')).rejects.toThrow(
|
||||
'Embedding provider sentence-transformers request failed with HTTP 500: Embedding compute failed: httpx.InvalidURL: Invalid port',
|
||||
);
|
||||
await expect(provider.embed('hello')).rejects.not.toThrow('ktx-daemon fallback failed');
|
||||
expect(fetch).toHaveBeenCalledTimes(1);
|
||||
expect(runSentenceTransformersJson).toHaveBeenNthCalledWith(1, 'embedding-compute', {
|
||||
text: '__ktx_embedding_probe__',
|
||||
});
|
||||
expect(runSentenceTransformersJson).toHaveBeenNthCalledWith(2, 'embedding-compute-bulk', {
|
||||
texts: ['hello', 'world'],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,15 +1,7 @@
|
|||
import { spawn } from 'node:child_process';
|
||||
import { join } from 'node:path';
|
||||
import OpenAI from 'openai';
|
||||
import type { KtxEmbeddingConfig, KtxEmbeddingProvider } from './types.js';
|
||||
|
||||
type FetchFn = typeof fetch;
|
||||
type SentenceTransformersCommand = 'embedding-compute' | 'embedding-compute-bulk';
|
||||
type SentenceTransformersJsonRunner = (
|
||||
subcommand: SentenceTransformersCommand,
|
||||
payload: Record<string, unknown>,
|
||||
) => Promise<Record<string, unknown>>;
|
||||
type SentenceTransformersProcessCommand = { command: string; args: string[] };
|
||||
|
||||
export interface KtxEmbeddingProviderDeps {
|
||||
createOpenAIClient?: (options: { apiKey?: string; baseURL?: string }) => {
|
||||
|
|
@ -23,14 +15,10 @@ export interface KtxEmbeddingProviderDeps {
|
|||
};
|
||||
};
|
||||
fetch?: FetchFn;
|
||||
runSentenceTransformersJson?: SentenceTransformersJsonRunner;
|
||||
sentenceTransformersCommand?: string;
|
||||
sentenceTransformersArgs?: string[];
|
||||
sentenceTransformersCwd?: string;
|
||||
sentenceTransformersEnv?: NodeJS.ProcessEnv;
|
||||
}
|
||||
|
||||
const DEFAULT_BATCH_SIZE = 100;
|
||||
const HTTP_ERROR_BODY_MAX_LENGTH = 2_000;
|
||||
|
||||
function assertNonEmptyText(text: string): void {
|
||||
if (!text.trim()) {
|
||||
|
|
@ -69,110 +57,12 @@ function joinUrl(baseURL: string, pathPrefix: string, path: string): string {
|
|||
return prefix ? `${base}/${prefix}/${suffix}` : `${base}/${suffix}`;
|
||||
}
|
||||
|
||||
function errorText(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.cause
|
||||
? `${error.name}: ${error.message}; cause: ${errorText(error.cause)}`
|
||||
: `${error.name}: ${error.message}`;
|
||||
function boundedHttpBody(text: string): string {
|
||||
const normalized = text.trim();
|
||||
if (normalized.length <= HTTP_ERROR_BODY_MAX_LENGTH) {
|
||||
return normalized;
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function parseJsonObject(raw: string, subcommand: string): Record<string, unknown> {
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
|
||||
throw new Error(`ktx-daemon ${subcommand} returned non-object JSON`);
|
||||
}
|
||||
return parsed as Record<string, unknown>;
|
||||
}
|
||||
|
||||
function isCommandNotFound(error: unknown): boolean {
|
||||
return (
|
||||
error instanceof Error &&
|
||||
('code' in error || 'errno' in error) &&
|
||||
((error as { code?: unknown }).code === 'ENOENT' || (error as { errno?: unknown }).errno === 'ENOENT')
|
||||
);
|
||||
}
|
||||
|
||||
function defaultSentenceTransformersProcessCommands(): SentenceTransformersProcessCommand[] {
|
||||
const venvBin =
|
||||
process.platform === 'win32' ? join('.venv', 'Scripts', 'ktx-daemon.exe') : join('.venv', 'bin', 'ktx-daemon');
|
||||
const repoVenvBin =
|
||||
process.platform === 'win32'
|
||||
? join('ktx', '.venv', 'Scripts', 'ktx-daemon.exe')
|
||||
: join('ktx', '.venv', 'bin', 'ktx-daemon');
|
||||
return [
|
||||
{ command: 'ktx-daemon', args: [] },
|
||||
{ command: venvBin, args: [] },
|
||||
{ command: repoVenvBin, args: [] },
|
||||
];
|
||||
}
|
||||
|
||||
function runSentenceTransformersProcessCommand(
|
||||
options: SentenceTransformersProcessCommand & {
|
||||
cwd?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
},
|
||||
): SentenceTransformersJsonRunner {
|
||||
return async (
|
||||
subcommand: SentenceTransformersCommand,
|
||||
payload: Record<string, unknown>,
|
||||
): Promise<Record<string, unknown>> =>
|
||||
new Promise((resolve, reject) => {
|
||||
const child = spawn(options.command, [...options.args, subcommand], {
|
||||
cwd: options.cwd,
|
||||
env: { ...process.env, ...options.env },
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
const stdout: Buffer[] = [];
|
||||
const stderr: Buffer[] = [];
|
||||
|
||||
child.stdout.on('data', (chunk: Buffer) => stdout.push(chunk));
|
||||
child.stderr.on('data', (chunk: Buffer) => stderr.push(chunk));
|
||||
child.on('error', reject);
|
||||
child.on('close', (code) => {
|
||||
const stdoutText = Buffer.concat(stdout).toString('utf8').trim();
|
||||
const stderrText = Buffer.concat(stderr).toString('utf8').trim();
|
||||
if (code !== 0) {
|
||||
reject(new Error(`ktx-daemon ${subcommand} failed: ${stderrText || `exit code ${code}`}`));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
resolve(parseJsonObject(stdoutText, subcommand));
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
child.stdin.end(`${JSON.stringify(payload)}\n`);
|
||||
});
|
||||
}
|
||||
|
||||
function runSentenceTransformersProcessJson(options: {
|
||||
commands: SentenceTransformersProcessCommand[];
|
||||
cwd?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): SentenceTransformersJsonRunner {
|
||||
return async (
|
||||
subcommand: SentenceTransformersCommand,
|
||||
payload: Record<string, unknown>,
|
||||
): Promise<Record<string, unknown>> => {
|
||||
const errors: string[] = [];
|
||||
for (const command of options.commands) {
|
||||
try {
|
||||
return await runSentenceTransformersProcessCommand({
|
||||
...command,
|
||||
cwd: options.cwd,
|
||||
env: options.env,
|
||||
})(subcommand, payload);
|
||||
} catch (error) {
|
||||
errors.push(`${command.command}: ${errorText(error)}`);
|
||||
if (!isCommandNotFound(error)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new Error(`ktx-daemon ${subcommand} failed: ${errors.join('; ')}`);
|
||||
};
|
||||
return `${normalized.slice(0, HTTP_ERROR_BODY_MAX_LENGTH)}...`;
|
||||
}
|
||||
|
||||
class OpenAIEmbeddingProvider implements KtxEmbeddingProvider {
|
||||
|
|
@ -228,9 +118,7 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
|
|||
private readonly fetch: FetchFn;
|
||||
private readonly baseURL: string;
|
||||
private readonly pathPrefix: string;
|
||||
private readonly runJson: SentenceTransformersJsonRunner;
|
||||
private readonly startupProbe: Promise<void>;
|
||||
private useProcessRunner = false;
|
||||
|
||||
constructor(config: KtxEmbeddingConfig, deps: KtxEmbeddingProviderDeps) {
|
||||
if (!config.sentenceTransformers?.baseURL) {
|
||||
|
|
@ -241,15 +129,6 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
|
|||
this.fetch = deps.fetch ?? fetch;
|
||||
this.baseURL = config.sentenceTransformers.baseURL;
|
||||
this.pathPrefix = config.sentenceTransformers.pathPrefix ?? '/api';
|
||||
this.runJson =
|
||||
deps.runSentenceTransformersJson ??
|
||||
runSentenceTransformersProcessJson({
|
||||
commands: deps.sentenceTransformersCommand
|
||||
? [{ command: deps.sentenceTransformersCommand, args: deps.sentenceTransformersArgs ?? [] }]
|
||||
: defaultSentenceTransformersProcessCommands(),
|
||||
cwd: deps.sentenceTransformersCwd,
|
||||
env: deps.sentenceTransformersEnv,
|
||||
});
|
||||
this.startupProbe = this.requestSingle('__ktx_embedding_probe__').then((embedding) => {
|
||||
assertVectorDimensions(embedding, this.dimensions, 'sentence-transformers');
|
||||
});
|
||||
|
|
@ -264,7 +143,7 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
|
|||
async embedMany(texts: string[]): Promise<number[][]> {
|
||||
assertBatchSize(texts, this.maxBatchSize);
|
||||
await this.startupProbe;
|
||||
const response = await this.requestJson('embedding-compute-bulk', '/embeddings/compute-bulk', { texts });
|
||||
const response = await this.requestJson('/embeddings/compute-bulk', { texts });
|
||||
if (
|
||||
!response ||
|
||||
typeof response !== 'object' ||
|
||||
|
|
@ -285,37 +164,15 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
|
|||
}
|
||||
|
||||
private async requestSingle(text: string): Promise<number[]> {
|
||||
const response = await this.requestJson('embedding-compute', '/embeddings/compute', { text });
|
||||
const response = await this.requestJson('/embeddings/compute', { text });
|
||||
if (!response || typeof response !== 'object' || !('embedding' in response) || !Array.isArray(response.embedding)) {
|
||||
throw new Error('Embedding provider sentence-transformers returned malformed single response');
|
||||
}
|
||||
return response.embedding;
|
||||
}
|
||||
|
||||
private async requestJson(
|
||||
command: SentenceTransformersCommand,
|
||||
path: string,
|
||||
body: Record<string, unknown>,
|
||||
): Promise<Record<string, unknown>> {
|
||||
if (this.useProcessRunner) {
|
||||
return this.runJson(command, body);
|
||||
}
|
||||
|
||||
try {
|
||||
return await this.postJson(path, body);
|
||||
} catch (httpError) {
|
||||
try {
|
||||
const response = await this.runJson(command, body);
|
||||
this.useProcessRunner = true;
|
||||
return response;
|
||||
} catch (processError) {
|
||||
throw new Error(
|
||||
`Embedding provider sentence-transformers local HTTP request failed (${errorText(
|
||||
httpError,
|
||||
)}) and ktx-daemon fallback failed (${errorText(processError)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
private async requestJson(path: string, body: Record<string, unknown>): Promise<Record<string, unknown>> {
|
||||
return await this.postJson(path, body);
|
||||
}
|
||||
|
||||
private async postJson(path: string, body: Record<string, unknown>): Promise<Record<string, unknown>> {
|
||||
|
|
@ -325,7 +182,12 @@ class SentenceTransformersEmbeddingProvider implements KtxEmbeddingProvider {
|
|||
body: JSON.stringify(body),
|
||||
});
|
||||
if (!response.ok) {
|
||||
throw new Error(`Embedding provider sentence-transformers request failed with HTTP ${response.status}`);
|
||||
const bodyText = boundedHttpBody(await response.text());
|
||||
throw new Error(
|
||||
`Embedding provider sentence-transformers request failed with HTTP ${response.status}${
|
||||
bodyText ? `: ${bodyText}` : ''
|
||||
}`,
|
||||
);
|
||||
}
|
||||
const parsed = (await response.json()) as unknown;
|
||||
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue