mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
fix(ingest): recover textual-conflict gate failures; fix query-history adapter (#255)
* fix(ingest): recover textual-conflict gate failures; fix query-history adapter Two latent gaps in the isolated-diff local-ingest pipeline that can abort an otherwise-successful ingest: - Metabase: when a work-unit patch hit both a textual conflict and a post-merge dangling sl_ref, the after-textual-resolution branch returned a hard semantic_conflict and rolled back the whole job. It now runs the same repairGateFailure recovery the clean-apply branch already uses (re-validate, then commit the union of resolved + repaired paths), reaching parity. - Query history: the historic-sql adapter was registered only when ktx.yaml had context.queryHistory.enabled=true, so `--query-history` threw "Adapter not available for local ingest". Registration now resolves the dialect from driver capability, since the explicit --query-history request is itself the opt-in; the config-gated helper is unchanged for status/setup/probes. Adds the previously-missing tests for both paths. * chore: sync uv.lock to 0.8.0 (regenerated with pinned uv 0.11.11) * fix(ingest): drop ktx's own scan probes and dedup tables in query history Query history (historic-sql) mined two kinds of noise back into context: - ktx's own warehouse scan emits relationship- and column-profiling probes (the relationship_profile_values aggregation and the child_values/parent_values FK-overlap CTEs) into pg_stat_statements. shouldDropBySql now filters these ktx-owned, dialect-stable signatures so ktx introspection is not ingested as usage history. - The same physical table appears both bare (accounts, via search_path) and schema-qualified (orbit_raw.accounts), producing duplicate per-table work units. canonicalizeTableIdentifiers collapses a bare name into its unique qualified form before work-unit keying; ambiguous names are left untouched. On the orbit demo this removes ~35% of sampled query templates (ktx self-probes) and ~45 duplicate per-table work units. * docs(agents): add Design Reasoning Defaults section
This commit is contained in:
parent
9d3a0b751d
commit
f5dea9a089
9 changed files with 437 additions and 12 deletions
59
AGENTS.md
59
AGENTS.md
|
|
@ -159,6 +159,65 @@ and naming asymmetries are bugs in waiting — see
|
||||||
[`docs/code-design.md`](docs/code-design.md). Treat the `MUST` / `MUST NOT`
|
[`docs/code-design.md`](docs/code-design.md). Treat the `MUST` / `MUST NOT`
|
||||||
rules there with the same weight as the ones in this file.
|
rules there with the same weight as the ones in this file.
|
||||||
|
|
||||||
|
## Design Reasoning Defaults
|
||||||
|
|
||||||
|
When proposing a design, an approach, or any non-trivial change, apply these
|
||||||
|
defaults and run the self-check before presenting it. They encode the
|
||||||
|
corrections users most often have to make; reaching these conclusions
|
||||||
|
autonomously — without being asked the leading question — is the bar.
|
||||||
|
|
||||||
|
- **MUST**: Optimize for the best outcome, not for an unstated constraint. Do not
|
||||||
|
silently adopt "smallest change", "least effort", "cheapest", or "least user
|
||||||
|
intervention" as the goal unless the user said so. Default to the most correct,
|
||||||
|
durable solution, and present cost / effort / scope as information for the user
|
||||||
|
to weigh — not as a ceiling you impose on their behalf.
|
||||||
|
- **MUST**: Separate one-time cost from recurring cost before discarding an
|
||||||
|
option. A fixed cost paid once (a setup-time computation, an extra LLM call
|
||||||
|
during setup, a contract change) to make every later run cheaper or more
|
||||||
|
correct is usually worth it. Do not reject it with recurring-cost reasoning;
|
||||||
|
quantify both sides. (Example smell: "don't add an LLM call to a cost-cutting
|
||||||
|
feature" — wrong when the call is one-time and the savings recur.)
|
||||||
|
- **MUST**: Treat a user's example as a representative of a class, not as the
|
||||||
|
spec. Design for the general population the example stands for, then stress-test
|
||||||
|
against deliberately different instances — another warehouse, dialect, stack
|
||||||
|
layout, or input shape — before committing. If a design only works because of an
|
||||||
|
incidental property of the example (e.g. "the noise happened to be in a separate
|
||||||
|
schema *on this demo*"), it is curve-fitting; generalize it or state the
|
||||||
|
assumption explicitly.
|
||||||
|
- **MUST**: Prefer deriving from the system's own state over enumerating cases.
|
||||||
|
Favor an allowlist computed from declared/observed state (config, scanned
|
||||||
|
catalog, query log, the user's own inputs) over a denylist of known-bad
|
||||||
|
specifics (particular tables, schemas, tools, or vendors). A hardcoded or
|
||||||
|
hand-maintained list of external specifics is a smell: it rots and fails on the
|
||||||
|
next stack. The only acceptable static patterns are genuinely universal
|
||||||
|
invariants (e.g. DB-engine system catalogs) and ktx's own self-emitted
|
||||||
|
signatures.
|
||||||
|
- **SHOULD**: Before inventing an abstraction or hand-rolling structural logic,
|
||||||
|
search for what already exists and reuse it — the codebase's canonical
|
||||||
|
representation (a structured ref/key type) instead of a parallel string scheme,
|
||||||
|
and a mandated/available tool (e.g. `sqlglot` for SQL structure; see
|
||||||
|
[SQL and Structured Parsing](#sql-and-structured-parsing)) instead of
|
||||||
|
hand-parsing. Normalize ambiguous input to the canonical form at the boundary;
|
||||||
|
do not carry the ambiguity downstream. This is the single-source-of-truth / DRY
|
||||||
|
item from the Priority Hierarchy applied at design time.
|
||||||
|
|
||||||
|
Before presenting a design, answer these explicitly:
|
||||||
|
|
||||||
|
1. Am I optimizing for a goal the user actually stated, or one I assumed?
|
||||||
|
2. Does this generalize beyond the example in front of me? Name a real case where
|
||||||
|
it would break.
|
||||||
|
3. Am I enumerating known-bad cases when I could derive scope from the system's
|
||||||
|
own declared/observed state?
|
||||||
|
4. Is there an existing canonical representation or mandated tool I should reuse
|
||||||
|
instead of building or parsing my own?
|
||||||
|
5. Am I discarding the better option on a weak or misapplied constraint
|
||||||
|
(one-time vs recurring cost, "more surface area", "more work now")?
|
||||||
|
|
||||||
|
A user question that nudges toward an alternative ("would X help?", "should I
|
||||||
|
always do Y?", "will you hardcode Z?") is a signal that a better option exists.
|
||||||
|
Investigate the implied direction and reason it through *before* defending the
|
||||||
|
original proposal — and prefer to have asked yourself the question first.
|
||||||
|
|
||||||
## TypeScript Standards
|
## TypeScript Standards
|
||||||
|
|
||||||
- Use Node 22+ and pnpm workspace commands.
|
- Use Node 22+ and pnpm workspace commands.
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,21 @@ export function isQueryHistoryEnabled(connection: unknown): boolean {
|
||||||
return queryHistoryRecord(connection)?.enabled === true;
|
return queryHistoryRecord(connection)?.enabled === true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolves the query-history dialect from the connection's driver capability
|
||||||
|
* alone, ignoring whether query history is enabled in ktx.yaml. Use this on the
|
||||||
|
* adapter-registration path when query history has been explicitly requested
|
||||||
|
* for the run (e.g. via `--query-history`, which is itself the opt-in): the
|
||||||
|
* persisted `context.queryHistory.enabled` flag must not gate registration.
|
||||||
|
* Returns null when the connection's driver has no query-history reader.
|
||||||
|
*/
|
||||||
|
export function historicSqlDialectForConnectionDriver(connection: unknown): HistoricSqlDialect | null {
|
||||||
|
const conn = recordOrNull(connection);
|
||||||
|
const driver = String(conn?.driver ?? '').toLowerCase();
|
||||||
|
const registration = getDriverRegistration(driver);
|
||||||
|
return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resolves the query-history dialect for a connection. Returns null when
|
* Resolves the query-history dialect for a connection. Returns null when
|
||||||
* query history is disabled, or when the connection's driver has no
|
* query history is disabled, or when the connection's driver has no
|
||||||
|
|
@ -35,8 +50,5 @@ export function queryHistoryDialectForConnection(connection: unknown): HistoricS
|
||||||
if (!isQueryHistoryEnabled(connection)) {
|
if (!isQueryHistoryEnabled(connection)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
const conn = recordOrNull(connection);
|
return historicSqlDialectForConnectionDriver(connection);
|
||||||
const driver = String(conn?.driver ?? '').toLowerCase();
|
|
||||||
const registration = getDriverRegistration(driver);
|
|
||||||
return registration?.hasHistoricSqlReader ? historicSqlDialectForDriver(registration.driver) : null;
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -79,8 +79,21 @@ function matchesAny(value: string | null, patterns: RegExp[]): boolean {
|
||||||
return !!value && patterns.some((pattern) => pattern.test(value));
|
return !!value && patterns.some((pattern) => pattern.test(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ktx's own warehouse scan emits relationship- and column-profiling probes that land in
|
||||||
|
// pg_stat_statements (relationship-validation, relationship-composite-candidates, and each
|
||||||
|
// dialect's relationship value aggregation). They are ktx introspection, not genuine query
|
||||||
|
// usage, so they must not be mined back as query history. The markers are ktx-owned
|
||||||
|
// identifiers, stable across dialects.
|
||||||
|
function isKtxScanProbe(sql: string): boolean {
|
||||||
|
if (/\brelationship_profile_values\b/i.test(sql)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return /\bchild_values\b/i.test(sql) && /\bparent_values\b/i.test(sql);
|
||||||
|
}
|
||||||
|
|
||||||
function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean {
|
function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean {
|
||||||
if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true;
|
if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true;
|
||||||
|
if (isKtxScanProbe(sql)) return true;
|
||||||
if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true;
|
if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -148,6 +161,53 @@ function isEnabledTable(table: string, filter: EnabledTableFilter | null): boole
|
||||||
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
|
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* pg_stat_statements records queries as written, so the same physical table can appear
|
||||||
|
* both bare (`accounts`, resolved via search_path) and schema-qualified
|
||||||
|
* (`orbit_raw.accounts`). Collapse a bare identifier into its schema-qualified form when
|
||||||
|
* exactly one qualified form shares its unqualified name, so the two never become separate
|
||||||
|
* work units. Ambiguous bare names (two qualified forms) are left untouched.
|
||||||
|
*/
|
||||||
|
function canonicalizeTableIdentifiers(parsedTemplates: ParsedTemplate[]): void {
|
||||||
|
const all = new Set<string>();
|
||||||
|
for (const parsed of parsedTemplates) {
|
||||||
|
for (const table of parsed.includedTables) {
|
||||||
|
all.add(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const qualifiedByUnqualified = new Map<string, Set<string>>();
|
||||||
|
for (const table of all) {
|
||||||
|
if (!table.includes('.')) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const unqualified = unqualifiedTableIdentifier(table);
|
||||||
|
if (unqualified.length === 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const forms = qualifiedByUnqualified.get(unqualified) ?? new Set<string>();
|
||||||
|
forms.add(table);
|
||||||
|
qualifiedByUnqualified.set(unqualified, forms);
|
||||||
|
}
|
||||||
|
const canonical = new Map<string, string>();
|
||||||
|
for (const table of all) {
|
||||||
|
if (table.includes('.')) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const forms = qualifiedByUnqualified.get(unqualifiedTableIdentifier(table));
|
||||||
|
if (forms && forms.size === 1) {
|
||||||
|
canonical.set(table, [...forms][0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (canonical.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const remap = (table: string): string => canonical.get(table) ?? table;
|
||||||
|
for (const parsed of parsedTemplates) {
|
||||||
|
parsed.includedTables = [...new Set(parsed.includedTables.map(remap))].sort();
|
||||||
|
parsed.tablesTouched = [...new Set(parsed.tablesTouched.map(remap))].sort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number {
|
function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number {
|
||||||
return 'windowDays' in config ? config.windowDays : 90;
|
return 'windowDays' in config ? config.windowDays : 90;
|
||||||
}
|
}
|
||||||
|
|
@ -323,6 +383,8 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
canonicalizeTableIdentifiers(parsedTemplates);
|
||||||
|
|
||||||
const byTable = new Map<string, TableAccumulator>();
|
const byTable = new Map<string, TableAccumulator>();
|
||||||
for (const parsed of parsedTemplates) {
|
for (const parsed of parsedTemplates) {
|
||||||
for (const table of parsed.includedTables) {
|
for (const table of parsed.includedTables) {
|
||||||
|
|
|
||||||
|
|
@ -155,18 +155,103 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
} catch (semanticError) {
|
} catch (semanticError) {
|
||||||
if (preApplyHead) {
|
const reason = errorMessage(semanticError);
|
||||||
await input.integrationGit.resetHardTo(preApplyHead);
|
|
||||||
}
|
|
||||||
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
|
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
|
||||||
unitKey: input.unitKey,
|
unitKey: input.unitKey,
|
||||||
patchPath: input.patchPath,
|
patchPath: input.patchPath,
|
||||||
touchedPaths: textualResolution.changedPaths,
|
touchedPaths: textualResolution.changedPaths,
|
||||||
reason: errorMessage(semanticError),
|
reason,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// A textual conflict and a semantic-gate failure can co-occur: the resolver
|
||||||
|
// reconciles the text but can leave wiki sl_refs pointing at measures the
|
||||||
|
// merged source no longer defines. Recover via the same gate repair the
|
||||||
|
// clean-apply branch uses, instead of hard-failing the whole job.
|
||||||
|
if (input.repairGateFailure) {
|
||||||
|
const gateRepair = await input.repairGateFailure({
|
||||||
|
unitKey: input.unitKey,
|
||||||
|
patchPath: input.patchPath,
|
||||||
|
touchedPaths: textualResolution.changedPaths,
|
||||||
|
reason,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (gateRepair.status !== 'failed') {
|
||||||
|
// The resolver wrote its merge to the worktree (unstaged); the repair
|
||||||
|
// edited a subset on top. Commit the union so neither is dropped.
|
||||||
|
const resolvedAndRepairedPaths = [
|
||||||
|
...new Set([...textualResolution.changedPaths, ...gateRepair.changedPaths]),
|
||||||
|
].sort();
|
||||||
|
try {
|
||||||
|
await traceTimed(
|
||||||
|
input.trace,
|
||||||
|
'integration',
|
||||||
|
'semantic_gate_after_gate_repair',
|
||||||
|
{ unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths },
|
||||||
|
async () => {
|
||||||
|
await input.validateAppliedTree(gateRepair.changedPaths);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const commit = await input.integrationGit.commitFiles(
|
||||||
|
resolvedAndRepairedPaths,
|
||||||
|
`ingest: resolve WorkUnit ${input.unitKey} conflict`,
|
||||||
|
input.author.name,
|
||||||
|
input.author.email,
|
||||||
|
);
|
||||||
|
if (commit.created) {
|
||||||
|
await input.trace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', {
|
||||||
|
unitKey: input.unitKey,
|
||||||
|
commitSha: commit.commitHash,
|
||||||
|
touchedPaths: resolvedAndRepairedPaths,
|
||||||
|
attempts: textualResolution.attempts,
|
||||||
|
gateRepairAttempts: gateRepair.attempts,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
status: 'accepted',
|
||||||
|
commitSha: commit.commitHash,
|
||||||
|
touchedPaths: resolvedAndRepairedPaths,
|
||||||
|
textualResolution,
|
||||||
|
gateRepair,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} catch (repairValidationError) {
|
||||||
|
if (preApplyHead) {
|
||||||
|
await input.integrationGit.resetHardTo(preApplyHead);
|
||||||
|
}
|
||||||
|
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
|
||||||
|
unitKey: input.unitKey,
|
||||||
|
patchPath: input.patchPath,
|
||||||
|
touchedPaths: gateRepair.changedPaths,
|
||||||
|
reason: errorMessage(repairValidationError),
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
status: 'semantic_conflict',
|
||||||
|
reason: errorMessage(repairValidationError),
|
||||||
|
touchedPaths: gateRepair.changedPaths,
|
||||||
|
textualResolution,
|
||||||
|
gateRepair,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (preApplyHead) {
|
||||||
|
await input.integrationGit.resetHardTo(preApplyHead);
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
status: 'semantic_conflict',
|
||||||
|
reason: gateRepair.status === 'failed' ? gateRepair.reason : reason,
|
||||||
|
touchedPaths: textualResolution.changedPaths,
|
||||||
|
textualResolution,
|
||||||
|
gateRepair,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (preApplyHead) {
|
||||||
|
await input.integrationGit.resetHardTo(preApplyHead);
|
||||||
|
}
|
||||||
return {
|
return {
|
||||||
status: 'semantic_conflict',
|
status: 'semantic_conflict',
|
||||||
reason: errorMessage(semanticError),
|
reason,
|
||||||
touchedPaths: textualResolution.changedPaths,
|
touchedPaths: textualResolution.changedPaths,
|
||||||
textualResolution,
|
textualResolution,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js';
|
||||||
import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js';
|
import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js';
|
||||||
import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js';
|
import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js';
|
||||||
import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js';
|
import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js';
|
||||||
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
|
import { historicSqlDialectForConnectionDriver } from './context/ingest/adapters/historic-sql/connection-dialect.js';
|
||||||
import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js';
|
import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js';
|
||||||
import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js';
|
import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js';
|
||||||
import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js';
|
import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js';
|
||||||
|
|
@ -268,7 +268,12 @@ function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCli
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
const connection = project.config.connections[connectionId];
|
const connection = project.config.connections[connectionId];
|
||||||
const dialect = queryHistoryDialectForConnection(connection);
|
// historicSqlConnectionId is only set when query history was explicitly
|
||||||
|
// requested for this run (e.g. `--query-history`), so resolve the dialect from
|
||||||
|
// driver capability rather than the persisted context.queryHistory.enabled
|
||||||
|
// flag — otherwise the adapter is missing and findAdapter('historic-sql')
|
||||||
|
// throws even though the run asked for it.
|
||||||
|
const dialect = historicSqlDialectForConnectionDriver(connection);
|
||||||
if (!dialect) {
|
if (!dialect) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,8 @@
|
||||||
import { describe, expect, it } from 'vitest';
|
import { describe, expect, it } from 'vitest';
|
||||||
import { queryHistoryDialectForConnection } from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
|
import {
|
||||||
|
historicSqlDialectForConnectionDriver,
|
||||||
|
queryHistoryDialectForConnection,
|
||||||
|
} from '../../../../../src/context/ingest/adapters/historic-sql/connection-dialect.js';
|
||||||
|
|
||||||
describe('queryHistoryDialectForConnection', () => {
|
describe('queryHistoryDialectForConnection', () => {
|
||||||
it.each([
|
it.each([
|
||||||
|
|
@ -21,3 +24,19 @@ describe('queryHistoryDialectForConnection', () => {
|
||||||
expect(queryHistoryDialectForConnection({ driver: 'postgres', context: { queryHistory: { enabled: false } } })).toBeNull();
|
expect(queryHistoryDialectForConnection({ driver: 'postgres', context: { queryHistory: { enabled: false } } })).toBeNull();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('historicSqlDialectForConnectionDriver', () => {
|
||||||
|
it('resolves the dialect from driver capability even when query history is disabled', () => {
|
||||||
|
expect(
|
||||||
|
historicSqlDialectForConnectionDriver({ driver: 'postgres', context: { queryHistory: { enabled: false } } }),
|
||||||
|
).toBe('postgres');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('resolves the dialect when no query-history context is present', () => {
|
||||||
|
expect(historicSqlDialectForConnectionDriver({ driver: 'bigquery' })).toBe('bigquery');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns null for drivers without a historic-SQL reader', () => {
|
||||||
|
expect(historicSqlDialectForConnectionDriver({ driver: 'mysql', context: { queryHistory: { enabled: true } } })).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
|
||||||
|
|
@ -433,4 +433,88 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
|
||||||
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
|
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
|
||||||
expect(manifest.warnings).toEqual([]);
|
expect(manifest.warnings).toEqual([]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("drops ktx's own scan/relationship probes from query history", async () => {
|
||||||
|
const stagedDir = await tempDir();
|
||||||
|
const fkOverlapProbe =
|
||||||
|
'select * from (WITH child_values AS ( SELECT DISTINCT "account_id" AS value FROM "account_owners" WHERE "account_id" IS NOT NULL LIMIT $1 ), parent_values AS ( SELECT DISTINCT "account_id" AS value FROM "accounts" WHERE "account_id" IS NOT NULL ) SELECT (SELECT COUNT(*) FROM child_values) AS child_distinct, (SELECT COUNT(*) FROM parent_values) AS parent_distinct) probe';
|
||||||
|
const profileProbe =
|
||||||
|
'select * from (SELECT $1 AS column_name, (SELECT COUNT(*) FROM "orbit_raw"."accounts") AS total, (SELECT STRING_AGG(CAST(value AS TEXT), CHR(31)) FROM (SELECT DISTINCT "id" AS value FROM "orbit_raw"."accounts" LIMIT $2) AS relationship_profile_values) AS samples) profile';
|
||||||
|
const reader: HistoricSqlReader = {
|
||||||
|
async probe() {
|
||||||
|
return { warnings: [], info: [] };
|
||||||
|
},
|
||||||
|
async *fetchAggregated() {
|
||||||
|
yield aggregate({
|
||||||
|
templateId: 'analytic',
|
||||||
|
canonicalSql: 'select status, count(*) from public.orders group by status',
|
||||||
|
});
|
||||||
|
yield aggregate({ templateId: 'ktx-fk-overlap', canonicalSql: fkOverlapProbe });
|
||||||
|
yield aggregate({ templateId: 'ktx-profile', canonicalSql: profileProbe });
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const sqlAnalysis: SqlAnalysisPort = {
|
||||||
|
analyzeForFingerprint: vi.fn(),
|
||||||
|
analyzeBatch: vi.fn(async () => new Map([
|
||||||
|
['analytic', { tablesTouched: ['public.orders'], columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] } }],
|
||||||
|
])),
|
||||||
|
validateReadOnly: vi.fn(async () => ({ ok: true })),
|
||||||
|
};
|
||||||
|
|
||||||
|
await stageHistoricSqlAggregatedSnapshot({
|
||||||
|
stagedDir,
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
queryClient: {},
|
||||||
|
reader,
|
||||||
|
sqlAnalysis,
|
||||||
|
pullConfig: { dialect: 'postgres' },
|
||||||
|
now: new Date('2026-05-11T12:00:00.000Z'),
|
||||||
|
});
|
||||||
|
|
||||||
|
// ktx scan probes are filtered before SQL analysis, so only the analytic query is parsed.
|
||||||
|
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
|
||||||
|
[{ id: 'analytic', sql: 'select status, count(*) from public.orders group by status' }],
|
||||||
|
'postgres',
|
||||||
|
);
|
||||||
|
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.orders.json']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('merges bare and schema-qualified references to the same table into one work unit', async () => {
|
||||||
|
const stagedDir = await tempDir();
|
||||||
|
const reader: HistoricSqlReader = {
|
||||||
|
async probe() {
|
||||||
|
return { warnings: [], info: [] };
|
||||||
|
},
|
||||||
|
async *fetchAggregated() {
|
||||||
|
yield aggregate({ templateId: 'qualified', canonicalSql: 'select count(*) from orbit_raw.accounts' });
|
||||||
|
yield aggregate({ templateId: 'bare', canonicalSql: 'select id from accounts where active' });
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const sqlAnalysis: SqlAnalysisPort = {
|
||||||
|
analyzeForFingerprint: vi.fn(),
|
||||||
|
analyzeBatch: vi.fn(async () => new Map([
|
||||||
|
['qualified', { tablesTouched: ['orbit_raw.accounts'], columnsByClause: { select: [], where: [], join: [], groupBy: [] } }],
|
||||||
|
['bare', { tablesTouched: ['accounts'], columnsByClause: { select: ['id'], where: ['active'], join: [], groupBy: [] } }],
|
||||||
|
])),
|
||||||
|
validateReadOnly: vi.fn(async () => ({ ok: true })),
|
||||||
|
};
|
||||||
|
|
||||||
|
await stageHistoricSqlAggregatedSnapshot({
|
||||||
|
stagedDir,
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
queryClient: {},
|
||||||
|
reader,
|
||||||
|
sqlAnalysis,
|
||||||
|
pullConfig: { dialect: 'postgres' },
|
||||||
|
now: new Date('2026-05-11T12:00:00.000Z'),
|
||||||
|
});
|
||||||
|
|
||||||
|
// The bare `accounts` reference resolves to the unique qualified `orbit_raw.accounts`,
|
||||||
|
// so the two templates collapse into a single work unit instead of two.
|
||||||
|
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['orbit_raw.accounts.json']);
|
||||||
|
const merged = await readJson<Record<string, any>>(stagedDir, 'tables/orbit_raw.accounts.json');
|
||||||
|
expect(merged.topTemplates.map((t: any) => t.id).sort()).toEqual(['bare', 'qualified']);
|
||||||
|
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
|
||||||
|
expect(manifest.touchedTableCount).toBe(1);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -401,4 +401,72 @@ describe('integrateWorkUnitPatch', () => {
|
||||||
});
|
});
|
||||||
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n');
|
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('repairs a semantic gate failure after a textual conflict is resolved', async () => {
|
||||||
|
const { homeDir, configDir, git } = await makeRepo();
|
||||||
|
await mkdir(join(configDir, 'wiki/global'), { recursive: true });
|
||||||
|
await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8');
|
||||||
|
await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com');
|
||||||
|
const conflictBase = await git.revParseHead();
|
||||||
|
|
||||||
|
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8');
|
||||||
|
await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com');
|
||||||
|
|
||||||
|
const childDir = join(homeDir, 'child-conflict-repair');
|
||||||
|
await git.addWorktree(childDir, 'child-conflict-repair', conflictBase);
|
||||||
|
const childGit = git.forWorktree(childDir);
|
||||||
|
await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8');
|
||||||
|
await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com');
|
||||||
|
const patchPath = join(homeDir, 'proposal-repair.patch');
|
||||||
|
await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath);
|
||||||
|
|
||||||
|
const trace = new FileIngestTraceWriter({
|
||||||
|
tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-repair/trace.jsonl'),
|
||||||
|
jobId: 'job-resolver-repair',
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
sourceKey: 'metabase',
|
||||||
|
level: 'trace',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Gate fails on the resolver's merged tree, then passes after the repair edit.
|
||||||
|
const validateAppliedTree = vi
|
||||||
|
.fn()
|
||||||
|
.mockRejectedValueOnce(
|
||||||
|
new Error('final artifact gates failed:\narr-definition: unknown sl_refs entity mart_arr_daily.arr_dollars'),
|
||||||
|
)
|
||||||
|
.mockResolvedValueOnce(undefined);
|
||||||
|
|
||||||
|
const repairGateFailure = vi.fn(async (context: { unitKey: string; touchedPaths: string[] }) => {
|
||||||
|
expect(context).toMatchObject({ unitKey: 'wu-conflict-repair', touchedPaths: ['wiki/global/a.md'] });
|
||||||
|
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal repaired\n', 'utf-8');
|
||||||
|
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await integrateWorkUnitPatch({
|
||||||
|
unitKey: 'wu-conflict-repair',
|
||||||
|
patchPath,
|
||||||
|
integrationGit: git,
|
||||||
|
trace,
|
||||||
|
author: { name: 'System User', email: 'system@example.com' },
|
||||||
|
slDisallowed: false,
|
||||||
|
allowedTargetConnectionIds: new Set(['warehouse']),
|
||||||
|
validateAppliedTree,
|
||||||
|
resolveTextualConflict: vi.fn(async () => {
|
||||||
|
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8');
|
||||||
|
return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] };
|
||||||
|
}),
|
||||||
|
repairGateFailure,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toMatchObject({
|
||||||
|
status: 'accepted',
|
||||||
|
touchedPaths: ['wiki/global/a.md'],
|
||||||
|
textualResolution: { status: 'repaired' },
|
||||||
|
gateRepair: { status: 'repaired', attempts: 1, changedPaths: ['wiki/global/a.md'] },
|
||||||
|
});
|
||||||
|
expect(validateAppliedTree).toHaveBeenCalledTimes(2);
|
||||||
|
expect(repairGateFailure).toHaveBeenCalledOnce();
|
||||||
|
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\nproposal repaired\n');
|
||||||
|
await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_accepted_after_textual_resolution');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,37 @@ describe('CLI local ingest adapters', () => {
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('registers historic SQL when explicitly requested even if connection query history is disabled', async () => {
|
||||||
|
await writeProject(
|
||||||
|
tempDir,
|
||||||
|
[
|
||||||
|
'connections:',
|
||||||
|
' warehouse:',
|
||||||
|
' driver: postgres',
|
||||||
|
' url: env:WAREHOUSE_DATABASE_URL',
|
||||||
|
' readonly: true',
|
||||||
|
' context:',
|
||||||
|
' queryHistory:',
|
||||||
|
' enabled: false',
|
||||||
|
'ingest:',
|
||||||
|
' adapters:',
|
||||||
|
' - historic-sql',
|
||||||
|
'',
|
||||||
|
].join('\n'),
|
||||||
|
);
|
||||||
|
const project = await loadKtxProject({ projectDir: tempDir });
|
||||||
|
|
||||||
|
// `--query-history` sets historicSqlConnectionId for the run; that explicit
|
||||||
|
// request is the opt-in, so the persisted context.queryHistory.enabled flag
|
||||||
|
// must not gate adapter registration.
|
||||||
|
const adapters = createKtxCliLocalIngestAdapters(project, {
|
||||||
|
historicSqlConnectionId: 'warehouse',
|
||||||
|
sqlAnalysis: sqlAnalysisStub(),
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(adapters.some((adapter) => adapter.source === 'historic-sql')).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
it('registers BigQuery historic SQL from the requested connection', async () => {
|
it('registers BigQuery historic SQL from the requested connection', async () => {
|
||||||
await writeProject(
|
await writeProject(
|
||||||
tempDir,
|
tempDir,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue