From d1b59364412b70ebec1f9b0d7180a5b518ac63bc Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 13 May 2026 19:32:49 +0200 Subject: [PATCH] feat(cli): add text ingest command (#72) --- packages/cli/src/cli-program.ts | 4 + packages/cli/src/cli-runtime.ts | 2 + packages/cli/src/commands/ingest-commands.ts | 31 +- packages/cli/src/context-build-view.test.ts | 24 ++ packages/cli/src/context-build-view.ts | 48 ++- packages/cli/src/index.test.ts | 59 ++++ packages/cli/src/text-ingest.test.ts | 339 ++++++++++++++++++ packages/cli/src/text-ingest.ts | 354 +++++++++++++++++++ 8 files changed, 850 insertions(+), 11 deletions(-) create mode 100644 packages/cli/src/text-ingest.test.ts create mode 100644 packages/cli/src/text-ingest.ts diff --git a/packages/cli/src/cli-program.ts b/packages/cli/src/cli-program.ts index efe2e5bb..c1935495 100644 --- a/packages/cli/src/cli-program.ts +++ b/packages/cli/src/cli-program.ts @@ -316,6 +316,10 @@ export function buildKtxProgram(options: BuildKtxProgramOptions): Command { registerIngestCommands(program, context, { runIngestWithProgress: async (ingestArgs, ingestIo, ingestDeps, defaultRunIngest) => await (ingestDeps.ingest ?? defaultRunIngest)(ingestArgs, ingestIo), + runTextIngest: async (textIngestArgs, ingestIo, ingestDeps) => { + const { runKtxTextIngest } = await import('./text-ingest.js'); + return await (ingestDeps.textIngest ?? runKtxTextIngest)(textIngestArgs, ingestIo); + }, }); registerScanCommands(program, context); registerWikiCommands(program, context); diff --git a/packages/cli/src/cli-runtime.ts b/packages/cli/src/cli-runtime.ts index 5e2430cf..2712558f 100644 --- a/packages/cli/src/cli-runtime.ts +++ b/packages/cli/src/cli-runtime.ts @@ -9,6 +9,7 @@ import type { KtxScanArgs } from './scan.js'; import type { KtxSetupArgs } from './setup.js'; import type { KtxSlArgs } from './sl.js'; import { profileMark, profileSpan } from './startup-profile.js'; +import type { KtxTextIngestArgs } from './text-ingest.js'; profileMark('module:cli-runtime'); @@ -30,6 +31,7 @@ export interface KtxCliDeps { connection?: (args: KtxConnectionArgs, io: KtxCliIo) => Promise; doctor?: (args: KtxDoctorArgs, io: KtxCliIo) => Promise; ingest?: (args: KtxIngestArgs, io: KtxCliIo) => Promise; + textIngest?: (args: KtxTextIngestArgs, io: KtxCliIo) => Promise; runtime?: (args: KtxRuntimeArgs, io: KtxCliIo) => Promise; scan?: (args: KtxScanArgs, io: KtxCliIo) => Promise; knowledge?: (args: KtxKnowledgeArgs, io: KtxCliIo) => Promise; diff --git a/packages/cli/src/commands/ingest-commands.ts b/packages/cli/src/commands/ingest-commands.ts index 5ad357e1..952b6aa0 100644 --- a/packages/cli/src/commands/ingest-commands.ts +++ b/packages/cli/src/commands/ingest-commands.ts @@ -1,10 +1,11 @@ import { resolve } from 'node:path'; import { type Command, Option } from '@commander-js/extra-typings'; -import { type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js'; +import { collectOption, type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js'; import type { KtxCliDeps, KtxCliIo } from '../index.js'; import type { KtxIngestArgs, KtxIngestOutputMode } from '../ingest.js'; import { runtimeInstallPolicyFromFlags } from '../managed-python-command.js'; import { profileMark } from '../startup-profile.js'; +import type { KtxTextIngestArgs } from '../text-ingest.js'; profileMark('module:commands/ingest-commands'); @@ -15,6 +16,7 @@ interface IngestCommandOptions { deps: KtxCliDeps, defaultRunIngest: (args: KtxIngestArgs, io: KtxCliIo) => Promise, ) => Promise; + runTextIngest: (args: KtxTextIngestArgs, io: KtxCliIo, deps: KtxCliDeps) => Promise; } function outputMode(options: OutputModeOptions): KtxIngestOutputMode { @@ -101,6 +103,33 @@ export function registerIngestCommands( ); }); + ingest + .command('text') + .description('Ingest free-form text artifacts into KTX memory') + .argument('[files...]', 'Files to ingest; use - to read one item from stdin') + .option('--text ', 'Text content to ingest; repeat for a batch', collectOption, []) + .option('--connection-id ', 'Optional KTX connection id for semantic-layer capture') + .option('--user-id ', 'Memory user id for capture attribution', 'local-cli') + .option('--json', 'Print JSON output') + .option('--fail-fast', 'Stop after the first failed text item', false) + .action(async (files: string[], options, command) => { + context.setExitCode( + await commandOptions.runTextIngest( + { + projectDir: resolveCommandProjectDir(command), + texts: options.text, + files, + ...(options.connectionId ? { connectionId: options.connectionId } : {}), + userId: options.userId, + json: options.json === true, + failFast: options.failFast === true, + }, + context.io, + context.deps, + ), + ); + }); + ingest .command('status') .description('Print status for the latest or selected stored local ingest run or report file') diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index c8dc5130..db172484 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -158,6 +158,30 @@ describe('renderContextBuildView', () => { expect(output).toContain('dbt-main'); }); + it('supports text ingest labels while preserving the shared compact progress view', () => { + const state = initViewState([ + { connectionId: 'text-1', driver: 'text', operation: 'source-ingest', debugCommand: '', steps: ['memory-update'] }, + { connectionId: 'schema.md', driver: 'text', operation: 'source-ingest', debugCommand: '', steps: ['memory-update'] }, + ]); + state.contextSources[0].status = 'running'; + state.contextSources[0].detailLine = 'capturing...'; + + const output = renderContextBuildView(state, { + styled: false, + title: 'Ingesting text memory', + contextGroupLabel: 'Texts', + sourceIngestRunningText: 'capturing...', + completedItemName: { singular: 'text', plural: 'texts' }, + }); + + expect(output).toContain('Ingesting text memory'); + expect(output).toContain('Texts:'); + expect(output).toContain('text-1'); + expect(output).toContain('schema.md'); + expect(output).toContain('capturing...'); + expect(output).not.toContain('Context sources:'); + }); + it('renders header with total elapsed time when set', () => { const state = initViewState([ { connectionId: 'warehouse', driver: 'postgres', operation: 'scan', debugCommand: '', steps: ['scan'] }, diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 38f3d674..e1f43ead 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -65,6 +65,24 @@ export interface ContextBuildSourceProgressUpdate { summaryText?: string; } +interface CompletedItemName { + singular: string; + plural: string; +} + +interface ContextBuildRenderOptions { + styled?: boolean; + showHint?: boolean; + hintText?: string; + projectDir?: string; + title?: string; + primaryGroupLabel?: string; + contextGroupLabel?: string; + scanRunningText?: string; + sourceIngestRunningText?: string; + completedItemName?: CompletedItemName; +} + export interface ContextBuildDeps { executeTarget?: typeof executePublicIngestTarget; now?: () => number; @@ -148,7 +166,7 @@ function staleProgressText(target: ContextBuildTargetState, styled: boolean): st return styled ? dim(text) : text; } -function targetDetail(target: ContextBuildTargetState, styled: boolean): string { +function targetDetail(target: ContextBuildTargetState, styled: boolean, options: ContextBuildRenderOptions): string { if (target.status === 'done') { const parts: string[] = []; if (target.summaryText) parts.push(target.summaryText); @@ -162,7 +180,9 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string if (target.status === 'running') { const percent = extractPercent(target.detailLine); const progressText = target.detailLine?.replace(/^\[\d+%\]\s*/, '') - ?? (target.target.operation === 'scan' ? 'scanning...' : 'ingesting...'); + ?? (target.target.operation === 'scan' + ? (options.scanRunningText ?? 'scanning...') + : (options.sourceIngestRunningText ?? 'ingesting...')); const elapsed = target.elapsedMs > 0 ? `(${formatDuration(target.elapsedMs)})` : null; const parts: string[] = []; if (percent !== null) { @@ -182,8 +202,14 @@ function columnWidth(state: ContextBuildViewState): number { return Math.max(12, ...all.map((t) => t.target.connectionId.length)) + 2; } -function renderTargetLine(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string { - return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled)}`; +function renderTargetLine( + target: ContextBuildTargetState, + frame: number, + styled: boolean, + width: number, + options: ContextBuildRenderOptions, +): string { + return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled, options)}`; } function renderTargetGroup( @@ -192,9 +218,10 @@ function renderTargetGroup( frame: number, styled: boolean, width: number, + options: ContextBuildRenderOptions, ): string[] { if (targets.length === 0) return []; - return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width))]; + return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width, options))]; } function resumeCommand(projectDir?: string): string { @@ -203,7 +230,7 @@ function resumeCommand(projectDir?: string): string { export function renderContextBuildView( state: ContextBuildViewState, - options: { styled?: boolean; showHint?: boolean; hintText?: string; projectDir?: string } = {}, + options: ContextBuildRenderOptions = {}, ): string { const styled = options.styled ?? true; const width = columnWidth(state); @@ -213,7 +240,7 @@ export function renderContextBuildView( const hasActive = allTargets.some((t) => t.status === 'running' || t.status === 'queued'); const allDone = totalCount > 0 && !hasActive; - const headerParts = ['Building KTX context']; + const headerParts = [options.title ?? 'Building KTX context']; if (totalCount > 0) { const progressParts: string[] = [`${doneCount}/${totalCount}`]; if (state.totalElapsedMs > 0) progressParts.push(formatDuration(state.totalElapsedMs)); @@ -229,13 +256,14 @@ export function renderContextBuildView( header, separator, ...(options.projectDir ? [` Project: ${options.projectDir}`] : []), - ...renderTargetGroup('Primary sources', state.primarySources, state.frame, styled, width), - ...renderTargetGroup('Context sources', state.contextSources, state.frame, styled, width), + ...renderTargetGroup(options.primaryGroupLabel ?? 'Primary sources', state.primarySources, state.frame, styled, width, options), + ...renderTargetGroup(options.contextGroupLabel ?? 'Context sources', state.contextSources, state.frame, styled, width, options), '', ]; if (allDone && state.totalElapsedMs > 0) { - const sourcesLabel = totalCount === 1 ? '1 source' : `${totalCount} sources`; + const itemName = options.completedItemName ?? { singular: 'source', plural: 'sources' }; + const sourcesLabel = totalCount === 1 ? `1 ${itemName.singular}` : `${totalCount} ${itemName.plural}`; const summary = ` Done in ${formatDuration(state.totalElapsedMs)} ยท ${sourcesLabel} processed`; lines.push(styled ? green(summary) : summary); lines.push(''); diff --git a/packages/cli/src/index.test.ts b/packages/cli/src/index.test.ts index 83f11639..cd635d78 100644 --- a/packages/cli/src/index.test.ts +++ b/packages/cli/src/index.test.ts @@ -734,14 +734,73 @@ describe('runKtxCli', () => { expect(testIo.stdout()).toContain('Usage: ktx ingest [options] [command]'); expect(testIo.stdout()).toContain('Run or inspect local ingest memory-flow output'); expect(testIo.stdout()).toContain('run'); + expect(testIo.stdout()).toContain('text'); expect(testIo.stdout()).toContain('status'); expect(testIo.stdout()).toContain('watch'); expect(testIo.stdout()).toContain('replay'); + expect(testIo.stdout()).not.toContain('--manifest'); expect(testIo.stdout()).not.toContain('--all'); expect(testIo.stderr()).toBe(''); expect(ingest).not.toHaveBeenCalled(); }); + it('routes text memory ingest through Commander without exposing chat ids', async () => { + const textIngest = vi.fn(async () => 0); + const testIo = makeIo(); + + await expect( + runKtxCli( + [ + '--project-dir', + tempDir, + 'ingest', + 'text', + '--text', + 'Revenue means gross receipts.', + '--text', + 'Orders are completed purchases.', + '--connection-id', + 'warehouse', + '--user-id', + 'agent', + '--json', + '--fail-fast', + ], + testIo.io, + { textIngest }, + ), + ).resolves.toBe(0); + + expect(textIngest).toHaveBeenCalledWith( + { + projectDir: tempDir, + texts: ['Revenue means gross receipts.', 'Orders are completed purchases.'], + files: [], + connectionId: 'warehouse', + userId: 'agent', + json: true, + failFast: true, + }, + testIo.io, + ); + expect(testIo.stderr()).toBe(''); + }); + + it('documents text ingest inputs without a manifest option', async () => { + const textIngest = vi.fn(async () => 0); + const testIo = makeIo(); + + await expect(runKtxCli(['ingest', 'text', '--help'], testIo.io, { textIngest })).resolves.toBe(0); + + expect(testIo.stdout()).toContain('Usage: ktx ingest text [options] [files...]'); + expect(testIo.stdout()).toContain('--text '); + expect(testIo.stdout()).toContain('--connection-id '); + expect(testIo.stdout()).toContain('--user-id '); + expect(testIo.stdout()).toContain('--fail-fast'); + expect(testIo.stdout()).not.toContain('--manifest'); + expect(textIngest).not.toHaveBeenCalled(); + }); + it('routes ingest run at the top level and rejects removed dev ingest', async () => { const runIo = makeIo(); const devRunIo = makeIo(); diff --git a/packages/cli/src/text-ingest.test.ts b/packages/cli/src/text-ingest.test.ts new file mode 100644 index 00000000..55dbe9e3 --- /dev/null +++ b/packages/cli/src/text-ingest.test.ts @@ -0,0 +1,339 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { MemoryCaptureStatus } from '@ktx/context/memory'; +import type { KtxLocalProject } from '@ktx/context/project'; +import { runKtxTextIngest, type TextMemoryCapturePort } from './text-ingest.js'; + +function makeIo(options: { isTTY?: boolean } = {}) { + let stdout = ''; + let stderr = ''; + return { + io: { + stdout: { + isTTY: options.isTTY, + write: (chunk: string) => { + stdout += chunk; + }, + }, + stderr: { + write: (chunk: string) => { + stderr += chunk; + }, + }, + }, + stdout: () => stdout, + stderr: () => stderr, + }; +} + +function fakeCapture( + options: { + failRunIds?: Set; + missingStatusRunIds?: Set; + events?: string[]; + } = {}, +): TextMemoryCapturePort { + let next = 1; + return { + capture: vi.fn(async () => { + const runId = `run-${next++}`; + options.events?.push(`capture:${runId}`); + return { runId }; + }), + waitForRun: vi.fn(async (runId: string) => { + options.events?.push(`wait:${runId}`); + }), + status: vi.fn(async (runId: string) => { + options.events?.push(`status:${runId}`); + if (options.missingStatusRunIds?.has(runId)) { + return null; + } + if (options.failRunIds?.has(runId)) { + return { + runId, + status: 'error', + stage: 'capturing', + done: true, + captured: { wiki: [], sl: [], xrefs: [] }, + error: `${runId} failed`, + commitHash: null, + skillsLoaded: [], + signalDetected: false, + } satisfies MemoryCaptureStatus; + } + return { + runId, + status: 'done', + stage: 'capturing', + done: true, + captured: { wiki: [`wiki-${runId}`], sl: [`sl-${runId}`], xrefs: [] }, + error: null, + commitHash: `commit-${runId}`, + skillsLoaded: ['wiki_capture', 'sl'], + signalDetected: true, + } satisfies MemoryCaptureStatus; + }), + }; +} + +function fakeProject(projectDir = '/tmp/project'): KtxLocalProject { + return { projectDir } as KtxLocalProject; +} + +describe('runKtxTextIngest', () => { + it('captures repeated inline text sequentially with generated internal chat ids', async () => { + const io = makeIo(); + const events: string[] = []; + const capture = fakeCapture({ events }); + const createMemoryCapture = vi.fn(() => capture); + + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: ['Revenue means gross receipts.', 'Orders are completed purchases.'], + files: [], + userId: 'local-cli', + json: true, + failFast: false, + }, + io.io, + { + loadProject: vi.fn(async () => fakeProject()), + createMemoryCapture, + now: () => 1_700_000_000_000, + }, + ), + ).resolves.toBe(0); + + expect(createMemoryCapture).toHaveBeenCalledWith({ projectDir: '/tmp/project' }); + expect(capture.capture).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + userId: 'local-cli', + chatId: 'cli-text-ingest-1700000000000-1', + userMessage: 'Ingest external text artifact "Revenue means gross receipts." into KTX memory.', + assistantMessage: 'Revenue means gross receipts.', + sourceType: 'external_ingest', + }), + ); + expect(capture.capture).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + chatId: 'cli-text-ingest-1700000000000-2', + userMessage: 'Ingest external text artifact "Orders are completed purchases." into KTX memory.', + assistantMessage: 'Orders are completed purchases.', + }), + ); + expect(capture.capture).not.toHaveBeenCalledWith(expect.objectContaining({ connectionId: expect.anything() })); + expect(events).toEqual(['capture:run-1', 'wait:run-1', 'status:run-1', 'capture:run-2', 'wait:run-2', 'status:run-2']); + expect(JSON.parse(io.stdout())).toMatchObject({ + status: 'done', + results: [ + { + label: '"Revenue means gross receipts."', + runId: 'run-1', + status: 'done', + captured: { wiki: ['wiki-run-1'], sl: ['sl-run-1'] }, + }, + { + label: '"Orders are completed purchases."', + runId: 'run-2', + status: 'done', + captured: { wiki: ['wiki-run-2'], sl: ['sl-run-2'] }, + }, + ], + }); + }); + + it('loads files and stdin as batch items and passes a global connection id', async () => { + const io = makeIo(); + const capture = fakeCapture(); + + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: [], + files: ['/tmp/docs/revenue.md', '-'], + connectionId: 'warehouse', + userId: 'agent', + json: false, + failFast: false, + }, + io.io, + { + loadProject: vi.fn(async () => fakeProject()), + createMemoryCapture: vi.fn(() => capture), + readFile: vi.fn(async (path) => `file:${path}`), + readStdin: vi.fn(async () => 'stdin content'), + now: () => 10, + }, + ), + ).resolves.toBe(0); + + expect(capture.capture).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + connectionId: 'warehouse', + userId: 'agent', + userMessage: 'Ingest external text artifact "revenue.md" into KTX memory.', + assistantMessage: 'file:/tmp/docs/revenue.md', + }), + ); + expect(capture.capture).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + connectionId: 'warehouse', + userMessage: 'Ingest external text artifact "stdin" into KTX memory.', + assistantMessage: 'stdin content', + }), + ); + expect(io.stdout()).toContain('Ingesting text memory'); + expect(io.stdout()).toContain('Texts:'); + expect(io.stdout()).toContain('revenue.md'); + expect(io.stdout()).toContain('stdin'); + }); + + it('uses bounded inline text previews as labels in plain output and capture metadata', async () => { + const io = makeIo(); + const capture = fakeCapture(); + const longText = `This inline note is intentionally long ${'x'.repeat(120)}`; + + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: ['remember to call me Andrey', ' first line\n\tsecond line ', longText], + files: [], + userId: 'local-cli', + json: false, + failFast: false, + }, + io.io, + { + loadProject: vi.fn(async () => fakeProject()), + createMemoryCapture: vi.fn(() => capture), + now: () => 10, + }, + ), + ).resolves.toBe(0); + + const output = io.stdout(); + expect(output).toContain('"remember to call me Andrey"'); + expect(output).toContain('"first line second line"'); + expect(output).toContain('"This inline note is intentionally long xxxxxxxx..."'); + expect(output).not.toContain('text-1'); + expect(output).not.toContain(longText); + + expect(capture.capture).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + userMessage: 'Ingest external text artifact "remember to call me Andrey" into KTX memory.', + }), + ); + expect(capture.capture).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + userMessage: 'Ingest external text artifact "first line second line" into KTX memory.', + }), + ); + expect(capture.capture).toHaveBeenNthCalledWith( + 3, + expect.objectContaining({ + userMessage: 'Ingest external text artifact "This inline note is intentionally long xxxxxxxx..." into KTX memory.', + }), + ); + }); + + it('continues after an item failure by default and stops when failFast is set', async () => { + const continueIo = makeIo(); + const continueCapture = fakeCapture({ failRunIds: new Set(['run-1']) }); + + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: ['bad', 'good'], + files: [], + userId: 'local-cli', + json: true, + failFast: false, + }, + continueIo.io, + { + loadProject: vi.fn(async () => fakeProject()), + createMemoryCapture: vi.fn(() => continueCapture), + }, + ), + ).resolves.toBe(1); + + expect(continueCapture.capture).toHaveBeenCalledTimes(2); + expect(JSON.parse(continueIo.stdout())).toMatchObject({ + status: 'failed', + results: [ + { label: '"bad"', status: 'error', error: 'run-1 failed' }, + { label: '"good"', status: 'done' }, + ], + }); + + const failFastIo = makeIo(); + const failFastCapture = fakeCapture({ failRunIds: new Set(['run-1']) }); + + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: ['bad', 'skipped'], + files: [], + userId: 'local-cli', + json: true, + failFast: true, + }, + failFastIo.io, + { + loadProject: vi.fn(async () => fakeProject()), + createMemoryCapture: vi.fn(() => failFastCapture), + }, + ), + ).resolves.toBe(1); + + expect(failFastCapture.capture).toHaveBeenCalledTimes(1); + expect(JSON.parse(failFastIo.stdout()).results).toHaveLength(1); + }); + + it('rejects empty batches and empty text items', async () => { + const noInputIo = makeIo(); + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: [], + files: [], + userId: 'local-cli', + json: false, + failFast: false, + }, + noInputIo.io, + { loadProject: vi.fn(), createMemoryCapture: vi.fn() }, + ), + ).resolves.toBe(1); + expect(noInputIo.stderr()).toContain('Provide at least one text item'); + + const emptyIo = makeIo(); + await expect( + runKtxTextIngest( + { + projectDir: '/tmp/project', + texts: [' '], + files: [], + userId: 'local-cli', + json: false, + failFast: false, + }, + emptyIo.io, + { loadProject: vi.fn(), createMemoryCapture: vi.fn() }, + ), + ).resolves.toBe(1); + expect(emptyIo.stderr()).toContain('Text item "text-1" is empty'); + }); +}); diff --git a/packages/cli/src/text-ingest.ts b/packages/cli/src/text-ingest.ts new file mode 100644 index 00000000..d48ee24b --- /dev/null +++ b/packages/cli/src/text-ingest.ts @@ -0,0 +1,354 @@ +import { readFile as fsReadFile } from 'node:fs/promises'; +import { basename, resolve } from 'node:path'; +import { createLocalProjectMemoryCapture, type MemoryAgentInput, type MemoryCaptureStatus } from '@ktx/context/memory'; +import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project'; +import type { KtxCliIo } from './cli-runtime.js'; +import { createRepainter, initViewState, renderContextBuildView, type ContextBuildTargetState } from './context-build-view.js'; +import { formatDuration } from './demo-metrics.js'; +import type { KtxPublicIngestPlanTarget } from './public-ingest.js'; + +export interface KtxTextIngestArgs { + projectDir: string; + texts: string[]; + files: string[]; + connectionId?: string; + userId: string; + json: boolean; + failFast: boolean; +} + +export interface TextMemoryCapturePort { + capture(input: MemoryAgentInput): Promise<{ runId: string }>; + waitForRun(runId: string): Promise; + status(runId: string): Promise; +} + +interface TextIngestItem { + label: string; + content: string; +} + +interface TextIngestResult { + label: string; + runId: string | null; + status: 'done' | 'error'; + captured: MemoryCaptureStatus['captured']; + commitHash: string | null; + error: string | null; +} + +export interface KtxTextIngestDeps { + loadProject?: (options: { projectDir: string }) => Promise; + createMemoryCapture?: (project: KtxLocalProject) => TextMemoryCapturePort; + readFile?: (path: string) => Promise; + readStdin?: () => Promise; + now?: () => number; +} + +const INLINE_TEXT_LABEL_MAX_LENGTH = 50; +const ANSI_ESCAPE_PATTERN = /\x1B\[[0-?]*[ -/]*[@-~]/g; + +function defaultCreateMemoryCapture(project: KtxLocalProject): TextMemoryCapturePort { + return createLocalProjectMemoryCapture(project); +} + +async function defaultReadStdin(): Promise { + const chunks: string[] = []; + process.stdin.setEncoding('utf-8'); + for await (const chunk of process.stdin) { + chunks.push(String(chunk)); + } + return chunks.join(''); +} + +async function defaultReadFile(path: string): Promise { + return await fsReadFile(path, 'utf-8'); +} + +function emptyCaptured(): MemoryCaptureStatus['captured'] { + return { wiki: [], sl: [], xrefs: [] }; +} + +function normalizedTextPreview(content: string): string { + return content + .replace(ANSI_ESCAPE_PATTERN, '') + .replace(/[\u0000-\u001f\u007f-\u009f]/g, ' ') + .replace(/\s+/g, ' ') + .trim(); +} + +function truncateLabel(label: string, maxLength = INLINE_TEXT_LABEL_MAX_LENGTH): string { + const chars = Array.from(label); + if (chars.length <= maxLength) { + return label; + } + return `${chars.slice(0, maxLength - 3).join('').trimEnd()}...`; +} + +function quoteInlineTextLabel(label: string): string { + return JSON.stringify(label); +} + +function makeUniqueLabel(label: string, usedLabels: Set): string { + if (!usedLabels.has(label)) { + return label; + } + + for (let index = 2; ; index++) { + const suffix = ` (${index})`; + const candidate = `${truncateLabel(label, INLINE_TEXT_LABEL_MAX_LENGTH - suffix.length)}${suffix}`; + if (!usedLabels.has(candidate)) { + return candidate; + } + } +} + +function textLabel(content: string, index: number, usedLabels: Set): string { + const preview = normalizedTextPreview(content); + const baseLabel = preview.length > 0 ? quoteInlineTextLabel(truncateLabel(preview)) : `text-${index + 1}`; + return makeUniqueLabel(baseLabel, usedLabels); +} + +function artifactReference(label: string): string { + return label.startsWith('"') ? label : `"${label}"`; +} + +function stdinLabel(items: TextIngestItem[]): string { + if (!items.some((item) => item.label === 'stdin')) { + return 'stdin'; + } + return `stdin-${items.filter((item) => item.label.startsWith('stdin')).length + 1}`; +} + +async function loadItems(args: KtxTextIngestArgs, deps: KtxTextIngestDeps): Promise { + const items: TextIngestItem[] = []; + const usedTextLabels = new Set(); + args.texts.forEach((content, index) => { + const label = textLabel(content, index, usedTextLabels); + usedTextLabels.add(label); + items.push({ label, content }); + }); + + const readFile = deps.readFile ?? defaultReadFile; + const readStdin = deps.readStdin ?? defaultReadStdin; + for (const file of args.files) { + if (file === '-') { + items.push({ label: stdinLabel(items), content: await readStdin() }); + } else { + const path = resolve(file); + items.push({ label: basename(path), content: await readFile(path) }); + } + } + + return items; +} + +function validateItems(items: TextIngestItem[], io: KtxCliIo): boolean { + if (items.length === 0) { + io.stderr.write('Provide at least one text item with --text, a file path, or - for stdin.\n'); + return false; + } + + for (const item of items) { + if (item.content.trim().length === 0) { + io.stderr.write(`Text item "${item.label}" is empty.\n`); + return false; + } + } + return true; +} + +function makeTarget(label: string): KtxPublicIngestPlanTarget { + return { + connectionId: label, + driver: 'text', + operation: 'source-ingest', + debugCommand: '', + steps: ['memory-update'], + }; +} + +function allTargets(state: ReturnType): ContextBuildTargetState[] { + return [...state.primarySources, ...state.contextSources]; +} + +function renderTextIngestView(state: ReturnType, styled: boolean): string { + return renderContextBuildView(state, { + styled, + title: 'Ingesting text memory', + contextGroupLabel: 'Texts', + sourceIngestRunningText: 'capturing...', + completedItemName: { singular: 'text', plural: 'texts' }, + }); +} + +function summarizeCaptured(captured: MemoryCaptureStatus['captured']): string { + const parts = [ + `wiki=${captured.wiki.length}`, + `sl=${captured.sl.length}`, + `xrefs=${captured.xrefs.length}`, + ]; + return parts.join(', '); +} + +function resultFromStatus(label: string, status: MemoryCaptureStatus): TextIngestResult { + return { + label, + runId: status.runId, + status: status.status === 'done' ? 'done' : 'error', + captured: status.captured, + commitHash: status.commitHash, + error: status.error, + }; +} + +function errorResult(label: string, runId: string | null, error: unknown): TextIngestResult { + return { + label, + runId, + status: 'error', + captured: emptyCaptured(), + commitHash: null, + error: error instanceof Error ? error.message : String(error), + }; +} + +function writeJsonResult(args: KtxTextIngestArgs, results: TextIngestResult[], io: KtxCliIo): void { + io.stdout.write( + `${JSON.stringify( + { + status: results.some((result) => result.status === 'error') ? 'failed' : 'done', + projectDir: args.projectDir, + connectionId: args.connectionId ?? null, + results, + }, + null, + 2, + )}\n`, + ); +} + +function writePlainFailures(results: TextIngestResult[], io: KtxCliIo): void { + const failures = results.filter((result) => result.status === 'error'); + if (failures.length === 0) { + return; + } + + io.stdout.write('\nFailed text items:\n'); + for (const result of failures) { + io.stdout.write(` ${result.label}: ${result.error ?? 'failed'}\n`); + } +} + +export async function runKtxTextIngest( + args: KtxTextIngestArgs, + io: KtxCliIo, + deps: KtxTextIngestDeps = {}, +): Promise { + const items = await loadItems(args, deps); + if (!validateItems(items, io)) { + return 1; + } + + const project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir }); + const memoryCapture = (deps.createMemoryCapture ?? defaultCreateMemoryCapture)(project); + const now = deps.now ?? (() => Date.now()); + const batchId = now(); + const state = initViewState(items.map((item) => makeTarget(item.label))); + const targets = allTargets(state); + const isTTY = io.stdout.isTTY === true && args.json !== true; + const repainter = isTTY ? createRepainter(io) : null; + const results: TextIngestResult[] = []; + + state.startedAt = now(); + const paint = () => repainter?.paint(renderTextIngestView(state, true)); + paint(); + + let spinnerInterval: ReturnType | null = null; + if (repainter) { + spinnerInterval = setInterval(() => { + const current = now(); + state.frame++; + state.totalElapsedMs = state.startedAt === null ? 0 : current - state.startedAt; + for (const target of targets) { + if (target.status === 'running' && target.startedAt !== null) { + target.elapsedMs = current - target.startedAt; + } + } + paint(); + }, 140); + } + + try { + for (let index = 0; index < items.length; index++) { + const item = items[index]!; + const target = targets[index]!; + target.status = 'running'; + target.startedAt = now(); + target.detailLine = 'capturing...'; + target.progressUpdatedAtMs = target.startedAt; + paint(); + + let runId: string | null = null; + let result: TextIngestResult; + try { + const captureInput: MemoryAgentInput = { + userId: args.userId, + chatId: `cli-text-ingest-${batchId}-${index + 1}`, + userMessage: `Ingest external text artifact ${artifactReference(item.label)} into KTX memory.`, + assistantMessage: item.content.trim(), + ...(args.connectionId ? { connectionId: args.connectionId } : {}), + sourceType: 'external_ingest', + }; + const capture = await memoryCapture.capture(captureInput); + runId = capture.runId; + await memoryCapture.waitForRun(runId); + const status = await memoryCapture.status(runId); + if (!status) { + throw new Error(`Memory capture run "${runId}" was not found.`); + } + result = resultFromStatus(item.label, status); + } catch (error) { + result = errorResult(item.label, runId, error); + } + + results.push(result); + target.elapsedMs = now() - (target.startedAt ?? now()); + target.detailLine = null; + target.status = result.status === 'done' ? 'done' : 'failed'; + target.summaryText = result.status === 'done' ? summarizeCaptured(result.captured) : null; + target.failureText = result.status === 'error' ? result.error : null; + paint(); + + if (result.status === 'error' && args.failFast) { + break; + } + } + } finally { + if (spinnerInterval) { + clearInterval(spinnerInterval); + } + } + + if (state.startedAt !== null) { + state.totalElapsedMs = now() - state.startedAt; + } + + if (args.json) { + writeJsonResult(args, results, io); + } else if (repainter) { + repainter.paint(renderTextIngestView(state, true)); + writePlainFailures(results, io); + } else { + io.stdout.write(renderTextIngestView(state, false)); + writePlainFailures(results, io); + } + + if (!args.json && results.length > 0) { + const duration = state.totalElapsedMs > 0 ? ` in ${formatDuration(state.totalElapsedMs)}` : ''; + const outcome = results.some((result) => result.status === 'error') ? 'finished with failures' : 'finished'; + io.stdout.write(`Text memory ingest ${outcome}${duration}.\n`); + } + + return results.some((result) => result.status === 'error') ? 1 : 0; +}