fix(ingest): recover textual-conflict gate failures; fix query-history adapter (#255)

* fix(ingest): recover textual-conflict gate failures; fix query-history adapter

Two latent gaps in the isolated-diff local-ingest pipeline that can abort an
otherwise-successful ingest:

- Metabase: when a work-unit patch hit both a textual conflict and a post-merge
  dangling sl_ref, the after-textual-resolution branch returned a hard
  semantic_conflict and rolled back the whole job. It now runs the same
  repairGateFailure recovery the clean-apply branch already uses (re-validate,
  then commit the union of resolved + repaired paths), reaching parity.

- Query history: the historic-sql adapter was registered only when ktx.yaml had
  context.queryHistory.enabled=true, so `--query-history` threw "Adapter not
  available for local ingest". Registration now resolves the dialect from driver
  capability, since the explicit --query-history request is itself the opt-in;
  the config-gated helper is unchanged for status/setup/probes.

Adds the previously-missing tests for both paths.

* chore: sync uv.lock to 0.8.0 (regenerated with pinned uv 0.11.11)

* fix(ingest): drop ktx's own scan probes and dedup tables in query history

Query history (historic-sql) mined two kinds of noise back into context:

- ktx's own warehouse scan emits relationship- and column-profiling probes
  (the relationship_profile_values aggregation and the child_values/parent_values
  FK-overlap CTEs) into pg_stat_statements. shouldDropBySql now filters these
  ktx-owned, dialect-stable signatures so ktx introspection is not ingested as
  usage history.

- The same physical table appears both bare (accounts, via search_path) and
  schema-qualified (orbit_raw.accounts), producing duplicate per-table work
  units. canonicalizeTableIdentifiers collapses a bare name into its unique
  qualified form before work-unit keying; ambiguous names are left untouched.

On the orbit demo this removes ~35% of sampled query templates (ktx self-probes)
and ~45 duplicate per-table work units.

* docs(agents): add Design Reasoning Defaults section
This commit is contained in:
Andrey Avtomonov 2026-06-03 13:05:59 +02:00 committed by GitHub
parent 9d3a0b751d
commit f5dea9a089
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 437 additions and 12 deletions

View file

@ -1,5 +1,8 @@
import { describe, expect, it } from 'vitest';
import { queryHistoryDialectForConnection } from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
import {
historicSqlDialectForConnectionDriver,
queryHistoryDialectForConnection,
} from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
describe('queryHistoryDialectForConnection', () => {
it.each([
@ -21,3 +24,19 @@ describe('queryHistoryDialectForConnection', () => {
expect(queryHistoryDialectForConnection({ driver: 'postgres', context: { queryHistory: { enabled: false } } })).toBeNull();
});
});
describe('historicSqlDialectForConnectionDriver', () => {
it('resolves the dialect from driver capability even when query history is disabled', () => {
expect(
historicSqlDialectForConnectionDriver({ driver: 'postgres', context: { queryHistory: { enabled: false } } }),
).toBe('postgres');
});
it('resolves the dialect when no query-history context is present', () => {
expect(historicSqlDialectForConnectionDriver({ driver: 'bigquery' })).toBe('bigquery');
});
it('returns null for drivers without a historic-SQL reader', () => {
expect(historicSqlDialectForConnectionDriver({ driver: 'mysql', context: { queryHistory: { enabled: true } } })).toBeNull();
});
});

View file

@ -433,4 +433,88 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.warnings).toEqual([]);
});
it("drops ktx's own scan/relationship probes from query history", async () => {
const stagedDir = await tempDir();
const fkOverlapProbe =
'select * from (WITH child_values AS ( SELECT DISTINCT "account_id" AS value FROM "account_owners" WHERE "account_id" IS NOT NULL LIMIT $1 ), parent_values AS ( SELECT DISTINCT "account_id" AS value FROM "accounts" WHERE "account_id" IS NOT NULL ) SELECT (SELECT COUNT(*) FROM child_values) AS child_distinct, (SELECT COUNT(*) FROM parent_values) AS parent_distinct) probe';
const profileProbe =
'select * from (SELECT $1 AS column_name, (SELECT COUNT(*) FROM "orbit_raw"."accounts") AS total, (SELECT STRING_AGG(CAST(value AS TEXT), CHR(31)) FROM (SELECT DISTINCT "id" AS value FROM "orbit_raw"."accounts" LIMIT $2) AS relationship_profile_values) AS samples) profile';
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'analytic',
canonicalSql: 'select status, count(*) from public.orders group by status',
});
yield aggregate({ templateId: 'ktx-fk-overlap', canonicalSql: fkOverlapProbe });
yield aggregate({ templateId: 'ktx-profile', canonicalSql: profileProbe });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['analytic', { tablesTouched: ['public.orders'], columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] } }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
now: new Date('2026-05-11T12:00:00.000Z'),
});
// ktx scan probes are filtered before SQL analysis, so only the analytic query is parsed.
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[{ id: 'analytic', sql: 'select status, count(*) from public.orders group by status' }],
'postgres',
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.orders.json']);
});
it('merges bare and schema-qualified references to the same table into one work unit', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'qualified', canonicalSql: 'select count(*) from orbit_raw.accounts' });
yield aggregate({ templateId: 'bare', canonicalSql: 'select id from accounts where active' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['qualified', { tablesTouched: ['orbit_raw.accounts'], columnsByClause: { select: [], where: [], join: [], groupBy: [] } }],
['bare', { tablesTouched: ['accounts'], columnsByClause: { select: ['id'], where: ['active'], join: [], groupBy: [] } }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
now: new Date('2026-05-11T12:00:00.000Z'),
});
// The bare `accounts` reference resolves to the unique qualified `orbit_raw.accounts`,
// so the two templates collapse into a single work unit instead of two.
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['orbit_raw.accounts.json']);
const merged = await readJson<Record<string, any>>(stagedDir, 'tables/orbit_raw.accounts.json');
expect(merged.topTemplates.map((t: any) => t.id).sort()).toEqual(['bare', 'qualified']);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(1);
});
});

