feat(cli): add text ingest command (#72)

This commit is contained in:
Andrey Avtomonov 2026-05-13 19:32:49 +02:00 committed by GitHub
parent be77c3c0bb
commit d1b5936441
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 850 additions and 11 deletions

View file

@ -316,6 +316,10 @@ export function buildKtxProgram(options: BuildKtxProgramOptions): Command {
registerIngestCommands(program, context, {
runIngestWithProgress: async (ingestArgs, ingestIo, ingestDeps, defaultRunIngest) =>
await (ingestDeps.ingest ?? defaultRunIngest)(ingestArgs, ingestIo),
runTextIngest: async (textIngestArgs, ingestIo, ingestDeps) => {
const { runKtxTextIngest } = await import('./text-ingest.js');
return await (ingestDeps.textIngest ?? runKtxTextIngest)(textIngestArgs, ingestIo);
},
});
registerScanCommands(program, context);
registerWikiCommands(program, context);

View file

@ -9,6 +9,7 @@ import type { KtxScanArgs } from './scan.js';
import type { KtxSetupArgs } from './setup.js';
import type { KtxSlArgs } from './sl.js';
import { profileMark, profileSpan } from './startup-profile.js';
import type { KtxTextIngestArgs } from './text-ingest.js';
profileMark('module:cli-runtime');
@ -30,6 +31,7 @@ export interface KtxCliDeps {
connection?: (args: KtxConnectionArgs, io: KtxCliIo) => Promise<number>;
doctor?: (args: KtxDoctorArgs, io: KtxCliIo) => Promise<number>;
ingest?: (args: KtxIngestArgs, io: KtxCliIo) => Promise<number>;
textIngest?: (args: KtxTextIngestArgs, io: KtxCliIo) => Promise<number>;
runtime?: (args: KtxRuntimeArgs, io: KtxCliIo) => Promise<number>;
scan?: (args: KtxScanArgs, io: KtxCliIo) => Promise<number>;
knowledge?: (args: KtxKnowledgeArgs, io: KtxCliIo) => Promise<number>;

View file

@ -1,10 +1,11 @@
import { resolve } from 'node:path';
import { type Command, Option } from '@commander-js/extra-typings';
import { type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js';
import { collectOption, type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js';
import type { KtxCliDeps, KtxCliIo } from '../index.js';
import type { KtxIngestArgs, KtxIngestOutputMode } from '../ingest.js';
import { runtimeInstallPolicyFromFlags } from '../managed-python-command.js';
import { profileMark } from '../startup-profile.js';
import type { KtxTextIngestArgs } from '../text-ingest.js';
profileMark('module:commands/ingest-commands');
@ -15,6 +16,7 @@ interface IngestCommandOptions {
deps: KtxCliDeps,
defaultRunIngest: (args: KtxIngestArgs, io: KtxCliIo) => Promise<number>,
) => Promise<number>;
runTextIngest: (args: KtxTextIngestArgs, io: KtxCliIo, deps: KtxCliDeps) => Promise<number>;
}
function outputMode(options: OutputModeOptions): KtxIngestOutputMode {
@ -101,6 +103,33 @@ export function registerIngestCommands(
);
});
ingest
.command('text')
.description('Ingest free-form text artifacts into KTX memory')
.argument('[files...]', 'Files to ingest; use - to read one item from stdin')
.option('--text <content>', 'Text content to ingest; repeat for a batch', collectOption, [])
.option('--connection-id <connectionId>', 'Optional KTX connection id for semantic-layer capture')
.option('--user-id <id>', 'Memory user id for capture attribution', 'local-cli')
.option('--json', 'Print JSON output')
.option('--fail-fast', 'Stop after the first failed text item', false)
.action(async (files: string[], options, command) => {
context.setExitCode(
await commandOptions.runTextIngest(
{
projectDir: resolveCommandProjectDir(command),
texts: options.text,
files,
...(options.connectionId ? { connectionId: options.connectionId } : {}),
userId: options.userId,
json: options.json === true,
failFast: options.failFast === true,
},
context.io,
context.deps,
),
);
});
ingest
.command('status')
.description('Print status for the latest or selected stored local ingest run or report file')

View file

@ -158,6 +158,30 @@ describe('renderContextBuildView', () => {
expect(output).toContain('dbt-main');
});
it('supports text ingest labels while preserving the shared compact progress view', () => {
const state = initViewState([
{ connectionId: 'text-1', driver: 'text', operation: 'source-ingest', debugCommand: '', steps: ['memory-update'] },
{ connectionId: 'schema.md', driver: 'text', operation: 'source-ingest', debugCommand: '', steps: ['memory-update'] },
]);
state.contextSources[0].status = 'running';
state.contextSources[0].detailLine = 'capturing...';
const output = renderContextBuildView(state, {
styled: false,
title: 'Ingesting text memory',
contextGroupLabel: 'Texts',
sourceIngestRunningText: 'capturing...',
completedItemName: { singular: 'text', plural: 'texts' },
});
expect(output).toContain('Ingesting text memory');
expect(output).toContain('Texts:');
expect(output).toContain('text-1');
expect(output).toContain('schema.md');
expect(output).toContain('capturing...');
expect(output).not.toContain('Context sources:');
});
it('renders header with total elapsed time when set', () => {
const state = initViewState([
{ connectionId: 'warehouse', driver: 'postgres', operation: 'scan', debugCommand: '', steps: ['scan'] },

View file

@ -65,6 +65,24 @@ export interface ContextBuildSourceProgressUpdate {
summaryText?: string;
}
interface CompletedItemName {
singular: string;
plural: string;
}
interface ContextBuildRenderOptions {
styled?: boolean;
showHint?: boolean;
hintText?: string;
projectDir?: string;
title?: string;
primaryGroupLabel?: string;
contextGroupLabel?: string;
scanRunningText?: string;
sourceIngestRunningText?: string;
completedItemName?: CompletedItemName;
}
export interface ContextBuildDeps {
executeTarget?: typeof executePublicIngestTarget;
now?: () => number;
@ -148,7 +166,7 @@ function staleProgressText(target: ContextBuildTargetState, styled: boolean): st
return styled ? dim(text) : text;
}
function targetDetail(target: ContextBuildTargetState, styled: boolean): string {
function targetDetail(target: ContextBuildTargetState, styled: boolean, options: ContextBuildRenderOptions): string {
if (target.status === 'done') {
const parts: string[] = [];
if (target.summaryText) parts.push(target.summaryText);
@ -162,7 +180,9 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string
if (target.status === 'running') {
const percent = extractPercent(target.detailLine);
const progressText = target.detailLine?.replace(/^\[\d+%\]\s*/, '')
?? (target.target.operation === 'scan' ? 'scanning...' : 'ingesting...');
?? (target.target.operation === 'scan'
? (options.scanRunningText ?? 'scanning...')
: (options.sourceIngestRunningText ?? 'ingesting...'));
const elapsed = target.elapsedMs > 0 ? `(${formatDuration(target.elapsedMs)})` : null;
const parts: string[] = [];
if (percent !== null) {
@ -182,8 +202,14 @@ function columnWidth(state: ContextBuildViewState): number {
return Math.max(12, ...all.map((t) => t.target.connectionId.length)) + 2;
}
function renderTargetLine(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string {
return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled)}`;
function renderTargetLine(
target: ContextBuildTargetState,
frame: number,
styled: boolean,
width: number,
options: ContextBuildRenderOptions,
): string {
return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled, options)}`;
}
function renderTargetGroup(
@ -192,9 +218,10 @@ function renderTargetGroup(
frame: number,
styled: boolean,
width: number,
options: ContextBuildRenderOptions,
): string[] {
if (targets.length === 0) return [];
return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width))];
return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width, options))];
}
function resumeCommand(projectDir?: string): string {
@ -203,7 +230,7 @@ function resumeCommand(projectDir?: string): string {
export function renderContextBuildView(
state: ContextBuildViewState,
options: { styled?: boolean; showHint?: boolean; hintText?: string; projectDir?: string } = {},
options: ContextBuildRenderOptions = {},
): string {
const styled = options.styled ?? true;
const width = columnWidth(state);
@ -213,7 +240,7 @@ export function renderContextBuildView(
const hasActive = allTargets.some((t) => t.status === 'running' || t.status === 'queued');
const allDone = totalCount > 0 && !hasActive;
const headerParts = ['Building KTX context'];
const headerParts = [options.title ?? 'Building KTX context'];
if (totalCount > 0) {
const progressParts: string[] = [`${doneCount}/${totalCount}`];
if (state.totalElapsedMs > 0) progressParts.push(formatDuration(state.totalElapsedMs));
@ -229,13 +256,14 @@ export function renderContextBuildView(
header,
separator,
...(options.projectDir ? [` Project: ${options.projectDir}`] : []),
...renderTargetGroup('Primary sources', state.primarySources, state.frame, styled, width),
...renderTargetGroup('Context sources', state.contextSources, state.frame, styled, width),
...renderTargetGroup(options.primaryGroupLabel ?? 'Primary sources', state.primarySources, state.frame, styled, width, options),
...renderTargetGroup(options.contextGroupLabel ?? 'Context sources', state.contextSources, state.frame, styled, width, options),
'',
];
if (allDone && state.totalElapsedMs > 0) {
const sourcesLabel = totalCount === 1 ? '1 source' : `${totalCount} sources`;
const itemName = options.completedItemName ?? { singular: 'source', plural: 'sources' };
const sourcesLabel = totalCount === 1 ? `1 ${itemName.singular}` : `${totalCount} ${itemName.plural}`;
const summary = ` Done in ${formatDuration(state.totalElapsedMs)} · ${sourcesLabel} processed`;
lines.push(styled ? green(summary) : summary);
lines.push('');

View file

@ -734,14 +734,73 @@ describe('runKtxCli', () => {
expect(testIo.stdout()).toContain('Usage: ktx ingest [options] [command]');
expect(testIo.stdout()).toContain('Run or inspect local ingest memory-flow output');
expect(testIo.stdout()).toContain('run');
expect(testIo.stdout()).toContain('text');
expect(testIo.stdout()).toContain('status');
expect(testIo.stdout()).toContain('watch');
expect(testIo.stdout()).toContain('replay');
expect(testIo.stdout()).not.toContain('--manifest');
expect(testIo.stdout()).not.toContain('--all');
expect(testIo.stderr()).toBe('');
expect(ingest).not.toHaveBeenCalled();
});
it('routes text memory ingest through Commander without exposing chat ids', async () => {
const textIngest = vi.fn(async () => 0);
const testIo = makeIo();
await expect(
runKtxCli(
[
'--project-dir',
tempDir,
'ingest',
'text',
'--text',
'Revenue means gross receipts.',
'--text',
'Orders are completed purchases.',
'--connection-id',
'warehouse',
'--user-id',
'agent',
'--json',
'--fail-fast',
],
testIo.io,
{ textIngest },
),
).resolves.toBe(0);
expect(textIngest).toHaveBeenCalledWith(
{
projectDir: tempDir,
texts: ['Revenue means gross receipts.', 'Orders are completed purchases.'],
files: [],
connectionId: 'warehouse',
userId: 'agent',
json: true,
failFast: true,
},
testIo.io,
);
expect(testIo.stderr()).toBe('');
});
it('documents text ingest inputs without a manifest option', async () => {
const textIngest = vi.fn(async () => 0);
const testIo = makeIo();
await expect(runKtxCli(['ingest', 'text', '--help'], testIo.io, { textIngest })).resolves.toBe(0);
expect(testIo.stdout()).toContain('Usage: ktx ingest text [options] [files...]');
expect(testIo.stdout()).toContain('--text <content>');
expect(testIo.stdout()).toContain('--connection-id <connectionId>');
expect(testIo.stdout()).toContain('--user-id <id>');
expect(testIo.stdout()).toContain('--fail-fast');
expect(testIo.stdout()).not.toContain('--manifest');
expect(textIngest).not.toHaveBeenCalled();
});
it('routes ingest run at the top level and rejects removed dev ingest', async () => {
const runIo = makeIo();
const devRunIo = makeIo();

View file

@ -0,0 +1,339 @@
import { describe, expect, it, vi } from 'vitest';
import type { MemoryCaptureStatus } from '@ktx/context/memory';
import type { KtxLocalProject } from '@ktx/context/project';
import { runKtxTextIngest, type TextMemoryCapturePort } from './text-ingest.js';
function makeIo(options: { isTTY?: boolean } = {}) {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
isTTY: options.isTTY,
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
function fakeCapture(
options: {
failRunIds?: Set<string>;
missingStatusRunIds?: Set<string>;
events?: string[];
} = {},
): TextMemoryCapturePort {
let next = 1;
return {
capture: vi.fn(async () => {
const runId = `run-${next++}`;
options.events?.push(`capture:${runId}`);
return { runId };
}),
waitForRun: vi.fn(async (runId: string) => {
options.events?.push(`wait:${runId}`);
}),
status: vi.fn(async (runId: string) => {
options.events?.push(`status:${runId}`);
if (options.missingStatusRunIds?.has(runId)) {
return null;
}
if (options.failRunIds?.has(runId)) {
return {
runId,
status: 'error',
stage: 'capturing',
done: true,
captured: { wiki: [], sl: [], xrefs: [] },
error: `${runId} failed`,
commitHash: null,
skillsLoaded: [],
signalDetected: false,
} satisfies MemoryCaptureStatus;
}
return {
runId,
status: 'done',
stage: 'capturing',
done: true,
captured: { wiki: [`wiki-${runId}`], sl: [`sl-${runId}`], xrefs: [] },
error: null,
commitHash: `commit-${runId}`,
skillsLoaded: ['wiki_capture', 'sl'],
signalDetected: true,
} satisfies MemoryCaptureStatus;
}),
};
}
function fakeProject(projectDir = '/tmp/project'): KtxLocalProject {
return { projectDir } as KtxLocalProject;
}
describe('runKtxTextIngest', () => {
it('captures repeated inline text sequentially with generated internal chat ids', async () => {
const io = makeIo();
const events: string[] = [];
const capture = fakeCapture({ events });
const createMemoryCapture = vi.fn(() => capture);
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: ['Revenue means gross receipts.', 'Orders are completed purchases.'],
files: [],
userId: 'local-cli',
json: true,
failFast: false,
},
io.io,
{
loadProject: vi.fn(async () => fakeProject()),
createMemoryCapture,
now: () => 1_700_000_000_000,
},
),
).resolves.toBe(0);
expect(createMemoryCapture).toHaveBeenCalledWith({ projectDir: '/tmp/project' });
expect(capture.capture).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
userId: 'local-cli',
chatId: 'cli-text-ingest-1700000000000-1',
userMessage: 'Ingest external text artifact "Revenue means gross receipts." into KTX memory.',
assistantMessage: 'Revenue means gross receipts.',
sourceType: 'external_ingest',
}),
);
expect(capture.capture).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
chatId: 'cli-text-ingest-1700000000000-2',
userMessage: 'Ingest external text artifact "Orders are completed purchases." into KTX memory.',
assistantMessage: 'Orders are completed purchases.',
}),
);
expect(capture.capture).not.toHaveBeenCalledWith(expect.objectContaining({ connectionId: expect.anything() }));
expect(events).toEqual(['capture:run-1', 'wait:run-1', 'status:run-1', 'capture:run-2', 'wait:run-2', 'status:run-2']);
expect(JSON.parse(io.stdout())).toMatchObject({
status: 'done',
results: [
{
label: '"Revenue means gross receipts."',
runId: 'run-1',
status: 'done',
captured: { wiki: ['wiki-run-1'], sl: ['sl-run-1'] },
},
{
label: '"Orders are completed purchases."',
runId: 'run-2',
status: 'done',
captured: { wiki: ['wiki-run-2'], sl: ['sl-run-2'] },
},
],
});
});
it('loads files and stdin as batch items and passes a global connection id', async () => {
const io = makeIo();
const capture = fakeCapture();
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: [],
files: ['/tmp/docs/revenue.md', '-'],
connectionId: 'warehouse',
userId: 'agent',
json: false,
failFast: false,
},
io.io,
{
loadProject: vi.fn(async () => fakeProject()),
createMemoryCapture: vi.fn(() => capture),
readFile: vi.fn(async (path) => `file:${path}`),
readStdin: vi.fn(async () => 'stdin content'),
now: () => 10,
},
),
).resolves.toBe(0);
expect(capture.capture).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
connectionId: 'warehouse',
userId: 'agent',
userMessage: 'Ingest external text artifact "revenue.md" into KTX memory.',
assistantMessage: 'file:/tmp/docs/revenue.md',
}),
);
expect(capture.capture).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
connectionId: 'warehouse',
userMessage: 'Ingest external text artifact "stdin" into KTX memory.',
assistantMessage: 'stdin content',
}),
);
expect(io.stdout()).toContain('Ingesting text memory');
expect(io.stdout()).toContain('Texts:');
expect(io.stdout()).toContain('revenue.md');
expect(io.stdout()).toContain('stdin');
});
it('uses bounded inline text previews as labels in plain output and capture metadata', async () => {
const io = makeIo();
const capture = fakeCapture();
const longText = `This inline note is intentionally long ${'x'.repeat(120)}`;
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: ['remember to call me Andrey', ' first line\n\tsecond line ', longText],
files: [],
userId: 'local-cli',
json: false,
failFast: false,
},
io.io,
{
loadProject: vi.fn(async () => fakeProject()),
createMemoryCapture: vi.fn(() => capture),
now: () => 10,
},
),
).resolves.toBe(0);
const output = io.stdout();
expect(output).toContain('"remember to call me Andrey"');
expect(output).toContain('"first line second line"');
expect(output).toContain('"This inline note is intentionally long xxxxxxxx..."');
expect(output).not.toContain('text-1');
expect(output).not.toContain(longText);
expect(capture.capture).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
userMessage: 'Ingest external text artifact "remember to call me Andrey" into KTX memory.',
}),
);
expect(capture.capture).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
userMessage: 'Ingest external text artifact "first line second line" into KTX memory.',
}),
);
expect(capture.capture).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
userMessage: 'Ingest external text artifact "This inline note is intentionally long xxxxxxxx..." into KTX memory.',
}),
);
});
it('continues after an item failure by default and stops when failFast is set', async () => {
const continueIo = makeIo();
const continueCapture = fakeCapture({ failRunIds: new Set(['run-1']) });
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: ['bad', 'good'],
files: [],
userId: 'local-cli',
json: true,
failFast: false,
},
continueIo.io,
{
loadProject: vi.fn(async () => fakeProject()),
createMemoryCapture: vi.fn(() => continueCapture),
},
),
).resolves.toBe(1);
expect(continueCapture.capture).toHaveBeenCalledTimes(2);
expect(JSON.parse(continueIo.stdout())).toMatchObject({
status: 'failed',
results: [
{ label: '"bad"', status: 'error', error: 'run-1 failed' },
{ label: '"good"', status: 'done' },
],
});
const failFastIo = makeIo();
const failFastCapture = fakeCapture({ failRunIds: new Set(['run-1']) });
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: ['bad', 'skipped'],
files: [],
userId: 'local-cli',
json: true,
failFast: true,
},
failFastIo.io,
{
loadProject: vi.fn(async () => fakeProject()),
createMemoryCapture: vi.fn(() => failFastCapture),
},
),
).resolves.toBe(1);
expect(failFastCapture.capture).toHaveBeenCalledTimes(1);
expect(JSON.parse(failFastIo.stdout()).results).toHaveLength(1);
});
it('rejects empty batches and empty text items', async () => {
const noInputIo = makeIo();
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: [],
files: [],
userId: 'local-cli',
json: false,
failFast: false,
},
noInputIo.io,
{ loadProject: vi.fn(), createMemoryCapture: vi.fn() },
),
).resolves.toBe(1);
expect(noInputIo.stderr()).toContain('Provide at least one text item');
const emptyIo = makeIo();
await expect(
runKtxTextIngest(
{
projectDir: '/tmp/project',
texts: [' '],
files: [],
userId: 'local-cli',
json: false,
failFast: false,
},
emptyIo.io,
{ loadProject: vi.fn(), createMemoryCapture: vi.fn() },
),
).resolves.toBe(1);
expect(emptyIo.stderr()).toContain('Text item "text-1" is empty');
});
});

