diff --git a/packages/context/src/ingest/adapters/historic-sql/evidence-tool.ts b/packages/context/src/ingest/adapters/historic-sql/evidence-tool.ts index 634b7593..a6641a74 100644 --- a/packages/context/src/ingest/adapters/historic-sql/evidence-tool.ts +++ b/packages/context/src/ingest/adapters/historic-sql/evidence-tool.ts @@ -6,6 +6,23 @@ import { patternOutputSchema, tableUsageOutputSchema } from './skill-schemas.js' const SYSTEM_AUTHOR = 'System User'; const SYSTEM_EMAIL = 'system@example.com'; +interface EmitHistoricSqlEvidenceToolContext { + connectionId?: string | null; + session?: { + ingest?: { runId: string; sourceKey: string }; + configService?: { + writeFile( + path: string, + content: string, + author: string, + authorEmail: string, + commitMessage: string, + options?: { skipLock?: boolean }, + ): Promise; + }; + }; +} + function unitKeyForEvidence(input: { kind: string; table?: string; pattern?: { slug: string } }): string { if (input.kind === 'table_usage') { return `historic-sql-table-${String(input.table).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`; @@ -13,7 +30,7 @@ function unitKeyForEvidence(input: { kind: string; table?: string; pattern?: { s return `historic-sql-pattern-${String(input.pattern?.slug).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`; } -export function createEmitHistoricSqlEvidenceTool() { +export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricSqlEvidenceToolContext) { return tool({ description: 'Record typed historic-SQL evidence for deterministic projection. Use this instead of wiki_write, sl_write_source, sl_edit_source, or context_candidate_write during historic-SQL WorkUnits.', @@ -31,24 +48,7 @@ export function createEmitHistoricSqlEvidenceTool() { }), ]), execute: async (input, options): Promise => { - const context = options.experimental_context as - | { - connectionId?: string | null; - session?: { - ingest?: { runId: string; sourceKey: string }; - configService?: { - writeFile( - path: string, - content: string, - author: string, - authorEmail: string, - commitMessage: string, - options?: { skipLock?: boolean }, - ): Promise; - }; - }; - } - | undefined; + const context = (options.experimental_context as EmitHistoricSqlEvidenceToolContext | undefined) ?? defaultContext; const ingest = context?.session?.ingest; const configService = context?.session?.configService; if (!ingest || ingest.sourceKey !== 'historic-sql' || !configService || !context?.connectionId) { diff --git a/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts b/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts new file mode 100644 index 00000000..c96461c1 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts @@ -0,0 +1,74 @@ +import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import YAML from 'yaml'; +import { describe, expect, it } from 'vitest'; +import { HistoricSqlProjectionPostProcessor } from './post-processor.js'; + +async function tempWorkdir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-post-processor-')); +} + +async function writeJson(root: string, relPath: string, value: unknown): Promise { + const target = join(root, relPath); + await mkdir(join(target, '..'), { recursive: true }); + await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); +} + +describe('HistoricSqlProjectionPostProcessor', () => { + it('projects current run evidence before the ingest squash commit', async () => { + const workdir = await tempWorkdir(); + await mkdir(join(workdir, 'semantic-layer/warehouse/_schema'), { recursive: true }); + await writeFile( + join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), + YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }), + 'utf-8', + ); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 1, + touchedTableCount: 1, + parseFailures: 0, + warnings: [], + probeWarnings: [], + staleArchiveAfterDays: 90, + }); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' }); + await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', { + kind: 'table_usage', + connectionId: 'warehouse', + table: 'public.orders', + rawPath: 'tables/public.orders.json', + usage: { + narrative: 'Orders are repeatedly queried by lifecycle status.', + frequencyTier: 'high', + commonFilters: ['status'], + commonJoins: [], + staleSince: null, + }, + }); + + const result = await new HistoricSqlProjectionPostProcessor().run({ + connectionId: 'warehouse', + sourceKey: 'historic-sql', + syncId: 'sync-1', + jobId: 'job-1', + runId: 'run-1', + workdir, + parseArtifacts: null, + }); + + expect(result.errors).toEqual([]); + expect(result.warnings).toEqual([]); + expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]); + expect(result.result).toMatchObject({ tableUsageMerged: 1 }); + await expect(readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain( + 'Orders are repeatedly queried by lifecycle status.', + ); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/post-processor.ts b/packages/context/src/ingest/adapters/historic-sql/post-processor.ts new file mode 100644 index 00000000..815b6798 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/post-processor.ts @@ -0,0 +1,41 @@ +import type { IngestBundlePostProcessorInput, IngestBundlePostProcessorPort, IngestBundlePostProcessorResult } from '../../ports.js'; +import { createSimpleGit } from '../../../core/git-env.js'; +import { projectHistoricSqlEvidence } from './projection.js'; + +async function commitProjectionChanges(workdir: string): Promise { + const git = createSimpleGit(workdir); + if (!(await git.checkIsRepo().catch(() => false))) { + return; + } + const status = await git.status(); + const paths = status.files + .map((file) => file.path) + .filter((path) => path.startsWith('semantic-layer/') || path.startsWith('knowledge/global/historic-sql/')); + if (paths.length === 0) { + return; + } + await git.add(paths); + const staged = await git.diff(['--cached', '--name-only']); + if (!staged.trim()) { + return; + } + await git.commit('Project historic SQL evidence', { '--author': 'System User ' }); +} + +export class HistoricSqlProjectionPostProcessor implements IngestBundlePostProcessorPort { + async run(input: IngestBundlePostProcessorInput): Promise { + const projection = await projectHistoricSqlEvidence({ + workdir: input.workdir, + connectionId: input.connectionId, + syncId: input.syncId, + runId: input.runId, + }); + await commitProjectionChanges(input.workdir); + return { + result: projection, + warnings: projection.warnings, + errors: [], + touchedSources: projection.touchedSources, + }; + } +} diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index f5a1ab0e..d69db419 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -347,6 +347,7 @@ export type { HistoricSqlTableUsageEvidence, } from './adapters/historic-sql/evidence.js'; export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js'; +export { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js'; export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js'; export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js'; export { diff --git a/packages/context/src/ingest/local-bundle-ingest.test.ts b/packages/context/src/ingest/local-bundle-ingest.test.ts index 6e9aa4aa..2fa014d0 100644 --- a/packages/context/src/ingest/local-bundle-ingest.test.ts +++ b/packages/context/src/ingest/local-bundle-ingest.test.ts @@ -2,6 +2,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import Database from 'better-sqlite3'; +import YAML from 'yaml'; import { AgentRunnerService } from '../agent/index.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js'; import { makeLocalGitRepo } from '../test/make-local-git-repo.js'; @@ -10,6 +11,7 @@ import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; import { createDefaultLocalIngestAdapters, localPullConfigForAdapter } from './local-adapters.js'; import { getLocalIngestStatus, runLocalIngest } from './local-ingest.js'; +import type { ChunkResult, DiffSet, SourceAdapter } from './types.js'; class TestAgentRunner extends AgentRunnerService { override runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' as const }); @@ -86,6 +88,70 @@ class WikiWritingAgentRunner extends AgentRunnerService { } } +class HistoricSqlEvidenceAgentRunner extends AgentRunnerService { + override runLoop = vi.fn(async (params: any) => { + if ( + params.telemetryTags?.operationName === 'ingest-bundle-wu' && + params.telemetryTags?.unitKey === 'historic-sql-table-public-orders' + ) { + const emitEvidence = params.toolSet.emit_historic_sql_evidence; + if (!emitEvidence?.execute) { + throw new Error('emit_historic_sql_evidence tool was not available to the historic-SQL WorkUnit'); + } + const result = await emitEvidence.execute( + { + kind: 'table_usage', + table: 'public.orders', + rawPath: 'tables/public.orders.json', + usage: { + narrative: 'Orders are repeatedly queried by lifecycle status.', + frequencyTier: 'high', + commonFilters: ['status'], + commonJoins: [], + staleSince: null, + }, + }, + { toolCallId: 'historic-sql-evidence' }, + ); + if (!String(result).includes('Recorded historic-SQL table_usage evidence')) { + throw new Error(`Unexpected historic-SQL evidence result: ${String(result)}`); + } + } + return { stopReason: 'natural' as const }; + }); + + constructor() { + super({ llmProvider: { getModel: () => ({}) as never } as never }); + } +} + +class HistoricSqlEvidenceTestAdapter implements SourceAdapter { + readonly source = 'historic-sql'; + readonly skillNames = ['historic_sql_table_digest']; + readonly reconcileSkillNames: string[] = []; + readonly triageSupported = false; + + detect(): Promise { + return Promise.resolve(true); + } + + chunk(_stagedDir: string, _diffSet?: DiffSet): Promise { + return Promise.resolve({ + workUnits: [ + { + unitKey: 'historic-sql-table-public-orders', + displayLabel: 'public.orders', + rawFiles: ['tables/public.orders.json'], + peerFileIndex: [], + dependencyPaths: ['manifest.json'], + notes: + 'Use historic_sql_table_digest. Read this table usage JSON and emit exactly one table_usage object with emit_historic_sql_evidence.', + }, + ], + }); + } +} + function makeLookerRuntimeClient() { const lookerModels = { models: [{ name: 'ecommerce', label: 'Ecommerce', explores: [{ name: 'orders', label: 'Orders' }] }], @@ -308,6 +374,90 @@ describe('canonical local ingest', () => { } }); + it('runs historic-SQL evidence projection through the local bundle post-processor', async () => { + const projectDir = join(tempDir, 'historic-sql-project'); + await initKtxProject({ projectDir, projectName: 'warehouse' }); + await writeFile( + join(projectDir, 'ktx.yaml'), + [ + 'project: warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + 'ingest:', + ' adapters:', + ' - historic-sql', + ' embeddings:', + ' backend: deterministic', + 'storage:', + ' state: sqlite', + ' search: sqlite-fts5', + ' git:', + ' auto_commit: false', + ' author: KTX Test ', + '', + ].join('\n'), + 'utf-8', + ); + const historicProject = await loadKtxProject({ projectDir }); + await historicProject.fileStore.writeFile( + 'semantic-layer/warehouse/_schema/public.yaml', + YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }), + 'KTX Test', + 'system@ktx.local', + 'Seed schema shard', + ); + + const sourceDir = join(tempDir, 'historic-sql-source'); + await mkdir(join(sourceDir, 'tables'), { recursive: true }); + await writeFile( + join(sourceDir, 'manifest.json'), + `${JSON.stringify( + { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 1, + touchedTableCount: 1, + parseFailures: 0, + warnings: [], + probeWarnings: [], + staleArchiveAfterDays: 90, + }, + null, + 2, + )}\n`, + 'utf-8', + ); + await writeFile(join(sourceDir, 'tables/public.orders.json'), '{"table":"public.orders"}\n', 'utf-8'); + await writeFile(join(sourceDir, 'patterns-input.json'), '{"templates":[]}\n', 'utf-8'); + const agentRunner = new HistoricSqlEvidenceAgentRunner(); + + const result = await runLocalIngest({ + project: historicProject, + adapters: [new HistoricSqlEvidenceTestAdapter()], + adapter: 'historic-sql', + connectionId: 'warehouse', + sourceDir, + jobId: 'historic-sql-local-projection', + agentRunner, + }); + + expect(result.result.failedWorkUnits).toEqual([]); + expect(result.report.body.postProcessor).toMatchObject({ + sourceKey: 'historic-sql', + status: 'success', + result: { tableUsageMerged: 1 }, + touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + }); + await expect(readFile(join(projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain( + 'Orders are repeatedly queried by lifecycle status.', + ); + }); + it('rejects direct Metabase scheduled pulls before requiring a local ingest LLM provider', async () => { const projectDir = join(tempDir, 'metabase-project'); await initKtxProject({ projectDir, projectName: 'warehouse' }); diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index f7c8be80..f8c7099e 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -2,6 +2,7 @@ import { mkdirSync } from 'node:fs'; import { join } from 'node:path'; import { fileURLToPath } from 'node:url'; import type { KtxLlmProvider } from '@ktx/llm'; +import type { Tool } from 'ai'; import YAML from 'yaml'; import type { AgentRunnerService } from '../agent/index.js'; import { AgentRunnerService as DefaultAgentRunnerService } from '../agent/index.js'; @@ -69,6 +70,8 @@ import { ContextCandidateCarryforwardService, CuratorPaginationService, } from './context-candidates/index.js'; +import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js'; +import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js'; import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js'; import { DiffSetService } from './diff-set.service.js'; import { IngestBundleRunner } from './ingest-bundle.runner.js'; @@ -427,10 +430,16 @@ class NoopKnowledgeEventPort implements KnowledgeEventPort { } class LocalIngestToolSet implements IngestToolsetLike { - constructor(private readonly tools: BaseTool[]) {} + constructor( + private readonly tools: BaseTool[], + private readonly sourceTools: Record = {}, + ) {} toAiSdkTools(context: ToolContext) { - return Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)])); + return { + ...Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)])), + ...this.sourceTools, + }; } } @@ -498,9 +507,19 @@ class LocalIngestToolsetFactory implements IngestToolsetFactoryPort { ]; } - createIngestWuToolset(_session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike { + createIngestWuToolset(session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike { + const sourceTools = + session.ingest?.sourceKey === 'historic-sql' + ? { + emit_historic_sql_evidence: createEmitHistoricSqlEvidenceTool({ + connectionId: session.connectionId, + session, + }), + } + : {}; return new LocalIngestToolSet( options?.includeContextEvidenceTools ? [...this.baseTools, ...this.contextTools] : this.baseTools, + sourceTools, ); } } @@ -656,6 +675,9 @@ export function createLocalBundleIngestRuntime( settings: { batchSize: 8, maxPasses: 8, stepBudgetPerPass: 60 }, logger, }), + postProcessors: { + 'historic-sql': new HistoricSqlProjectionPostProcessor(), + }, logger, }; diff --git a/packages/context/src/package-exports.test.ts b/packages/context/src/package-exports.test.ts index 25a3ddb1..588c6391 100644 --- a/packages/context/src/package-exports.test.ts +++ b/packages/context/src/package-exports.test.ts @@ -246,6 +246,7 @@ describe('@ktx/context package exports', () => { expect(ingest.historicSqlEvidenceEnvelopeSchema).toBeDefined(); expect(ingest.historicSqlEvidencePath).toBeTypeOf('function'); expect(ingest.createEmitHistoricSqlEvidenceTool).toBeTypeOf('function'); + expect(ingest.HistoricSqlProjectionPostProcessor).toBeTypeOf('function'); expect(ingest.SqliteContextEvidenceStore).toBeTypeOf('function'); expect(ingest.SqliteBundleIngestStore).toBeTypeOf('function'); expect(ingest.CuratorPaginationService).toBeTypeOf('function');