fix(ingest): recover textual-conflict gate failures; fix query-history adapter (#255)

* fix(ingest): recover textual-conflict gate failures; fix query-history adapter

Two latent gaps in the isolated-diff local-ingest pipeline that can abort an
otherwise-successful ingest:

- Metabase: when a work-unit patch hit both a textual conflict and a post-merge
  dangling sl_ref, the after-textual-resolution branch returned a hard
  semantic_conflict and rolled back the whole job. It now runs the same
  repairGateFailure recovery the clean-apply branch already uses (re-validate,
  then commit the union of resolved + repaired paths), reaching parity.

- Query history: the historic-sql adapter was registered only when ktx.yaml had
  context.queryHistory.enabled=true, so `--query-history` threw "Adapter not
  available for local ingest". Registration now resolves the dialect from driver
  capability, since the explicit --query-history request is itself the opt-in;
  the config-gated helper is unchanged for status/setup/probes.

Adds the previously-missing tests for both paths.

* chore: sync uv.lock to 0.8.0 (regenerated with pinned uv 0.11.11)

* fix(ingest): drop ktx's own scan probes and dedup tables in query history

Query history (historic-sql) mined two kinds of noise back into context:

- ktx's own warehouse scan emits relationship- and column-profiling probes
  (the relationship_profile_values aggregation and the child_values/parent_values
  FK-overlap CTEs) into pg_stat_statements. shouldDropBySql now filters these
  ktx-owned, dialect-stable signatures so ktx introspection is not ingested as
  usage history.

- The same physical table appears both bare (accounts, via search_path) and
  schema-qualified (orbit_raw.accounts), producing duplicate per-table work
  units. canonicalizeTableIdentifiers collapses a bare name into its unique
  qualified form before work-unit keying; ambiguous names are left untouched.

On the orbit demo this removes ~35% of sampled query templates (ktx self-probes)
and ~45 duplicate per-table work units.

* docs(agents): add Design Reasoning Defaults section
This commit is contained in:
Andrey Avtomonov 2026-06-03 13:05:59 +02:00 committed by GitHub
parent 9d3a0b751d
commit f5dea9a089
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 437 additions and 12 deletions

View file

@ -159,6 +159,65 @@ and naming asymmetries are bugs in waiting — see
[`docs/code-design.md`](docs/code-design.md). Treat the `MUST` / `MUST NOT`
rules there with the same weight as the ones in this file.
## Design Reasoning Defaults
When proposing a design, an approach, or any non-trivial change, apply these
defaults and run the self-check before presenting it. They encode the
corrections users most often have to make; reaching these conclusions
autonomously — without being asked the leading question — is the bar.
- **MUST**: Optimize for the best outcome, not for an unstated constraint. Do not
silently adopt "smallest change", "least effort", "cheapest", or "least user
intervention" as the goal unless the user said so. Default to the most correct,
durable solution, and present cost / effort / scope as information for the user
to weigh — not as a ceiling you impose on their behalf.
- **MUST**: Separate one-time cost from recurring cost before discarding an
option. A fixed cost paid once (a setup-time computation, an extra LLM call
during setup, a contract change) to make every later run cheaper or more
correct is usually worth it. Do not reject it with recurring-cost reasoning;
quantify both sides. (Example smell: "don't add an LLM call to a cost-cutting
feature" — wrong when the call is one-time and the savings recur.)
- **MUST**: Treat a user's example as a representative of a class, not as the
spec. Design for the general population the example stands for, then stress-test
against deliberately different instances — another warehouse, dialect, stack
layout, or input shape — before committing. If a design only works because of an
incidental property of the example (e.g. "the noise happened to be in a separate
schema *on this demo*"), it is curve-fitting; generalize it or state the
assumption explicitly.
- **MUST**: Prefer deriving from the system's own state over enumerating cases.
Favor an allowlist computed from declared/observed state (config, scanned
catalog, query log, the user's own inputs) over a denylist of known-bad
specifics (particular tables, schemas, tools, or vendors). A hardcoded or
hand-maintained list of external specifics is a smell: it rots and fails on the
next stack. The only acceptable static patterns are genuinely universal
invariants (e.g. DB-engine system catalogs) and ktx's own self-emitted
signatures.
- **SHOULD**: Before inventing an abstraction or hand-rolling structural logic,
search for what already exists and reuse it — the codebase's canonical
representation (a structured ref/key type) instead of a parallel string scheme,
and a mandated/available tool (e.g. `sqlglot` for SQL structure; see
[SQL and Structured Parsing](#sql-and-structured-parsing)) instead of
hand-parsing. Normalize ambiguous input to the canonical form at the boundary;
do not carry the ambiguity downstream. This is the single-source-of-truth / DRY
item from the Priority Hierarchy applied at design time.
Before presenting a design, answer these explicitly:
1. Am I optimizing for a goal the user actually stated, or one I assumed?
2. Does this generalize beyond the example in front of me? Name a real case where
it would break.
3. Am I enumerating known-bad cases when I could derive scope from the system's
own declared/observed state?
4. Is there an existing canonical representation or mandated tool I should reuse
instead of building or parsing my own?
5. Am I discarding the better option on a weak or misapplied constraint
(one-time vs recurring cost, "more surface area", "more work now")?
A user question that nudges toward an alternative ("would X help?", "should I
always do Y?", "will you hardcode Z?") is a signal that a better option exists.
Investigate the implied direction and reason it through *before* defending the
original proposal — and prefer to have asked yourself the question first.
## TypeScript Standards
- Use Node 22+ and pnpm workspace commands.

View file

@ -26,6 +26,21 @@ export function isQueryHistoryEnabled(connection: unknown): boolean {
return queryHistoryRecord(connection)?.enabled === true;
}
/**
* Resolves the query-history dialect from the connection's driver capability
* alone, ignoring whether query history is enabled in ktx.yaml. Use this on the
* adapter-registration path when query history has been explicitly requested
* for the run (e.g. via `--query-history`, which is itself the opt-in): the
* persisted `context.queryHistory.enabled` flag must not gate registration.
* Returns null when the connection's driver has no query-history reader.
*/
export function historicSqlDialectForConnectionDriver(connection: unknown): HistoricSqlDialect | null {
const conn = recordOrNull(connection);
const driver = String(conn?.driver ?? '').toLowerCase();
const registration = getDriverRegistration(driver);
return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null;
}
/**
* Resolves the query-history dialect for a connection. Returns null when
* query history is disabled, or when the connection's driver has no
@ -35,8 +50,5 @@ export function queryHistoryDialectForConnection(connection: unknown): HistoricS
if (!isQueryHistoryEnabled(connection)) {
return null;
}
const conn = recordOrNull(connection);
const driver = String(conn?.driver ?? '').toLowerCase();
const registration = getDriverRegistration(driver);
return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null;
return historicSqlDialectForConnectionDriver(connection);
}

View file

@ -79,8 +79,21 @@ function matchesAny(value: string | null, patterns: RegExp[]): boolean {
return !!value && patterns.some((pattern) => pattern.test(value));
}
// ktx's own warehouse scan emits relationship- and column-profiling probes that land in
// pg_stat_statements (relationship-validation, relationship-composite-candidates, and each
// dialect's relationship value aggregation). They are ktx introspection, not genuine query
// usage, so they must not be mined back as query history. The markers are ktx-owned
// identifiers, stable across dialects.
function isKtxScanProbe(sql: string): boolean {
if (/\brelationship_profile_values\b/i.test(sql)) {
return true;
}
return /\bchild_values\b/i.test(sql) && /\bparent_values\b/i.test(sql);
}
function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean {
if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true;
if (isKtxScanProbe(sql)) return true;
if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true;
return false;
}
@ -148,6 +161,53 @@ function isEnabledTable(table: string, filter: EnabledTableFilter | null): boole
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
}
/**
* pg_stat_statements records queries as written, so the same physical table can appear
* both bare (`accounts`, resolved via search_path) and schema-qualified
* (`orbit_raw.accounts`). Collapse a bare identifier into its schema-qualified form when
* exactly one qualified form shares its unqualified name, so the two never become separate
* work units. Ambiguous bare names (two qualified forms) are left untouched.
*/
function canonicalizeTableIdentifiers(parsedTemplates: ParsedTemplate[]): void {
const all = new Set<string>();
for (const parsed of parsedTemplates) {
for (const table of parsed.includedTables) {
all.add(table);
}
}
const qualifiedByUnqualified = new Map<string, Set<string>>();
for (const table of all) {
if (!table.includes('.')) {
continue;
}
const unqualified = unqualifiedTableIdentifier(table);
if (unqualified.length === 0) {
continue;
}
const forms = qualifiedByUnqualified.get(unqualified) ?? new Set<string>();
forms.add(table);
qualifiedByUnqualified.set(unqualified, forms);
}
const canonical = new Map<string, string>();
for (const table of all) {
if (table.includes('.')) {
continue;
}
const forms = qualifiedByUnqualified.get(unqualifiedTableIdentifier(table));
if (forms && forms.size === 1) {
canonical.set(table, [...forms][0]);
}
}
if (canonical.size === 0) {
return;
}
const remap = (table: string): string => canonical.get(table) ?? table;
for (const parsed of parsedTemplates) {
parsed.includedTables = [...new Set(parsed.includedTables.map(remap))].sort();
parsed.tablesTouched = [...new Set(parsed.tablesTouched.map(remap))].sort();
}
}
function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number {
return 'windowDays' in config ? config.windowDays : 90;
}
@ -323,6 +383,8 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
});
}
canonicalizeTableIdentifiers(parsedTemplates);
const byTable = new Map<string, TableAccumulator>();
for (const parsed of parsedTemplates) {
for (const table of parsed.includedTables) {

View file

@ -155,18 +155,103 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
},
);
} catch (semanticError) {
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
const reason = errorMessage(semanticError);
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: textualResolution.changedPaths,
reason: errorMessage(semanticError),
reason,
});
// A textual conflict and a semantic-gate failure can co-occur: the resolver
// reconciles the text but can leave wiki sl_refs pointing at measures the
// merged source no longer defines. Recover via the same gate repair the
// clean-apply branch uses, instead of hard-failing the whole job.
if (input.repairGateFailure) {
const gateRepair = await input.repairGateFailure({
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: textualResolution.changedPaths,
reason,
});
if (gateRepair.status !== 'failed') {
// The resolver wrote its merge to the worktree (unstaged); the repair
// edited a subset on top. Commit the union so neither is dropped.
const resolvedAndRepairedPaths = [
...new Set([...textualResolution.changedPaths, ...gateRepair.changedPaths]),
].sort();
try {
await traceTimed(
input.trace,
'integration',
'semantic_gate_after_gate_repair',
{ unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths },
async () => {
await input.validateAppliedTree(gateRepair.changedPaths);
},
);
const commit = await input.integrationGit.commitFiles(
resolvedAndRepairedPaths,
`ingest: resolve WorkUnit ${input.unitKey} conflict`,
input.author.name,
input.author.email,
);
if (commit.created) {
await input.trace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', {
unitKey: input.unitKey,
commitSha: commit.commitHash,
touchedPaths: resolvedAndRepairedPaths,
attempts: textualResolution.attempts,
gateRepairAttempts: gateRepair.attempts,
});
return {
status: 'accepted',
commitSha: commit.commitHash,
touchedPaths: resolvedAndRepairedPaths,
textualResolution,
gateRepair,
};
}
} catch (repairValidationError) {
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: gateRepair.changedPaths,
reason: errorMessage(repairValidationError),
});
return {
status: 'semantic_conflict',
reason: errorMessage(repairValidationError),
touchedPaths: gateRepair.changedPaths,
textualResolution,
gateRepair,
};
}
}
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
return {
status: 'semantic_conflict',
reason: gateRepair.status === 'failed' ? gateRepair.reason : reason,
touchedPaths: textualResolution.changedPaths,
textualResolution,
gateRepair,
};
}
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
return {
status: 'semantic_conflict',
reason: errorMessage(semanticError),
reason,
touchedPaths: textualResolution.changedPaths,
textualResolution,
};

