From f5dea9a0891305e7c4d90b0156638681fe75c1dc Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 3 Jun 2026 13:05:59 +0200 Subject: [PATCH] fix(ingest): recover textual-conflict gate failures; fix query-history adapter (#255) * fix(ingest): recover textual-conflict gate failures; fix query-history adapter Two latent gaps in the isolated-diff local-ingest pipeline that can abort an otherwise-successful ingest: - Metabase: when a work-unit patch hit both a textual conflict and a post-merge dangling sl_ref, the after-textual-resolution branch returned a hard semantic_conflict and rolled back the whole job. It now runs the same repairGateFailure recovery the clean-apply branch already uses (re-validate, then commit the union of resolved + repaired paths), reaching parity. - Query history: the historic-sql adapter was registered only when ktx.yaml had context.queryHistory.enabled=true, so `--query-history` threw "Adapter not available for local ingest". Registration now resolves the dialect from driver capability, since the explicit --query-history request is itself the opt-in; the config-gated helper is unchanged for status/setup/probes. Adds the previously-missing tests for both paths. * chore: sync uv.lock to 0.8.0 (regenerated with pinned uv 0.11.11) * fix(ingest): drop ktx's own scan probes and dedup tables in query history Query history (historic-sql) mined two kinds of noise back into context: - ktx's own warehouse scan emits relationship- and column-profiling probes (the relationship_profile_values aggregation and the child_values/parent_values FK-overlap CTEs) into pg_stat_statements. shouldDropBySql now filters these ktx-owned, dialect-stable signatures so ktx introspection is not ingested as usage history. - The same physical table appears both bare (accounts, via search_path) and schema-qualified (orbit_raw.accounts), producing duplicate per-table work units. canonicalizeTableIdentifiers collapses a bare name into its unique qualified form before work-unit keying; ambiguous names are left untouched. On the orbit demo this removes ~35% of sampled query templates (ktx self-probes) and ~45 duplicate per-table work units. * docs(agents): add Design Reasoning Defaults section --- AGENTS.md | 59 ++++++++++++ .../historic-sql/connection-dialect.ts | 20 +++- .../adapters/historic-sql/stage-unified.ts | 62 ++++++++++++ .../ingest/isolated-diff/patch-integrator.ts | 95 ++++++++++++++++++- packages/cli/src/local-adapters.ts | 9 +- .../historic-sql/connection-dialect.test.ts | 21 +++- .../historic-sql/stage-unified.test.ts | 84 ++++++++++++++++ .../isolated-diff/patch-integrator.test.ts | 68 +++++++++++++ packages/cli/test/local-adapters.test.ts | 31 ++++++ 9 files changed, 437 insertions(+), 12 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 0cd9da93..20f9bcdf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -159,6 +159,65 @@ and naming asymmetries are bugs in waiting — see [`docs/code-design.md`](docs/code-design.md). Treat the `MUST` / `MUST NOT` rules there with the same weight as the ones in this file. +## Design Reasoning Defaults + +When proposing a design, an approach, or any non-trivial change, apply these +defaults and run the self-check before presenting it. They encode the +corrections users most often have to make; reaching these conclusions +autonomously — without being asked the leading question — is the bar. + +- **MUST**: Optimize for the best outcome, not for an unstated constraint. Do not + silently adopt "smallest change", "least effort", "cheapest", or "least user + intervention" as the goal unless the user said so. Default to the most correct, + durable solution, and present cost / effort / scope as information for the user + to weigh — not as a ceiling you impose on their behalf. +- **MUST**: Separate one-time cost from recurring cost before discarding an + option. A fixed cost paid once (a setup-time computation, an extra LLM call + during setup, a contract change) to make every later run cheaper or more + correct is usually worth it. Do not reject it with recurring-cost reasoning; + quantify both sides. (Example smell: "don't add an LLM call to a cost-cutting + feature" — wrong when the call is one-time and the savings recur.) +- **MUST**: Treat a user's example as a representative of a class, not as the + spec. Design for the general population the example stands for, then stress-test + against deliberately different instances — another warehouse, dialect, stack + layout, or input shape — before committing. If a design only works because of an + incidental property of the example (e.g. "the noise happened to be in a separate + schema *on this demo*"), it is curve-fitting; generalize it or state the + assumption explicitly. +- **MUST**: Prefer deriving from the system's own state over enumerating cases. + Favor an allowlist computed from declared/observed state (config, scanned + catalog, query log, the user's own inputs) over a denylist of known-bad + specifics (particular tables, schemas, tools, or vendors). A hardcoded or + hand-maintained list of external specifics is a smell: it rots and fails on the + next stack. The only acceptable static patterns are genuinely universal + invariants (e.g. DB-engine system catalogs) and ktx's own self-emitted + signatures. +- **SHOULD**: Before inventing an abstraction or hand-rolling structural logic, + search for what already exists and reuse it — the codebase's canonical + representation (a structured ref/key type) instead of a parallel string scheme, + and a mandated/available tool (e.g. `sqlglot` for SQL structure; see + [SQL and Structured Parsing](#sql-and-structured-parsing)) instead of + hand-parsing. Normalize ambiguous input to the canonical form at the boundary; + do not carry the ambiguity downstream. This is the single-source-of-truth / DRY + item from the Priority Hierarchy applied at design time. + +Before presenting a design, answer these explicitly: + +1. Am I optimizing for a goal the user actually stated, or one I assumed? +2. Does this generalize beyond the example in front of me? Name a real case where + it would break. +3. Am I enumerating known-bad cases when I could derive scope from the system's + own declared/observed state? +4. Is there an existing canonical representation or mandated tool I should reuse + instead of building or parsing my own? +5. Am I discarding the better option on a weak or misapplied constraint + (one-time vs recurring cost, "more surface area", "more work now")? + +A user question that nudges toward an alternative ("would X help?", "should I +always do Y?", "will you hardcode Z?") is a signal that a better option exists. +Investigate the implied direction and reason it through *before* defending the +original proposal — and prefer to have asked yourself the question first. + ## TypeScript Standards - Use Node 22+ and pnpm workspace commands. diff --git a/packages/cli/src/context/ingest/adapters/historic-sql/connection-dialect.ts b/packages/cli/src/context/ingest/adapters/historic-sql/connection-dialect.ts index dd95f87a..7845cbbc 100644 --- a/packages/cli/src/context/ingest/adapters/historic-sql/connection-dialect.ts +++ b/packages/cli/src/context/ingest/adapters/historic-sql/connection-dialect.ts @@ -26,6 +26,21 @@ export function isQueryHistoryEnabled(connection: unknown): boolean { return queryHistoryRecord(connection)?.enabled === true; } +/** + * Resolves the query-history dialect from the connection's driver capability + * alone, ignoring whether query history is enabled in ktx.yaml. Use this on the + * adapter-registration path when query history has been explicitly requested + * for the run (e.g. via `--query-history`, which is itself the opt-in): the + * persisted `context.queryHistory.enabled` flag must not gate registration. + * Returns null when the connection's driver has no query-history reader. + */ +export function historicSqlDialectForConnectionDriver(connection: unknown): HistoricSqlDialect | null { + const conn = recordOrNull(connection); + const driver = String(conn?.driver ?? '').toLowerCase(); + const registration = getDriverRegistration(driver); + return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null; +} + /** * Resolves the query-history dialect for a connection. Returns null when * query history is disabled, or when the connection's driver has no @@ -35,8 +50,5 @@ export function queryHistoryDialectForConnection(connection: unknown): HistoricS if (!isQueryHistoryEnabled(connection)) { return null; } - const conn = recordOrNull(connection); - const driver = String(conn?.driver ?? '').toLowerCase(); - const registration = getDriverRegistration(driver); - return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null; + return historicSqlDialectForConnectionDriver(connection); } diff --git a/packages/cli/src/context/ingest/adapters/historic-sql/stage-unified.ts b/packages/cli/src/context/ingest/adapters/historic-sql/stage-unified.ts index 70997648..853a3e68 100644 --- a/packages/cli/src/context/ingest/adapters/historic-sql/stage-unified.ts +++ b/packages/cli/src/context/ingest/adapters/historic-sql/stage-unified.ts @@ -79,8 +79,21 @@ function matchesAny(value: string | null, patterns: RegExp[]): boolean { return !!value && patterns.some((pattern) => pattern.test(value)); } +// ktx's own warehouse scan emits relationship- and column-profiling probes that land in +// pg_stat_statements (relationship-validation, relationship-composite-candidates, and each +// dialect's relationship value aggregation). They are ktx introspection, not genuine query +// usage, so they must not be mined back as query history. The markers are ktx-owned +// identifiers, stable across dialects. +function isKtxScanProbe(sql: string): boolean { + if (/\brelationship_profile_values\b/i.test(sql)) { + return true; + } + return /\bchild_values\b/i.test(sql) && /\bparent_values\b/i.test(sql); +} + function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean { if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true; + if (isKtxScanProbe(sql)) return true; if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true; return false; } @@ -148,6 +161,53 @@ function isEnabledTable(table: string, filter: EnabledTableFilter | null): boole return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized)); } +/** + * pg_stat_statements records queries as written, so the same physical table can appear + * both bare (`accounts`, resolved via search_path) and schema-qualified + * (`orbit_raw.accounts`). Collapse a bare identifier into its schema-qualified form when + * exactly one qualified form shares its unqualified name, so the two never become separate + * work units. Ambiguous bare names (two qualified forms) are left untouched. + */ +function canonicalizeTableIdentifiers(parsedTemplates: ParsedTemplate[]): void { + const all = new Set(); + for (const parsed of parsedTemplates) { + for (const table of parsed.includedTables) { + all.add(table); + } + } + const qualifiedByUnqualified = new Map>(); + for (const table of all) { + if (!table.includes('.')) { + continue; + } + const unqualified = unqualifiedTableIdentifier(table); + if (unqualified.length === 0) { + continue; + } + const forms = qualifiedByUnqualified.get(unqualified) ?? new Set(); + forms.add(table); + qualifiedByUnqualified.set(unqualified, forms); + } + const canonical = new Map(); + for (const table of all) { + if (table.includes('.')) { + continue; + } + const forms = qualifiedByUnqualified.get(unqualifiedTableIdentifier(table)); + if (forms && forms.size === 1) { + canonical.set(table, [...forms][0]); + } + } + if (canonical.size === 0) { + return; + } + const remap = (table: string): string => canonical.get(table) ?? table; + for (const parsed of parsedTemplates) { + parsed.includedTables = [...new Set(parsed.includedTables.map(remap))].sort(); + parsed.tablesTouched = [...new Set(parsed.tablesTouched.map(remap))].sort(); + } +} + function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number { return 'windowDays' in config ? config.windowDays : 90; } @@ -323,6 +383,8 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql }); } + canonicalizeTableIdentifiers(parsedTemplates); + const byTable = new Map(); for (const parsed of parsedTemplates) { for (const table of parsed.includedTables) { diff --git a/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts b/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts index 869c019e..1e2f0cee 100644 --- a/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts +++ b/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts @@ -155,18 +155,103 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) }, ); } catch (semanticError) { - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } + const reason = errorMessage(semanticError); await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', { unitKey: input.unitKey, patchPath: input.patchPath, touchedPaths: textualResolution.changedPaths, - reason: errorMessage(semanticError), + reason, }); + + // A textual conflict and a semantic-gate failure can co-occur: the resolver + // reconciles the text but can leave wiki sl_refs pointing at measures the + // merged source no longer defines. Recover via the same gate repair the + // clean-apply branch uses, instead of hard-failing the whole job. + if (input.repairGateFailure) { + const gateRepair = await input.repairGateFailure({ + unitKey: input.unitKey, + patchPath: input.patchPath, + touchedPaths: textualResolution.changedPaths, + reason, + }); + + if (gateRepair.status !== 'failed') { + // The resolver wrote its merge to the worktree (unstaged); the repair + // edited a subset on top. Commit the union so neither is dropped. + const resolvedAndRepairedPaths = [ + ...new Set([...textualResolution.changedPaths, ...gateRepair.changedPaths]), + ].sort(); + try { + await traceTimed( + input.trace, + 'integration', + 'semantic_gate_after_gate_repair', + { unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths }, + async () => { + await input.validateAppliedTree(gateRepair.changedPaths); + }, + ); + + const commit = await input.integrationGit.commitFiles( + resolvedAndRepairedPaths, + `ingest: resolve WorkUnit ${input.unitKey} conflict`, + input.author.name, + input.author.email, + ); + if (commit.created) { + await input.trace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', { + unitKey: input.unitKey, + commitSha: commit.commitHash, + touchedPaths: resolvedAndRepairedPaths, + attempts: textualResolution.attempts, + gateRepairAttempts: gateRepair.attempts, + }); + return { + status: 'accepted', + commitSha: commit.commitHash, + touchedPaths: resolvedAndRepairedPaths, + textualResolution, + gateRepair, + }; + } + } catch (repairValidationError) { + if (preApplyHead) { + await input.integrationGit.resetHardTo(preApplyHead); + } + await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', { + unitKey: input.unitKey, + patchPath: input.patchPath, + touchedPaths: gateRepair.changedPaths, + reason: errorMessage(repairValidationError), + }); + return { + status: 'semantic_conflict', + reason: errorMessage(repairValidationError), + touchedPaths: gateRepair.changedPaths, + textualResolution, + gateRepair, + }; + } + } + + if (preApplyHead) { + await input.integrationGit.resetHardTo(preApplyHead); + } + return { + status: 'semantic_conflict', + reason: gateRepair.status === 'failed' ? gateRepair.reason : reason, + touchedPaths: textualResolution.changedPaths, + textualResolution, + gateRepair, + }; + } + + if (preApplyHead) { + await input.integrationGit.resetHardTo(preApplyHead); + } return { status: 'semantic_conflict', - reason: errorMessage(semanticError), + reason, touchedPaths: textualResolution.changedPaths, textualResolution, }; diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index cfc57adc..0cd2d940 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -12,7 +12,7 @@ import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js'; import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js'; import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js'; import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js'; -import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js'; +import { historicSqlDialectForConnectionDriver } from './context/ingest/adapters/historic-sql/connection-dialect.js'; import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js'; import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js'; import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js'; @@ -268,7 +268,12 @@ function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCli return undefined; } const connection = project.config.connections[connectionId]; - const dialect = queryHistoryDialectForConnection(connection); + // historicSqlConnectionId is only set when query history was explicitly + // requested for this run (e.g. `--query-history`), so resolve the dialect from + // driver capability rather than the persisted context.queryHistory.enabled + // flag — otherwise the adapter is missing and findAdapter('historic-sql') + // throws even though the run asked for it. + const dialect = historicSqlDialectForConnectionDriver(connection); if (!dialect) { return undefined; } diff --git a/packages/cli/test/context/ingest/adapters/historic-sql/connection-dialect.test.ts b/packages/cli/test/context/ingest/adapters/historic-sql/connection-dialect.test.ts index 8dc2ec88..935bab8e 100644 --- a/packages/cli/test/context/ingest/adapters/historic-sql/connection-dialect.test.ts +++ b/packages/cli/test/context/ingest/adapters/historic-sql/connection-dialect.test.ts @@ -1,5 +1,8 @@ import { describe, expect, it } from 'vitest'; -import { queryHistoryDialectForConnection } from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js'; +import { + historicSqlDialectForConnectionDriver, + queryHistoryDialectForConnection, +} from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js'; describe('queryHistoryDialectForConnection', () => { it.each([ @@ -21,3 +24,19 @@ describe('queryHistoryDialectForConnection', () => { expect(queryHistoryDialectForConnection({ driver: 'postgres', context: { queryHistory: { enabled: false } } })).toBeNull(); }); }); + +describe('historicSqlDialectForConnectionDriver', () => { + it('resolves the dialect from driver capability even when query history is disabled', () => { + expect( + historicSqlDialectForConnectionDriver({ driver: 'postgres', context: { queryHistory: { enabled: false } } }), + ).toBe('postgres'); + }); + + it('resolves the dialect when no query-history context is present', () => { + expect(historicSqlDialectForConnectionDriver({ driver: 'bigquery' })).toBe('bigquery'); + }); + + it('returns null for drivers without a historic-SQL reader', () => { + expect(historicSqlDialectForConnectionDriver({ driver: 'mysql', context: { queryHistory: { enabled: true } } })).toBeNull(); + }); +}); diff --git a/packages/cli/test/context/ingest/adapters/historic-sql/stage-unified.test.ts b/packages/cli/test/context/ingest/adapters/historic-sql/stage-unified.test.ts index b930d695..630a3939 100644 --- a/packages/cli/test/context/ingest/adapters/historic-sql/stage-unified.test.ts +++ b/packages/cli/test/context/ingest/adapters/historic-sql/stage-unified.test.ts @@ -433,4 +433,88 @@ describe('stageHistoricSqlAggregatedSnapshot', () => { const manifest = await readJson>(stagedDir, 'manifest.json'); expect(manifest.warnings).toEqual([]); }); + + it("drops ktx's own scan/relationship probes from query history", async () => { + const stagedDir = await tempDir(); + const fkOverlapProbe = + 'select * from (WITH child_values AS ( SELECT DISTINCT "account_id" AS value FROM "account_owners" WHERE "account_id" IS NOT NULL LIMIT $1 ), parent_values AS ( SELECT DISTINCT "account_id" AS value FROM "accounts" WHERE "account_id" IS NOT NULL ) SELECT (SELECT COUNT(*) FROM child_values) AS child_distinct, (SELECT COUNT(*) FROM parent_values) AS parent_distinct) probe'; + const profileProbe = + 'select * from (SELECT $1 AS column_name, (SELECT COUNT(*) FROM "orbit_raw"."accounts") AS total, (SELECT STRING_AGG(CAST(value AS TEXT), CHR(31)) FROM (SELECT DISTINCT "id" AS value FROM "orbit_raw"."accounts" LIMIT $2) AS relationship_profile_values) AS samples) profile'; + const reader: HistoricSqlReader = { + async probe() { + return { warnings: [], info: [] }; + }, + async *fetchAggregated() { + yield aggregate({ + templateId: 'analytic', + canonicalSql: 'select status, count(*) from public.orders group by status', + }); + yield aggregate({ templateId: 'ktx-fk-overlap', canonicalSql: fkOverlapProbe }); + yield aggregate({ templateId: 'ktx-profile', canonicalSql: profileProbe }); + }, + }; + const sqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(async () => new Map([ + ['analytic', { tablesTouched: ['public.orders'], columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] } }], + ])), + validateReadOnly: vi.fn(async () => ({ ok: true })), + }; + + await stageHistoricSqlAggregatedSnapshot({ + stagedDir, + connectionId: 'warehouse', + queryClient: {}, + reader, + sqlAnalysis, + pullConfig: { dialect: 'postgres' }, + now: new Date('2026-05-11T12:00:00.000Z'), + }); + + // ktx scan probes are filtered before SQL analysis, so only the analytic query is parsed. + expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith( + [{ id: 'analytic', sql: 'select status, count(*) from public.orders group by status' }], + 'postgres', + ); + expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.orders.json']); + }); + + it('merges bare and schema-qualified references to the same table into one work unit', async () => { + const stagedDir = await tempDir(); + const reader: HistoricSqlReader = { + async probe() { + return { warnings: [], info: [] }; + }, + async *fetchAggregated() { + yield aggregate({ templateId: 'qualified', canonicalSql: 'select count(*) from orbit_raw.accounts' }); + yield aggregate({ templateId: 'bare', canonicalSql: 'select id from accounts where active' }); + }, + }; + const sqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(async () => new Map([ + ['qualified', { tablesTouched: ['orbit_raw.accounts'], columnsByClause: { select: [], where: [], join: [], groupBy: [] } }], + ['bare', { tablesTouched: ['accounts'], columnsByClause: { select: ['id'], where: ['active'], join: [], groupBy: [] } }], + ])), + validateReadOnly: vi.fn(async () => ({ ok: true })), + }; + + await stageHistoricSqlAggregatedSnapshot({ + stagedDir, + connectionId: 'warehouse', + queryClient: {}, + reader, + sqlAnalysis, + pullConfig: { dialect: 'postgres' }, + now: new Date('2026-05-11T12:00:00.000Z'), + }); + + // The bare `accounts` reference resolves to the unique qualified `orbit_raw.accounts`, + // so the two templates collapse into a single work unit instead of two. + expect(await readdir(join(stagedDir, 'tables'))).toEqual(['orbit_raw.accounts.json']); + const merged = await readJson>(stagedDir, 'tables/orbit_raw.accounts.json'); + expect(merged.topTemplates.map((t: any) => t.id).sort()).toEqual(['bare', 'qualified']); + const manifest = await readJson>(stagedDir, 'manifest.json'); + expect(manifest.touchedTableCount).toBe(1); + }); }); diff --git a/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts b/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts index 1deabfe8..e822472d 100644 --- a/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts +++ b/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts @@ -401,4 +401,72 @@ describe('integrateWorkUnitPatch', () => { }); await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n'); }); + + it('repairs a semantic gate failure after a textual conflict is resolved', async () => { + const { homeDir, configDir, git } = await makeRepo(); + await mkdir(join(configDir, 'wiki/global'), { recursive: true }); + await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8'); + await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com'); + const conflictBase = await git.revParseHead(); + + await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8'); + await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com'); + + const childDir = join(homeDir, 'child-conflict-repair'); + await git.addWorktree(childDir, 'child-conflict-repair', conflictBase); + const childGit = git.forWorktree(childDir); + await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8'); + await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com'); + const patchPath = join(homeDir, 'proposal-repair.patch'); + await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath); + + const trace = new FileIngestTraceWriter({ + tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-repair/trace.jsonl'), + jobId: 'job-resolver-repair', + connectionId: 'warehouse', + sourceKey: 'metabase', + level: 'trace', + }); + + // Gate fails on the resolver's merged tree, then passes after the repair edit. + const validateAppliedTree = vi + .fn() + .mockRejectedValueOnce( + new Error('final artifact gates failed:\narr-definition: unknown sl_refs entity mart_arr_daily.arr_dollars'), + ) + .mockResolvedValueOnce(undefined); + + const repairGateFailure = vi.fn(async (context: { unitKey: string; touchedPaths: string[] }) => { + expect(context).toMatchObject({ unitKey: 'wu-conflict-repair', touchedPaths: ['wiki/global/a.md'] }); + await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal repaired\n', 'utf-8'); + return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] }; + }); + + const result = await integrateWorkUnitPatch({ + unitKey: 'wu-conflict-repair', + patchPath, + integrationGit: git, + trace, + author: { name: 'System User', email: 'system@example.com' }, + slDisallowed: false, + allowedTargetConnectionIds: new Set(['warehouse']), + validateAppliedTree, + resolveTextualConflict: vi.fn(async () => { + await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8'); + return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] }; + }), + repairGateFailure, + }); + + expect(result).toMatchObject({ + status: 'accepted', + touchedPaths: ['wiki/global/a.md'], + textualResolution: { status: 'repaired' }, + gateRepair: { status: 'repaired', attempts: 1, changedPaths: ['wiki/global/a.md'] }, + }); + expect(validateAppliedTree).toHaveBeenCalledTimes(2); + expect(repairGateFailure).toHaveBeenCalledOnce(); + await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\nproposal repaired\n'); + await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_accepted_after_textual_resolution'); + }); }); diff --git a/packages/cli/test/local-adapters.test.ts b/packages/cli/test/local-adapters.test.ts index c7ae58cc..345f662a 100644 --- a/packages/cli/test/local-adapters.test.ts +++ b/packages/cli/test/local-adapters.test.ts @@ -70,6 +70,37 @@ describe('CLI local ingest adapters', () => { ]); }); + it('registers historic SQL when explicitly requested even if connection query history is disabled', async () => { + await writeProject( + tempDir, + [ + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' readonly: true', + ' context:', + ' queryHistory:', + ' enabled: false', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + ); + const project = await loadKtxProject({ projectDir: tempDir }); + + // `--query-history` sets historicSqlConnectionId for the run; that explicit + // request is the opt-in, so the persisted context.queryHistory.enabled flag + // must not gate adapter registration. + const adapters = createKtxCliLocalIngestAdapters(project, { + historicSqlConnectionId: 'warehouse', + sqlAnalysis: sqlAnalysisStub(), + }); + + expect(adapters.some((adapter) => adapter.source === 'historic-sql')).toBe(true); + }); + it('registers BigQuery historic SQL from the requested connection', async () => { await writeProject( tempDir,