Merge remote-tracking branch 'origin/main' into luca-martial/schema-select-ux-text

This commit is contained in:
Luca Martial 2026-05-11 14:45:12 -07:00
commit 523d6ab68a
130 changed files with 17386 additions and 5942 deletions

View file

@ -92,7 +92,7 @@ export function registerIngestCommands(
sourceDir: options.sourceDir ? resolve(options.sourceDir) : undefined,
databaseIntrospectionUrl: options.databaseIntrospectionUrl || undefined,
cliVersion: context.packageInfo.version,
runtimeInstallPolicy: runtimeInstallPolicyFromFlags(options),
runtimeInstallPolicy: runtimeInstallPolicyFromFlags({ yes: options.yes }),
...(options.debugLlmRequestFile ? { debugLlmRequestFile: resolve(options.debugLlmRequestFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),

View file

@ -117,6 +117,7 @@ function shouldShowSetupEntryMenu(
enableHistoricSql?: boolean;
disableHistoricSql?: boolean;
historicSqlWindowDays?: number;
historicSqlMinExecutions?: number;
historicSqlMinCalls?: number;
historicSqlServiceAccountPattern?: string[];
historicSqlRedactionPattern?: string[];
@ -186,6 +187,7 @@ function shouldShowSetupEntryMenu(
'enableHistoricSql',
'disableHistoricSql',
'historicSqlWindowDays',
'historicSqlMinExecutions',
'historicSqlMinCalls',
'skipDatabases',
'source',
@ -274,9 +276,10 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo
.option('--enable-historic-sql', 'Enable Historic SQL when the selected database supports it', false)
.option('--disable-historic-sql', 'Disable Historic SQL for the selected database', false)
.option('--historic-sql-window-days <number>', 'Historic SQL query-history window', positiveInteger)
.option('--historic-sql-min-executions <number>', 'Minimum Historic SQL executions for a template', positiveInteger)
.option(
'--historic-sql-min-calls <number>',
'Postgres Historic SQL pg_stat_statements minimum calls floor',
'Alias for --historic-sql-min-executions',
positiveInteger,
)
.option(
@ -360,6 +363,7 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo
const mode = options.new ? 'new' : options.existing ? 'existing' : 'auto';
const resolvedAgentScope = options.global ? 'global' : options.agentScope;
const historicSqlMinExecutions = options.historicSqlMinExecutions ?? options.historicSqlMinCalls;
await runSetupArgs(context, {
command: 'run',
projectDir: resolveCommandProjectDir(command),
@ -388,7 +392,7 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo
...(options.enableHistoricSql ? { enableHistoricSql: true } : {}),
...(options.disableHistoricSql ? { disableHistoricSql: true } : {}),
...(options.historicSqlWindowDays !== undefined ? { historicSqlWindowDays: options.historicSqlWindowDays } : {}),
...(options.historicSqlMinCalls !== undefined ? { historicSqlMinCalls: options.historicSqlMinCalls } : {}),
...(historicSqlMinExecutions !== undefined ? { historicSqlMinExecutions } : {}),
...(options.historicSqlServiceAccountPattern.length > 0
? { historicSqlServiceAccountPatterns: options.historicSqlServiceAccountPattern }
: {}),

View file

@ -292,10 +292,9 @@ describe('runKtxDoctor', () => {
{
id: 'historic-sql-postgres-warehouse',
label: 'Postgres Historic SQL (warehouse)',
status: 'warn' as const,
status: 'pass' as const,
detail:
'pg_stat_statements ready (PostgreSQL 16.4) with warnings: pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
fix: `Update the Postgres parameter group or config, then rerun \`ktx dev doctor --project-dir ${tempDir}\``,
'pg_stat_statements ready (PostgreSQL 16.4); info: pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
},
]);
@ -313,8 +312,9 @@ describe('runKtxDoctor', () => {
).resolves.toBe(0);
expect(runHistoricSqlDoctorChecks).toHaveBeenCalledTimes(1);
expect(testIo.stdout()).toContain('WARN Postgres Historic SQL (warehouse): pg_stat_statements ready');
expect(testIo.stdout()).toContain('Fix: Update the Postgres parameter group or config');
expect(testIo.stdout()).toContain('PASS Postgres Historic SQL (warehouse): pg_stat_statements ready');
expect(testIo.stdout()).toContain('info: pg_stat_statements.max is 1000');
expect(testIo.stdout()).not.toContain('Fix: Update the Postgres parameter group or config');
});
it('warns when semantic-search embeddings are not configured', async () => {

View file

@ -81,7 +81,39 @@ describe('runPostgresHistoricSqlDoctorChecks', () => {
]);
});
it('warns when the PGSS probe succeeds with operational warnings', async () => {
it('passes with an informational note when only pg_stat_statements.max is below the recommended floor', async () => {
const checks = await runPostgresHistoricSqlDoctorChecks(
projectWithConnections({
warehouse: {
driver: 'postgres',
url: 'env:WAREHOUSE_DATABASE_URL',
readonly: true,
historicSql: { enabled: true, dialect: 'postgres' },
},
}),
{
postgresHistoricSqlProbe: async () => ({
pgServerVersion: 'PostgreSQL 16.4',
warnings: [],
info: [
'pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
],
}),
},
);
expect(checks).toEqual([
{
id: 'historic-sql-postgres-warehouse',
label: 'Postgres Historic SQL (warehouse)',
status: 'pass',
detail:
'pg_stat_statements ready (PostgreSQL 16.4); info: pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
},
]);
});
it('warns when pg_stat_statements tracking is disabled', async () => {
const checks = await runPostgresHistoricSqlDoctorChecks(
projectWithConnections({
warehouse: {
@ -95,6 +127,9 @@ describe('runPostgresHistoricSqlDoctorChecks', () => {
postgresHistoricSqlProbe: async () => ({
pgServerVersion: 'PostgreSQL 16.4',
warnings: [
'pg_stat_statements.track is none; set it to top or all in the Postgres parameter group or config',
],
info: [
'pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
],
}),
@ -107,7 +142,7 @@ describe('runPostgresHistoricSqlDoctorChecks', () => {
label: 'Postgres Historic SQL (warehouse)',
status: 'warn',
detail:
'pg_stat_statements ready (PostgreSQL 16.4) with warnings: pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
'pg_stat_statements ready (PostgreSQL 16.4) with warnings: pg_stat_statements.track is none; set it to top or all in the Postgres parameter group or config; info: pg_stat_statements.max is 1000; set it to at least 5000 to reduce query-template eviction churn',
fix: 'Update the Postgres parameter group or config, then rerun `ktx dev doctor --project-dir /tmp/ktx-project`',
},
]);

View file

@ -16,6 +16,7 @@ export interface PostgresHistoricSqlDoctorProbeInput {
export interface PostgresHistoricSqlDoctorProbeResult {
pgServerVersion: string;
warnings: string[];
info?: string[];
}
export type PostgresHistoricSqlDoctorProbe = (
@ -72,10 +73,17 @@ function failureDetail(error: unknown): string {
return String(error);
}
function readinessDetail(result: PostgresHistoricSqlDoctorProbeResult): string {
const warningText = result.warnings.length > 0 ? ` with warnings: ${result.warnings.join('; ')}` : '';
const info = result.info ?? [];
const infoText = info.length > 0 ? `; info: ${info.join('; ')}` : '';
return `pg_stat_statements ready (${result.pgServerVersion})${warningText}${infoText}`;
}
async function defaultPostgresHistoricSqlProbe(
input: PostgresHistoricSqlDoctorProbeInput,
): Promise<PostgresHistoricSqlDoctorProbeResult> {
const [{ PostgresPgssQueryHistoryReader }, { KtxPostgresHistoricSqlQueryClient, isKtxPostgresConnectionConfig }] =
const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient, isKtxPostgresConnectionConfig }] =
await Promise.all([import('@ktx/context/ingest'), import('@ktx/connector-postgres')]);
if (!isKtxPostgresConnectionConfig(input.connection)) {
@ -88,7 +96,7 @@ async function defaultPostgresHistoricSqlProbe(
env: input.env,
});
try {
return await new PostgresPgssQueryHistoryReader().probe(client);
return await new PostgresPgssReader().probe(client);
} finally {
await client.cleanup();
}
@ -134,14 +142,12 @@ export async function runPostgresHistoricSqlDoctorChecks(
'warn',
checkId(connectionId),
label,
`pg_stat_statements ready (${result.pgServerVersion}) with warnings: ${result.warnings.join('; ')}`,
readinessDetail(result),
`Update the Postgres parameter group or config, then rerun \`ktx dev doctor --project-dir ${project.projectDir}\``,
),
);
} else {
checks.push(
check('pass', checkId(connectionId), label, `pg_stat_statements ready (${result.pgServerVersion})`),
);
checks.push(check('pass', checkId(connectionId), label, readinessDetail(result)));
}
} catch (error) {
checks.push(

View file

@ -920,7 +920,7 @@ describe('runKtxCli', () => {
sourceDir: tempDir,
databaseIntrospectionUrl: undefined,
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'never',
runtimeInstallPolicy: 'prompt',
debugLlmRequestFile: `${tempDir}/debug.jsonl`,
outputMode: 'json',
inputMode: 'disabled',
@ -934,9 +934,9 @@ describe('runKtxCli', () => {
expect(ingestReplayHelpIo.stderr()).toBe('');
});
it('routes ingest managed runtime install policies', async () => {
it('routes ingest managed runtime install policy separately from visualization input mode', async () => {
const autoIo = makeIo();
const conflictIo = makeIo();
const nonInteractiveIo = makeIo();
const ingest = vi.fn(async () => 0);
await expect(
@ -972,10 +972,10 @@ describe('runKtxCli', () => {
'--yes',
'--no-input',
],
conflictIo.io,
nonInteractiveIo.io,
{ ingest },
),
).resolves.toBe(1);
).resolves.toBe(0);
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({
@ -985,7 +985,16 @@ describe('runKtxCli', () => {
}),
autoIo.io,
);
expect(conflictIo.stderr()).toContain('Choose only one runtime install mode: --yes or --no-input');
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({
command: 'run',
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'auto',
inputMode: 'disabled',
}),
nonInteractiveIo.io,
);
expect(nonInteractiveIo.stderr()).toBe('');
});
it('dispatches public connection through the existing connection implementation', async () => {
@ -1182,7 +1191,7 @@ describe('runKtxCli', () => {
'--enable-historic-sql',
'--historic-sql-window-days',
'30',
'--historic-sql-min-calls',
'--historic-sql-min-executions',
'12',
],
setupIo.io,
@ -1205,7 +1214,7 @@ describe('runKtxCli', () => {
databaseSchemas: ['public'],
enableHistoricSql: true,
historicSqlWindowDays: 30,
historicSqlMinCalls: 12,
historicSqlMinExecutions: 12,
skipDatabases: false,
}),
setupIo.io,

View file

@ -22,6 +22,7 @@ import { resetVizFallbackWarningsForTest } from './viz-fallback.js';
describe('runKtxIngest viz and replay', () => {
let tempDir: string;
let originalTerm: string | undefined;
const interactiveEnv = (): NodeJS.ProcessEnv => ({ ...process.env, CI: 'false' });
beforeEach(async () => {
resetVizFallbackWarningsForTest();
@ -304,7 +305,7 @@ describe('runKtxIngest viz and replay', () => {
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('does not attach a live memory-flow sink for plain run output', async () => {
it('attaches a plain progress memory-flow sink for interactive plain run output', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
@ -325,11 +326,12 @@ describe('runKtxIngest viz and replay', () => {
outputMode: 'plain',
},
io.io,
{ runLocalIngest: runLocal },
{ env: interactiveEnv(), runLocalIngest: runLocal },
),
).resolves.toBe(0);
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: plain-run');
expect(io.stdout()).not.toContain('KTX memory flow');
});
@ -395,6 +397,7 @@ describe('runKtxIngest viz and replay', () => {
},
io.io,
{
env: interactiveEnv(),
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'raw-missing-viz-run',
@ -403,7 +406,8 @@ describe('runKtxIngest viz and replay', () => {
).resolves.toBe(0);
expect(startLiveMemoryFlow).not.toHaveBeenCalled();
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: raw-missing-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(

View file

@ -36,6 +36,7 @@ import { resetVizFallbackWarningsForTest } from './viz-fallback.js';
describe('runKtxIngest', () => {
let tempDir: string;
let originalTerm: string | undefined;
const interactiveEnv = (): NodeJS.ProcessEnv => ({ ...process.env, CI: 'false' });
beforeEach(async () => {
resetVizFallbackWarningsForTest();
@ -544,6 +545,63 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Diff: +2/~0/-0/=0\n');
});
it('includes historic-sql projection output in saved memory counts', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
const result = completedLocalBundleRun(input, 'historic-sql-projection');
return {
...result,
report: localFakeBundleReport('historic-sql-projection', {
sourceKey: 'historic-sql',
body: {
workUnits: [],
postProcessor: {
sourceKey: 'historic-sql',
status: 'success',
result: {
tableUsageMerged: 56,
staleTablesMarked: 1,
patternPagesWritten: 30,
stalePatternPagesMarked: 2,
archivedPatternPages: 3,
legacyPagesDeleted: 4,
},
errors: [],
warnings: [],
touchedSources: [],
},
},
}),
};
});
const io = makeIo();
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
runLocalIngest: runLocal,
createAdapters: vi.fn(() => [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
]),
jobIdFactory: () => 'historic-sql-projection',
},
),
).resolves.toBe(0);
expect(io.stderr()).toBe('');
expect(io.stdout()).toContain('Adapter: historic-sql\n');
expect(io.stdout()).toContain('Saved memory: 39 wiki, 57 SL\n');
});
it('returns a non-zero code when local ingest reports failed work units', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
@ -715,7 +773,7 @@ describe('runKtxIngest', () => {
' historicSql:',
' enabled: true',
' dialect: postgres',
' minCalls: 2',
' minExecutions: 2',
'ingest:',
' adapters:',
' - historic-sql',
@ -762,6 +820,104 @@ describe('runKtxIngest', () => {
);
});
it('prints live progress for plain local ingest in interactive terminals', async () => {
const projectDir = join(tempDir, 'historic-sql-progress-project');
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: historic-sql-progress-project',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' historicSql:',
' enabled: true',
' dialect: postgres',
' minExecutions: 2',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
'utf-8',
);
const createdAdapters: SourceAdapter[] = [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const createAdapters = vi.fn(() => createdAdapters as never);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
expect(input.memoryFlow).toBeDefined();
input.memoryFlow?.emit({
type: 'source_acquired',
adapter: 'historic-sql',
trigger: 'manual_resync',
fileCount: 3,
});
input.memoryFlow?.update({ syncId: 'sync-progress-1' });
input.memoryFlow?.emit({ type: 'raw_snapshot_written', syncId: 'sync-progress-1', rawFileCount: 3 });
input.memoryFlow?.emit({ type: 'diff_computed', added: 2, modified: 0, deleted: 0, unchanged: 1 });
input.memoryFlow?.update({
plannedWorkUnits: [
{
unitKey: 'historic-sql-table-public-orders',
rawFiles: ['tables/public/orders.json'],
peerFileCount: 0,
dependencyCount: 0,
},
],
});
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
input.memoryFlow?.emit({
type: 'work_unit_started',
unitKey: 'historic-sql-table-public-orders',
skills: ['historic_sql_table_digest'],
stepBudget: 40,
});
input.memoryFlow?.emit({
type: 'work_unit_finished',
unitKey: 'historic-sql-table-public-orders',
status: 'success',
});
input.memoryFlow?.emit({ type: 'saved', commitSha: null, wikiCount: 0, slCount: 1 });
input.memoryFlow?.emit({ type: 'provenance_recorded', rowCount: 3 });
input.memoryFlow?.emit({ type: 'report_created', runId: 'run-live-1', reportPath: 'report-live-1' });
input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, input.jobId ?? 'historic-progress-job');
});
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
env: interactiveEnv(),
createAdapters,
runLocalIngest: runLocal,
jobIdFactory: () => 'historic-progress-job',
},
),
).resolves.toBe(0);
const stdout = io.stdout();
expect(stdout).toContain('[5%] Fetching source files for warehouse/historic-sql');
expect(stdout).toContain('[15%] Fetched 3 source files from historic-sql');
expect(stdout).toContain('[45%] Planned 1 work unit');
expect(stdout).toContain('[80%] Processed 1/1 work units');
expect(stdout).toContain('[100%] Ingest completed');
expect(stdout.indexOf('[5%] Fetching source files for warehouse/historic-sql')).toBeLessThan(
stdout.indexOf('Report: report-live-1'),
);
expect(io.stderr()).toBe('');
});
it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -8,11 +8,13 @@ import {
ingestReportToMemoryFlowReplay,
type LocalMetabaseFanoutResult,
type LocalMetabaseFanoutProgress,
type MemoryFlowEvent,
type MemoryFlowReplayInput,
type RunLocalIngestOptions,
renderMemoryFlowReplay,
runLocalIngest,
runLocalMetabaseIngest,
savedMemoryCountsForReport,
} from '@ktx/context/ingest';
import { loadKtxProject } from '@ktx/context/project';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
@ -88,16 +90,8 @@ function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
return report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
}
function reportActionCounts(report: IngestReportSnapshot): { wikiCount: number; slCount: number } {
const actions = report.body.workUnits.flatMap((workUnit) => workUnit.actions);
return {
wikiCount: actions.filter((action) => action.target === 'wiki').length,
slCount: actions.filter((action) => action.target === 'sl').length,
};
}
function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void {
const counts = reportActionCounts(report);
const counts = savedMemoryCountsForReport(report);
io.stdout.write(`Report: ${report.id}\n`);
io.stdout.write(`Run: ${report.runId}\n`);
io.stdout.write(`Job: ${report.jobId}\n`);
@ -116,7 +110,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIngestIo): void {
const counts = result.children.reduce(
(acc, child) => {
const childCounts = reportActionCounts(child.report);
const childCounts = savedMemoryCountsForReport(child.report);
return {
wikiCount: acc.wikiCount + childCounts.wikiCount,
slCount: acc.slCount + childCounts.slCount,
@ -170,6 +164,118 @@ function createMetabaseFanoutProgress(
};
}
function formatDiffProgress(event: Extract<MemoryFlowEvent, { type: 'diff_computed' }>): string {
return `+${event.added}/~${event.modified}/-${event.deleted}/=${event.unchanged}`;
}
function completedWorkUnitCount(snapshot: MemoryFlowReplayInput): number {
return snapshot.events.filter((event) => event.type === 'work_unit_finished').length;
}
function plainIngestEventProgress(
event: MemoryFlowEvent,
snapshot: MemoryFlowReplayInput,
): { percent: number; message: string } | null {
switch (event.type) {
case 'source_acquired':
return {
percent: 15,
message: `Fetched ${pluralize(event.fileCount, 'source file')} from ${event.adapter}`,
};
case 'raw_snapshot_written':
return {
percent: 25,
message: `Wrote raw snapshot ${event.syncId} with ${pluralize(event.rawFileCount, 'file')}`,
};
case 'diff_computed':
return { percent: 35, message: `Computed source diff ${formatDiffProgress(event)}` };
case 'chunks_planned':
return {
percent: 45,
message: `Planned ${pluralize(event.workUnitCount, 'work unit')}`,
};
case 'stage_skipped':
return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` };
case 'work_unit_started':
return { percent: 55, message: `Processing ${event.unitKey}` };
case 'work_unit_finished': {
const total = snapshot.plannedWorkUnits.length || completedWorkUnitCount(snapshot);
const completed = completedWorkUnitCount(snapshot);
const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80;
return {
percent,
message: `Processed ${completed}/${total} work units`,
};
}
case 'reconciliation_finished':
return {
percent: 85,
message: `Reconciled results with ${pluralize(event.conflictCount, 'conflict')} and ${pluralize(
event.fallbackCount,
'fallback',
)}`,
};
case 'saved':
return {
percent: 90,
message: `Saved memory updates (${event.wikiCount} wiki, ${event.slCount} SL)`,
};
case 'provenance_recorded':
return { percent: 95, message: `Recorded ${pluralize(event.rowCount, 'provenance row')}` };
case 'report_created':
return { percent: 98, message: `Created ingest report ${event.reportPath ?? event.runId}` };
case 'scope_detected':
case 'work_unit_step':
case 'candidate_action':
return null;
}
}
function shouldWritePlainIngestProgress(
outputMode: KtxIngestOutputMode,
io: KtxIngestIo,
env: NodeJS.ProcessEnv,
): boolean {
return outputMode === 'plain' && io.stdout.isTTY === true && env.CI !== 'true';
}
function createPlainIngestProgressRenderer(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void } {
let printedEvents = 0;
let lastPercent = 0;
let printedCompletion = false;
const write = (percent: number, message: string) => {
const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent)));
lastPercent = nextPercent;
io.stdout.write(`[${nextPercent}%] ${message}\n`);
};
return {
start() {
write(5, `Fetching source files for ${args.connectionId}/${args.adapter}`);
},
update(snapshot) {
while (printedEvents < snapshot.events.length) {
const event = snapshot.events[printedEvents++];
if (!event) {
continue;
}
const progress = plainIngestEventProgress(event, snapshot);
if (progress) {
write(progress.percent, progress.message);
}
}
if (!printedCompletion && snapshot.status !== 'running') {
printedCompletion = true;
write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed');
}
},
};
}
function writeReportJson(report: IngestReportSnapshot, io: KtxIngestIo): void {
io.stdout.write(`${JSON.stringify(report, null, 2)}\n`);
}
@ -366,10 +472,14 @@ export async function runKtxIngest(
});
const shouldUseLiveViz =
runOutputMode === 'viz' && (args.inputMode ?? 'auto') === 'auto' && isInteractiveTerminal(io);
const initialMemoryFlow = shouldUseLiveViz ? initialRunMemoryFlowInput(args, jobId ?? 'pending') : undefined;
const plainProgress = shouldWritePlainIngestProgress(runOutputMode, io, env)
? createPlainIngestProgressRenderer(args, io)
: null;
const initialMemoryFlow =
shouldUseLiveViz || plainProgress ? initialRunMemoryFlowInput(args, jobId ?? 'pending') : undefined;
let latestMemoryFlowSnapshot: MemoryFlowReplayInput | null = initialMemoryFlow ?? null;
if (initialMemoryFlow && isTuiCapableIo(io)) {
if (shouldUseLiveViz && initialMemoryFlow && isTuiCapableIo(io)) {
const startLiveMemoryFlow = deps.startLiveMemoryFlow ?? startLiveMemoryFlowTui;
liveTui = await startLiveMemoryFlow(initialMemoryFlow, io);
}
@ -382,13 +492,17 @@ export async function runKtxIngest(
liveTui.update(snapshot);
return;
}
if (!liveTui) {
if (shouldUseLiveViz && !liveTui) {
writeMemoryFlowInput(snapshot, io, { clear: true });
return;
}
plainProgress?.update(snapshot);
},
})
: undefined;
plainProgress?.start();
try {
const result = await executeLocalIngest({
project,
@ -403,7 +517,7 @@ export async function runKtxIngest(
...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}),
...(memoryFlow ? { memoryFlow } : {}),
});
if (memoryFlow) {
if (shouldUseLiveViz && memoryFlow) {
latestMemoryFlowSnapshot = memoryFlow.snapshot();
liveTui?.close();
liveTui = null;

View file

@ -2,6 +2,7 @@ import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { initKtxProject } from '@ktx/context/project';
import type { KtxEmbeddingPort } from '@ktx/context';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { runKtxKnowledge } from './knowledge.js';
@ -26,6 +27,19 @@ function makeIo() {
};
}
class FakeEmbeddingPort implements KtxEmbeddingPort {
readonly maxBatchSize = 16;
async computeEmbedding(text: string): Promise<number[]> {
const lower = text.toLowerCase();
return lower.includes('revenue') || lower.includes('arr') ? [1, 0] : [0, 1];
}
async computeEmbeddingsBulk(texts: string[]): Promise<number[][]> {
return Promise.all(texts.map((text) => this.computeEmbedding(text)));
}
}
describe('runKtxKnowledge', () => {
let tempDir: string;
@ -92,4 +106,39 @@ describe('runKtxKnowledge', () => {
expect(searchIo.stderr()).toContain('No local wiki pages found');
expect(searchIo.stderr()).toContain('ktx wiki write');
});
it('uses configured embeddings for semantic wiki search', async () => {
const projectDir = join(tempDir, 'semantic-project');
await initKtxProject({ projectDir, projectName: 'warehouse' });
await expect(
runKtxKnowledge(
{
command: 'write',
projectDir,
key: 'historic-sql/active-contract-arr-open-tickets',
scope: 'GLOBAL',
userId: 'local',
summary: 'Active Contract ARR Ranked by Open Support Ticket Count',
content: 'Accounts ranked by annual recurring contract value and support ticket load.',
tags: ['historic-sql'],
refs: [],
slRefs: [],
},
makeIo().io,
),
).resolves.toBe(0);
const searchIo = makeIo();
await expect(
runKtxKnowledge(
{ command: 'search', projectDir, query: 'revenue', userId: 'local' },
searchIo.io,
{ embeddingService: new FakeEmbeddingPort() },
),
).resolves.toBe(0);
expect(searchIo.stdout()).toContain('historic-sql/active-contract-arr-open-tickets');
expect(searchIo.stderr()).toBe('');
});
});

View file

@ -1,3 +1,8 @@
import {
createLocalKtxEmbeddingProviderFromConfig,
KtxIngestEmbeddingPortAdapter,
type KtxEmbeddingPort,
} from '@ktx/context';
import { loadKtxProject } from '@ktx/context/project';
import {
type LocalKnowledgeScope,
@ -29,7 +34,29 @@ interface KtxKnowledgeIo {
stderr: { write(chunk: string): void };
}
export async function runKtxKnowledge(args: KtxKnowledgeArgs, io: KtxKnowledgeIo = process): Promise<number> {
interface KtxKnowledgeDeps {
embeddingService?: KtxEmbeddingPort | null;
createEmbeddingProvider?: typeof createLocalKtxEmbeddingProviderFromConfig;
}
function wikiSearchEmbeddingService(
project: Awaited<ReturnType<typeof loadKtxProject>>,
deps: KtxKnowledgeDeps,
): KtxEmbeddingPort | null {
if ('embeddingService' in deps) {
return deps.embeddingService ?? null;
}
const provider = (deps.createEmbeddingProvider ?? createLocalKtxEmbeddingProviderFromConfig)(
project.config.ingest.embeddings,
);
return provider ? new KtxIngestEmbeddingPortAdapter(provider) : null;
}
export async function runKtxKnowledge(
args: KtxKnowledgeArgs,
io: KtxKnowledgeIo = process,
deps: KtxKnowledgeDeps = {},
): Promise<number> {
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
if (args.command === 'list') {
@ -51,7 +78,11 @@ export async function runKtxKnowledge(args: KtxKnowledgeArgs, io: KtxKnowledgeIo
return 0;
}
if (args.command === 'search') {
const results = await searchLocalKnowledgePages(project, { query: args.query, userId: args.userId });
const results = await searchLocalKnowledgePages(project, {
query: args.query,
userId: args.userId,
embeddingService: wikiSearchEmbeddingService(project, deps),
});
if (results.length === 0) {
const pages = await listLocalKnowledgePages(project, { userId: args.userId });
if (pages.length === 0) {

View file

@ -0,0 +1,141 @@
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { loadKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
function sqlAnalysisStub() {
return {
async analyzeForFingerprint(sql: string) {
return {
fingerprint: 'fp',
normalizedSql: sql,
tablesTouched: [],
literalSlots: [],
};
},
async analyzeBatch() {
return new Map();
},
};
}
async function writeProject(projectDir: string, body: string): Promise<void> {
await writeFile(join(projectDir, 'ktx.yaml'), body, 'utf-8');
}
describe('CLI local ingest adapters', () => {
let tempDir: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-local-adapters-'));
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it('registers Postgres historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' historicSql:',
' enabled: true',
' dialect: postgres',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' bq:',
' driver: bigquery',
' readonly: true',
' dataset_id: analytics',
' location: us',
' credentials_json: \'{"project_id":"demo-project"}\'',
' historicSql:',
' enabled: true',
' dialect: bigquery',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'bq',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers Snowflake historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' sf:',
' driver: snowflake',
' readonly: true',
' account: acct',
' warehouse: wh',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
' historicSql:',
' enabled: true',
' dialect: snowflake',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'sf',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
});

View file

@ -1,5 +1,10 @@
import { join } from 'node:path';
import { createBigQueryLiveDatabaseIntrospection, isKtxBigQueryConnectionConfig } from '@ktx/connector-bigquery';
import {
createBigQueryLiveDatabaseIntrospection,
isKtxBigQueryConnectionConfig,
KtxBigQueryScanConnector,
type KtxBigQueryConnectionConfig,
} from '@ktx/connector-bigquery';
import { createClickHouseLiveDatabaseIntrospection, isKtxClickHouseConnectionConfig } from '@ktx/connector-clickhouse';
import { createMysqlLiveDatabaseIntrospection, isKtxMysqlConnectionConfig } from '@ktx/connector-mysql';
import {
@ -11,15 +16,19 @@ import {
import { createSqliteLiveDatabaseIntrospection, isKtxSqliteConnectionConfig } from '@ktx/connector-sqlite';
import { createSqlServerLiveDatabaseIntrospection, isKtxSqlServerConnectionConfig } from '@ktx/connector-sqlserver';
import {
BigQueryHistoricSqlQueryHistoryReader,
createDaemonLiveDatabaseIntrospection,
createDefaultLocalIngestAdapters,
type DefaultLocalIngestAdaptersOptions,
type HistoricSqlReader,
type LiveDatabaseIntrospectionPort,
LiveDatabaseSourceAdapter,
PostgresPgssReader,
SnowflakeHistoricSqlQueryHistoryReader,
type SourceAdapter,
} from '@ktx/context/ingest';
import type { KtxLocalProject } from '@ktx/context/project';
import { createHttpSqlAnalysisPort } from '@ktx/context/sql-analysis';
import { createHttpSqlAnalysisPort, type SqlAnalysisPort } from '@ktx/context/sql-analysis';
import {
createManagedDaemonLookerTableIdentifierParser,
createManagedDaemonSqlAnalysisPort,
@ -35,6 +44,8 @@ function hasSnowflakeDriver(connection: unknown): boolean {
);
}
type SnowflakeConnectorModule = typeof import('@ktx/connector-snowflake');
function ktxCliDaemonDatabaseIntrospectionOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['databaseIntrospection'] {
@ -61,6 +72,9 @@ function ktxCliLookerOptions(
}
function ktxCliHistoricSqlAnalysis(options: KtxCliLocalIngestAdaptersOptions) {
if (options.sqlAnalysis) {
return options.sqlAnalysis;
}
if (options.sqlAnalysisUrl) {
return createHttpSqlAnalysisPort({ baseUrl: options.sqlAnalysisUrl });
}
@ -145,21 +159,32 @@ function createKtxCliLiveDatabaseIntrospection(
export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions {
historicSqlConnectionId?: string;
sqlAnalysis?: SqlAnalysisPort;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
}
function isEnabledPostgresHistoricSqlConnection(connection: KtxPostgresConnectionConfig | undefined): boolean {
if (!connection || !isKtxPostgresConnectionConfig(connection)) {
return false;
function historicSqlRecord(connection: unknown): Record<string, unknown> | null {
if (
connection &&
typeof connection === 'object' &&
'historicSql' in connection &&
typeof (connection as { historicSql?: unknown }).historicSql === 'object' &&
(connection as { historicSql?: unknown }).historicSql !== null &&
!Array.isArray((connection as { historicSql?: unknown }).historicSql)
) {
return (connection as { historicSql: Record<string, unknown> }).historicSql;
}
const historicSql =
typeof connection.historicSql === 'object' &&
connection.historicSql !== null &&
!Array.isArray(connection.historicSql)
? (connection.historicSql as Record<string, unknown>)
: null;
return historicSql?.enabled === true && historicSql.dialect === 'postgres';
return null;
}
function enabledHistoricSqlDialect(connection: unknown): 'postgres' | 'bigquery' | 'snowflake' | null {
const historicSql = historicSqlRecord(connection);
if (historicSql?.enabled !== true) {
return null;
}
const dialect = String(historicSql.dialect ?? '').toLowerCase();
return dialect === 'postgres' || dialect === 'bigquery' || dialect === 'snowflake' ? dialect : null;
}
function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
@ -184,20 +209,131 @@ function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, conn
};
}
function createEphemeralBigQueryHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
const connection = project.config.connections[connectionId] as KtxBigQueryConnectionConfig | undefined;
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
async executeQuery(query: string) {
const connector = new KtxBigQueryScanConnector({
connectionId,
connection,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
async function createEphemeralSnowflakeHistoricSqlClient(
project: KtxLocalProject,
connectionId: string,
connectorModule: SnowflakeConnectorModule,
) {
const connection = project.config.connections[connectionId];
if (!connectorModule.isKtxSnowflakeConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a Snowflake connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
async executeQuery(query: string) {
const connector = new connectorModule.KtxSnowflakeScanConnector({
connectionId,
connection,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
function bigQueryProjectId(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string {
const raw = typeof connection.credentials_json === 'string' ? connection.credentials_json : '';
const resolved = raw.startsWith('env:') ? env[raw.slice('env:'.length)] ?? '' : raw;
const parsed = JSON.parse(resolved) as { project_id?: unknown };
if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) {
throw new Error('Historic SQL BigQuery connection requires credentials_json.project_id');
}
return parsed.project_id;
}
function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string {
return typeof connection.location === 'string' && connection.location.trim().length > 0
? connection.location.trim()
: 'us';
}
function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCliLocalIngestAdaptersOptions) {
const connectionId = options.historicSqlConnectionId;
if (!connectionId) {
return undefined;
}
const connection = project.config.connections[connectionId] as KtxPostgresConnectionConfig | undefined;
if (!isEnabledPostgresHistoricSqlConnection(connection)) {
const connection = project.config.connections[connectionId];
const dialect = enabledHistoricSqlDialect(connection);
if (!dialect) {
return undefined;
}
return {
const base = {
sqlAnalysis: ktxCliHistoricSqlAnalysis(options),
postgresQueryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
postgresBaselineRootDir: join(project.projectDir, '.ktx/cache/historic-sql'),
};
if (dialect === 'postgres') {
return {
...base,
reader: new PostgresPgssReader() satisfies HistoricSqlReader,
queryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
};
}
if (dialect === 'bigquery') {
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
...base,
reader: new BigQueryHistoricSqlQueryHistoryReader({
projectId: bigQueryProjectId(connection, process.env),
region: bigQueryRegion(connection),
}) satisfies HistoricSqlReader,
queryClient: createEphemeralBigQueryHistoricSqlClient(project, connectionId),
};
}
return {
...base,
reader: new SnowflakeHistoricSqlQueryHistoryReader() satisfies HistoricSqlReader,
queryClient: {
async executeQuery(query: string) {
const connectorModule = await import('@ktx/connector-snowflake');
const client = await createEphemeralSnowflakeHistoricSqlClient(project, connectionId, connectorModule);
return client.executeQuery(query);
},
},
};
}
export function createKtxCliLocalIngestAdapters(

View file

@ -154,6 +154,37 @@ describe('managed daemon ingest ports', () => {
});
});
it('routes SQL batch analysis through the managed daemon runner', async () => {
const requestJson = vi.fn(async () => ({
results: {
orders: {
tables_touched: ['public.orders'],
columns_by_clause: { select: ['status'] },
error: null,
},
},
}));
const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ requestJson });
await expect(sqlAnalysis.analyzeBatch([{ id: 'orders', sql: 'select status from public.orders' }], 'postgres'))
.resolves.toEqual(
new Map([
[
'orders',
{
tablesTouched: ['public.orders'],
columnsByClause: { select: ['status'] },
error: null,
},
],
]),
);
expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', {
dialect: 'postgres',
items: [{ id: 'orders', sql: 'select status from public.orders' }],
});
});
it('returns live-database daemon request options backed by the managed runner', async () => {
const requestJson = vi.fn(async () => ({
connection_id: 'warehouse',

View file

@ -767,6 +767,9 @@ export async function runKtxSetupContextStep(
const missing = missingCapabilities(project);
if (missing.length > 0) {
if (args.allowEmpty === true) {
return { status: 'skipped', projectDir: args.projectDir };
}
writeMissingCapabilities(missing, io);
return { status: 'missing-input', projectDir: args.projectDir };
}

View file

@ -64,6 +64,8 @@ function textInputPrompt(message: string): string {
return `${title}\n\n${bodyLines.join('\n')}\nPress Escape to go back.\n`;
}
const legacyHistoricSqlServiceAccountPatternsKey = ['serviceAccount', 'UserPatterns'].join('');
describe('setup databases step', () => {
let tempDir: string;
@ -1240,14 +1242,21 @@ describe('setup databases step', () => {
enabled: true,
dialect: 'snowflake',
windowDays: 30,
serviceAccountUserPatterns: ['^svc_'],
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
},
},
redactionPatterns: ['(?i)secret'],
},
});
expect(config.connections.snowflake.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey);
expect(config.ingest.adapters).toContain('historic-sql');
});
it('writes Postgres Historic SQL config with minCalls and ignores window/redaction output', async () => {
it('writes Postgres Historic SQL config with minExecutions and ignores window/redaction output', async () => {
const io = makeIo();
const result = await runKtxSetupDatabasesStep(
{
@ -1259,7 +1268,7 @@ describe('setup databases step', () => {
databaseSchemas: ['public'],
enableHistoricSql: true,
historicSqlWindowDays: 30,
historicSqlMinCalls: 12,
historicSqlMinExecutions: 12,
historicSqlServiceAccountPatterns: ['^svc_'],
historicSqlRedactionPatterns: ['(?i)secret'],
skipDatabases: false,
@ -1281,13 +1290,20 @@ describe('setup databases step', () => {
historicSql: {
enabled: true,
dialect: 'postgres',
minCalls: 12,
maxTemplatesPerRun: 5000,
serviceAccountUserPatterns: ['^svc_'],
minExecutions: 12,
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
},
},
},
});
expect(config.connections.warehouse.historicSql).not.toHaveProperty('minCalls');
expect(config.connections.warehouse.historicSql).not.toHaveProperty('windowDays');
expect(config.connections.warehouse.historicSql).not.toHaveProperty('redactionPatterns');
expect(config.connections.warehouse.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey);
expect(config.ingest.adapters).toContain('historic-sql');
expect(io.stdout()).toContain('Historic SQL probe...');
expect(io.stdout()).toContain('pg_stat_statements ready');
@ -1334,10 +1350,13 @@ describe('setup databases step', () => {
enabled: true,
dialect: 'bigquery',
windowDays: 45,
serviceAccountUserPatterns: [],
filters: {
dropTrivialProbes: true,
},
redactionPatterns: [],
},
});
expect(config.connections.analytics.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey);
expect(config.ingest.adapters).toContain('historic-sql');
});
@ -1364,7 +1383,7 @@ describe('setup databases step', () => {
databaseConnectionIds: ['warehouse'],
databaseSchemas: [],
enableHistoricSql: true,
historicSqlMinCalls: 8,
historicSqlMinExecutions: 8,
skipDatabases: false,
},
io.io,
@ -1381,11 +1400,13 @@ describe('setup databases step', () => {
historicSql: {
enabled: true,
dialect: 'postgres',
minCalls: 8,
maxTemplatesPerRun: 5000,
serviceAccountUserPatterns: [],
minExecutions: 8,
filters: {
dropTrivialProbes: true,
},
},
});
expect(config.connections.warehouse.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey);
});
it('prints a non-blocking Postgres Historic SQL probe failure after connection test succeeds', async () => {

View file

@ -34,6 +34,7 @@ export interface KtxSetupDatabasesArgs {
enableHistoricSql?: boolean;
disableHistoricSql?: boolean;
historicSqlWindowDays?: number;
historicSqlMinExecutions?: number;
historicSqlMinCalls?: number;
historicSqlServiceAccountPatterns?: string[];
historicSqlRedactionPatterns?: string[];
@ -276,7 +277,7 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
const [{ PostgresPgssQueryHistoryReader }, { KtxPostgresHistoricSqlQueryClient, isKtxPostgresConnectionConfig }] =
const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient, isKtxPostgresConnectionConfig }] =
await Promise.all([import('@ktx/context/ingest'), import('@ktx/connector-postgres')]);
const postgresConnection = connection as Parameters<typeof isKtxPostgresConnectionConfig>[0];
@ -292,7 +293,7 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr
connection: postgresConnection,
});
try {
const result = await new PostgresPgssQueryHistoryReader().probe(client);
const result = await new PostgresPgssReader().probe(client);
return {
ok: true,
lines: [
@ -751,20 +752,20 @@ async function maybeApplyHistoricSqlConfig(input: {
return { ...input.connection, historicSql: { ...existing, enabled: false, dialect } };
}
const common = {
const common: Record<string, unknown> = {
...existing,
enabled: true,
dialect,
serviceAccountUserPatterns: input.args.historicSqlServiceAccountPatterns ?? [],
filters: historicSqlFiltersForSetup(input.args.historicSqlServiceAccountPatterns),
};
delete common[['serviceAccount', 'UserPatterns'].join('')];
if (dialect === 'postgres') {
return {
...input.connection,
historicSql: {
...common,
minCalls: input.args.historicSqlMinCalls ?? 5,
maxTemplatesPerRun: 5000,
minExecutions: input.args.historicSqlMinExecutions ?? input.args.historicSqlMinCalls ?? 5,
},
};
}
@ -779,6 +780,21 @@ async function maybeApplyHistoricSqlConfig(input: {
};
}
function historicSqlFiltersForSetup(patterns: string[] | undefined) {
const serviceAccountPatterns = patterns ?? [];
return {
dropTrivialProbes: true,
...(serviceAccountPatterns.length > 0
? {
serviceAccounts: {
patterns: serviceAccountPatterns,
mode: 'exclude' as const,
},
}
: {}),
};
}
async function defaultTestConnection(projectDir: string, connectionId: string, io: KtxCliIo): Promise<number> {
return await runKtxConnection({ command: 'test', projectDir, connectionId }, io);
}

View file

@ -1174,6 +1174,66 @@ describe('setup status', () => {
expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']);
});
it('does not fail context build when prerequisites were explicitly skipped and agents are skipped', async () => {
const calls: string[] = [];
const io = makeIo();
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'project: revenue',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DEMO_DATABASE_URL',
' readonly: true',
'',
].join('\n'),
'utf-8',
);
await expect(
runKtxSetup(
{
command: 'run',
projectDir: tempDir,
mode: 'existing',
agents: false,
skipAgents: true,
inputMode: 'disabled',
yes: true,
cliVersion: '0.2.0',
skipLlm: true,
skipEmbeddings: true,
skipDatabases: true,
skipSources: true,
databaseSchemas: [],
},
io.io,
{
model: async () => {
calls.push('model');
return { status: 'skipped', projectDir: tempDir };
},
embeddings: async () => {
calls.push('embeddings');
return { status: 'skipped', projectDir: tempDir };
},
databases: async () => {
calls.push('databases');
return { status: 'skipped', projectDir: tempDir };
},
sources: async () => {
calls.push('sources');
return { status: 'skipped', projectDir: tempDir };
},
},
),
).resolves.toBe(0);
expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']);
expect(io.stderr()).not.toContain('KTX cannot build agent-ready context yet.');
});
it('runs context after sources and before agents in full setup', async () => {
const calls: string[] = [];
const io = makeIo();

View file

@ -82,6 +82,7 @@ export type KtxSetupArgs =
enableHistoricSql?: boolean;
disableHistoricSql?: boolean;
historicSqlWindowDays?: number;
historicSqlMinExecutions?: number;
historicSqlMinCalls?: number;
historicSqlServiceAccountPatterns?: string[];
historicSqlRedactionPatterns?: string[];
@ -644,6 +645,9 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup
...(args.enableHistoricSql !== undefined ? { enableHistoricSql: args.enableHistoricSql } : {}),
...(args.disableHistoricSql !== undefined ? { disableHistoricSql: args.disableHistoricSql } : {}),
...(args.historicSqlWindowDays !== undefined ? { historicSqlWindowDays: args.historicSqlWindowDays } : {}),
...(args.historicSqlMinExecutions !== undefined
? { historicSqlMinExecutions: args.historicSqlMinExecutions }
: {}),
...(args.historicSqlMinCalls !== undefined ? { historicSqlMinCalls: args.historicSqlMinCalls } : {}),
...(args.historicSqlServiceAccountPatterns
? { historicSqlServiceAccountPatterns: args.historicSqlServiceAccountPatterns }