feat: run historic sql deterministic projection

This commit is contained in:
Andrey Avtomonov 2026-05-11 18:58:19 +02:00
parent fb541cb9e5
commit da263c0957
7 changed files with 311 additions and 22 deletions

View file

@ -6,6 +6,23 @@ import { patternOutputSchema, tableUsageOutputSchema } from './skill-schemas.js'
const SYSTEM_AUTHOR = 'System User';
const SYSTEM_EMAIL = 'system@example.com';
interface EmitHistoricSqlEvidenceToolContext {
connectionId?: string | null;
session?: {
ingest?: { runId: string; sourceKey: string };
configService?: {
writeFile(
path: string,
content: string,
author: string,
authorEmail: string,
commitMessage: string,
options?: { skipLock?: boolean },
): Promise<unknown>;
};
};
}
function unitKeyForEvidence(input: { kind: string; table?: string; pattern?: { slug: string } }): string {
if (input.kind === 'table_usage') {
return `historic-sql-table-${String(input.table).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`;
@ -13,7 +30,7 @@ function unitKeyForEvidence(input: { kind: string; table?: string; pattern?: { s
return `historic-sql-pattern-${String(input.pattern?.slug).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`;
}
export function createEmitHistoricSqlEvidenceTool() {
export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricSqlEvidenceToolContext) {
return tool({
description:
'Record typed historic-SQL evidence for deterministic projection. Use this instead of wiki_write, sl_write_source, sl_edit_source, or context_candidate_write during historic-SQL WorkUnits.',
@ -31,24 +48,7 @@ export function createEmitHistoricSqlEvidenceTool() {
}),
]),
execute: async (input, options): Promise<string> => {
const context = options.experimental_context as
| {
connectionId?: string | null;
session?: {
ingest?: { runId: string; sourceKey: string };
configService?: {
writeFile(
path: string,
content: string,
author: string,
authorEmail: string,
commitMessage: string,
options?: { skipLock?: boolean },
): Promise<unknown>;
};
};
}
| undefined;
const context = (options.experimental_context as EmitHistoricSqlEvidenceToolContext | undefined) ?? defaultContext;
const ingest = context?.session?.ingest;
const configService = context?.session?.configService;
if (!ingest || ingest.sourceKey !== 'historic-sql' || !configService || !context?.connectionId) {

View file

@ -0,0 +1,74 @@
import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import YAML from 'yaml';
import { describe, expect, it } from 'vitest';
import { HistoricSqlProjectionPostProcessor } from './post-processor.js';
async function tempWorkdir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-post-processor-'));
}
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
await mkdir(join(target, '..'), { recursive: true });
await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
}
describe('HistoricSqlProjectionPostProcessor', () => {
it('projects current run evidence before the ingest squash commit', async () => {
const workdir = await tempWorkdir();
await mkdir(join(workdir, 'semantic-layer/warehouse/_schema'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'),
YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }),
'utf-8',
);
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
});
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' });
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by lifecycle status.',
frequencyTier: 'high',
commonFilters: ['status'],
commonJoins: [],
staleSince: null,
},
});
const result = await new HistoricSqlProjectionPostProcessor().run({
connectionId: 'warehouse',
sourceKey: 'historic-sql',
syncId: 'sync-1',
jobId: 'job-1',
runId: 'run-1',
workdir,
parseArtifacts: null,
});
expect(result.errors).toEqual([]);
expect(result.warnings).toEqual([]);
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
expect(result.result).toMatchObject({ tableUsageMerged: 1 });
await expect(readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain(
'Orders are repeatedly queried by lifecycle status.',
);
});
});

View file