View file

@ -12,7 +12,7 @@ import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js';
import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js';
import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js';
import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js';
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
import { historicSqlDialectForConnectionDriver } from './context/ingest/adapters/historic-sql/connection-dialect.js';
import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js';
import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js';
import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js';
@ -268,7 +268,12 @@ function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCli
return undefined;
}
const connection = project.config.connections[connectionId];
const dialect = queryHistoryDialectForConnection(connection);
// historicSqlConnectionId is only set when query history was explicitly
// requested for this run (e.g. `--query-history`), so resolve the dialect from
// driver capability rather than the persisted context.queryHistory.enabled
// flag — otherwise the adapter is missing and findAdapter('historic-sql')
// throws even though the run asked for it.
const dialect = historicSqlDialectForConnectionDriver(connection);
if (!dialect) {
return undefined;
}

View file

@ -1,5 +1,8 @@
import { describe, expect, it } from 'vitest';
import { queryHistoryDialectForConnection } from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
import {
historicSqlDialectForConnectionDriver,
queryHistoryDialectForConnection,
} from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
describe('queryHistoryDialectForConnection', () => {
it.each([
@ -21,3 +24,19 @@ describe('queryHistoryDialectForConnection', () => {
expect(queryHistoryDialectForConnection({ driver: 'postgres', context: { queryHistory: { enabled: false } } })).toBeNull();
});
});
describe('historicSqlDialectForConnectionDriver', () => {
it('resolves the dialect from driver capability even when query history is disabled', () => {
expect(
historicSqlDialectForConnectionDriver({ driver: 'postgres', context: { queryHistory: { enabled: false } } }),
).toBe('postgres');
});
it('resolves the dialect when no query-history context is present', () => {
expect(historicSqlDialectForConnectionDriver({ driver: 'bigquery' })).toBe('bigquery');
});
it('returns null for drivers without a historic-SQL reader', () => {
expect(historicSqlDialectForConnectionDriver({ driver: 'mysql', context: { queryHistory: { enabled: true } } })).toBeNull();
});
});

View file

@ -433,4 +433,88 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.warnings).toEqual([]);
});
it("drops ktx's own scan/relationship probes from query history", async () => {
const stagedDir = await tempDir();
const fkOverlapProbe =
'select * from (WITH child_values AS ( SELECT DISTINCT "account_id" AS value FROM "account_owners" WHERE "account_id" IS NOT NULL LIMIT $1 ), parent_values AS ( SELECT DISTINCT "account_id" AS value FROM "accounts" WHERE "account_id" IS NOT NULL ) SELECT (SELECT COUNT(*) FROM child_values) AS child_distinct, (SELECT COUNT(*) FROM parent_values) AS parent_distinct) probe';
const profileProbe =
'select * from (SELECT $1 AS column_name, (SELECT COUNT(*) FROM "orbit_raw"."accounts") AS total, (SELECT STRING_AGG(CAST(value AS TEXT), CHR(31)) FROM (SELECT DISTINCT "id" AS value FROM "orbit_raw"."accounts" LIMIT $2) AS relationship_profile_values) AS samples) profile';
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'analytic',
canonicalSql: 'select status, count(*) from public.orders group by status',
});
yield aggregate({ templateId: 'ktx-fk-overlap', canonicalSql: fkOverlapProbe });
yield aggregate({ templateId: 'ktx-profile', canonicalSql: profileProbe });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['analytic', { tablesTouched: ['public.orders'], columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] } }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
now: new Date('2026-05-11T12:00:00.000Z'),
});
// ktx scan probes are filtered before SQL analysis, so only the analytic query is parsed.
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[{ id: 'analytic', sql: 'select status, count(*) from public.orders group by status' }],
'postgres',
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.orders.json']);
});
it('merges bare and schema-qualified references to the same table into one work unit', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'qualified', canonicalSql: 'select count(*) from orbit_raw.accounts' });
yield aggregate({ templateId: 'bare', canonicalSql: 'select id from accounts where active' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['qualified', { tablesTouched: ['orbit_raw.accounts'], columnsByClause: { select: [], where: [], join: [], groupBy: [] } }],
['bare', { tablesTouched: ['accounts'], columnsByClause: { select: ['id'], where: ['active'], join: [], groupBy: [] } }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
now: new Date('2026-05-11T12:00:00.000Z'),
});
// The bare `accounts` reference resolves to the unique qualified `orbit_raw.accounts`,
// so the two templates collapse into a single work unit instead of two.
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['orbit_raw.accounts.json']);
const merged = await readJson<Record<string, any>>(stagedDir, 'tables/orbit_raw.accounts.json');
expect(merged.topTemplates.map((t: any) => t.id).sort()).toEqual(['bare', 'qualified']);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(1);
});
});

