ktx/packages/cli/src/scan.ts
Andrey Avtomonov cb8902f1e5
fix(context): merge overlay columns onto manifest columns by name (#94)
* fix(context): merge overlay columns onto manifest columns by name

composeOverlay was appending overlay columns to the manifest column list,
producing duplicate entries when dbt/metabase overlays declared a column
just to attach descriptions. The duplicates carried no `type`, so the
pydantic SourceDefinition rejected them at semantic-query time and broke
`ktx sl query` for every overlay-backed measure. Now overlay columns
match base columns by name (case-insensitive): same-name entries merge
onto the manifest (overlay fields win, type/role fall back to the base,
descriptions merge per source key) and only new names append.

* refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract

Overlay sources now have two distinct collections: `columns:` for computed
columns (requiring `expr` + `type`) and `column_overrides:` for metadata
patches to inherited manifest columns. Composing or loading an overlay that
mixes the two — or references an unknown column — fails with a typed error.

Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` /
`toResolvedWire` as the strict shape sent to the Python engine, and add a
schema contract test that diffs Zod against the Pydantic JSON schema dumped
by `python -m semantic_layer dump-schema`. `SourceDefinition` is now
`extra="forbid"` on the Python side.

`loadAllSources` surfaces per-file load errors instead of swallowing them,
so validation/query paths can report manifest shard parse failures.

* fix(context): make scan description generation resilient and quiet

A transient sampleTable failure during ingest used to take out every
table in a connection: generateTableDescription returned a hardcoded
'Table not found' string into descriptions.ai, and KtxDescriptionGenerator
was constructed without a logger, so the failure left no trail anywhere.

- sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff,
  honouring KtxScanContext.signal via a new KtxAbortedError.
- On retry exhaustion or missing capability, table generation falls back
  to a metadata-only prompt built from column name / native type / comment
  / rawDescriptions. The column path follows the same rule -- call the
  LLM when any of samples or rawDescriptions are available; skip only
  when both are absent.
- Logger is now threaded from KtxScanContext into the generator. Failures
  emit structured KtxScanWarning entries (new description_fallback_used
  code, plus existing sampling_failed / enrichment_failed /
  connector_capability_missing). ktx scan groups warnings by code so a
  batch of identical failures collapses to one summary line plus sample.
- Returns null on failure instead of the 'Table not found' sentinel; the
  manifest writer's existing guard already skips empty descriptions, so
  schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS
  already strips stale 'ai' on merge, so existing YAML clears on next run.

Also suppress AI SDK v6 'system in messages' warning: pull system messages
out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages
helper and pass them top-level to generateText (preserves cacheControl
providerOptions on the SystemModelMessage). Agent-runner's local
splitSystemPromptMessages dedupes onto the shared helper.

* test(docs): align examples-docs assertions with revamped docs

PR #103 (setup/guide doc revamp) reworded several CLI examples and
connection labels; the assertions in scripts/examples-docs.test.mjs
still referenced the pre-revamp wording and were failing in CI on main.
Update the regexes to match the post-revamp content:

- drop the `--json` flag from the sl-query example expectation
- move the `Driver:` / `Status: ok` probe to the connection reference,
  which is where that output now lives (driver id is lowercase
  `postgres`, not the display name `PostgreSQL`)
- drop the obsolete `Install \`uv\`...` troubleshooting line
- accept `<connectionId>` everywhere; the docs no longer use the
  hyphenated `<connection-id>` form
- match the `warehouse` connection id used in the quickstart instead of
  the `postgres-warehouse` id only used in the README and setup ref

* fix(sl): skip TS/Python schema contract test when uv is unavailable

The TypeScript checks CI job does not install uv or Python, so the
module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw
ENOENT and failed the suite. Wrap the schema dump in a try/catch and
guard the describe block with `describe.skipIf` so the test skips in
environments without uv. Local dev and any CI job that has uv on PATH
still runs the cross-language contract assertion.
2026-05-15 02:11:04 +02:00

349 lines
13 KiB
TypeScript

import { loadKtxProject } from '@ktx/context/project';
import {
type KtxProgressPort,
type KtxScanMode,
type KtxScanReport,
type KtxScanWarning,
runLocalScan,
} from '@ktx/context/scan';
import type { KtxCliIo } from './index.js';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { profileMark } from './startup-profile.js';
profileMark('module:scan');
export interface KtxScanArgs {
command: 'run';
projectDir: string;
connectionId: string;
mode: KtxScanMode;
detectRelationships: boolean;
dryRun: boolean;
databaseIntrospectionUrl?: string;
cliVersion?: string;
runtimeInstallPolicy?: KtxManagedPythonInstallPolicy;
}
export interface KtxScanDeps {
runLocalScan?: typeof runLocalScan;
createLocalIngestAdapters?: typeof createKtxCliLocalIngestAdapters;
progress?: KtxProgressPort;
}
function shouldUseStyledOutput(io: KtxCliIo): boolean {
return io.stdout.isTTY === true && !process.env.NO_COLOR && process.env.TERM !== 'dumb' && !process.env.CI;
}
function green(text: string): string {
return `\u001b[32m${text}\u001b[39m`;
}
function dim(text: string): string {
return `\u001b[2m${text}\u001b[22m`;
}
function quoteCliArg(value: string): string {
if (/^[A-Za-z0-9_./:@=-]+$/.test(value)) {
return value;
}
return `'${value.replaceAll("'", "'\\''")}'`;
}
function plural(count: number, singular: string, pluralValue = `${singular}s`): string {
return count === 1 ? singular : pluralValue;
}
function tableChangeCount(report: KtxScanReport): number {
return report.diffSummary.tablesAdded + report.diffSummary.tablesModified + report.diffSummary.tablesDeleted;
}
function totalTableCount(report: KtxScanReport): number {
return tableChangeCount(report) + report.diffSummary.tablesUnchanged;
}
function writeScanIdentity(report: KtxScanReport, io: KtxCliIo): void {
io.stdout.write(`Run: ${report.runId}\n`);
io.stdout.write(`Connection: ${report.connectionId}\n`);
io.stdout.write(`Mode: ${report.mode}\n`);
io.stdout.write(`Sync: ${report.syncId}\n`);
io.stdout.write(`Dry run: ${report.dryRun ? 'yes' : 'no'}\n`);
}
function writeWhatChanged(report: KtxScanReport, io: KtxCliIo): void {
const changedTables = tableChangeCount(report);
const totalTables = totalTableCount(report);
io.stdout.write('\nWhat changed\n');
const tableNoun = plural(totalTables, 'table');
const changeNoun = plural(changedTables, 'change');
io.stdout.write(
` Semantic layer comparison found ${changedTables} ${changeNoun} across ${totalTables} ${tableNoun}\n`,
);
io.stdout.write(` New tables: ${report.diffSummary.tablesAdded}\n`);
io.stdout.write(` Changed tables: ${report.diffSummary.tablesModified}\n`);
io.stdout.write(` Removed tables: ${report.diffSummary.tablesDeleted}\n`);
io.stdout.write(` Unchanged tables: ${report.diffSummary.tablesUnchanged}\n`);
if (
report.diffSummary.columnsAdded > 0 ||
report.diffSummary.columnsModified > 0 ||
report.diffSummary.columnsDeleted > 0
) {
io.stdout.write(` New columns: ${report.diffSummary.columnsAdded}\n`);
io.stdout.write(` Changed columns: ${report.diffSummary.columnsModified}\n`);
io.stdout.write(` Removed columns: ${report.diffSummary.columnsDeleted}\n`);
}
}
function hasRelationshipResults(report: KtxScanReport): boolean {
return (
report.relationships.accepted > 0 ||
report.relationships.review > 0 ||
report.relationships.rejected > 0 ||
report.relationships.skipped > 0
);
}
function writeRelationships(report: KtxScanReport, io: KtxCliIo): void {
if (!hasRelationshipResults(report)) {
return;
}
io.stdout.write('\nRelationships\n');
io.stdout.write(` Accepted: ${report.relationships.accepted}\n`);
io.stdout.write(` Review: ${report.relationships.review}\n`);
io.stdout.write(` Rejected: ${report.relationships.rejected}\n`);
io.stdout.write(` Skipped: ${report.relationships.skipped}\n`);
}
function capabilityGapMessage(gap: string): string {
if (gap === 'columnStats') {
return 'columnStats is unavailable; relationship confidence may be lower.';
}
if (gap === 'tableSampling' || gap === 'columnSampling') {
return `${gap} is unavailable; descriptions may be less specific.`;
}
if (gap === 'readOnlySql') {
return 'readOnlySql is unavailable; relationship and validation checks may be limited.';
}
return `${gap} is unavailable; scan results may be less complete.`;
}
function warningLine(warning: KtxScanWarning): string {
const location = warning.table ? `${warning.table}${warning.column ? `.${warning.column}` : ''}: ` : '';
return `${warning.code}: ${location}${warning.message}`;
}
function groupWarningsByCode(warnings: readonly KtxScanWarning[]): Map<string, KtxScanWarning[]> {
const groups = new Map<string, KtxScanWarning[]>();
for (const warning of warnings) {
const list = groups.get(warning.code);
if (list) {
list.push(warning);
} else {
groups.set(warning.code, [warning]);
}
}
return groups;
}
function describeWarningGroup(code: string, count: number): string {
switch (code) {
case 'sampling_failed':
return `${count} ${plural(count, 'table')} could not be sampled (retries exhausted); descriptions used metadata-only fallback or were skipped.`;
case 'description_fallback_used':
return `${count} ${plural(count, 'table')} got an AI description from column metadata only (no sample rows available).`;
case 'enrichment_failed':
return `${count} ${plural(count, 'table/column')} could not be enriched.`;
case 'connector_capability_missing':
return `${count} ${plural(count, 'table')} affected by missing connector capability.`;
case 'statistics_failed':
return `${count} statistics ${plural(count, 'lookup')} failed.`;
case 'llm_unavailable':
return 'LLM provider unavailable; AI enrichment was skipped.';
case 'embedding_unavailable':
return 'Embedding provider unavailable; embeddings were skipped.';
case 'relationship_validation_failed':
return `${count} relationship ${plural(count, 'validation')} could not run.`;
case 'relationship_llm_invalid_reference':
return `${count} LLM-proposed ${plural(count, 'relationship')} referenced unknown columns.`;
case 'relationship_llm_proposal_failed':
return `${count} LLM relationship ${plural(count, 'proposal')} failed.`;
case 'scan_enrichment_backend_not_configured':
return 'Scan enrichment backend is not configured; AI stages were skipped.';
case 'credential_redacted':
return `${count} ${plural(count, 'credential')} were redacted from scan output.`;
default:
return `${count} ${plural(count, 'warning')} (${code})`;
}
}
function managedDaemonOptionsForScanRun(args: Extract<KtxScanArgs, { command: 'run' }>, io: KtxCliIo) {
if (args.databaseIntrospectionUrl || !args.cliVersion || !args.runtimeInstallPolicy) {
return undefined;
}
return {
cliVersion: args.cliVersion,
projectDir: args.projectDir,
installPolicy: args.runtimeInstallPolicy,
io,
};
}
function writeNeedsAttention(report: KtxScanReport, io: KtxCliIo): void {
io.stdout.write('\nNeeds attention\n');
if (report.warnings.length === 0 && report.capabilityGaps.length === 0) {
io.stdout.write(' None\n');
return;
}
if (report.warnings.length > 0) {
io.stdout.write(` ${report.warnings.length} ${plural(report.warnings.length, 'warning')}\n`);
const groups = groupWarningsByCode(report.warnings);
for (const [code, warnings] of groups) {
io.stdout.write(` - ${describeWarningGroup(code, warnings.length)}\n`);
const first = warnings[0];
if (first) {
io.stdout.write(` ${warningLine(first)}\n`);
}
if (warnings.length > 1) {
const moreTables = warnings
.slice(1)
.map((warning) =>
warning.table ? (warning.column ? `${warning.table}.${warning.column}` : warning.table) : null,
)
.filter((value): value is string => value !== null)
.slice(0, 3);
if (moreTables.length > 0) {
const suffix = warnings.length - 1 > moreTables.length ? `, …` : '';
io.stdout.write(` also: ${moreTables.join(', ')}${suffix}\n`);
}
}
}
}
if (report.capabilityGaps.length > 0) {
io.stdout.write(` ${report.capabilityGaps.length} capability ${plural(report.capabilityGaps.length, 'gap')}\n`);
for (const gap of report.capabilityGaps) {
io.stdout.write(` - ${capabilityGapMessage(gap)}\n`);
}
}
}
function writeArtifacts(report: KtxScanReport, io: KtxCliIo): void {
io.stdout.write('\nArtifacts\n');
io.stdout.write(` Report: ${report.artifactPaths.reportPath ?? 'none'}\n`);
io.stdout.write(` Raw sources: ${report.artifactPaths.rawSourcesDir ?? 'none'}\n`);
if (report.artifactPaths.manifestShards.length > 0) {
io.stdout.write(` Schema shards: ${report.artifactPaths.manifestShards.length}\n`);
}
if (report.artifactPaths.enrichmentArtifacts.length > 0) {
io.stdout.write(` Enrichment artifacts: ${report.artifactPaths.enrichmentArtifacts.length}\n`);
}
}
function writeHumanReportBody(report: KtxScanReport, io: KtxCliIo): void {
writeScanIdentity(report, io);
writeWhatChanged(report, io);
writeRelationships(report, io);
writeNeedsAttention(report, io);
writeArtifacts(report, io);
}
function writeRunSummary(report: KtxScanReport, projectDir: string, io: KtxCliIo): void {
const styled = shouldUseStyledOutput(io);
io.stdout.write(`${styled ? green('✓') : ''}${styled ? ' ' : ''}KTX scan completed\n`);
io.stdout.write('Status: done\n');
writeHumanReportBody(report, io);
const projectDirArg = quoteCliArg(projectDir);
io.stdout.write('\nNext:\n');
const statusCommand = styled ? dim('ktx status') : 'ktx status';
io.stdout.write(` ${statusCommand} --project-dir ${projectDirArg}\n`);
}
interface KtxCliScanProgressState {
progress: number;
hasPendingTransient: boolean;
}
interface KtxCliScanProgressUpdateOptions {
transient?: boolean;
}
interface KtxCliScanProgress extends Omit<KtxProgressPort, 'update'> {
update(progress: number, message?: string, options?: KtxCliScanProgressUpdateOptions): Promise<void>;
flush(): void;
}
export function createCliScanProgress(
io: KtxCliIo,
state: KtxCliScanProgressState = { progress: 0, hasPendingTransient: false },
start = 0,
weight = 1,
): KtxCliScanProgress {
const shouldWrite = io.stdout.isTTY === true && !process.env.CI;
const progress: KtxCliScanProgress = {
async update(value: number, message?: string, options?: KtxCliScanProgressUpdateOptions) {
const absoluteValue = start + Math.max(0, Math.min(1, value)) * weight;
state.progress = Math.max(state.progress, Math.min(1, absoluteValue));
if (!shouldWrite || !message) {
return;
}
const percent = Math.max(0, Math.min(100, Math.round(absoluteValue * 100)));
const line = `[${percent}%] ${message}`;
if (options?.transient === true) {
io.stdout.write(`\r${line}\u001b[K`);
state.hasPendingTransient = true;
return;
}
progress.flush();
io.stdout.write(`${line}\n`);
},
startPhase(phaseWeight: number) {
return createCliScanProgress(io, state, state.progress, weight * phaseWeight);
},
flush() {
if (!shouldWrite || !state.hasPendingTransient) {
return;
}
io.stdout.write('\n');
state.hasPendingTransient = false;
},
};
return progress;
}
export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps: KtxScanDeps = {}): Promise<number> {
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
const managedDaemon = managedDaemonOptionsForScanRun(args, io);
const connector =
args.mode !== 'structural' || args.detectRelationships
? await createKtxCliScanConnector(project, args.connectionId)
: undefined;
const cliProgress = deps.progress ? null : createCliScanProgress(io);
const progress = deps.progress ?? cliProgress;
try {
const result = await (deps.runLocalScan ?? runLocalScan)({
project,
connectionId: args.connectionId,
mode: args.mode,
detectRelationships: args.detectRelationships,
dryRun: args.dryRun,
trigger: 'cli',
databaseIntrospectionUrl: args.databaseIntrospectionUrl,
connector,
adapters: (deps.createLocalIngestAdapters ?? createKtxCliLocalIngestAdapters)(project, {
...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}),
...(managedDaemon ? { managedDaemon } : {}),
}),
...(progress ? { progress } : {}),
});
cliProgress?.flush();
writeRunSummary(result.report, args.projectDir, io);
} finally {
cliProgress?.flush();
}
return 0;
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}
}