Fix historic SQL ingest setup and progress

This commit is contained in:
Andrey Avtomonov 2026-05-11 22:35:07 +02:00
parent f3f6b36551
commit 1bd29c7eb1
14 changed files with 877 additions and 34 deletions

View file

@ -92,7 +92,7 @@ export function registerIngestCommands(
sourceDir: options.sourceDir ? resolve(options.sourceDir) : undefined,
databaseIntrospectionUrl: options.databaseIntrospectionUrl || undefined,
cliVersion: context.packageInfo.version,
runtimeInstallPolicy: runtimeInstallPolicyFromFlags(options),
runtimeInstallPolicy: runtimeInstallPolicyFromFlags({ yes: options.yes }),
...(options.debugLlmRequestFile ? { debugLlmRequestFile: resolve(options.debugLlmRequestFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),

View file

@ -920,7 +920,7 @@ describe('runKtxCli', () => {
sourceDir: tempDir,
databaseIntrospectionUrl: undefined,
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'never',
runtimeInstallPolicy: 'prompt',
debugLlmRequestFile: `${tempDir}/debug.jsonl`,
outputMode: 'json',
inputMode: 'disabled',
@ -934,9 +934,9 @@ describe('runKtxCli', () => {
expect(ingestReplayHelpIo.stderr()).toBe('');
});
it('routes ingest managed runtime install policies', async () => {
it('routes ingest managed runtime install policy separately from visualization input mode', async () => {
const autoIo = makeIo();
const conflictIo = makeIo();
const nonInteractiveIo = makeIo();
const ingest = vi.fn(async () => 0);
await expect(
@ -972,10 +972,10 @@ describe('runKtxCli', () => {
'--yes',
'--no-input',
],
conflictIo.io,
nonInteractiveIo.io,
{ ingest },
),
).resolves.toBe(1);
).resolves.toBe(0);
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({
@ -985,7 +985,16 @@ describe('runKtxCli', () => {
}),
autoIo.io,
);
expect(conflictIo.stderr()).toContain('Choose only one runtime install mode: --yes or --no-input');
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({
command: 'run',
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'auto',
inputMode: 'disabled',
}),
nonInteractiveIo.io,
);
expect(nonInteractiveIo.stderr()).toBe('');
});
it('dispatches public connection through the existing connection implementation', async () => {

View file

@ -304,7 +304,7 @@ describe('runKtxIngest viz and replay', () => {
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('does not attach a live memory-flow sink for plain run output', async () => {
it('attaches a plain progress memory-flow sink for interactive plain run output', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
@ -329,7 +329,8 @@ describe('runKtxIngest viz and replay', () => {
),
).resolves.toBe(0);
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: plain-run');
expect(io.stdout()).not.toContain('KTX memory flow');
});
@ -403,7 +404,8 @@ describe('runKtxIngest viz and replay', () => {
).resolves.toBe(0);
expect(startLiveMemoryFlow).not.toHaveBeenCalled();
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: raw-missing-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(

View file

@ -762,6 +762,103 @@ describe('runKtxIngest', () => {
);
});
it('prints live progress for plain local ingest in interactive terminals', async () => {
const projectDir = join(tempDir, 'historic-sql-progress-project');
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: historic-sql-progress-project',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' historicSql:',
' enabled: true',
' dialect: postgres',
' minExecutions: 2',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
'utf-8',
);
const createdAdapters: SourceAdapter[] = [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const createAdapters = vi.fn(() => createdAdapters as never);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
expect(input.memoryFlow).toBeDefined();
input.memoryFlow?.emit({
type: 'source_acquired',
adapter: 'historic-sql',
trigger: 'manual_resync',
fileCount: 3,
});
input.memoryFlow?.update({ syncId: 'sync-progress-1' });
input.memoryFlow?.emit({ type: 'raw_snapshot_written', syncId: 'sync-progress-1', rawFileCount: 3 });
input.memoryFlow?.emit({ type: 'diff_computed', added: 2, modified: 0, deleted: 0, unchanged: 1 });
input.memoryFlow?.update({
plannedWorkUnits: [
{
unitKey: 'historic-sql-table-public-orders',
rawFiles: ['tables/public/orders.json'],
peerFileCount: 0,
dependencyCount: 0,
},
],
});
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
input.memoryFlow?.emit({
type: 'work_unit_started',
unitKey: 'historic-sql-table-public-orders',
skills: ['historic_sql_table_digest'],
stepBudget: 40,
});
input.memoryFlow?.emit({
type: 'work_unit_finished',
unitKey: 'historic-sql-table-public-orders',
status: 'success',
});
input.memoryFlow?.emit({ type: 'saved', commitSha: null, wikiCount: 0, slCount: 1 });
input.memoryFlow?.emit({ type: 'provenance_recorded', rowCount: 3 });
input.memoryFlow?.emit({ type: 'report_created', runId: 'run-live-1', reportPath: 'report-live-1' });
input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, input.jobId ?? 'historic-progress-job');
});
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
createAdapters,
runLocalIngest: runLocal,
jobIdFactory: () => 'historic-progress-job',
},
),
).resolves.toBe(0);
const stdout = io.stdout();
expect(stdout).toContain('[5%] Fetching source files for warehouse/historic-sql');
expect(stdout).toContain('[15%] Fetched 3 source files from historic-sql');
expect(stdout).toContain('[45%] Planned 1 work unit');
expect(stdout).toContain('[80%] Processed 1/1 work units');
expect(stdout).toContain('[100%] Ingest completed');
expect(stdout.indexOf('[5%] Fetching source files for warehouse/historic-sql')).toBeLessThan(
stdout.indexOf('Report: report-live-1'),
);
expect(io.stderr()).toBe('');
});
it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -8,6 +8,7 @@ import {
ingestReportToMemoryFlowReplay,
type LocalMetabaseFanoutResult,
type LocalMetabaseFanoutProgress,
type MemoryFlowEvent,
type MemoryFlowReplayInput,
type RunLocalIngestOptions,
renderMemoryFlowReplay,
@ -170,6 +171,118 @@ function createMetabaseFanoutProgress(
};
}
function formatDiffProgress(event: Extract<MemoryFlowEvent, { type: 'diff_computed' }>): string {
return `+${event.added}/~${event.modified}/-${event.deleted}/=${event.unchanged}`;
}
function completedWorkUnitCount(snapshot: MemoryFlowReplayInput): number {
return snapshot.events.filter((event) => event.type === 'work_unit_finished').length;
}
function plainIngestEventProgress(
event: MemoryFlowEvent,
snapshot: MemoryFlowReplayInput,
): { percent: number; message: string } | null {
switch (event.type) {
case 'source_acquired':
return {
percent: 15,
message: `Fetched ${pluralize(event.fileCount, 'source file')} from ${event.adapter}`,
};
case 'raw_snapshot_written':
return {
percent: 25,
message: `Wrote raw snapshot ${event.syncId} with ${pluralize(event.rawFileCount, 'file')}`,
};
case 'diff_computed':
return { percent: 35, message: `Computed source diff ${formatDiffProgress(event)}` };
case 'chunks_planned':
return {
percent: 45,
message: `Planned ${pluralize(event.workUnitCount, 'work unit')}`,
};
case 'stage_skipped':
return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` };
case 'work_unit_started':
return { percent: 55, message: `Processing ${event.unitKey}` };
case 'work_unit_finished': {
const total = snapshot.plannedWorkUnits.length || completedWorkUnitCount(snapshot);
const completed = completedWorkUnitCount(snapshot);
const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80;
return {
percent,
message: `Processed ${completed}/${total} work units`,
};
}
case 'reconciliation_finished':
return {
percent: 85,
message: `Reconciled results with ${pluralize(event.conflictCount, 'conflict')} and ${pluralize(
event.fallbackCount,
'fallback',
)}`,
};
case 'saved':
return {
percent: 90,
message: `Saved memory updates (${event.wikiCount} wiki, ${event.slCount} SL)`,
};
case 'provenance_recorded':
return { percent: 95, message: `Recorded ${pluralize(event.rowCount, 'provenance row')}` };
case 'report_created':
return { percent: 98, message: `Created ingest report ${event.reportPath ?? event.runId}` };
case 'scope_detected':
case 'work_unit_step':
case 'candidate_action':
return null;
}
}
function shouldWritePlainIngestProgress(
outputMode: KtxIngestOutputMode,
io: KtxIngestIo,
env: NodeJS.ProcessEnv,
): boolean {
return outputMode === 'plain' && io.stdout.isTTY === true && env.CI !== 'true';
}
function createPlainIngestProgressRenderer(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void } {
let printedEvents = 0;
let lastPercent = 0;
let printedCompletion = false;
const write = (percent: number, message: string) => {
const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent)));
lastPercent = nextPercent;
io.stdout.write(`[${nextPercent}%] ${message}\n`);
};
return {
start() {
write(5, `Fetching source files for ${args.connectionId}/${args.adapter}`);
},
update(snapshot) {
while (printedEvents < snapshot.events.length) {
const event = snapshot.events[printedEvents++];
if (!event) {
continue;
}
const progress = plainIngestEventProgress(event, snapshot);
if (progress) {
write(progress.percent, progress.message);
}
}
if (!printedCompletion && snapshot.status !== 'running') {
printedCompletion = true;
write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed');
}
},
};
}
function writeReportJson(report: IngestReportSnapshot, io: KtxIngestIo): void {
io.stdout.write(`${JSON.stringify(report, null, 2)}\n`);
}
@ -366,10 +479,14 @@ export async function runKtxIngest(
});
const shouldUseLiveViz =
runOutputMode === 'viz' && (args.inputMode ?? 'auto') === 'auto' && isInteractiveTerminal(io);
const initialMemoryFlow = shouldUseLiveViz ? initialRunMemoryFlowInput(args, jobId ?? 'pending') : undefined;
const plainProgress = shouldWritePlainIngestProgress(runOutputMode, io, env)
? createPlainIngestProgressRenderer(args, io)
: null;
const initialMemoryFlow =
shouldUseLiveViz || plainProgress ? initialRunMemoryFlowInput(args, jobId ?? 'pending') : undefined;
let latestMemoryFlowSnapshot: MemoryFlowReplayInput | null = initialMemoryFlow ?? null;
if (initialMemoryFlow && isTuiCapableIo(io)) {
if (shouldUseLiveViz && initialMemoryFlow && isTuiCapableIo(io)) {
const startLiveMemoryFlow = deps.startLiveMemoryFlow ?? startLiveMemoryFlowTui;
liveTui = await startLiveMemoryFlow(initialMemoryFlow, io);
}
@ -382,13 +499,17 @@ export async function runKtxIngest(
liveTui.update(snapshot);
return;
}
if (!liveTui) {
if (shouldUseLiveViz && !liveTui) {
writeMemoryFlowInput(snapshot, io, { clear: true });
return;
}
plainProgress?.update(snapshot);
},
})
: undefined;
plainProgress?.start();
try {
const result = await executeLocalIngest({
project,
@ -403,7 +524,7 @@ export async function runKtxIngest(
...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}),
...(memoryFlow ? { memoryFlow } : {}),
});
if (memoryFlow) {
if (shouldUseLiveViz && memoryFlow) {
latestMemoryFlowSnapshot = memoryFlow.snapshot();
liveTui?.close();
liveTui = null;

View file

@ -767,6 +767,9 @@ export async function runKtxSetupContextStep(
const missing = missingCapabilities(project);
if (missing.length > 0) {
if (args.allowEmpty === true) {
return { status: 'skipped', projectDir: args.projectDir };
}
writeMissingCapabilities(missing, io);
return { status: 'missing-input', projectDir: args.projectDir };
}

View file

@ -1174,6 +1174,66 @@ describe('setup status', () => {
expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']);
});
it('does not fail context build when prerequisites were explicitly skipped and agents are skipped', async () => {
const calls: string[] = [];
const io = makeIo();
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'project: revenue',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DEMO_DATABASE_URL',
' readonly: true',
'',
].join('\n'),
'utf-8',
);
await expect(
runKtxSetup(
{
command: 'run',
projectDir: tempDir,
mode: 'existing',
agents: false,
skipAgents: true,
inputMode: 'disabled',
yes: true,
cliVersion: '0.2.0',
skipLlm: true,
skipEmbeddings: true,
skipDatabases: true,
skipSources: true,
databaseSchemas: [],
},
io.io,
{
model: async () => {
calls.push('model');
return { status: 'skipped', projectDir: tempDir };
},
embeddings: async () => {
calls.push('embeddings');
return { status: 'skipped', projectDir: tempDir };
},
databases: async () => {
calls.push('databases');
return { status: 'skipped', projectDir: tempDir };
},
sources: async () => {
calls.push('sources');
return { status: 'skipped', projectDir: tempDir };
},
},
),
).resolves.toBe(0);
expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']);
expect(io.stderr()).not.toContain('KTX cannot build agent-ready context yet.');
});
it('runs context after sources and before agents in full setup', async () => {
const calls: string[] = [];
const io = makeIo();

View file

@ -129,6 +129,25 @@ describe('KtxPostgresScanConnector', () => {
options: '-c search_path=analytics,public',
ssl: { rejectUnauthorized: false },
});
const libpqPreferConfig = postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
url: 'env:DEMO_DATABASE_URL',
readonly: true,
},
env: {
DEMO_DATABASE_URL: 'postgresql://reader@start.kaelio.com:5432/demo?sslmode=prefer',
},
});
expect(libpqPreferConfig).toMatchObject({
host: 'start.kaelio.com',
port: 5432,
database: 'demo',
user: 'reader',
});
expect(libpqPreferConfig).not.toHaveProperty('connectionString');
expect(libpqPreferConfig).not.toHaveProperty('ssl');
expect(() =>
postgresPoolConfigFromConfig({
connectionId: 'warehouse',

View file

@ -57,6 +57,8 @@ export interface KtxPostgresConnectionConfig {
schema?: string;
schemas?: string[];
ssl?: boolean;
sslmode?: string;
sslMode?: string;
rejectUnauthorized?: boolean;
readonly?: boolean;
[key: string]: unknown;
@ -253,15 +255,22 @@ function numberValue(value: unknown): number | undefined {
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : undefined,
database: parsed.pathname.replace(/^\/+/, '') || undefined,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
...(sslmode ? { sslmode } : {}),
};
}
function normalizedSslMode(connection: KtxPostgresConnectionConfig): string | undefined {
const value = connection.sslmode ?? connection.sslMode;
return typeof value === 'string' && value.trim().length > 0 ? value.trim().toLowerCase() : undefined;
}
function schemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
return connection.schemas.filter((schema): schema is string => typeof schema === 'string' && schema.length > 0);
@ -299,6 +308,7 @@ export function postgresPoolConfigFromConfig(input: {
const database = stringConfigValue(merged, 'database', env);
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
@ -314,7 +324,7 @@ export function postgresPoolConfigFromConfig(input: {
max: 10,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'
? { connectionString: referencedUrl }
: { host, port: numberValue(merged.port) ?? 5432, database, user, password }),
};
@ -322,7 +332,7 @@ export function postgresPoolConfigFromConfig(input: {
if (searchPathSchemas.length > 0) {
config.options = `-c search_path=${searchPathSchemas.join(',')}`;
}
if (merged.ssl) {
if (merged.ssl && sslmode !== 'prefer' && sslmode !== 'disable') {
config.ssl = { rejectUnauthorized: merged.rejectUnauthorized ?? true };
}
return config;

View file

@ -1,7 +1,16 @@
import { describe, expect, it, vi } from 'vitest';
import { asSchema } from 'ai';
import { createEmitHistoricSqlEvidenceTool } from './evidence-tool.js';
describe('emit_historic_sql_evidence tool', () => {
it('exposes an AI SDK v6 tool input schema with top-level object type', async () => {
const tool = createEmitHistoricSqlEvidenceTool();
expect(await asSchema(tool.inputSchema).jsonSchema).toMatchObject({
type: 'object',
});
});
it('writes table usage evidence to the ignored run evidence directory', async () => {
const writeFile = vi.fn(async () => ({ success: true, commitHash: null }));
const tool = createEmitHistoricSqlEvidenceTool();

View file

@ -6,6 +6,42 @@ import { patternOutputSchema, tableUsageOutputSchema } from './skill-schemas.js'
const SYSTEM_AUTHOR = 'System User';
const SYSTEM_EMAIL = 'system@example.com';
const emitHistoricSqlEvidenceInputSchema = z
.object({
kind: z.enum(['table_usage', 'pattern']),
table: z.string().min(1).optional(),
rawPath: z.string().min(1),
usage: tableUsageOutputSchema.optional(),
pattern: patternOutputSchema.optional(),
})
.superRefine((input, ctx) => {
if (input.kind === 'table_usage') {
if (!input.table) {
ctx.addIssue({
code: 'custom',
path: ['table'],
message: 'table is required when kind is table_usage',
});
}
if (!input.usage) {
ctx.addIssue({
code: 'custom',
path: ['usage'],
message: 'usage is required when kind is table_usage',
});
}
}
if (input.kind === 'pattern' && !input.pattern) {
ctx.addIssue({
code: 'custom',
path: ['pattern'],
message: 'pattern is required when kind is pattern',
});
}
});
type EmitHistoricSqlEvidenceInput = z.infer<typeof emitHistoricSqlEvidenceInputSchema>;
interface EmitHistoricSqlEvidenceToolContext {
connectionId?: string | null;
session?: {
@ -23,30 +59,42 @@ interface EmitHistoricSqlEvidenceToolContext {
};
}
function unitKeyForEvidence(input: { kind: string; table?: string; pattern?: { slug: string } }): string {
function unitKeyForEvidence(input: EmitHistoricSqlEvidenceInput): string {
if (input.kind === 'table_usage') {
return `historic-sql-table-${String(input.table).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`;
}
return `historic-sql-pattern-${String(input.pattern?.slug).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`;
}
function evidenceEnvelope(input: EmitHistoricSqlEvidenceInput, connectionId: string) {
if (input.kind === 'table_usage') {
if (!input.table || !input.usage) {
throw new Error('Invalid historic-SQL table usage evidence input.');
}
return {
kind: 'table_usage' as const,
connectionId,
table: input.table,
rawPath: input.rawPath,
usage: input.usage,
};
}
if (!input.pattern) {
throw new Error('Invalid historic-SQL pattern evidence input.');
}
return {
kind: 'pattern' as const,
connectionId,
rawPath: input.rawPath,
pattern: input.pattern,
};
}
export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricSqlEvidenceToolContext) {
return tool({
description:
'Record typed historic-SQL evidence for deterministic projection. Use this instead of wiki_write, sl_write_source, sl_edit_source, or context_candidate_write during historic-SQL WorkUnits.',
inputSchema: z.discriminatedUnion('kind', [
z.object({
kind: z.literal('table_usage'),
table: z.string().min(1),
rawPath: z.string().min(1),
usage: tableUsageOutputSchema,
}),
z.object({
kind: z.literal('pattern'),
rawPath: z.string().min(1),
pattern: patternOutputSchema,
}),
]),
inputSchema: emitHistoricSqlEvidenceInputSchema,
execute: async (input, options): Promise<string> => {
const context = (options.experimental_context as EmitHistoricSqlEvidenceToolContext | undefined) ?? defaultContext;
const ingest = context?.session?.ingest;
@ -56,7 +104,8 @@ export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricS
}
const unitKey = unitKeyForEvidence(input);
const content = serializeHistoricSqlEvidence({ ...input, connectionId: context.connectionId });
const evidence = evidenceEnvelope(input, context.connectionId);
const content = serializeHistoricSqlEvidence(evidence);
await configService.writeFile(
historicSqlEvidencePath(ingest.runId, unitKey),
content,
@ -65,7 +114,7 @@ export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricS
`Record historic-SQL evidence: ${unitKey}`,
{ skipLock: true },
);
const label = input.kind === 'table_usage' ? input.table : input.pattern.slug;
const label = evidence.kind === 'table_usage' ? evidence.table : evidence.pattern.slug;
return `Recorded historic-SQL ${input.kind} evidence for ${label}.`;
},
});