View file

@ -0,0 +1,354 @@
import { readFile as fsReadFile } from 'node:fs/promises';
import { basename, resolve } from 'node:path';
import { createLocalProjectMemoryCapture, type MemoryAgentInput, type MemoryCaptureStatus } from '@ktx/context/memory';
import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project';
import type { KtxCliIo } from './cli-runtime.js';
import { createRepainter, initViewState, renderContextBuildView, type ContextBuildTargetState } from './context-build-view.js';
import { formatDuration } from './demo-metrics.js';
import type { KtxPublicIngestPlanTarget } from './public-ingest.js';
export interface KtxTextIngestArgs {
projectDir: string;
texts: string[];
files: string[];
connectionId?: string;
userId: string;
json: boolean;
failFast: boolean;
}
export interface TextMemoryCapturePort {
capture(input: MemoryAgentInput): Promise<{ runId: string }>;
waitForRun(runId: string): Promise<void>;
status(runId: string): Promise<MemoryCaptureStatus | null>;
}
interface TextIngestItem {
label: string;
content: string;
}
interface TextIngestResult {
label: string;
runId: string | null;
status: 'done' | 'error';
captured: MemoryCaptureStatus['captured'];
commitHash: string | null;
error: string | null;
}
export interface KtxTextIngestDeps {
loadProject?: (options: { projectDir: string }) => Promise<KtxLocalProject>;
createMemoryCapture?: (project: KtxLocalProject) => TextMemoryCapturePort;
readFile?: (path: string) => Promise<string>;
readStdin?: () => Promise<string>;
now?: () => number;
}
const INLINE_TEXT_LABEL_MAX_LENGTH = 50;
const ANSI_ESCAPE_PATTERN = /\x1B\[[0-?]*[ -/]*[@-~]/g;
function defaultCreateMemoryCapture(project: KtxLocalProject): TextMemoryCapturePort {
return createLocalProjectMemoryCapture(project);
}
async function defaultReadStdin(): Promise<string> {
const chunks: string[] = [];
process.stdin.setEncoding('utf-8');
for await (const chunk of process.stdin) {
chunks.push(String(chunk));
}
return chunks.join('');
}
async function defaultReadFile(path: string): Promise<string> {
return await fsReadFile(path, 'utf-8');
}
function emptyCaptured(): MemoryCaptureStatus['captured'] {
return { wiki: [], sl: [], xrefs: [] };
}
function normalizedTextPreview(content: string): string {
return content
.replace(ANSI_ESCAPE_PATTERN, '')
.replace(/[\u0000-\u001f\u007f-\u009f]/g, ' ')
.replace(/\s+/g, ' ')
.trim();
}
function truncateLabel(label: string, maxLength = INLINE_TEXT_LABEL_MAX_LENGTH): string {
const chars = Array.from(label);
if (chars.length <= maxLength) {
return label;
}
return `${chars.slice(0, maxLength - 3).join('').trimEnd()}...`;
}
function quoteInlineTextLabel(label: string): string {
return JSON.stringify(label);
}
function makeUniqueLabel(label: string, usedLabels: Set<string>): string {
if (!usedLabels.has(label)) {
return label;
}
for (let index = 2; ; index++) {
const suffix = ` (${index})`;
const candidate = `${truncateLabel(label, INLINE_TEXT_LABEL_MAX_LENGTH - suffix.length)}${suffix}`;
if (!usedLabels.has(candidate)) {
return candidate;
}
}
}
function textLabel(content: string, index: number, usedLabels: Set<string>): string {
const preview = normalizedTextPreview(content);
const baseLabel = preview.length > 0 ? quoteInlineTextLabel(truncateLabel(preview)) : `text-${index + 1}`;
return makeUniqueLabel(baseLabel, usedLabels);
}
function artifactReference(label: string): string {
return label.startsWith('"') ? label : `"${label}"`;
}
function stdinLabel(items: TextIngestItem[]): string {
if (!items.some((item) => item.label === 'stdin')) {
return 'stdin';
}
return `stdin-${items.filter((item) => item.label.startsWith('stdin')).length + 1}`;
}
async function loadItems(args: KtxTextIngestArgs, deps: KtxTextIngestDeps): Promise<TextIngestItem[]> {
const items: TextIngestItem[] = [];
const usedTextLabels = new Set<string>();
args.texts.forEach((content, index) => {
const label = textLabel(content, index, usedTextLabels);
usedTextLabels.add(label);
items.push({ label, content });
});
const readFile = deps.readFile ?? defaultReadFile;
const readStdin = deps.readStdin ?? defaultReadStdin;
for (const file of args.files) {
if (file === '-') {
items.push({ label: stdinLabel(items), content: await readStdin() });
} else {
const path = resolve(file);
items.push({ label: basename(path), content: await readFile(path) });
}
}
return items;
}
function validateItems(items: TextIngestItem[], io: KtxCliIo): boolean {
if (items.length === 0) {
io.stderr.write('Provide at least one text item with --text, a file path, or - for stdin.\n');
return false;
}
for (const item of items) {
if (item.content.trim().length === 0) {
io.stderr.write(`Text item "${item.label}" is empty.\n`);
return false;
}
}
return true;
}
function makeTarget(label: string): KtxPublicIngestPlanTarget {
return {
connectionId: label,
driver: 'text',
operation: 'source-ingest',
debugCommand: '',
steps: ['memory-update'],
};
}
function allTargets(state: ReturnType<typeof initViewState>): ContextBuildTargetState[] {
return [...state.primarySources, ...state.contextSources];
}
function renderTextIngestView(state: ReturnType<typeof initViewState>, styled: boolean): string {
return renderContextBuildView(state, {
styled,
title: 'Ingesting text memory',
contextGroupLabel: 'Texts',
sourceIngestRunningText: 'capturing...',
completedItemName: { singular: 'text', plural: 'texts' },
});
}
function summarizeCaptured(captured: MemoryCaptureStatus['captured']): string {
const parts = [
`wiki=${captured.wiki.length}`,
`sl=${captured.sl.length}`,
`xrefs=${captured.xrefs.length}`,
];
return parts.join(', ');
}
function resultFromStatus(label: string, status: MemoryCaptureStatus): TextIngestResult {
return {
label,
runId: status.runId,
status: status.status === 'done' ? 'done' : 'error',
captured: status.captured,
commitHash: status.commitHash,
error: status.error,
};
}
function errorResult(label: string, runId: string | null, error: unknown): TextIngestResult {
return {
label,
runId,
status: 'error',
captured: emptyCaptured(),
commitHash: null,
error: error instanceof Error ? error.message : String(error),
};
}
function writeJsonResult(args: KtxTextIngestArgs, results: TextIngestResult[], io: KtxCliIo): void {
io.stdout.write(
`${JSON.stringify(
{
status: results.some((result) => result.status === 'error') ? 'failed' : 'done',
projectDir: args.projectDir,
connectionId: args.connectionId ?? null,
results,
},
null,
2,
)}\n`,
);
}
function writePlainFailures(results: TextIngestResult[], io: KtxCliIo): void {
const failures = results.filter((result) => result.status === 'error');
if (failures.length === 0) {
return;
}
io.stdout.write('\nFailed text items:\n');
for (const result of failures) {
io.stdout.write(` ${result.label}: ${result.error ?? 'failed'}\n`);
}
}
export async function runKtxTextIngest(
args: KtxTextIngestArgs,
io: KtxCliIo,
deps: KtxTextIngestDeps = {},
): Promise<number> {
const items = await loadItems(args, deps);
if (!validateItems(items, io)) {
return 1;
}
const project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir });
const memoryCapture = (deps.createMemoryCapture ?? defaultCreateMemoryCapture)(project);
const now = deps.now ?? (() => Date.now());
const batchId = now();
const state = initViewState(items.map((item) => makeTarget(item.label)));
const targets = allTargets(state);
const isTTY = io.stdout.isTTY === true && args.json !== true;
const repainter = isTTY ? createRepainter(io) : null;
const results: TextIngestResult[] = [];
state.startedAt = now();
const paint = () => repainter?.paint(renderTextIngestView(state, true));
paint();
let spinnerInterval: ReturnType<typeof setInterval> | null = null;
if (repainter) {
spinnerInterval = setInterval(() => {
const current = now();
state.frame++;
state.totalElapsedMs = state.startedAt === null ? 0 : current - state.startedAt;
for (const target of targets) {
if (target.status === 'running' && target.startedAt !== null) {
target.elapsedMs = current - target.startedAt;
}
}
paint();
}, 140);
}
try {
for (let index = 0; index < items.length; index++) {
const item = items[index]!;
const target = targets[index]!;
target.status = 'running';
target.startedAt = now();
target.detailLine = 'capturing...';
target.progressUpdatedAtMs = target.startedAt;
paint();
let runId: string | null = null;
let result: TextIngestResult;
try {
const captureInput: MemoryAgentInput = {
userId: args.userId,
chatId: `cli-text-ingest-${batchId}-${index + 1}`,
userMessage: `Ingest external text artifact ${artifactReference(item.label)} into KTX memory.`,
assistantMessage: item.content.trim(),
...(args.connectionId ? { connectionId: args.connectionId } : {}),
sourceType: 'external_ingest',
};
const capture = await memoryCapture.capture(captureInput);
runId = capture.runId;
await memoryCapture.waitForRun(runId);
const status = await memoryCapture.status(runId);
if (!status) {
throw new Error(`Memory capture run "${runId}" was not found.`);
}
result = resultFromStatus(item.label, status);
} catch (error) {
result = errorResult(item.label, runId, error);
}
results.push(result);
target.elapsedMs = now() - (target.startedAt ?? now());
target.detailLine = null;
target.status = result.status === 'done' ? 'done' : 'failed';
target.summaryText = result.status === 'done' ? summarizeCaptured(result.captured) : null;
target.failureText = result.status === 'error' ? result.error : null;
paint();
if (result.status === 'error' && args.failFast) {
break;
}
}
} finally {
if (spinnerInterval) {
clearInterval(spinnerInterval);
}
}
if (state.startedAt !== null) {
state.totalElapsedMs = now() - state.startedAt;
}
if (args.json) {
writeJsonResult(args, results, io);
} else if (repainter) {
repainter.paint(renderTextIngestView(state, true));
writePlainFailures(results, io);
} else {
io.stdout.write(renderTextIngestView(state, false));
writePlainFailures(results, io);
}
if (!args.json && results.length > 0) {
const duration = state.totalElapsedMs > 0 ? ` in ${formatDuration(state.totalElapsedMs)}` : '';
const outcome = results.some((result) => result.status === 'error') ? 'finished with failures' : 'finished';
io.stdout.write(`Text memory ingest ${outcome}${duration}.\n`);
}
return results.some((result) => result.status === 'error') ? 1 : 0;
}