@ -0,0 +1,41 @@
import type { IngestBundlePostProcessorInput, IngestBundlePostProcessorPort, IngestBundlePostProcessorResult } from '../../ports.js';
import { createSimpleGit } from '../../../core/git-env.js';
import { projectHistoricSqlEvidence } from './projection.js';
async function commitProjectionChanges(workdir: string): Promise<void> {
const git = createSimpleGit(workdir);
if (!(await git.checkIsRepo().catch(() => false))) {
return;
}
const status = await git.status();
const paths = status.files
.map((file) => file.path)
.filter((path) => path.startsWith('semantic-layer/') || path.startsWith('knowledge/global/historic-sql/'));
if (paths.length === 0) {
return;
}
await git.add(paths);
const staged = await git.diff(['--cached', '--name-only']);
if (!staged.trim()) {
return;
}
await git.commit('Project historic SQL evidence', { '--author': 'System User <system@example.com>' });
}
export class HistoricSqlProjectionPostProcessor implements IngestBundlePostProcessorPort {
async run(input: IngestBundlePostProcessorInput): Promise<IngestBundlePostProcessorResult> {
const projection = await projectHistoricSqlEvidence({
workdir: input.workdir,
connectionId: input.connectionId,
syncId: input.syncId,
runId: input.runId,
});
await commitProjectionChanges(input.workdir);
return {
result: projection,
warnings: projection.warnings,
errors: [],
touchedSources: projection.touchedSources,
};
}
}

View file

