diff --git a/AGENTS.md b/AGENTS.md index e8062dcb..2e5a684a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -156,6 +156,19 @@ pnpm run test 2>&1 | tee /tmp/ktx-test-output.log - Do not manually edit generated or built output under `dist/`; edit source and rebuild. +### CLI Standards + +- Use Commander for CLI command trees, arguments, options, help text, custom + parsers, and async action dispatch. Prefer `@commander-js/extra-typings` for + typed command definitions, use `InvalidArgumentError` for parse failures, and + call `parseAsync` when actions await asynchronous work. +- Use `@clack/prompts` for interactive flows. Always handle cancellation with + `isCancel` plus `cancel`, stop active spinners before exiting, and keep prompts + grouped or factored so multi-step setup flows share cancellation behavior. +- Keep command behavior scriptable: prefer flags and config over prompts when + values are supplied, and reserve prompts for interactive missing input or + explicit setup flows. + ### Zod Naming Convention ```typescript diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index bf27be15..b547d185 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -1076,17 +1076,17 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - const stderr = io.stderr(); - expect(stderr).toContain('[5%] Fetching source files for warehouse/historic-sql'); - expect(stderr).toContain('[15%] Fetched 3 source files from historic-sql'); - expect(stderr).toContain('[45%] Planned 1 work unit'); - expect(stderr).toContain('[80%] Processed 1/1 work units'); - expect(stderr).toContain('[100%] Ingest completed'); - expect(io.stdout()).toContain('Report: report-live-1'); - expect(io.stdout()).not.toContain('[5%]'); + const stdout = io.stdout(); + expect(stdout).toContain('[5%] Fetching source files for warehouse/historic-sql'); + expect(stdout).toContain('[15%] Fetched 3 source files from historic-sql'); + expect(stdout).toContain('[45%] Planned 1 work unit'); + expect(stdout).toContain('[80%] Processed 1/1 work units'); + expect(stdout).toContain('[100%] Ingest completed'); + expect(stdout).toContain('Report: report-live-1'); + expect(io.stderr()).toBe(''); }); - it('writes plain TTY ingest progress to stderr and final report to stdout', async () => { + it('writes plain TTY ingest progress and final report to stdout', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); const sourceDir = join(tempDir, 'source'); @@ -1113,9 +1113,9 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(io.stderr()).toContain('[5%] Fetching source files for warehouse/fake'); + expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake'); expect(io.stdout()).toContain('Report: report-live-1'); - expect(io.stdout()).not.toContain('[5%]'); + expect(io.stderr()).toBe(''); }); it('prints plain WorkUnit step progress during long-running local ingest', async () => { @@ -1202,12 +1202,106 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - const stderr = io.stderr(); - expect(stderr).toContain('[45%] Planned 2 work units'); - expect(stderr).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders'); - expect(stderr).toContain('[58%] Processing 1/2 work units: historic-sql-table-public-orders step 7/40'); - expect(stderr).toContain('[68%] Processed 1/2 work units'); - expect(io.stdout()).not.toContain('[45%]'); + const stdout = io.stdout(); + expect(stdout).toContain('[45%] Planned 2 work units'); + expect(stdout).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders'); + expect(stdout).toContain( + '\r[58%] Processing work units: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', + ); + expect(stdout).toContain('[68%] Processed 1/2 work units'); + }); + + it('renders concurrent WorkUnit step progress as transient aggregate status', async () => { + const projectDir = join(tempDir, 'historic-sql-concurrent-progress-project'); + await mkdir(projectDir, { recursive: true }); + await writeFile( + join(projectDir, 'ktx.yaml'), + [ + 'project: historic-sql-concurrent-progress-project', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + ' minExecutions: 2', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + 'utf-8', + ); + const createdAdapters: SourceAdapter[] = [ + { source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, + ]; + const workUnitKeys = [ + 'historic-sql-table-public-orders', + 'historic-sql-table-public-customers', + 'historic-sql-table-public-line-items', + 'historic-sql-table-public-payments', + 'historic-sql-table-public-products', + 'historic-sql-table-public-suppliers', + ]; + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => { + input.memoryFlow?.update({ + plannedWorkUnits: workUnitKeys.map((unitKey) => ({ + unitKey, + rawFiles: [`tables/${unitKey}.json`], + peerFileCount: 0, + dependencyCount: 0, + })), + }); + input.memoryFlow?.emit({ + type: 'chunks_planned', + chunkCount: workUnitKeys.length, + workUnitCount: workUnitKeys.length, + evictionCount: 0, + }); + for (const unitKey of workUnitKeys) { + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey, + skills: ['historic_sql_table_digest'], + stepBudget: 40, + }); + } + for (const unitKey of workUnitKeys) { + input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, stepIndex: 1, stepBudget: 40 }); + } + input.memoryFlow?.finish('done'); + return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job'); + }); + const io = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'historic-sql', + outputMode: 'plain', + }, + io.io, + { + env: interactiveEnv(), + createAdapters: vi.fn(() => createdAdapters as never), + runLocalIngest: runLocal, + jobIdFactory: () => 'historic-concurrent-progress-job', + }, + ), + ).resolves.toBe(0); + + const stdout = io.stdout(); + expect(stdout).toContain( + '\r[56%] Processing work units: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', + ); + expect(stdout).not.toContain( + '\n[56%] Processing 6/6 work units: historic-sql-table-public-suppliers step 1/40\n', + ); + expect(stdout).toContain('\n[100%] Ingest completed\n'); }); it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => { diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 36b45a6a..571bc1ef 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -177,6 +177,19 @@ function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventInd return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length; } +function activeWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { + const active = new Set(); + for (const event of workUnitEventsThrough(snapshot, eventIndex)) { + if (event.type === 'work_unit_started') { + active.add(event.unitKey); + } + if (event.type === 'work_unit_finished') { + active.delete(event.unitKey); + } + } + return active.size; +} + function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { if (snapshot.plannedWorkUnits.length > 0) { return snapshot.plannedWorkUnits.length; @@ -200,7 +213,7 @@ function plainIngestEventProgress( event: MemoryFlowEvent, snapshot: MemoryFlowReplayInput, eventIndex: number, -): { percent: number; message: string } | null { +): { percent: number; message: string; transient?: boolean } | null { switch (event.type) { case 'source_acquired': return { @@ -230,13 +243,14 @@ function plainIngestEventProgress( case 'work_unit_step': { const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const completed = completedWorkUnitCountThrough(snapshot, eventIndex); - const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); + const active = activeWorkUnitCountThrough(snapshot, eventIndex); const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0; const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55; - const progress = total > 0 ? `${ordinal}/${total} work units: ` : ''; + const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`; return { percent, - message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`, + message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`, + transient: true, }; } case 'work_unit_finished': { @@ -282,15 +296,31 @@ function shouldWritePlainIngestProgress( function createPlainIngestProgressRenderer( args: Extract, io: KtxIngestIo, -): { start(): void; update(snapshot: MemoryFlowReplayInput): void } { +): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } { let printedEvents = 0; let lastPercent = 0; let printedCompletion = false; + let hasPendingTransient = false; - const write = (percent: number, message: string) => { + const flush = () => { + if (!hasPendingTransient) { + return; + } + io.stdout.write('\n'); + hasPendingTransient = false; + }; + + const write = (percent: number, message: string, options?: { transient?: boolean }) => { const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent))); lastPercent = nextPercent; - io.stderr.write(`[${nextPercent}%] ${message}\n`); + const line = `[${nextPercent}%] ${message}`; + if (options?.transient === true) { + io.stdout.write(`\r${line}\u001b[K`); + hasPendingTransient = true; + return; + } + flush(); + io.stdout.write(`${line}\n`); }; return { @@ -306,7 +336,7 @@ function createPlainIngestProgressRenderer( } const progress = plainIngestEventProgress(event, snapshot, eventIndex); if (progress) { - write(progress.percent, progress.message); + write(progress.percent, progress.message, progress.transient === true ? { transient: true } : undefined); } } if (!printedCompletion && snapshot.status !== 'running') { @@ -314,6 +344,7 @@ function createPlainIngestProgressRenderer( write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed'); } }, + flush, }; } @@ -567,6 +598,7 @@ export async function runKtxIngest( io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot)); return reportStatus(result.report) === 'done' ? 0 : 1; } + plainProgress?.flush(); await writeReportRecord(result.report, runOutputMode, io, { interactive: (args.inputMode ?? 'auto') === 'auto', renderStoredMemoryFlow: deps.renderStoredMemoryFlow, @@ -574,6 +606,7 @@ export async function runKtxIngest( }); return reportStatus(result.report) === 'done' ? 0 : 1; } finally { + plainProgress?.flush(); liveTui?.close(); } } diff --git a/packages/cli/src/scan.test.ts b/packages/cli/src/scan.test.ts index 525ae53d..152c8b3a 100644 --- a/packages/cli/src/scan.test.ts +++ b/packages/cli/src/scan.test.ts @@ -573,6 +573,32 @@ describe('runKtxScan', () => { expect(io.stdout()).toContain('\n[90%] Building embeddings 1/4 batches\n'); }); + it('scales nested progress phases by the parent phase weight', async () => { + const io = makeIo({ isTTY: true }); + const previousCi = process.env.CI; + delete process.env.CI; + + try { + const progress = createCliScanProgress(io.io); + await progress.update(0.82, 'Enriching schema metadata'); + const enrichmentProgress = progress.startPhase(0.18); + await enrichmentProgress.update(0.05, 'Loaded schema snapshot with 56 tables'); + const descriptionProgress = enrichmentProgress.startPhase(0.45); + await descriptionProgress.update(37 / 56, 'Generating descriptions 37/56 tables', { transient: true }); + await descriptionProgress.update(1, 'Generated descriptions for 56 tables'); + } finally { + if (previousCi === undefined) { + delete process.env.CI; + } else { + process.env.CI = previousCi; + } + } + + expect(io.stdout()).toContain('\r[88%] Generating descriptions 37/56 tables'); + expect(io.stdout()).toContain('\n[91%] Generated descriptions for 56 tables\n'); + expect(io.stdout()).not.toContain('[100%] Generating descriptions 37/56 tables'); + }); + it('flushes transient TTY progress messages before printing scan failures', async () => { await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' }); const runLocalScan = vi.fn(async (input: RunLocalScanOptions): Promise => { diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index f89a9d18..e3bda577 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -527,7 +527,7 @@ export function createCliScanProgress( io.stdout.write(`${line}\n`); }, startPhase(phaseWeight: number) { - return createCliScanProgress(io, state, state.progress, phaseWeight); + return createCliScanProgress(io, state, state.progress, weight * phaseWeight); }, flush() { if (!shouldWrite || !state.hasPendingTransient) { diff --git a/packages/context/src/scan/description-generation.test.ts b/packages/context/src/scan/description-generation.test.ts index de69fb27..70117919 100644 --- a/packages/context/src/scan/description-generation.test.ts +++ b/packages/context/src/scan/description-generation.test.ts @@ -51,6 +51,29 @@ function createLlmProvider(text = 'generated description') { } as any; } +function createFailingLlmProvider(message = 'timeout exceeded when trying to connect') { + vi.mocked(generateText).mockRejectedValue(new Error(message) as never); + return { + getModel: vi.fn().mockReturnValue({ modelId: 'claude-sonnet-4-6', provider: 'anthropic' }), + getModelByName: vi.fn(), + cacheMarker: vi.fn(), + repairToolCallHandler: vi.fn(), + thinkingProviderOptions: vi.fn(), + telemetryConfig: vi.fn(), + promptCachingConfig: vi.fn(() => ({ + enabled: false, + systemTtl: '1h', + toolsTtl: '1h', + historyTtl: '5m', + cacheSystem: true, + cacheTools: true, + cacheHistory: true, + vertexFallbackTo5m: false, + })), + activeBackend: vi.fn(() => 'anthropic'), + } as any; +} + function createConnector(): KtxScanConnector { return { id: 'test-connector', @@ -274,6 +297,51 @@ describe('KtxDescriptionGenerator', () => { expect('introspect' in sampler).toBe(false); }); + it('does not turn LLM failures into generated descriptions', async () => { + const cache = createCache(); + const connector = createConnector(); + const generator = new KtxDescriptionGenerator({ + llmProvider: createFailingLlmProvider(), + cache, + settings: { + columnMaxWords: 12, + tableMaxWords: 18, + dataSourceMaxWords: 24, + }, + }); + + const columnResult = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'status' }], + }, + }); + + await expect( + generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orders' }, + }), + ).resolves.toBeNull(); + + expect(columnResult).toEqual({ + columnDescriptions: [['status', null]], + processedColumns: [], + skippedColumns: [], + }); + expect(cache.set).not.toHaveBeenCalled(); + }); + it('generates and caches table and data-source descriptions', async () => { const cache = createCache(); const connector = createConnector(); diff --git a/packages/context/src/scan/description-generation.ts b/packages/context/src/scan/description-generation.ts index dc30af04..c719ca65 100644 --- a/packages/context/src/scan/description-generation.ts +++ b/packages/context/src/scan/description-generation.ts @@ -348,7 +348,7 @@ export class KtxDescriptionGenerator { }; } - async generateTableDescription(input: KtxGenerateTableDescriptionInput): Promise { + async generateTableDescription(input: KtxGenerateTableDescriptionInput): Promise { const tableRef = toTableRef(input.table); const cacheKey = this.cache?.buildTableKey(tableRef); if (cacheKey) { @@ -386,7 +386,7 @@ export class KtxDescriptionGenerator { this.settings.tableMaxWords, 'ktx-table-description', ); - if (cacheKey) { + if (cacheKey && description) { await this.cache?.set(cacheKey, description); } return description; @@ -396,7 +396,7 @@ export class KtxDescriptionGenerator { } } - async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise { + async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise { if (input.tables.length === 0) { return 'No tables found in database'; } @@ -451,7 +451,7 @@ export class KtxDescriptionGenerator { this.settings.dataSourceMaxWords, 'ktx-data-source-description', ); - if (cacheKey) { + if (cacheKey && description) { await this.cache?.set(cacheKey, description); } return description; @@ -543,7 +543,7 @@ export class KtxDescriptionGenerator { 'ktx-column-description', ); - if (cacheKey) { + if (cacheKey && description) { await this.cache?.set(cacheKey, description); } @@ -551,20 +551,20 @@ export class KtxDescriptionGenerator { columnName: column.name, description, skipped: false, - processed: true, + processed: description !== null, }; } catch (error) { this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`); return { columnName: column.name, - description: `Error generating description: ${errorMessage(error)}`, + description: null, skipped: false, processed: false, }; } } - private async generateAiDescription(prompt: string, maxWords: number, _operationName: string): Promise { + private async generateAiDescription(prompt: string, maxWords: number, _operationName: string): Promise { try { const text = await generateKtxText({ llmProvider: this.llmProvider, @@ -573,10 +573,10 @@ export class KtxDescriptionGenerator { temperature: this.settings.temperature, }); const description = text.trim(); - return description || 'Failed to generate description'; + return description || null; } catch (error) { this.logger?.error(`Error generating AI description: ${errorMessage(error)}`); - return `Error generating description: ${errorMessage(error)}`; + return null; } } } diff --git a/packages/context/src/scan/local-enrichment-artifacts.test.ts b/packages/context/src/scan/local-enrichment-artifacts.test.ts index 0123f086..8e0c25fd 100644 --- a/packages/context/src/scan/local-enrichment-artifacts.test.ts +++ b/packages/context/src/scan/local-enrichment-artifacts.test.ts @@ -553,6 +553,47 @@ describe('writeLocalScanEnrichmentArtifacts', () => { }); }); + it('does not persist generated error descriptions in manifest shards', async () => { + await writeLocalScanManifestShards({ + project, + connectionId: 'warehouse', + syncId: 'sync-error-description', + driver: 'postgres', + snapshot, + descriptionUpdates: [ + { + table: { catalog: null, db: 'public', name: 'orders' }, + tableDescription: 'Error generating description: timeout exceeded when trying to connect', + columnDescriptions: { + id: 'Error generating description: timeout exceeded when trying to connect', + customer_id: 'AI customer reference', + }, + }, + ], + dryRun: false, + }); + + const shard = YAML.parse( + await readFile(join(tempDir, 'project/semantic-layer/warehouse/_schema/public.yaml'), 'utf8'), + ) as { + tables: { + orders: { + descriptions?: Record; + columns: Array<{ name: string; descriptions?: Record }>; + }; + }; + }; + + expect(shard.tables.orders.descriptions).toEqual({ db: 'DB orders table' }); + expect(shard.tables.orders.columns.find((column) => column.name === 'id')?.descriptions).toEqual({ + db: 'DB order id', + }); + expect(shard.tables.orders.columns.find((column) => column.name === 'customer_id')?.descriptions).toEqual({ + db: 'DB customer id', + ai: 'AI customer reference', + }); + }); + it('writes accepted composite relationships to relationship artifacts and manifest shards', async () => { const compositeSnapshot: KtxSchemaSnapshot = { connectionId: 'warehouse', diff --git a/packages/context/src/scan/local-enrichment-artifacts.ts b/packages/context/src/scan/local-enrichment-artifacts.ts index 101d062e..78f5e36d 100644 --- a/packages/context/src/scan/local-enrichment-artifacts.ts +++ b/packages/context/src/scan/local-enrichment-artifacts.ts @@ -62,6 +62,14 @@ interface ExistingManifestState { type LocalDescriptionUpdates = KtxLocalScanEnrichmentResult['descriptionUpdates']; +function isGeneratedErrorDescription(description: string | null | undefined): boolean { + const normalized = description?.trim().toLowerCase(); + return ( + normalized === 'failed to generate description' || + normalized?.startsWith('error generating description:') === true + ); +} + function artifactDir(connectionId: string, syncId: string): string { return `raw-sources/${connectionId}/${LIVE_DATABASE_ADAPTER}/${syncId}/enrichment`; } @@ -79,7 +87,7 @@ function tableDescription( if (table.comment) { descriptions.db = table.comment; } - if (update?.tableDescription) { + if (update?.tableDescription && !isGeneratedErrorDescription(update.tableDescription)) { descriptions.ai = update.tableDescription; } return Object.keys(descriptions).length > 0 ? descriptions : undefined; @@ -96,7 +104,7 @@ function columnDescription( if (column.comment) { descriptions.db = column.comment; } - if (aiDescription) { + if (aiDescription && !isGeneratedErrorDescription(aiDescription)) { descriptions.ai = aiDescription; } return Object.keys(descriptions).length > 0 ? descriptions : undefined;