View file

@ -401,4 +401,72 @@ describe('integrateWorkUnitPatch', () => {
});
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n');
});
it('repairs a semantic gate failure after a textual conflict is resolved', async () => {
const { homeDir, configDir, git } = await makeRepo();
await mkdir(join(configDir, 'wiki/global'), { recursive: true });
await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com');
const conflictBase = await git.revParseHead();
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com');
const childDir = join(homeDir, 'child-conflict-repair');
await git.addWorktree(childDir, 'child-conflict-repair', conflictBase);
const childGit = git.forWorktree(childDir);
await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8');
await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com');
const patchPath = join(homeDir, 'proposal-repair.patch');
await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath);
const trace = new FileIngestTraceWriter({
tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-repair/trace.jsonl'),
jobId: 'job-resolver-repair',
connectionId: 'warehouse',
sourceKey: 'metabase',
level: 'trace',
});
// Gate fails on the resolver's merged tree, then passes after the repair edit.
const validateAppliedTree = vi
.fn()
.mockRejectedValueOnce(
new Error('final artifact gates failed:\narr-definition: unknown sl_refs entity mart_arr_daily.arr_dollars'),
)
.mockResolvedValueOnce(undefined);
const repairGateFailure = vi.fn(async (context: { unitKey: string; touchedPaths: string[] }) => {
expect(context).toMatchObject({ unitKey: 'wu-conflict-repair', touchedPaths: ['wiki/global/a.md'] });
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal repaired\n', 'utf-8');
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
});
const result = await integrateWorkUnitPatch({
unitKey: 'wu-conflict-repair',
patchPath,
integrationGit: git,
trace,
author: { name: 'System User', email: 'system@example.com' },
slDisallowed: false,
allowedTargetConnectionIds: new Set(['warehouse']),
validateAppliedTree,
resolveTextualConflict: vi.fn(async () => {
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8');
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
}),
repairGateFailure,
});
expect(result).toMatchObject({
status: 'accepted',
touchedPaths: ['wiki/global/a.md'],
textualResolution: { status: 'repaired' },
gateRepair: { status: 'repaired', attempts: 1, changedPaths: ['wiki/global/a.md'] },
});
expect(validateAppliedTree).toHaveBeenCalledTimes(2);
expect(repairGateFailure).toHaveBeenCalledOnce();
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\nproposal repaired\n');
await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_accepted_after_textual_resolution');
});
});

View file

@ -70,6 +70,37 @@ describe('CLI local ingest adapters', () => {
]);
});
it('registers historic SQL when explicitly requested even if connection query history is disabled', async () => {
await writeProject(
tempDir,
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' context:',
' queryHistory:',
' enabled: false',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
// `--query-history` sets historicSqlConnectionId for the run; that explicit
// request is the opt-in, so the persisted context.queryHistory.enabled flag
// must not gate adapter registration.
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.some((adapter) => adapter.source === 'historic-sql')).toBe(true);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,