@ -347,6 +347,7 @@ export type {
HistoricSqlTableUsageEvidence,
} from './adapters/historic-sql/evidence.js';
export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js';
export { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js';
export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js';
export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js';
export {

View file

@ -2,6 +2,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import Database from 'better-sqlite3';
import YAML from 'yaml';
import { AgentRunnerService } from '../agent/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { makeLocalGitRepo } from '../test/make-local-git-repo.js';
@ -10,6 +11,7 @@ import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js';
import { createDefaultLocalIngestAdapters, localPullConfigForAdapter } from './local-adapters.js';
import { getLocalIngestStatus, runLocalIngest } from './local-ingest.js';
import type { ChunkResult, DiffSet, SourceAdapter } from './types.js';
class TestAgentRunner extends AgentRunnerService {
override runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' as const });
@ -86,6 +88,70 @@ class WikiWritingAgentRunner extends AgentRunnerService {
}
}
class HistoricSqlEvidenceAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async (params: any) => {
if (
params.telemetryTags?.operationName === 'ingest-bundle-wu' &&
params.telemetryTags?.unitKey === 'historic-sql-table-public-orders'
) {
const emitEvidence = params.toolSet.emit_historic_sql_evidence;
if (!emitEvidence?.execute) {
throw new Error('emit_historic_sql_evidence tool was not available to the historic-SQL WorkUnit');
}
const result = await emitEvidence.execute(
{
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by lifecycle status.',
frequencyTier: 'high',
commonFilters: ['status'],
commonJoins: [],
staleSince: null,
},
},
{ toolCallId: 'historic-sql-evidence' },
);
if (!String(result).includes('Recorded historic-SQL table_usage evidence')) {
throw new Error(`Unexpected historic-SQL evidence result: ${String(result)}`);
}
}
return { stopReason: 'natural' as const };
});
constructor() {
super({ llmProvider: { getModel: () => ({}) as never } as never });
}
}
class HistoricSqlEvidenceTestAdapter implements SourceAdapter {
readonly source = 'historic-sql';
readonly skillNames = ['historic_sql_table_digest'];
readonly reconcileSkillNames: string[] = [];
readonly triageSupported = false;
detect(): Promise<boolean> {
return Promise.resolve(true);
}
chunk(_stagedDir: string, _diffSet?: DiffSet): Promise<ChunkResult> {
return Promise.resolve({
workUnits: [
{
unitKey: 'historic-sql-table-public-orders',
displayLabel: 'public.orders',
rawFiles: ['tables/public.orders.json'],
peerFileIndex: [],
dependencyPaths: ['manifest.json'],
notes:
'Use historic_sql_table_digest. Read this table usage JSON and emit exactly one table_usage object with emit_historic_sql_evidence.',
},
],
});
}
}
function makeLookerRuntimeClient() {
const lookerModels = {
models: [{ name: 'ecommerce', label: 'Ecommerce', explores: [{ name: 'orders', label: 'Orders' }] }],
@ -308,6 +374,90 @@ describe('canonical local ingest', () => {
}
});
it('runs historic-SQL evidence projection through the local bundle post-processor', async () => {
const projectDir = join(tempDir, 'historic-sql-project');
await initKtxProject({ projectDir, projectName: 'warehouse' });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
'ingest:',
' adapters:',
' - historic-sql',
' embeddings:',
' backend: deterministic',
'storage:',
' state: sqlite',
' search: sqlite-fts5',
' git:',
' auto_commit: false',
' author: KTX Test <system@ktx.local>',
'',
].join('\n'),
'utf-8',
);
const historicProject = await loadKtxProject({ projectDir });
await historicProject.fileStore.writeFile(
'semantic-layer/warehouse/_schema/public.yaml',
YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }),
'KTX Test',
'system@ktx.local',
'Seed schema shard',
);
const sourceDir = join(tempDir, 'historic-sql-source');
await mkdir(join(sourceDir, 'tables'), { recursive: true });
await writeFile(
join(sourceDir, 'manifest.json'),
`${JSON.stringify(
{
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
},
null,
2,
)}\n`,
'utf-8',
);
await writeFile(join(sourceDir, 'tables/public.orders.json'), '{"table":"public.orders"}\n', 'utf-8');
await writeFile(join(sourceDir, 'patterns-input.json'), '{"templates":[]}\n', 'utf-8');
const agentRunner = new HistoricSqlEvidenceAgentRunner();
const result = await runLocalIngest({
project: historicProject,
adapters: [new HistoricSqlEvidenceTestAdapter()],
adapter: 'historic-sql',
connectionId: 'warehouse',
sourceDir,
jobId: 'historic-sql-local-projection',
agentRunner,
});
expect(result.result.failedWorkUnits).toEqual([]);
expect(result.report.body.postProcessor).toMatchObject({
sourceKey: 'historic-sql',
status: 'success',
result: { tableUsageMerged: 1 },
touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
});
await expect(readFile(join(projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain(
'Orders are repeatedly queried by lifecycle status.',
);
});
it('rejects direct Metabase scheduled pulls before requiring a local ingest LLM provider', async () => {
const projectDir = join(tempDir, 'metabase-project');
await initKtxProject({ projectDir, projectName: 'warehouse' });

View file

@ -2,6 +2,7 @@ import { mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';
import type { KtxLlmProvider } from '@ktx/llm';
import type { Tool } from 'ai';
import YAML from 'yaml';
import type { AgentRunnerService } from '../agent/index.js';
import { AgentRunnerService as DefaultAgentRunnerService } from '../agent/index.js';
@ -69,6 +70,8 @@ import {
ContextCandidateCarryforwardService,
CuratorPaginationService,
} from './context-candidates/index.js';
import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js';
import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js';
import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js';
import { DiffSetService } from './diff-set.service.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
@ -427,10 +430,16 @@ class NoopKnowledgeEventPort implements KnowledgeEventPort {
}
class LocalIngestToolSet implements IngestToolsetLike {
constructor(private readonly tools: BaseTool[]) {}
constructor(
private readonly tools: BaseTool[],
private readonly sourceTools: Record<string, Tool> = {},
) {}
toAiSdkTools(context: ToolContext) {
return Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)]));
return {
...Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)])),
...this.sourceTools,
};
}
}
@ -498,9 +507,19 @@ class LocalIngestToolsetFactory implements IngestToolsetFactoryPort {
];
}
createIngestWuToolset(_session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike {
createIngestWuToolset(session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike {
const sourceTools =
session.ingest?.sourceKey === 'historic-sql'
? {
emit_historic_sql_evidence: createEmitHistoricSqlEvidenceTool({
connectionId: session.connectionId,
session,
}),
}
: {};
return new LocalIngestToolSet(
options?.includeContextEvidenceTools ? [...this.baseTools, ...this.contextTools] : this.baseTools,
sourceTools,
);
}
}
@ -656,6 +675,9 @@ export function createLocalBundleIngestRuntime(
settings: { batchSize: 8, maxPasses: 8, stepBudgetPerPass: 60 },
logger,
}),
postProcessors: {
'historic-sql': new HistoricSqlProjectionPostProcessor(),
},
logger,
};

View file

@ -246,6 +246,7 @@ describe('@ktx/context package exports', () => {
expect(ingest.historicSqlEvidenceEnvelopeSchema).toBeDefined();
expect(ingest.historicSqlEvidencePath).toBeTypeOf('function');
expect(ingest.createEmitHistoricSqlEvidenceTool).toBeTypeOf('function');
expect(ingest.HistoricSqlProjectionPostProcessor).toBeTypeOf('function');
expect(ingest.SqliteContextEvidenceStore).toBeTypeOf('function');
expect(ingest.SqliteBundleIngestStore).toBeTypeOf('function');
expect(ingest.CuratorPaginationService).toBeTypeOf('function');