View file

@ -401,4 +401,72 @@ describe('integrateWorkUnitPatch', () => {
});
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n');
});
it('repairs a semantic gate failure after a textual conflict is resolved', async () => {
const { homeDir, configDir, git } = await makeRepo();
await mkdir(join(configDir, 'wiki/global'), { recursive: true });
await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com');
const conflictBase = await git.revParseHead();
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com');
const childDir = join(homeDir, 'child-conflict-repair');
await git.addWorktree(childDir, 'child-conflict-repair', conflictBase);
const childGit = git.forWorktree(childDir);
await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8');
await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com');
const patchPath = join(homeDir, 'proposal-repair.patch');
await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath);
const trace = new FileIngestTraceWriter({
tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-repair/trace.jsonl'),
jobId: 'job-resolver-repair',
connectionId: 'warehouse',
sourceKey: 'metabase',
level: 'trace',
});
// Gate fails on the resolver's merged tree, then passes after the repair edit.
const validateAppliedTree = vi
.fn()
.mockRejectedValueOnce(
new Error('final artifact gates failed:\narr-definition: unknown sl_refs entity mart_arr_daily.arr_dollars'),
)
.mockResolvedValueOnce(undefined);
const repairGateFailure = vi.fn(async (context: { unitKey: string; touchedPaths: string[] }) => {
expect(context).toMatchObject({ unitKey: 'wu-conflict-repair', touchedPaths: ['wiki/global/a.md'] });
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal repaired\n', 'utf-8');
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
});
const result = await integrateWorkUnitPatch({
unitKey: 'wu-conflict-repair',
patchPath,
integrationGit: git,
trace,
author: { name: 'System User', email: 'system@example.com' },
slDisallowed: false,
allowedTargetConnectionIds: new Set(['warehouse']),
validateAppliedTree,
resolveTextualConflict: vi.fn(async () => {
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8');
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
}),
repairGateFailure,
});
expect(result).toMatchObject({
status: 'accepted',
touchedPaths: ['wiki/global/a.md'],
textualResolution: { status: 'repaired' },
gateRepair: { status: 'repaired', attempts: 1, changedPaths: ['wiki/global/a.md'] },
});
expect(validateAppliedTree).toHaveBeenCalledTimes(2);
expect(repairGateFailure).toHaveBeenCalledOnce();
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\nproposal repaired\n');
await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_accepted_after_textual_resolution');
});
});

View file

@ -70,6 +70,37 @@ describe('CLI local ingest adapters', () => {
]);
});
it('registers historic SQL when explicitly requested even if connection query history is disabled', async () => {
await writeProject(
tempDir,
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' context:',
' queryHistory:',
' enabled: false',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
// `--query-history` sets historicSqlConnectionId for the run; that explicit
// request is the opt-in, so the persisted context.queryHistory.enabled flag
// must not gate adapter registration.
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.some((adapter) => adapter.source === 'historic-sql')).toBe(true);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,