feat(query-history): scope mining to modeled schemas by default (#258)

* feat(query-history): structure SQL analysis table refs

* feat(query-history): qualify SQL analysis table refs

* feat(query-history): wire modeled scope floor through ingest

* chore(query-history): verify scope floor

* test(query-history): align daemon SQL batch endpoint contract

* feat(query-history): build scope from same-run scan catalog

* feat(query-history): fail open on scope-floor catalog failures

* chore(query-history): verify scope-floor v1 closure

* refactor(query-history): share scope membership

* feat(setup): apply derived query history filters

* docs: document derived query history filters

* fix(query-history): redact filter picker LLM prompt SQL

* fix(setup): run filter picker SQL analysis through managed daemon

* chore(query-history): verify filter picker v1 closure

* fix(query-history): fail open on partial service-account attribution

* fix(query-history): aggregate BigQuery users by execution count

* fix(query-history): aggregate Snowflake users by execution count

* fix(query-history): use BigQuery query info hash
This commit is contained in:
Andrey Avtomonov 2026-06-03 17:19:42 +02:00 committed by GitHub
parent ce1516b357
commit e70ae1e63b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 3090 additions and 274 deletions

View file

@ -91,7 +91,10 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
40,
0.05,
null,
JSON.stringify([{ user: 'analyst@example.test', executions: 1 }]),
JSON.stringify([
{ user: 'svc-loader@example.test', executions: 40 },
{ user: 'analyst@example.test', executions: 2 },
]),
],
],
totalRows: 1,
@ -103,15 +106,25 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, enabledTables: [], enabledSchemas: [], modeledTableCatalog: [], scopeFloorWarnings: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
const sql = firstQuery(client);
expect(sql).toContain('WITH filtered_jobs AS');
expect(sql).toContain('query_info.query_hashes.normalized_literals');
expect(sql).toContain('TO_HEX(SHA256(query))');
expect(sql).toContain('AS template_id');
expect(sql).toContain('template_stats AS');
expect(sql).toContain('template_users AS');
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('GROUP BY template_id');
expect(sql).toContain('GROUP BY template_id, user_email');
expect(sql).toContain('ORDER BY users.executions DESC');
expect(sql).not.toMatch(/\bquery_hash\b/);
expect(sql).not.toContain('LIMIT 5');
expect(sql).toContain('HAVING COUNT(*) >= 5');
expect(rows).toMatchObject([
{
@ -120,7 +133,10 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
executions: 42,
errorRate: 0.05,
},
topUsers: [{ user: 'analyst@example.test', executions: 1 }],
topUsers: [
{ user: 'svc-loader@example.test', executions: 40 },
{ user: 'analyst@example.test', executions: 2 },
],
},
]);
});
@ -137,6 +153,9 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
minExecutions: 5,
windowDays: 90,
enabledTables: [],
enabledSchemas: [],
modeledTableCatalog: [],
scopeFloorWarnings: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -30,6 +30,7 @@ async function writeUnifiedStagedDir(root: string): Promise<void> {
});
await writeJson(root, 'tables/public.orders.json', {
table: 'public.orders',
tableRef: { catalog: null, db: 'public', name: 'orders' },
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
@ -46,7 +47,10 @@ async function writeUnifiedStagedDir(root: string): Promise<void> {
{
id: 'orders',
canonicalSql: 'select * from public.orders join public.customers on true',
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
@ -58,7 +62,10 @@ async function writeUnifiedStagedDir(root: string): Promise<void> {
{
id: 'orders',
canonicalSql: 'select * from public.orders join public.customers on true',
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
@ -155,7 +162,10 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => {
{
id: 'line-items',
canonicalSql: 'select * from public.orders join public.line_items on true',
tablesTouched: ['public.orders', 'public.line_items'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'line_items' },
],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',

View file

@ -76,7 +76,10 @@ describe('HistoricSqlSourceAdapter', () => {
[
'pg:1',
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
columnsByClause: { select: ['status'], join: ['customer_id', 'id'], groupBy: ['status'] },
},
],

View file

@ -126,7 +126,10 @@ function acceptanceSqlAnalysis(): SqlAnalysisPort {
items.map((item) => [
item.id,
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
columnsByClause: {
select: ['status', 'segment'],
where: ['status'],

View file

@ -9,11 +9,18 @@ import type { StagedPatternsInput } from '../../../../../src/context/ingest/adap
type PatternTemplate = StagedPatternsInput['templates'][number];
function tableRef(value: string): { catalog: string | null; db: string | null; name: string } {
const parts = value.split('.');
if (parts.length === 3) return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
if (parts.length === 2) return { catalog: null, db: parts[0]!, name: parts[1]! };
return { catalog: null, db: null, name: value };
}
function template(id: string, tablesTouched: string[], canonicalSql = 'select 1'): PatternTemplate {
return {
id,
canonicalSql,
tablesTouched,
tablesTouched: tablesTouched.map(tableRef),
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
@ -32,7 +39,7 @@ describe('historic-SQL pattern input sharding', () => {
],
};
const result = splitHistoricSqlPatternInputs(input, { maxBytes: 760 });
const result = splitHistoricSqlPatternInputs(input, { maxBytes: 1200 });
expect(result.auditInput.templates.map((entry) => entry.id)).toEqual([
'orders-customers-1',
@ -51,7 +58,7 @@ describe('historic-SQL pattern input sharding', () => {
'orders-customers-1',
'orders-customers-2',
]);
expect(result.shards.every((shard) => shard.byteLength <= 760)).toBe(true);
expect(result.shards.every((shard) => shard.byteLength <= 1200)).toBe(true);
expect(result.shards.flatMap((shard) => shard.input.templates).some((entry) => entry.id === 'single-table-orders')).toBe(false);
expect(result.warnings).toEqual([]);
});

View file

@ -215,7 +215,7 @@ describe('PostgresPgssReader aggregate path', () => {
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'postgres', minExecutions: 5, enabledTables: [], enabledSchemas: [], modeledTableCatalog: [], scopeFloorWarnings: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}

View file

@ -0,0 +1,274 @@
import { describe, expect, it, vi } from 'vitest';
import type { KtxLlmRuntimePort } from '../../../../../src/context/llm/runtime-port.js';
import type {
SqlAnalysisBatchItem,
SqlAnalysisBatchResult,
SqlAnalysisPort,
} from '../../../../../src/context/sql-analysis/ports.js';
import {
proposeQueryHistoryServiceAccountFilters,
regexEscapeForExactRolePattern,
} from '../../../../../src/context/ingest/adapters/historic-sql/query-history-filter-picker.js';
import type {
AggregatedTemplate,
HistoricSqlReader,
} from '../../../../../src/context/ingest/adapters/historic-sql/types.js';
function aggregate(overrides: Partial<AggregatedTemplate> & { templateId: string; canonicalSql: string }): AggregatedTemplate {
return {
templateId: overrides.templateId,
canonicalSql: overrides.canonicalSql,
dialect: overrides.dialect ?? 'postgres',
stats: overrides.stats ?? {
executions: 25,
distinctUsers: 1,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-06-01T00:00:00.000Z',
p50RuntimeMs: 50,
p95RuntimeMs: 100,
errorRate: 0,
rowsProduced: 10,
},
topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 25 }],
};
}
function reader(...templates: AggregatedTemplate[]): HistoricSqlReader {
return {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
for (const template of templates) {
yield template;
}
},
};
}
function sqlAnalysis(tablesById: Record<string, Array<{ catalog: string | null; db: string | null; name: string }>>): SqlAnalysisPort {
return {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async (items: SqlAnalysisBatchItem[]): Promise<Map<string, SqlAnalysisBatchResult>> =>
new Map<string, SqlAnalysisBatchResult>(
items.map((item) => [
item.id,
{
tablesTouched: tablesById[item.id] ?? [],
columnsByClause: {},
},
]),
),
),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
}
function llm(decisions: Array<{ role: string; exclude: boolean; reason: string }>): KtxLlmRuntimePort {
const generateObject = vi.fn(async () => ({ roles: decisions })) as KtxLlmRuntimePort['generateObject'];
return {
generateText: vi.fn(),
generateObject,
runAgentLoop: vi.fn(),
};
}
describe('query-history filter picker', () => {
it('emits anchored escaped patterns for excluded roles from one batched LLM call', async () => {
const runtime = llm([
{ role: 'svc.loader+prod', exclude: true, reason: 'Runs recurring loader traffic only.' },
{ role: 'analyst', exclude: false, reason: 'Interactive analytic usage.' },
]);
const analysis = sqlAnalysis({
loader: [{ catalog: null, db: 'analytics', name: 'orders' }],
analyst: [{ catalog: null, db: 'analytics', name: 'orders' }],
});
const proposal = await proposeQueryHistoryServiceAccountFilters({
connectionId: 'warehouse',
dialect: 'postgres',
queryClient: {},
reader: reader(
aggregate({
templateId: 'loader',
canonicalSql: 'merge into analytics.orders using staging.orders_delta on orders.id = orders_delta.id',
topUsers: [{ user: 'svc.loader+prod', executions: 40 }],
}),
aggregate({
templateId: 'analyst',
canonicalSql: 'select status, count(*) from analytics.orders group by status',
topUsers: [{ user: 'analyst', executions: 25 }],
}),
),
sqlAnalysis: analysis,
llmRuntime: runtime,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['analytics'],
enabledTables: [],
modeledTableCatalog: [{ catalog: null, db: 'analytics', name: 'orders' }],
filters: { dropTrivialProbes: true },
},
now: new Date('2026-06-03T00:00:00.000Z'),
});
expect(runtime.generateObject).toHaveBeenCalledTimes(1);
expect(proposal).toMatchObject({
excludedRoles: [
{
role: 'svc.loader+prod',
pattern: '^svc\\.loader\\+prod$',
reason: 'Runs recurring loader traffic only.',
},
],
consideredRoleCount: 2,
skipped: null,
warnings: [],
});
});
it('redacts representative SQL before sending role records to the LLM', async () => {
const originalSql =
"select * from public.api_events where api_key = 'sk_live_abc123' and note = 'Secret_Token_9f'"; // pragma: allowlist secret
const runtime = llm([
{ role: 'svc_loader', exclude: false, reason: 'Keep by default.' },
{ role: 'analyst', exclude: false, reason: 'Interactive analytic usage.' },
]);
const analysis = sqlAnalysis({
secret: [{ catalog: null, db: 'public', name: 'api_events' }],
analyst: [{ catalog: null, db: 'public', name: 'orders' }],
});
await proposeQueryHistoryServiceAccountFilters({
connectionId: 'warehouse',
dialect: 'postgres',
queryClient: {},
reader: reader(
aggregate({
templateId: 'secret',
canonicalSql: originalSql,
topUsers: [{ user: 'svc_loader', executions: 30 }],
}),
aggregate({
templateId: 'analyst',
canonicalSql: 'select status, count(*) from public.orders group by status',
topUsers: [{ user: 'analyst', executions: 25 }],
}),
),
sqlAnalysis: analysis,
llmRuntime: runtime,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['public'],
enabledTables: [],
modeledTableCatalog: [],
redactionPatterns: ['sk_live_[A-Za-z0-9]+', '(?i)secret_token_[a-z0-9]+'],
filters: { dropTrivialProbes: true },
},
now: new Date('2026-06-03T00:00:00.000Z'),
});
expect(analysis.analyzeBatch).toHaveBeenCalledWith(
[
{ id: 'secret', sql: originalSql },
{ id: 'analyst', sql: 'select status, count(*) from public.orders group by status' },
],
'postgres',
undefined,
);
const call = vi.mocked(runtime.generateObject).mock.calls[0]?.[0];
expect(call?.prompt).toContain('[REDACTED]');
expect(call?.prompt).not.toContain('sk_live_abc123');
expect(call?.prompt).not.toContain('Secret_Token_9f');
});
it('fails open with no LLM runtime', async () => {
const proposal = await proposeQueryHistoryServiceAccountFilters({
connectionId: 'warehouse',
dialect: 'postgres',
queryClient: {},
reader: reader(),
sqlAnalysis: sqlAnalysis({}),
llmRuntime: null,
pullConfig: { dialect: 'postgres', filters: { dropTrivialProbes: true } },
});
expect(proposal).toEqual({
excludedRoles: [],
consideredRoleCount: 0,
skipped: { reason: 'no-llm' },
warnings: [],
});
});
it('proposes nothing for a single-role stack', async () => {
const runtime = llm([{ role: 'warehouse_user', exclude: true, reason: 'Only observed role.' }]);
const proposal = await proposeQueryHistoryServiceAccountFilters({
connectionId: 'warehouse',
dialect: 'postgres',
queryClient: {},
reader: reader(
aggregate({
templateId: 'single-role',
canonicalSql: 'select * from analytics.orders',
topUsers: [{ user: 'warehouse_user', executions: 40 }],
}),
),
sqlAnalysis: sqlAnalysis({
'single-role': [{ catalog: null, db: 'analytics', name: 'orders' }],
}),
llmRuntime: runtime,
pullConfig: { dialect: 'postgres', enabledSchemas: ['analytics'], filters: { dropTrivialProbes: true } },
});
expect(runtime.generateObject).not.toHaveBeenCalled();
expect(proposal.excludedRoles).toEqual([]);
expect(proposal.skipped).toEqual({ reason: 'no-in-scope-history' });
});
it('keeps clean in-scope history when the model excludes nothing', async () => {
const proposal = await proposeQueryHistoryServiceAccountFilters({
connectionId: 'warehouse',
dialect: 'bigquery',
queryClient: {},
reader: reader(
aggregate({
templateId: 'dashboard',
canonicalSql: 'select status, count(*) from `demo.analytics.orders` group by status',
dialect: 'bigquery',
topUsers: [{ user: 'bi_runner', executions: 1 }],
}),
aggregate({
templateId: 'analyst',
canonicalSql: 'select * from `demo.analytics.orders` where id = @id',
dialect: 'bigquery',
topUsers: [{ user: 'analyst', executions: 1 }],
}),
),
sqlAnalysis: sqlAnalysis({
dashboard: [{ catalog: 'demo', db: 'analytics', name: 'orders' }],
analyst: [{ catalog: 'demo', db: 'analytics', name: 'orders' }],
}),
llmRuntime: llm([
{ role: 'bi_runner', exclude: false, reason: 'Dashboard usage is analytic.' },
{ role: 'analyst', exclude: false, reason: 'Interactive analyst usage.' },
]),
pullConfig: {
dialect: 'bigquery',
windowDays: 90,
enabledSchemas: ['analytics'],
filters: { dropTrivialProbes: true },
},
});
expect(proposal.excludedRoles).toEqual([]);
expect(proposal.consideredRoleCount).toBe(2);
expect(proposal.skipped).toBeNull();
});
it('escapes regex metacharacters for exact role matches', () => {
expect(regexEscapeForExactRolePattern('svc.loader+prod')).toBe('^svc\\.loader\\+prod$');
expect(regexEscapeForExactRolePattern('team[etl](west)')).toBe('^team\\[etl\\]\\(west\\)$');
});
});

View file

@ -0,0 +1,194 @@
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import { resolveQueryHistoryScopeFloor } from '../../../../../src/context/ingest/adapters/historic-sql/scope-floor.js';
async function tempProject(): Promise<string> {
return mkdtemp(join(tmpdir(), 'ktx-qh-scope-'));
}
async function seedLiveScanTable(
projectDir: string,
connectionId: string,
syncId: string,
table: { catalog: string | null; db: string | null; name: string },
): Promise<void> {
const root = join(projectDir, 'raw-sources', connectionId, 'live-database', syncId);
await mkdir(join(root, 'tables'), { recursive: true });
await writeFile(
join(root, 'connection.json'),
`${JSON.stringify({ connectionId, driver: 'postgres' }, null, 2)}\n`,
'utf-8',
);
await writeFile(
join(root, 'tables', `${table.db ?? 'default'}-${table.name}.json`),
`${JSON.stringify(
{
...table,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
},
null,
2,
)}\n`,
'utf-8',
);
await writeFile(
join(root, 'scan-report.json'),
`${JSON.stringify(
{
connectionId,
driver: 'postgres',
syncId,
runId: `scan-${syncId}`,
trigger: 'cli',
mode: 'enriched',
dryRun: false,
artifactPaths: {
rawSourcesDir: `raw-sources/${connectionId}/live-database/${syncId}`,
reportPath: `raw-sources/${connectionId}/live-database/${syncId}/scan-report.json`,
manifestShards: [],
enrichmentArtifacts: [],
},
counts: {},
warnings: [],
enrichment: {},
enrichmentState: {},
},
null,
2,
)}\n`,
'utf-8',
);
}
describe('resolveQueryHistoryScopeFloor', () => {
it('computes modeled schemas from connection schemas plus semantic source tables', async () => {
const projectDir = await tempProject();
await mkdir(join(projectDir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(projectDir, 'semantic-layer/warehouse/revenue.yaml'),
[
'name: revenue',
'table: orbit_analytics.mart_revenue',
'grain: [id]',
'columns:',
' - name: id',
' type: string',
'',
].join('\n'),
'utf-8',
);
await seedLiveScanTable(projectDir, 'warehouse', 'sync-1', {
catalog: null,
db: 'orbit_raw',
name: 'accounts',
});
const scope = await resolveQueryHistoryScopeFloor({
projectDir,
connectionId: 'warehouse',
driver: 'postgres',
connection: { driver: 'postgres', schemas: ['orbit_raw'] },
storedQueryHistory: {},
});
expect(scope.enabledSchemas).toEqual(['orbit_analytics', 'orbit_raw']);
expect(scope.modeledTableCatalog).toEqual([
{ catalog: null, db: 'orbit_analytics', name: 'mart_revenue' },
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
]);
expect(scope.enabledTables).toEqual([]);
expect(scope.floorDisabled).toBe(false);
});
it('uses explicit enabledTables before explicit enabledSchemas and computed scope', async () => {
const scope = await resolveQueryHistoryScopeFloor({
projectDir: await tempProject(),
connectionId: 'warehouse',
driver: 'postgres',
connection: { driver: 'postgres', schemas: ['orbit_raw'] },
storedQueryHistory: {
enabledTables: ['orbit_analytics.mart_revenue'],
enabledSchemas: ['orbit_raw'],
},
});
expect(scope.enabledTables).toEqual([{ catalog: null, db: 'orbit_analytics', name: 'mart_revenue' }]);
expect(scope.enabledSchemas).toEqual([]);
expect(scope.floorDisabled).toBe(false);
});
it('disables the floor for enabledSchemas star', async () => {
const scope = await resolveQueryHistoryScopeFloor({
projectDir: await tempProject(),
connectionId: 'warehouse',
driver: 'postgres',
connection: { driver: 'postgres', schemas: ['orbit_raw'] },
storedQueryHistory: { enabledSchemas: ['*'] },
});
expect(scope.enabledTables).toEqual([]);
expect(scope.enabledSchemas).toEqual(['*']);
expect(scope.floorDisabled).toBe(true);
});
it('adds latest live-database scan tables to the modeled table catalog', async () => {
const projectDir = await tempProject();
await mkdir(join(projectDir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(projectDir, 'semantic-layer/warehouse/revenue.yaml'),
[
'name: revenue',
'table: orbit_analytics.mart_revenue',
'grain: [id]',
'columns:',
' - name: id',
' type: string',
'',
].join('\n'),
'utf-8',
);
await seedLiveScanTable(projectDir, 'warehouse', 'sync-1', {
catalog: null,
db: 'orbit_raw',
name: 'accounts',
});
const scope = await resolveQueryHistoryScopeFloor({
projectDir,
connectionId: 'warehouse',
driver: 'postgres',
connection: { driver: 'postgres', schemas: ['orbit_raw'] },
storedQueryHistory: {},
});
expect(scope.enabledSchemas).toEqual(['orbit_analytics', 'orbit_raw']);
expect(scope.modeledTableCatalog).toEqual([
{ catalog: null, db: 'orbit_analytics', name: 'mart_revenue' },
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
]);
expect(scope.warnings).toEqual([]);
expect(scope.floorDisabled).toBe(false);
});
it('fails open when schema scope exists but the scan catalog is unavailable', async () => {
const scope = await resolveQueryHistoryScopeFloor({
projectDir: await tempProject(),
connectionId: 'warehouse',
driver: 'postgres',
connection: { driver: 'postgres', schemas: ['orbit_raw'] },
storedQueryHistory: {},
});
expect(scope.enabledTables).toEqual([]);
expect(scope.enabledSchemas).toEqual(['*']);
expect(scope.modeledTableCatalog).toEqual([]);
expect(scope.floorDisabled).toBe(true);
expect(scope.warnings).toContain('query_history_scope_floor_disabled:catalog_unavailable');
});
});

View file

@ -0,0 +1,51 @@
import { describe, expect, it } from 'vitest';
import {
includedQueryHistoryTableRefs,
isQueryHistoryScopeFloorDisabled,
shouldFailOpenQueryHistoryScope,
} from '../../../../../src/context/ingest/adapters/historic-sql/scope-membership.js';
import type { KtxTableRef } from '../../../../../src/context/scan/types.js';
function ref(db: string | null, name: string, catalog: string | null = null): KtxTableRef {
return { catalog, db, name };
}
describe('query-history scope membership', () => {
it('prefers explicit enabled tables over schema scope', () => {
const orders = ref('analytics', 'orders');
const noise = ref('metabase', 'application_table');
expect(
includedQueryHistoryTableRefs([orders, noise], {
enabledTables: [orders],
enabledSchemas: ['metabase'],
}),
).toEqual([orders]);
});
it('matches schema scope by the db component across catalogs', () => {
const modeled = ref('orbit_analytics', 'orders', 'demo-project');
const noise = ref('metabase', 'application_table', 'demo-project');
expect(
includedQueryHistoryTableRefs([modeled, noise], {
enabledTables: [],
enabledSchemas: ['orbit_analytics'],
}),
).toEqual([modeled]);
});
it('keeps every touched ref when wildcard scope disables the floor', () => {
const tables = [ref('analytics', 'orders'), ref('metabase', 'application_table')];
expect(isQueryHistoryScopeFloorDisabled({ enabledTables: [], enabledSchemas: ['*'] })).toBe(true);
expect(includedQueryHistoryTableRefs(tables, { enabledTables: [], enabledSchemas: ['*'] })).toEqual(tables);
});
it('fails open when no tables, schemas, or wildcard are configured', () => {
const tables = [ref('metabase', 'application_table')];
expect(shouldFailOpenQueryHistoryScope({ enabledTables: [], enabledSchemas: [] })).toBe(true);
expect(includedQueryHistoryTableRefs(tables, { enabledTables: [], enabledSchemas: [] })).toEqual(tables);
});
});

View file

@ -90,7 +90,10 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
40,
0.05,
100,
JSON.stringify([{ user: 'ANALYST', executions: 1 }]),
JSON.stringify([
{ user: 'SVC_LOADER', executions: 40 },
{ user: 'ANALYST', executions: 2 },
]),
],
],
totalRows: 1,
@ -102,15 +105,20 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, enabledTables: [], enabledSchemas: [], modeledTableCatalog: [], scopeFloorWarnings: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
const sql = firstQuery(client);
expect(sql).toContain('WITH filtered_queries AS');
expect(sql).toContain('template_stats AS');
expect(sql).toContain('template_users AS');
expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('COUNT(DISTINCT user_name) AS distinct_users');
expect(sql).toContain('GROUP BY query_hash, user_name');
expect(sql).toContain('ORDER BY users.executions DESC');
expect(sql).toContain('HAVING COUNT(*) >= 5');
expect(rows).toMatchObject([
{
@ -119,7 +127,10 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
executions: 42,
errorRate: 0.05,
},
topUsers: [{ user: 'ANALYST', executions: 1 }],
topUsers: [
{ user: 'SVC_LOADER', executions: 40 },
{ user: 'ANALYST', executions: 2 },
],
},
]);
});
@ -136,6 +147,9 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
minExecutions: 5,
windowDays: 90,
enabledTables: [],
enabledSchemas: [],
modeledTableCatalog: [],
scopeFloorWarnings: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -14,6 +14,13 @@ async function readJson<T>(root: string, relPath: string): Promise<T> {
return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T;
}
function tableRef(value: string): { catalog: string | null; db: string | null; name: string } {
const parts = value.split('.');
if (parts.length === 3) return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
if (parts.length === 2) return { catalog: null, db: parts[0]!, name: parts[1]! };
return { catalog: null, db: null, name: value };
}
function aggregate(overrides: Partial<AggregatedTemplate> & { templateId: string; canonicalSql: string }): AggregatedTemplate {
return {
templateId: overrides.templateId,
@ -72,7 +79,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'orders-by-status',
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [tableRef('public.orders'), tableRef('public.customers')],
columnsByClause: {
select: ['status'],
where: ['created_at'],
@ -94,6 +101,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['public'],
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
},
@ -111,6 +119,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
{ id: 'bad-parse', sql: 'select broken from' },
],
'postgres',
undefined,
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']);
@ -131,6 +140,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
const orders = await readJson<Record<string, any>>(stagedDir, 'tables/public.orders.json');
expect(orders).toMatchObject({
table: 'public.orders',
tableRef: tableRef('public.orders'),
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
@ -159,7 +169,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
{
id: 'orders-by-status',
canonicalSql: expect.stringContaining('public.orders'),
tablesTouched: ['public.customers', 'public.orders'],
tablesTouched: [tableRef('public.customers'), tableRef('public.orders')],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
@ -167,6 +177,129 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
]);
});
it('keeps templates when service-account topUsers are only a partial execution sample', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'shared-bigquery-template',
canonicalSql: 'select status, count(*) from `demo.analytics.orders` group by status',
dialect: 'bigquery',
stats: {
executions: 42,
distinctUsers: 2,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 20,
p95RuntimeMs: 80,
errorRate: 0,
rowsProduced: null,
},
topUsers: [{ user: 'svc_loader', executions: 5 }],
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () =>
new Map([
[
'shared-bigquery-template',
{
tablesTouched: [tableRef('demo.analytics.orders')],
columnsByClause: { select: ['status'], groupBy: ['status'] },
},
],
]),
),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'bigquery',
windowDays: 90,
enabledSchemas: ['analytics'],
filters: {
serviceAccounts: { patterns: ['^svc_loader$'], mode: 'exclude' },
},
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates.map((template: { id: string }) => template.id)).toEqual([
'shared-bigquery-template',
]);
const orders = await readJson<Record<string, any>>(stagedDir, 'tables/demo.analytics.orders.json');
expect(orders.topTemplates).toEqual([
{
id: 'shared-bigquery-template',
canonicalSql: 'select status, count(*) from `demo.analytics.orders` group by status',
topUsers: [{ user: 'svc_loader' }],
},
]);
});
it('drops service-account-only templates when matched users cover all executions', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'service-only-template',
canonicalSql: 'merge into analytics.orders using staging.orders_delta on orders.id = orders_delta.id',
stats: {
executions: 12,
distinctUsers: 1,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 20,
p95RuntimeMs: 80,
errorRate: 0,
rowsProduced: 0,
},
topUsers: [{ user: 'svc_loader', executions: 12 }],
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map()),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['analytics'],
filters: {
serviceAccounts: { patterns: ['^svc_loader$'], mode: 'exclude' },
},
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith([], 'postgres', undefined);
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates).toEqual([]);
});
it('redacts configured SQL substrings in staged artifacts while analyzing original SQL', async () => {
const stagedDir = await tempDir();
const originalSql =
@ -198,7 +331,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'api-events-with-secret',
{
tablesTouched: ['public.api_events'],
tablesTouched: [tableRef('public.api_events')],
columnsByClause: {
select: [],
where: ['api_key', 'note'],
@ -219,6 +352,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['public'],
redactionPatterns: ['sk_live_[A-Za-z0-9]+', '(?i)secret_token_[a-z0-9]+'],
},
now: new Date('2026-05-11T12:00:00.000Z'),
@ -227,6 +361,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[{ id: 'api-events-with-secret', sql: originalSql }],
'postgres',
undefined,
);
const tableJson = await readFile(join(stagedDir, 'tables/public.api_events.json'), 'utf-8');
@ -266,21 +401,21 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'selected-qualified',
{
tablesTouched: ['orbit_analytics.int_active_contract_arr'],
tablesTouched: [tableRef('orbit_analytics.int_active_contract_arr')],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'selected-unqualified',
{
tablesTouched: ['int_customer_health_signals'],
tablesTouched: [tableRef('orbit_analytics.int_customer_health_signals')],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'unselected',
{
tablesTouched: ['orbit_raw.accounts'],
tablesTouched: [tableRef('orbit_raw.accounts')],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
@ -297,16 +432,16 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
pullConfig: {
dialect: 'postgres',
enabledTables: [
'orbit_analytics.int_active_contract_arr',
'orbit_analytics.int_customer_health_signals',
tableRef('orbit_analytics.int_active_contract_arr'),
tableRef('orbit_analytics.int_customer_health_signals'),
],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual([
'int_customer_health_signals.json',
'orbit_analytics.int_active_contract_arr.json',
'orbit_analytics.int_customer_health_signals.json',
]);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(2);
@ -372,7 +507,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'orders-customers-a',
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [tableRef('public.orders'), tableRef('public.customers')],
columnsByClause: {
select: [],
where: ['payload'],
@ -384,7 +519,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'orders-customers-b',
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [tableRef('public.orders'), tableRef('public.customers')],
columnsByClause: {
select: [],
where: ['payload_b'],
@ -396,7 +531,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
[
'orders-single-table',
{
tablesTouched: ['public.orders'],
tablesTouched: [tableRef('public.orders')],
columnsByClause: {
select: [],
where: [],
@ -415,7 +550,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
pullConfig: { dialect: 'postgres', enabledSchemas: ['public'] },
now: new Date('2026-05-11T12:00:00.000Z'),
});
@ -456,7 +591,13 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['analytic', { tablesTouched: ['public.orders'], columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] } }],
[
'analytic',
{
tablesTouched: [tableRef('public.orders')],
columnsByClause: { select: ['status'], where: [], join: [], groupBy: ['status'] },
},
],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
@ -467,7 +608,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
pullConfig: { dialect: 'postgres', enabledSchemas: ['public'] },
now: new Date('2026-05-11T12:00:00.000Z'),
});
@ -475,26 +616,27 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[{ id: 'analytic', sql: 'select status, count(*) from public.orders group by status' }],
'postgres',
undefined,
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.orders.json']);
});
it('merges bare and schema-qualified references to the same table into one work unit', async () => {
it('keeps modeled-schema refs and drops unmodeled-schema refs by default', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'qualified', canonicalSql: 'select count(*) from orbit_raw.accounts' });
yield aggregate({ templateId: 'bare', canonicalSql: 'select id from accounts where active' });
yield aggregate({ templateId: 'modeled', canonicalSql: 'select count(*) from orbit_raw.accounts' });
yield aggregate({ templateId: 'noise', canonicalSql: 'select count(*) from metabase.application_table' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['qualified', { tablesTouched: ['orbit_raw.accounts'], columnsByClause: { select: [], where: [], join: [], groupBy: [] } }],
['bare', { tablesTouched: ['accounts'], columnsByClause: { select: ['id'], where: ['active'], join: [], groupBy: [] } }],
['modeled', { tablesTouched: [{ catalog: null, db: 'orbit_raw', name: 'accounts' }], columnsByClause: {} }],
['noise', { tablesTouched: [{ catalog: null, db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
@ -505,16 +647,213 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres' },
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['orbit_raw'],
modeledTableCatalog: [{ catalog: null, db: 'orbit_raw', name: 'accounts' }],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
// The bare `accounts` reference resolves to the unique qualified `orbit_raw.accounts`,
// so the two templates collapse into a single work unit instead of two.
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['orbit_raw.accounts.json']);
const merged = await readJson<Record<string, any>>(stagedDir, 'tables/orbit_raw.accounts.json');
expect(merged.topTemplates.map((t: any) => t.id).sort()).toEqual(['bare', 'qualified']);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(1);
});
it('fails open when the implicit modeled scope is empty', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'any-table', canonicalSql: 'select count(*) from metabase.application_table' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['any-table', { tablesTouched: [{ catalog: null, db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: { dialect: 'postgres', enabledSchemas: [], modeledTableCatalog: [] },
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['metabase.application_table.json']);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.warnings).toContain('query_history_scope_floor_disabled:empty_modeled_scope');
});
it('lets enabledSchemas star disable the floor', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'noise', canonicalSql: 'select count(*) from metabase.application_table' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['noise', { tablesTouched: [{ catalog: null, db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['*'],
modeledTableCatalog: [{ catalog: null, db: 'orbit_raw', name: 'accounts' }],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['metabase.application_table.json']);
});
it('matches BigQuery dataset scope even when refs include a catalog', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'modeled', canonicalSql: 'select count(*) from `demo-project.orbit_analytics.orders`' });
yield aggregate({ templateId: 'noise', canonicalSql: 'select count(*) from `demo-project.metabase.application_table`' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['modeled', { tablesTouched: [{ catalog: 'demo-project', db: 'orbit_analytics', name: 'orders' }], columnsByClause: {} }],
['noise', { tablesTouched: [{ catalog: 'demo-project', db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'bigquery',
enabledSchemas: ['orbit_analytics'],
modeledTableCatalog: [{ catalog: 'demo-project', db: 'orbit_analytics', name: 'orders' }],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['demo-project.orbit_analytics.orders.json']);
});
it('writes propagated scope-floor warnings to the staged manifest', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'any-table', canonicalSql: 'select count(*) from metabase.application_table' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
['any-table', { tablesTouched: [{ catalog: null, db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['*'],
scopeFloorWarnings: ['query_history_scope_floor_disabled:catalog_unavailable'],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.warnings).toContain('query_history_scope_floor_disabled:catalog_unavailable');
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['metabase.application_table.json']);
});
it('retries without the catalog and disables the floor when catalog qualification fails wholesale', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({ templateId: 'noise', canonicalSql: 'select count(*) from metabase.application_table' });
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi
.fn()
.mockRejectedValueOnce(new Error('catalog qualification failed'))
.mockResolvedValueOnce(
new Map([
['noise', { tablesTouched: [{ catalog: null, db: 'metabase', name: 'application_table' }], columnsByClause: {} }],
]),
),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledSchemas: ['orbit_raw'],
modeledTableCatalog: [{ catalog: null, db: 'orbit_raw', name: 'accounts' }],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(2);
expect(sqlAnalysis.analyzeBatch).toHaveBeenNthCalledWith(
1,
[{ id: 'noise', sql: 'select count(*) from metabase.application_table' }],
'postgres',
{ catalog: { tables: [{ catalog: null, db: 'orbit_raw', name: 'accounts' }] } },
);
expect(sqlAnalysis.analyzeBatch).toHaveBeenNthCalledWith(
2,
[{ id: 'noise', sql: 'select count(*) from metabase.application_table' }],
'postgres',
undefined,
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['metabase.application_table.json']);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.warnings).toContain('query_history_scope_floor_disabled:catalog_qualification_failed');
});
});

View file

@ -59,6 +59,7 @@ describe('historic-sql unified contracts', () => {
expect(
stagedTableInputSchema.parse({
table: 'public.orders',
tableRef: { catalog: null, db: 'public', name: 'orders' },
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
@ -81,7 +82,7 @@ describe('historic-sql unified contracts', () => {
{
id: 'pg:123',
canonicalSql: 'select * from public.orders',
tablesTouched: ['public.orders'],
tablesTouched: [{ catalog: null, db: 'public', name: 'orders' }],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',

View file

@ -1,4 +1,4 @@
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
@ -34,6 +34,36 @@ describe('local ingest adapters', () => {
};
}
async function seedLiveScanTable(
projectDir: string,
connectionId: string,
table: { catalog: string | null; db: string | null; name: string },
): Promise<void> {
const rawRoot = join(projectDir, 'raw-sources', connectionId, 'live-database', 'sync-1');
await mkdir(join(rawRoot, 'tables'), { recursive: true });
await writeFile(
join(rawRoot, 'connection.json'),
`${JSON.stringify({ connectionId, driver: 'postgres' }, null, 2)}\n`,
'utf-8',
);
await writeFile(
join(rawRoot, 'tables', `${table.db ?? 'default'}-${table.name}.json`),
`${JSON.stringify(
{
...table,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
},
null,
2,
)}\n`,
'utf-8',
);
}
it('registers Metabase locally as a staged-bundle adapter', () => {
const adapters = createDefaultLocalIngestAdapters(project);
@ -205,11 +235,14 @@ describe('local ingest adapters', () => {
dialect: 'postgres',
minExecutions: 7,
enabledTables: [],
enabledSchemas: [],
modeledTableCatalog: [],
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
dropTrivialProbes: true,
},
redactionPatterns: [],
scopeFloorWarnings: [],
staleArchiveAfterDays: 90,
});
});
@ -237,6 +270,71 @@ describe('local ingest adapters', () => {
});
});
it('passes computed modeled scope to direct historic-sql adapter pull config', async () => {
await mkdir(join(project.projectDir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(project.projectDir, 'semantic-layer/warehouse/revenue.yaml'),
[
'name: revenue',
'table: orbit_analytics.mart_revenue',
'grain: [id]',
'columns:',
' - name: id',
' type: string',
'',
].join('\n'),
'utf-8',
);
await seedLiveScanTable(project.projectDir, 'warehouse', {
catalog: null,
db: 'orbit_raw',
name: 'accounts',
});
const projectWithQueryHistory = projectWithConnections({
warehouse: {
driver: 'postgres',
schemas: ['orbit_raw'],
context: {
queryHistory: {
enabled: true,
minExecutions: 7,
filters: { dropTrivialProbes: true },
},
},
},
});
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(projectWithQueryHistory, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
minExecutions: 7,
enabledSchemas: ['orbit_analytics', 'orbit_raw'],
modeledTableCatalog: [
{ catalog: null, db: 'orbit_analytics', name: 'mart_revenue' },
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
],
});
});
it('passes query-history scope fail-open warnings to direct historic-sql pull config', async () => {
const projectDir = await mkdtemp(join(tmpdir(), 'ktx-local-qh-scope-warning-'));
const project = await initKtxProject({ projectDir });
project.config.connections.warehouse = {
driver: 'postgres',
schemas: ['orbit_raw'],
context: { queryHistory: { enabled: true } },
} as never;
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
enabledSchemas: ['*'],
scopeFloorWarnings: ['query_history_scope_floor_disabled:catalog_unavailable'],
});
await rm(projectDir, { recursive: true, force: true });
});
it('rejects local historic-sql pulls when the connection has not enabled historic SQL', async () => {
const historicSql = createDefaultLocalIngestAdapters(project, {
historicSql: {

View file

@ -49,7 +49,10 @@ describe('createHttpSqlAnalysisPort', () => {
const requestJson = vi.fn(async () => ({
results: {
orders: {
tables_touched: ['public.orders', 'public.customers'],
tables_touched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
columns_by_clause: {
select: ['status'],
where: ['created_at'],
@ -79,7 +82,10 @@ describe('createHttpSqlAnalysisPort', () => {
[
'orders',
{
tablesTouched: ['public.orders', 'public.customers'],
tablesTouched: [
{ catalog: null, db: 'public', name: 'orders' },
{ catalog: null, db: 'public', name: 'customers' },
],
columnsByClause: {
select: ['status'],
where: ['created_at'],
@ -108,6 +114,62 @@ describe('createHttpSqlAnalysisPort', () => {
});
});
it('passes an optional catalog and maps structured table refs for SQL batch analysis', async () => {
const requestJson = vi.fn(async () => ({
results: {
orders: {
tables_touched: [
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
{ catalog: 'demo_project', db: 'orbit_analytics', name: 'orders' },
],
columns_by_clause: { select: ['id'] },
error: null,
},
},
}));
const port = createHttpSqlAnalysisPort({ baseUrl: 'http://python.test', requestJson });
await expect(
port.analyzeBatch(
[{ id: 'orders', sql: 'select id from accounts' }],
'postgres',
{
catalog: {
tables: [
{ catalog: null, db: 'orbit_raw', name: 'accounts', columns: ['id'] },
{ catalog: 'demo_project', db: 'orbit_analytics', name: 'orders', columns: ['id'] },
],
},
},
),
).resolves.toEqual(
new Map([
[
'orders',
{
tablesTouched: [
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
{ catalog: 'demo_project', db: 'orbit_analytics', name: 'orders' },
],
columnsByClause: { select: ['id'] },
error: null,
},
],
]),
);
expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', {
dialect: 'postgres',
items: [{ id: 'orders', sql: 'select id from accounts' }],
catalog: {
tables: [
{ catalog: null, db: 'orbit_raw', name: 'accounts', columns: ['id'] },
{ catalog: 'demo_project', db: 'orbit_analytics', name: 'orders', columns: ['id'] },
],
},
});
});
it('maps read-only SQL validation responses', async () => {
const requests: Array<{ path: string; payload: Record<string, unknown> }> = [];
const port = createHttpSqlAnalysisPort({
@ -150,7 +212,7 @@ describe('createHttpSqlAnalysisPort', () => {
const requestJson = vi.fn(async () => ({
results: {
orders: {
tables_touched: ['public.orders'],
tables_touched: [{ catalog: null, db: 'public', name: 'orders' }],
columns_by_clause: { select: ['status'], where: [42] },
error: null,
},

View file

@ -2,8 +2,8 @@ import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { loadKtxProject } from '../src/context/project/project.js';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createKtxCliLocalIngestAdapters } from '../src/local-adapters.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { createKtxCliHistoricSqlRuntime, createKtxCliLocalIngestAdapters } from '../src/local-adapters.js';
function sqlAnalysisStub() {
return {
@ -70,6 +70,116 @@ describe('CLI local ingest adapters', () => {
]);
});
it('creates reusable query-history runtime dependencies for setup', async () => {
await writeProject(
tempDir,
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' context:',
' queryHistory:',
' enabled: true',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const sqlAnalysis = sqlAnalysisStub();
const runtime = createKtxCliHistoricSqlRuntime(project, 'warehouse', { sqlAnalysis });
expect(runtime).toMatchObject({
dialect: 'postgres',
sqlAnalysis,
});
expect(runtime?.reader).toBeDefined();
expect(runtime?.queryClient).toBeDefined();
});
it('uses managed daemon SQL analysis when query-history runtime gets managed daemon options', async () => {
await writeProject(
tempDir,
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' context:',
' queryHistory:',
' enabled: true',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const testIo = {
stdout: { write: vi.fn() },
stderr: { write: vi.fn() },
};
const ensureRuntime = vi.fn(async () => ({
layout: {} as never,
manifest: {} as never,
}));
const startDaemon = vi.fn(async () => ({
status: 'started' as const,
layout: {} as never,
state: { pid: 1234 } as never,
baseUrl: 'http://127.0.0.1:61234',
}));
const postJson = vi.fn(async () => ({
results: {
probe: {
tables_touched: [],
columns_by_clause: {},
error: null,
},
},
}));
const runtime = createKtxCliHistoricSqlRuntime(project, 'warehouse', {
managedDaemon: {
cliVersion: '0.2.0',
projectDir: tempDir,
installPolicy: 'auto',
io: testIo,
ensureRuntime,
startDaemon,
postJson,
},
});
await expect(runtime?.sqlAnalysis.analyzeBatch([{ id: 'probe', sql: 'select 1' }], 'postgres')).resolves.toEqual(
new Map([
[
'probe',
{
tablesTouched: [],
columnsByClause: {},
error: null,
},
],
]),
);
expect(ensureRuntime).toHaveBeenCalledWith({
cliVersion: '0.2.0',
installPolicy: 'auto',
io: testIo,
feature: 'core',
});
expect(startDaemon).toHaveBeenCalledWith({
cliVersion: '0.2.0',
projectDir: tempDir,
features: ['core'],
force: false,
});
expect(postJson).toHaveBeenCalledWith('http://127.0.0.1:61234', '/sql/analyze-batch', {
dialect: 'postgres',
items: [{ id: 'probe', sql: 'select 1' }],
});
});
it('registers historic SQL when explicitly requested even if connection query history is disabled', async () => {
await writeProject(
tempDir,

View file

@ -161,7 +161,7 @@ describe('KTX daemon ingest ports', () => {
const requestJson = vi.fn(async () => ({
results: {
orders: {
tables_touched: ['public.orders'],
tables_touched: [{ catalog: null, db: 'public', name: 'orders' }],
columns_by_clause: { select: ['status'] },
error: null,
},
@ -175,7 +175,7 @@ describe('KTX daemon ingest ports', () => {
[
'orders',
{
tablesTouched: ['public.orders'],
tablesTouched: [{ catalog: null, db: 'public', name: 'orders' }],
columnsByClause: { select: ['status'] },
error: null,
},

View file

@ -1,4 +1,4 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from '../src/context/project/config.js';
@ -668,12 +668,134 @@ describe('runKtxPublicIngest', () => {
dropFailedBelow: { errorRate: 0.5, executions: 3 },
},
redactionPatterns: ['(?i)secret'],
enabledTables: ['orbit_analytics.int_active_contract_arr'],
enabledTables: [{ catalog: null, db: 'orbit_analytics', name: 'int_active_contract_arr' }],
},
});
expect(ingestArgs?.historicSqlPullConfigOverride).not.toHaveProperty('enabled');
});
it('resolves query-history scope after the schema scan writes artifacts', async () => {
const io = makeIo();
const projectDir = await mkdtemp(join(tmpdir(), 'ktx-public-qh-scope-'));
const project = deepReadyProject({
warehouse: {
driver: 'postgres',
schemas: ['orbit_raw'],
context: { queryHistory: { enabled: true } },
},
});
const runScan = vi.fn(async () => {
await mkdir(join(projectDir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(projectDir, 'semantic-layer/warehouse/revenue.yaml'),
[
'name: revenue',
'table: orbit_analytics.mart_revenue',
'grain: [id]',
'columns:',
' - name: id',
' type: string',
'',
].join('\n'),
'utf-8',
);
const rawRoot = join(projectDir, 'raw-sources/warehouse/live-database/sync-1');
await mkdir(join(rawRoot, 'tables'), { recursive: true });
await writeFile(
join(rawRoot, 'connection.json'),
`${JSON.stringify({ connectionId: 'warehouse', driver: 'postgres' }, null, 2)}\n`,
'utf-8',
);
await writeFile(
join(rawRoot, 'tables/accounts.json'),
`${JSON.stringify(
{
catalog: null,
db: 'orbit_raw',
name: 'accounts',
kind: 'table',
comment: null,
estimatedRows: null,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
],
foreignKeys: [],
},
null,
2,
)}\n`,
'utf-8',
);
await writeFile(
join(rawRoot, 'scan-report.json'),
`${JSON.stringify(
{
connectionId: 'warehouse',
driver: 'postgres',
syncId: 'sync-1',
runId: 'scan-sync-1',
trigger: 'cli',
mode: 'enriched',
dryRun: false,
artifactPaths: {
rawSourcesDir: 'raw-sources/warehouse/live-database/sync-1',
reportPath: 'raw-sources/warehouse/live-database/sync-1/scan-report.json',
manifestShards: [],
enrichmentArtifacts: [],
},
counts: {},
warnings: [],
enrichment: {},
enrichmentState: {},
},
null,
2,
)}\n`,
'utf-8',
);
return 0;
});
const runIngest = vi.fn<NonNullable<KtxPublicIngestDeps['runIngest']>>(async () => 0);
await expect(
runKtxPublicIngest(
{
command: 'run',
projectDir,
targetConnectionId: 'warehouse',
all: false,
json: false,
inputMode: 'disabled',
queryHistory: 'enabled',
},
io.io,
{ loadProject: vi.fn(async () => ({ ...project, projectDir })), runScan, runIngest },
),
).resolves.toBe(0);
const ingestArgs = runIngest.mock.calls[0]?.[0] as
| Extract<Parameters<NonNullable<KtxPublicIngestDeps['runIngest']>>[0], { command: 'run' }>
| undefined;
expect(ingestArgs?.historicSqlPullConfigOverride).toMatchObject({
dialect: 'postgres',
enabledSchemas: ['orbit_analytics', 'orbit_raw'],
modeledTableCatalog: [
{ catalog: null, db: 'orbit_analytics', name: 'mart_revenue' },
{ catalog: null, db: 'orbit_raw', name: 'accounts' },
],
});
await rm(projectDir, { recursive: true, force: true });
});
it('prints the schema-first notice for explicit query-history runs', async () => {
const io = makeIo();
const project = deepReadyProject({

View file

@ -6,6 +6,7 @@ import { parseKtxProjectConfig } from '../src/context/project/config.js';
import { readKtxSetupState, writeKtxSetupState } from '../src/context/project/setup-config.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
managedDaemonOptionsForSetupQueryHistoryPicker,
type KtxSetupDatabaseDriver,
type KtxSetupDatabasesDeps,
type KtxSetupDatabasesPromptAdapter,
@ -137,6 +138,22 @@ function textInputPrompt(message: string): string {
return `${title}\n│\n│ ${bodyLines.join('\n│ ')}\n│ Press Escape to go back.\n│`;
}
function queryHistoryFromConfig(connection: unknown): {
filters?: { serviceAccounts?: unknown; dropTrivialProbes?: boolean };
} | undefined {
if (!connection || typeof connection !== 'object' || Array.isArray(connection)) {
return undefined;
}
const context = (connection as { context?: unknown }).context;
if (!context || typeof context !== 'object' || Array.isArray(context)) {
return undefined;
}
const queryHistory = (context as { queryHistory?: unknown }).queryHistory;
return queryHistory && typeof queryHistory === 'object' && !Array.isArray(queryHistory)
? (queryHistory as { filters?: { serviceAccounts?: unknown; dropTrivialProbes?: boolean } })
: undefined;
}
describe('setup databases step', () => {
let tempDir: string;
@ -150,6 +167,61 @@ describe('setup databases step', () => {
await rm(tempDir, { recursive: true, force: true });
});
it('builds managed daemon options for setup query-history SQL analysis', () => {
const io = makeIo();
expect(
managedDaemonOptionsForSetupQueryHistoryPicker({
projectDir: tempDir,
args: {
inputMode: 'disabled',
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
},
io: io.io,
}),
).toEqual({
cliVersion: '0.2.0',
projectDir: tempDir,
installPolicy: 'auto',
io: io.io,
});
});
it('defaults managed daemon setup options when the database step is called directly', () => {
const io = makeIo();
expect(
managedDaemonOptionsForSetupQueryHistoryPicker({
projectDir: tempDir,
args: {
inputMode: 'disabled',
},
io: io.io,
}),
).toMatchObject({
cliVersion: expect.any(String),
projectDir: tempDir,
installPolicy: 'never',
io: io.io,
});
expect(
managedDaemonOptionsForSetupQueryHistoryPicker({
projectDir: tempDir,
args: {
inputMode: 'auto',
},
io: io.io,
}),
).toMatchObject({
cliVersion: expect.any(String),
projectDir: tempDir,
installPolicy: 'prompt',
io: io.io,
});
});
it('shows every supported database in the interactive checklist', async () => {
const prompts = makePromptAdapter({ multiselectValues: [['back']] });
@ -2569,6 +2641,190 @@ describe('setup databases step', () => {
expect(io.stdout()).toContain('pg_stat_statements ready');
});
it('auto-applies derived query-history service-account filters in non-interactive setup', async () => {
const io = makeIo();
const queryHistoryFilterPicker = vi.fn(async () => ({
excludedRoles: [
{
role: 'svc_loader',
pattern: '^svc_loader$',
reason: 'Runs recurring loader traffic against modeled tables.',
},
],
consideredRoleCount: 2,
skipped: null,
warnings: [],
}));
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
yes: true,
databaseDrivers: ['postgres'],
databaseConnectionId: 'warehouse',
databaseUrl: 'env:DATABASE_URL',
databaseSchemas: ['public'],
enableQueryHistory: true,
skipDatabases: false,
},
io.io,
{
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
historicSqlReadinessProbe: vi.fn(async () => {
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
return {
ok: true as const,
dialect: 'postgres' as const,
runner,
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
};
}),
queryHistoryFilterPicker,
createQueryHistoryLlmRuntime: vi.fn(() => null),
},
);
expect(result.status).toBe('ready');
expect(queryHistoryFilterPicker).toHaveBeenCalledTimes(1);
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.warehouse).toMatchObject({
context: {
queryHistory: {
filters: {
dropTrivialProbes: true,
serviceAccounts: {
mode: 'exclude',
patterns: ['^svc_loader$'],
},
},
},
},
});
expect(io.stdout()).toContain('Proposed query-history service-account filters');
expect(io.stdout()).toContain('svc_loader');
});
it('lets interactive setup skip applying derived filters', async () => {
const io = makeIo();
const prompts = makePromptAdapter({
selectValues: ['skip'],
});
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
yes: false,
databaseDrivers: ['postgres'],
databaseConnectionId: 'warehouse',
databaseUrl: 'env:DATABASE_URL',
databaseSchemas: ['public'],
enableQueryHistory: true,
skipDatabases: false,
},
io.io,
{
prompts,
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
historicSqlReadinessProbe: vi.fn(async () => {
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
return {
ok: true as const,
dialect: 'postgres' as const,
runner,
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
};
}),
queryHistoryFilterPicker: vi.fn(async () => ({
excludedRoles: [{ role: 'svc_loader', pattern: '^svc_loader$', reason: 'Loader traffic.' }],
consideredRoleCount: 2,
skipped: null,
warnings: [],
})),
createQueryHistoryLlmRuntime: vi.fn(() => null),
},
);
expect(result.status).toBe('ready');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(queryHistoryFromConfig(config.connections.warehouse)?.filters).toEqual({ dropTrivialProbes: true });
expect(prompts.select).toHaveBeenCalledWith({
message: 'Apply 1 derived query-history service-account exclusion?',
options: [
{ value: 'apply', label: 'Apply derived filters (recommended)' },
{ value: 'skip', label: 'Leave query history filters unchanged' },
],
});
});
it('does not overwrite an existing serviceAccounts block', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DATABASE_URL',
' context:',
' queryHistory:',
' enabled: true',
' filters:',
' dropTrivialProbes: true',
' serviceAccounts:',
' mode: exclude',
' patterns:',
" - '^existing$'",
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
yes: true,
databaseConnectionIds: ['warehouse'],
databaseSchemas: [],
enableQueryHistory: true,
skipDatabases: false,
},
io.io,
{
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
historicSqlReadinessProbe: vi.fn(async () => {
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
return {
ok: true as const,
dialect: 'postgres' as const,
runner,
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
};
}),
queryHistoryFilterPicker: vi.fn(async () => ({
excludedRoles: [{ role: 'svc_loader', pattern: '^svc_loader$', reason: 'Loader traffic.' }],
consideredRoleCount: 2,
skipped: { reason: 'user-block-present' as const },
warnings: [],
})),
createQueryHistoryLlmRuntime: vi.fn(() => null),
},
);
expect(result.status).toBe('ready');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(queryHistoryFromConfig(config.connections.warehouse)?.filters?.serviceAccounts).toEqual({
mode: 'exclude',
patterns: ['^existing$'],
});
expect(io.stdout()).toContain('Existing query-history service-account filters left unchanged');
});
it('asks interactive Postgres setup whether to enable query history', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),

View file

@ -1684,6 +1684,9 @@ describe('setup status', () => {
expect.objectContaining({
projectDir: tempDir,
inputMode: 'disabled',
yes: true,
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
databaseDrivers: ['postgres'],
databaseConnectionId: 'warehouse',
databaseUrl: 'env:DATABASE_URL',

View file

@ -33,7 +33,7 @@ function makeIo(options: { isTTY?: boolean } = {}) {
function makeSqlAnalysis(result: Awaited<ReturnType<SqlAnalysisPort['validateReadOnly']>>): SqlAnalysisPort {
return {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([['cli-sql', { tablesTouched: ['orders'], columnsByClause: {} }]])),
analyzeBatch: vi.fn(async () => new Map([['cli-sql', { tablesTouched: [{ catalog: null, db: null, name: 'orders' }], columnsByClause: {} }]])),
validateReadOnly: vi.fn(async () => result),
};
}