Merge scan into ingest flow

This commit is contained in:
Andrey Avtomonov 2026-05-13 23:53:45 +02:00
parent 75e04cfa56
commit ecebc018b9
31 changed files with 747 additions and 216 deletions

View file

@ -50,11 +50,8 @@ describe('AgentRunnerService.runLoop', () => {
telemetryTags: { source: 'test' },
});
const call = (generateText as any).mock.calls[0][0];
expect(call.messages).toEqual([
{ role: 'system', content: 'SYS' },
{ role: 'user', content: 'USR' },
]);
expect(call.system).toBeUndefined();
expect(call.system).toEqual({ role: 'system', content: 'SYS' });
expect(call.messages).toEqual([{ role: 'user', content: 'USR' }]);
expect(call.prompt).toBeUndefined();
expect(call.tools).toEqual(tools);
expect(call.stopWhen).toBe(17);
@ -77,10 +74,8 @@ describe('AgentRunnerService.runLoop', () => {
expect(llmProvider.getModel).toHaveBeenCalledWith('candidateExtraction');
expect(generateText).toHaveBeenCalledWith(
expect.objectContaining({
messages: [
{ role: 'system', content: 'system' },
{ role: 'user', content: 'user' },
],
system: { role: 'system', content: 'system' },
messages: [{ role: 'user', content: 'user' }],
}),
);
});

View file

@ -36,6 +36,14 @@ export interface AgentRunnerServiceDeps {
logger?: KtxLogger;
}
function splitSystemPromptMessages(messages: ReturnType<KtxMessageBuilder['wrapSimple']>['messages']) {
const systemMessages = messages.filter((message) => message.role === 'system');
return {
system: systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: messages.filter((message) => message.role !== 'system'),
};
}
export class AgentRunnerService {
private readonly logger: KtxLogger;
@ -54,6 +62,7 @@ export class AgentRunnerService {
tools: params.toolSet,
model,
});
const promptMessages = splitSystemPromptMessages(built.messages);
await this.deps.debugRequestRecorder?.record(
summarizeKtxLlmDebugRequest({
@ -73,7 +82,8 @@ export class AgentRunnerService {
temperature: 0,
stopWhen: stepCountIs(params.stepBudget),
experimental_telemetry: this.deps.telemetry?.createTelemetry(params.telemetryTags),
messages: built.messages,
...(promptMessages.system ? { system: promptMessages.system } : {}),
messages: promptMessages.messages,
tools: built.tools as Record<string, Tool>,
onStepFinish: async () => {
stepIndex += 1;

View file

@ -103,7 +103,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -137,6 +137,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
minExecutions: 5,
windowDays: 90,
concurrency: 12,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -215,7 +215,7 @@ describe('PostgresPgssReader aggregate path', () => {
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}

View file

@ -102,7 +102,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -136,6 +136,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
minExecutions: 5,
windowDays: 90,
concurrency: 12,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -237,6 +237,80 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
expect(patternsJson).toContain('[REDACTED]');
});
it('limits staged table artifacts to configured enabled tables', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'selected-qualified',
canonicalSql: 'select count(*) from orbit_analytics.int_active_contract_arr',
});
yield aggregate({
templateId: 'selected-unqualified',
canonicalSql: 'select count(*) from int_customer_health_signals',
});
yield aggregate({
templateId: 'unselected',
canonicalSql: 'select count(*) from orbit_raw.accounts',
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
[
'selected-qualified',
{
tablesTouched: ['orbit_analytics.int_active_contract_arr'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'selected-unqualified',
{
tablesTouched: ['int_customer_health_signals'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'unselected',
{
tablesTouched: ['orbit_raw.accounts'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
])),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledTables: [
'orbit_analytics.int_active_contract_arr',
'orbit_analytics.int_customer_health_signals',
],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual([
'int_customer_health_signals.json',
'orbit_analytics.int_active_contract_arr.json',
]);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(2);
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates.map((entry: any) => entry.id)).toEqual(['selected-qualified', 'selected-unqualified']);
});
it('preserves full patterns audit input and writes bounded cross-table pattern shards', async () => {
const stagedDir = await tempDir();
const largeSql = `select * from public.orders o join public.customers c on c.id = o.customer_id where payload = '${'x'.repeat(8000)}'`;

View file

@ -39,9 +39,15 @@ interface StageHistoricSqlAggregatedSnapshotInput {
interface ParsedTemplate {
template: AggregatedTemplate;
tablesTouched: string[];
includedTables: string[];
columnsByClause: Record<string, string[]>;
}
interface EnabledTableFilter {
exact: Set<string>;
uniqueUnqualified: Set<string>;
}
interface TableAccumulator {
table: string;
executions: number;
@ -103,6 +109,45 @@ function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUni
return false;
}
function normalizeTableIdentifier(value: string): string {
return value.trim().toLowerCase();
}
function unqualifiedTableIdentifier(value: string): string {
const parts = normalizeTableIdentifier(value).split('.').filter(Boolean);
return parts.at(-1) ?? '';
}
function buildEnabledTableFilter(enabledTables: string[]): EnabledTableFilter | null {
if (enabledTables.length === 0) {
return null;
}
const exact = new Set(enabledTables.map(normalizeTableIdentifier).filter((value) => value.length > 0));
const unqualifiedCounts = new Map<string, number>();
for (const table of exact) {
const unqualified = unqualifiedTableIdentifier(table);
if (unqualified.length > 0) {
unqualifiedCounts.set(unqualified, (unqualifiedCounts.get(unqualified) ?? 0) + 1);
}
}
return {
exact,
uniqueUnqualified: new Set(
[...unqualifiedCounts.entries()]
.filter(([, count]) => count === 1)
.map(([table]) => table),
),
};
}
function isEnabledTable(table: string, filter: EnabledTableFilter | null): boolean {
if (!filter) {
return true;
}
const normalized = normalizeTableIdentifier(table);
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
}
function redactTemplateSql(
template: AggregatedTemplate,
redactors: readonly HistoricSqlRedactionPattern[],
@ -231,6 +276,7 @@ function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput
export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise<void> {
const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig);
const enabledTableFilter = buildEnabledTableFilter(config.enabledTables);
const redactors = compileHistoricSqlRedactionPatterns(config.redactionPatterns);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000);
@ -259,12 +305,14 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
continue;
}
const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort();
if (tablesTouched.length === 0) {
const includedTables = tablesTouched.filter((table) => isEnabledTable(table, enabledTableFilter));
if (includedTables.length === 0) {
continue;
}
parsedTemplates.push({
template: redactTemplateSql(template, redactors),
tablesTouched,
includedTables,
columnsByClause: Object.fromEntries(
Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]),
),
@ -273,7 +321,7 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
const byTable = new Map<string, TableAccumulator>();
for (const parsed of parsedTemplates) {
for (const table of parsed.tablesTouched) {
for (const table of parsed.includedTables) {
const acc = byTable.get(table) ?? accumulatorFor(table);
addTemplate(acc, parsed);
byTable.set(table, acc);

View file

@ -13,6 +13,7 @@ export const historicSqlUnifiedPullConfigSchema = z.object({
windowDays: z.number().int().positive().default(90),
minExecutions: z.number().int().nonnegative().default(5),
concurrency: z.number().int().positive().default(12),
enabledTables: z.array(z.string().min(1)).default([]),
filters: z.object({
serviceAccounts: z.object({
patterns: z.array(z.string()).default([]),

View file

@ -2,15 +2,19 @@ import { describe, expect, it } from 'vitest';
import { buildDefaultKtxProjectConfig, parseKtxProjectConfig, serializeKtxProjectConfig } from './config.js';
describe('KTX project config', () => {
it.each(['status', 'replay', 'run', 'watch'])('rejects reserved ingest connection id "%s"', (connectionId) => {
expect(() =>
it.each(['status', 'replay', 'run', 'watch'])('accepts former ingest subcommand name "%s" as a connection id', (connectionId) => {
expect(
parseKtxProjectConfig(`
project: reserved-test
connections:
${connectionId}:
driver: postgres
`),
).toThrow(`"${connectionId}" is reserved for the KTX ingest command namespace`);
).toMatchObject({
connections: {
[connectionId]: { driver: 'postgres' },
},
});
});
it('builds the default standalone project config', () => {

View file

@ -112,25 +112,6 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
const RESERVED_INGEST_CONNECTION_IDS = new Map([
['status', 'the KTX ingest command namespace'],
['replay', 'the KTX ingest command namespace'],
['run', 'the KTX ingest command namespace'],
['watch', 'the KTX ingest command namespace'],
]);
export function reservedKtxIngestConnectionIdMessage(connectionId: string): string | null {
const command = RESERVED_INGEST_CONNECTION_IDS.get(connectionId);
return command ? `"${connectionId}" is reserved for ${command}; choose a different connection id.` : null;
}
export function assertKtxConnectionIdIsNotReserved(connectionId: string): void {
const message = reservedKtxIngestConnectionIdMessage(connectionId);
if (message) {
throw new Error(message);
}
}
function stringArray(value: unknown, fallback: string[]): string[] {
if (!Array.isArray(value)) {
return fallback;
@ -507,9 +488,6 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig {
const parsedConnections = isRecord(parsed.connections)
? (parsed.connections as Record<string, KtxProjectConnectionConfig>)
: defaults.connections;
for (const connectionId of Object.keys(parsedConnections)) {
assertKtxConnectionIdIsNotReserved(connectionId);
}
return {
project: project.trim(),

View file

@ -7,10 +7,8 @@ export type {
KtxStorageState,
} from './config.js';
export {
assertKtxConnectionIdIsNotReserved,
buildDefaultKtxProjectConfig,
parseKtxProjectConfig,
reservedKtxIngestConnectionIdMessage,
serializeKtxProjectConfig,
} from './config.js';
export type { LocalGitFileStoreDeps } from './local-git-file-store.js';