From 4ec5903aa5d710c8dd675ef9f902f23af52c43d0 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 20 May 2026 14:17:10 +0200 Subject: [PATCH] feat(ingest): adapter-owned finalization replaces post-processor escape hatch (#136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refine adapter-owned ingest finalization design after adversarial review iteration 1 * Refine adapter-owned ingest finalization design after adversarial review iteration 2 * Refine adapter-owned ingest finalization design after adversarial review iteration 3 * Implement adapter-owned ingest finalization v1 Moves finalization from runner-owned post-processors into typed SourceAdapter.finalize() contracts. Adds finalization report schema, scope derivation, override replay context, and migrates historic-SQL projection. Removes IngestBundlePostProcessorPort wiring and HistoricSqlProjectionPostProcessor. * feat(ingest): export finalization adapter contract types * test(ingest): exercise historic sql finalization locally * docs(plans): add adapter-owned finalization v1 closure plan * fix(setup): unblock clean Linux installs and add enabled_tables allowlist - Pin managed Python runtime to 3.13 via `uv venv --python 3.13` so installs don't pick the system 3.12 on Ubuntu 24.04 and fail at wheel install. - Sanitize NO_PROXY/no_proxy for the daemon child process — drop IPv6 CIDR entries that httpx rejects with InvalidURL (OrbStack injects these by default). - Add `enabled_tables` allowlist on warehouse connections (zod schema + live-database introspection filter) to scope ingest to specific tables. - Add `getting-started/troubleshooting-linux` docs page covering the Python 3.13 prerequisite, IPv6 proxy gotcha, and a minimal working recipe; link it from the quickstart troubleshooting table and the llms-docs map. - Make docs-site origin overridable via `KTX_DOCS_ORIGIN` so local builds can serve under host.docker.internal. * Move docs changes to specs repo * fix(cli): keep managed runtime python version private * Deduplicate enabled tables filtering --- packages/cli/src/ingest.test.ts | 28 +- packages/context/src/core/git.service.ts | 13 + .../historic-sql/historic-sql.adapter.ts | 29 +- .../local-ingest-acceptance.test.ts | 12 +- .../historic-sql/post-processor.test.ts | 74 ---- .../adapters/historic-sql/post-processor.ts | 41 --- .../adapters/historic-sql/projection.test.ts | 94 +++++ .../adapters/historic-sql/projection.ts | 55 +++ .../daemon-introspection.test.ts | 36 ++ .../live-database/daemon-introspection.ts | 5 +- .../src/ingest/finalization-scope.test.ts | 131 +++++++ .../context/src/ingest/finalization-scope.ts | 145 ++++++++ packages/context/src/ingest/index.ts | 4 +- ...ingest-bundle.runner.isolated-diff.test.ts | 212 +++++++++++ .../src/ingest/ingest-bundle.runner.test.ts | 261 +++++++++++--- .../src/ingest/ingest-bundle.runner.ts | 339 +++++++++++++++--- .../src/ingest/local-bundle-ingest.test.ts | 35 +- .../src/ingest/local-bundle-runtime.ts | 4 - packages/context/src/ingest/local-ingest.ts | 7 - .../context/src/ingest/memory-flow/schema.ts | 2 +- .../context/src/ingest/memory-flow/types.ts | 2 +- packages/context/src/ingest/ports.ts | 24 +- .../context/src/ingest/report-snapshot.ts | 30 ++ packages/context/src/ingest/reports.ts | 63 ++-- packages/context/src/ingest/types.ts | 36 ++ packages/context/src/package-exports.test.ts | 13 +- .../context/src/project/driver-schemas.ts | 6 + packages/context/src/scan/enabled-tables.ts | 17 + packages/context/src/scan/local-scan.ts | 19 +- 29 files changed, 1423 insertions(+), 314 deletions(-) delete mode 100644 packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts delete mode 100644 packages/context/src/ingest/adapters/historic-sql/post-processor.ts create mode 100644 packages/context/src/ingest/finalization-scope.test.ts create mode 100644 packages/context/src/ingest/finalization-scope.ts create mode 100644 packages/context/src/scan/enabled-tables.ts diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index d3451e0c..03f04d8e 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -1020,9 +1020,16 @@ describe('runKtxIngest', () => { sourceKey: 'historic-sql', body: { workUnits: [], - postProcessor: { + finalization: { sourceKey: 'historic-sql', status: 'success', + commitSha: 'finalization-sha', + touchedPaths: ['semantic-layer/warehouse/_schema/public.yaml', 'wiki/global/historic-sql-orders.md'], + declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + declaredChangedWikiPageKeys: ['historic-sql-orders'], + derivedChangedWikiPageKeys: ['historic-sql-orders'], + mismatches: [], result: { tableUsageMerged: 56, staleTablesMarked: 1, @@ -1032,7 +1039,24 @@ describe('runKtxIngest', () => { }, errors: [], warnings: [], - touchedSources: [], + actions: [ + ...Array.from({ length: 57 }, (_, index) => ({ + target: 'sl' as const, + type: 'updated' as const, + key: `orders-${index}`, + detail: 'Merged usage', + targetConnectionId: 'warehouse', + rawPaths: ['tables/public/orders.json'], + })), + ...Array.from({ length: 35 }, (_, index) => ({ + target: 'wiki' as const, + type: 'updated' as const, + key: `historic-sql-orders-${index}`, + detail: 'Projected pattern', + rawPaths: ['patterns/orders.json'], + })), + ], + provenanceExclusions: [], }, }, }), diff --git a/packages/context/src/core/git.service.ts b/packages/context/src/core/git.service.ts index a3e0c133..216183ff 100644 --- a/packages/context/src/core/git.service.ts +++ b/packages/context/src/core/git.service.ts @@ -533,6 +533,19 @@ export class GitService { return out; } + async changedPaths(): Promise { + const raw = await this.git.raw(['status', '--porcelain=v1', '-z']); + const fields = raw.split('\0').filter(Boolean); + const paths: string[] = []; + for (const field of fields) { + const path = field.slice(3); + if (path.length > 0) { + paths.push(path); + } + } + return [...new Set(paths)].sort(); + } + /** * List all paths under the working tree that match `pathSpec`, scoped to HEAD. * Used for the reconciler's first-ever run when there's no watermark to diff from. diff --git a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts index be2fc9f0..b72adb9e 100644 --- a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts +++ b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts @@ -1,6 +1,15 @@ -import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter } from '../../types.js'; +import type { + ChunkResult, + DeterministicFinalizationContext, + DiffSet, + FetchContext, + FinalizationResult, + ScopeDescriptor, + SourceAdapter, +} from '../../types.js'; import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js'; import { detectHistoricSqlStagedDir } from './detect.js'; +import { projectHistoricSqlEvidence } from './projection.js'; import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js'; import { type HistoricSqlSourceAdapterDeps } from './types.js'; @@ -35,4 +44,22 @@ export class HistoricSqlSourceAdapter implements SourceAdapter { describeScope(stagedDir: string): Promise { return describeHistoricSqlUnifiedScope(stagedDir); } + + async finalize(ctx: DeterministicFinalizationContext): Promise { + const projection = await projectHistoricSqlEvidence({ + workdir: ctx.workdir, + connectionId: ctx.connectionId, + syncId: ctx.syncId, + runId: ctx.runId, + overrideReplay: ctx.overrideReplay, + }); + return { + result: projection, + warnings: projection.warnings, + errors: [], + touchedSources: projection.touchedSources, + changedWikiPageKeys: projection.changedWikiPageKeys, + actions: projection.actions, + }; + } } diff --git a/packages/context/src/ingest/adapters/historic-sql/local-ingest-acceptance.test.ts b/packages/context/src/ingest/adapters/historic-sql/local-ingest-acceptance.test.ts index 19bcd6be..971a05ec 100644 --- a/packages/context/src/ingest/adapters/historic-sql/local-ingest-acceptance.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/local-ingest-acceptance.test.ts @@ -242,12 +242,12 @@ describe('historic-SQL local ingest retrieval acceptance', () => { expect(result.result.failedWorkUnits).toEqual([]); expect(result.result.workUnitCount).toBe(3); expect(agentRunner.runLoop).toHaveBeenCalledTimes(3); - const postProcessor = result.report.body.postProcessor; - expect(postProcessor).toBeDefined(); - if (!postProcessor) { - throw new Error('Expected historic-SQL post-processor result'); + const finalization = result.report.body.finalization; + expect(finalization).toBeDefined(); + if (!finalization) { + throw new Error('Expected historic-SQL finalization result'); } - expect(postProcessor).toMatchObject({ + expect(finalization).toMatchObject({ sourceKey: 'historic-sql', status: 'success', result: { @@ -255,7 +255,7 @@ describe('historic-SQL local ingest retrieval acceptance', () => { patternPagesWritten: 1, }, }); - expect(postProcessor.touchedSources).toEqual( + expect(finalization.declaredTouchedSources).toEqual( expect.arrayContaining([ { connectionId: 'warehouse', sourceName: 'customers' }, { connectionId: 'warehouse', sourceName: 'orders' }, diff --git a/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts b/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts deleted file mode 100644 index c96461c1..00000000 --- a/packages/context/src/ingest/adapters/historic-sql/post-processor.test.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; -import YAML from 'yaml'; -import { describe, expect, it } from 'vitest'; -import { HistoricSqlProjectionPostProcessor } from './post-processor.js'; - -async function tempWorkdir(): Promise { - return mkdtemp(join(tmpdir(), 'historic-sql-post-processor-')); -} - -async function writeJson(root: string, relPath: string, value: unknown): Promise { - const target = join(root, relPath); - await mkdir(join(target, '..'), { recursive: true }); - await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); -} - -describe('HistoricSqlProjectionPostProcessor', () => { - it('projects current run evidence before the ingest squash commit', async () => { - const workdir = await tempWorkdir(); - await mkdir(join(workdir, 'semantic-layer/warehouse/_schema'), { recursive: true }); - await writeFile( - join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), - YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }), - 'utf-8', - ); - await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', { - source: 'historic-sql', - connectionId: 'warehouse', - dialect: 'postgres', - fetchedAt: '2026-05-11T00:00:00.000Z', - windowStart: '2026-02-10T00:00:00.000Z', - windowEnd: '2026-05-11T00:00:00.000Z', - snapshotRowCount: 1, - touchedTableCount: 1, - parseFailures: 0, - warnings: [], - probeWarnings: [], - staleArchiveAfterDays: 90, - }); - await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' }); - await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', { - kind: 'table_usage', - connectionId: 'warehouse', - table: 'public.orders', - rawPath: 'tables/public.orders.json', - usage: { - narrative: 'Orders are repeatedly queried by lifecycle status.', - frequencyTier: 'high', - commonFilters: ['status'], - commonJoins: [], - staleSince: null, - }, - }); - - const result = await new HistoricSqlProjectionPostProcessor().run({ - connectionId: 'warehouse', - sourceKey: 'historic-sql', - syncId: 'sync-1', - jobId: 'job-1', - runId: 'run-1', - workdir, - parseArtifacts: null, - }); - - expect(result.errors).toEqual([]); - expect(result.warnings).toEqual([]); - expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]); - expect(result.result).toMatchObject({ tableUsageMerged: 1 }); - await expect(readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain( - 'Orders are repeatedly queried by lifecycle status.', - ); - }); -}); diff --git a/packages/context/src/ingest/adapters/historic-sql/post-processor.ts b/packages/context/src/ingest/adapters/historic-sql/post-processor.ts deleted file mode 100644 index f5e0aaec..00000000 --- a/packages/context/src/ingest/adapters/historic-sql/post-processor.ts +++ /dev/null @@ -1,41 +0,0 @@ -import type { IngestBundlePostProcessorInput, IngestBundlePostProcessorPort, IngestBundlePostProcessorResult } from '../../ports.js'; -import { createSimpleGit } from '../../../core/git-env.js'; -import { projectHistoricSqlEvidence } from './projection.js'; - -async function commitProjectionChanges(workdir: string): Promise { - const git = createSimpleGit(workdir); - if (!(await git.checkIsRepo().catch(() => false))) { - return; - } - const status = await git.status(); - const paths = status.files - .map((file) => file.path) - .filter((path) => path.startsWith('semantic-layer/') || path.startsWith('wiki/global/historic-sql')); - if (paths.length === 0) { - return; - } - await git.add(paths); - const staged = await git.diff(['--cached', '--name-only']); - if (!staged.trim()) { - return; - } - await git.commit('Project historic SQL evidence', { '--author': 'System User ' }); -} - -export class HistoricSqlProjectionPostProcessor implements IngestBundlePostProcessorPort { - async run(input: IngestBundlePostProcessorInput): Promise { - const projection = await projectHistoricSqlEvidence({ - workdir: input.workdir, - connectionId: input.connectionId, - syncId: input.syncId, - runId: input.runId, - }); - await commitProjectionChanges(input.workdir); - return { - result: projection, - warnings: projection.warnings, - errors: [], - touchedSources: projection.touchedSources, - }; - } -} diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.test.ts b/packages/context/src/ingest/adapters/historic-sql/projection.test.ts index 0b3c5604..28ddf5f8 100644 --- a/packages/context/src/ingest/adapters/historic-sql/projection.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/projection.test.ts @@ -74,6 +74,15 @@ describe('projectHistoricSqlEvidence', () => { const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' }); expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]); + expect(result.actions).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + target: 'sl', + key: 'orders', + rawPaths: ['tables/public.orders.json'], + }), + ]), + ); const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')); expect(shard.tables.orders.usage).toEqual({ ownerNote: 'keep me', @@ -164,6 +173,16 @@ describe('projectHistoricSqlEvidence', () => { const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' }); expect(result.patternPagesWritten).toBe(1); + expect(result.changedWikiPageKeys).toContain('historic-sql-old-order-lifecycle'); + expect(result.actions).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + target: 'wiki', + key: 'historic-sql-old-order-lifecycle', + rawPaths: ['patterns-input.json'], + }), + ]), + ); await expect(readFile(join(workdir, 'wiki/global/historic-sql-old-order-lifecycle.md'), 'utf-8')).resolves.toContain( 'Order Lifecycle Analysis', ); @@ -320,6 +339,19 @@ describe('projectHistoricSqlEvidence', () => { probeWarnings: [], staleArchiveAfterDays: 90, }); + await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/customers.json', { + kind: 'table_usage', + connectionId: 'warehouse', + table: 'public.customers', + rawPath: 'tables/public.customers.json', + usage: { + narrative: 'Customers were queried.', + frequencyTier: 'low', + commonFilters: [], + commonJoins: [], + staleSince: null, + }, + }); await writeText( workdir, 'wiki/global/historic-sql-old-template.md', @@ -346,6 +378,9 @@ describe('projectHistoricSqlEvidence', () => { expect(result.staleTablesMarked).toBe(1); expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]); + const staleAction = result.actions.find((action) => action.target === 'sl' && action.key === 'orders'); + expect(staleAction).toEqual(expect.objectContaining({ target: 'sl', key: 'orders' })); + expect(staleAction?.rawPaths).toBeUndefined(); const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')); expect(shard.tables.orders.usage).toEqual({ ownerNote: 'keep analyst annotation', @@ -360,4 +395,63 @@ describe('projectHistoricSqlEvidence', () => { 'Old body', ); }); + + it('does not mark stale or archive pages when override replay has no current-run evidence', async () => { + const workdir = await tempWorkdir(); + await writeText( + workdir, + 'semantic-layer/warehouse/_schema/public.yaml', + YAML.stringify({ + tables: { + orders: { + table: 'public.orders', + usage: { + narrative: 'Orders were active before.', + frequencyTier: 'high', + commonFilters: ['status'], + commonGroupBys: ['status'], + commonJoins: [], + }, + columns: [{ name: 'id', type: 'string' }], + }, + }, + }), + ); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/override-sync/manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 0, + touchedTableCount: 0, + parseFailures: 0, + warnings: [], + probeWarnings: [], + staleArchiveAfterDays: 90, + }); + + const result = await projectHistoricSqlEvidence({ + workdir, + connectionId: 'warehouse', + syncId: 'override-sync', + runId: 'override-run', + overrideReplay: { + priorJobId: 'prior-job', + priorRunId: 'prior-run', + priorSyncId: 'prior-sync', + evictionRawPaths: ['tables/public/orders.json'], + }, + }); + + expect(result.tableUsageMerged).toBe(0); + expect(result.staleTablesMarked).toBe(0); + expect(result.patternPagesWritten).toBe(0); + expect(result.stalePatternPagesMarked).toBe(0); + expect(result.archivedPatternPages).toBe(0); + expect(result.touchedSources).toEqual([]); + expect(result.changedWikiPageKeys).toEqual([]); + expect(result.actions).toEqual([]); + }); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.ts b/packages/context/src/ingest/adapters/historic-sql/projection.ts index 36a7be19..56b1e360 100644 --- a/packages/context/src/ingest/adapters/historic-sql/projection.ts +++ b/packages/context/src/ingest/adapters/historic-sql/projection.ts @@ -1,7 +1,9 @@ import { access, mkdir, readdir, readFile, rename, writeFile } from 'node:fs/promises'; import { dirname, join, relative } from 'node:path'; import YAML from 'yaml'; +import type { MemoryAction } from '../../../memory/index.js'; import { rawSourcesDirForSync } from '../../raw-sources-paths.js'; +import type { FinalizationOverrideReplay } from '../../types.js'; import { mergeUsagePreservingExternal } from '../live-database/manifest.js'; import { historicSqlEvidenceEnvelopeSchema, type HistoricSqlEvidenceEnvelope } from './evidence.js'; import type { TableUsageOutput } from './skill-schemas.js'; @@ -12,6 +14,7 @@ export interface HistoricSqlProjectionInput { connectionId: string; syncId: string; runId: string; + overrideReplay?: FinalizationOverrideReplay; } export interface HistoricSqlProjectionResult { @@ -21,6 +24,8 @@ export interface HistoricSqlProjectionResult { stalePatternPagesMarked: number; archivedPatternPages: number; touchedSources: Array<{ connectionId: string; sourceName: string }>; + changedWikiPageKeys: string[]; + actions: MemoryAction[]; warnings: string[]; } @@ -223,6 +228,8 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp stalePatternPagesMarked: 0, archivedPatternPages: 0, touchedSources: [], + changedWikiPageKeys: [], + actions: [], warnings: [], }; const touchedKeys = new Set(); @@ -230,6 +237,16 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp const manifest = stagedManifestSchema.parse(await readJson(join(rawDir, 'manifest.json'))); const currentTables = await currentStagedTables(rawDir); const evidence = await loadEvidence(input.workdir, input.runId); + if (input.overrideReplay && evidence.length === 0) { + result.warnings.push( + 'historic-sql finalization skipped stale/archive cleanup during override replay without current-run evidence', + ); + return result; + } + if (evidence.length === 0) { + result.warnings.push('historic-sql finalization skipped because no current-run evidence was emitted'); + return result; + } const tableEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'table_usage' } => entry.kind === 'table_usage'); const patternEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'pattern' } => entry.kind === 'pattern'); @@ -255,6 +272,14 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp touchedKeys.add(key); result.touchedSources.push({ connectionId: input.connectionId, sourceName }); } + result.actions.push({ + target: 'sl', + type: 'updated', + key: sourceName, + targetConnectionId: input.connectionId, + detail: `Merged historic-SQL usage for ${matchingEvidence.table}`, + rawPaths: [matchingEvidence.rawPath], + }); } } else if (entry.usage && !currentTables.has(tableRef)) { const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, staleUsage(manifest.fetchedAt)); @@ -267,6 +292,13 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp touchedKeys.add(key); result.touchedSources.push({ connectionId: input.connectionId, sourceName }); } + result.actions.push({ + target: 'sl', + type: 'updated', + key: sourceName, + targetConnectionId: input.connectionId, + detail: `Marked historic-SQL usage stale for ${tableRef}`, + }); } } } @@ -303,6 +335,14 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp await writeFile(pagePath, renderMarkdownPage(frontmatter, renderPatternMarkdown(pattern)), 'utf-8'); writtenKeys.add(key); result.patternPagesWritten += 1; + result.changedWikiPageKeys.push(key); + result.actions.push({ + target: 'wiki', + type: reusable ? 'updated' : 'created', + key, + detail: `Projected historic-SQL pattern ${pattern.pattern.title}`, + rawPaths: [pattern.rawPath], + }); } for (const page of patternPages) { @@ -315,6 +355,13 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp 'utf-8', ); result.archivedPatternPages += 1; + result.changedWikiPageKeys.push(page.key); + result.actions.push({ + target: 'wiki', + type: 'updated', + key: page.key, + detail: `Archived stale historic-SQL pattern page ${page.key}`, + }); continue; } const tags = [...new Set([...stringArray(page.frontmatter.tags), 'stale'])]; @@ -324,7 +371,15 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp 'utf-8', ); result.stalePatternPagesMarked += 1; + result.changedWikiPageKeys.push(page.key); + result.actions.push({ + target: 'wiki', + type: 'updated', + key: page.key, + detail: `Marked historic-SQL pattern page ${page.key} stale`, + }); } + result.changedWikiPageKeys = [...new Set(result.changedWikiPageKeys)].sort(); return result; } diff --git a/packages/context/src/ingest/adapters/live-database/daemon-introspection.test.ts b/packages/context/src/ingest/adapters/live-database/daemon-introspection.test.ts index 93a9739d..8237d903 100644 --- a/packages/context/src/ingest/adapters/live-database/daemon-introspection.test.ts +++ b/packages/context/src/ingest/adapters/live-database/daemon-introspection.test.ts @@ -216,4 +216,40 @@ describe('createDaemonLiveDatabaseIntrospection', () => { ); expect(runJson).not.toHaveBeenCalled(); }); + + it('filters out tables not on the enabled_tables allowlist', async () => { + const runJson = vi.fn(async () => daemonResponse); + const introspection = createDaemonLiveDatabaseIntrospection({ + connections: { + warehouse: { + driver: 'postgres', + url: 'postgres://localhost:5432/warehouse', + enabled_tables: ['public.orders'], + }, + }, + schemas: ['public'], + runJson, + }); + + const snapshot = await introspection.extractSchema('warehouse'); + expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.orders']); + }); + + it('passes through every table when enabled_tables is omitted or empty', async () => { + const runJson = vi.fn(async () => daemonResponse); + const introspection = createDaemonLiveDatabaseIntrospection({ + connections: { + warehouse: { + driver: 'postgres', + url: 'postgres://localhost:5432/warehouse', + enabled_tables: [], + }, + }, + schemas: ['public'], + runJson, + }); + + const snapshot = await introspection.extractSchema('warehouse'); + expect(snapshot.tables.map((table) => table.name)).toEqual(['customers', 'orders']); + }); }); diff --git a/packages/context/src/ingest/adapters/live-database/daemon-introspection.ts b/packages/context/src/ingest/adapters/live-database/daemon-introspection.ts index 6c333385..d20b0adc 100644 --- a/packages/context/src/ingest/adapters/live-database/daemon-introspection.ts +++ b/packages/context/src/ingest/adapters/live-database/daemon-introspection.ts @@ -3,6 +3,7 @@ import { request as httpRequest } from 'node:http'; import { request as httpsRequest } from 'node:https'; import { URL } from 'node:url'; import type { KtxProjectConnectionConfig } from '../../../project/config.js'; +import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js'; import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js'; import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js'; import type { LiveDatabaseIntrospectionPort } from './types.js'; @@ -243,11 +244,13 @@ export function createDaemonLiveDatabaseIntrospection( const raw = requestJson ? await requestJson('/database/introspect', payload) : await runJson('database-introspect', payload); - return mapDaemonSnapshot(raw, { + const snapshot = mapDaemonSnapshot(raw, { connectionId, extractedAt: now().toISOString(), schemas, }); + const enabledTables = resolveEnabledTables(connection); + return enabledTables ? filterSnapshotTables(snapshot, enabledTables) : snapshot; }, }; } diff --git a/packages/context/src/ingest/finalization-scope.test.ts b/packages/context/src/ingest/finalization-scope.test.ts new file mode 100644 index 00000000..28d0b863 --- /dev/null +++ b/packages/context/src/ingest/finalization-scope.test.ts @@ -0,0 +1,131 @@ +import { describe, expect, it } from 'vitest'; +import { + compareFinalizationDeclarations, + deriveFinalizationTouchedSources, + deriveFinalizationWikiPageKeys, +} from './finalization-scope.js'; + +describe('deriveFinalizationWikiPageKeys', () => { + it('maps changed global wiki markdown paths to page keys', () => { + expect( + deriveFinalizationWikiPageKeys([ + 'wiki/global/historic-sql-orders.md', + 'wiki/global/nested/page.md', + 'README.md', + ]), + ).toEqual(['historic-sql-orders']); + }); +}); + +describe('deriveFinalizationTouchedSources', () => { + it('maps standalone semantic-layer files directly', async () => { + const result = await deriveFinalizationTouchedSources({ + changedPaths: ['semantic-layer/warehouse/orders.yaml'], + beforeSourcesByConnection: new Map(), + afterSourcesByConnection: new Map(), + }); + expect(result).toEqual({ + touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + unresolvedPaths: [], + }); + }); + + it('resolves aggregate _schema changes by comparing loaded source snapshots', async () => { + const beforeSourcesByConnection = new Map([ + [ + 'warehouse', + [ + { + name: 'orders', + grain: ['order_id'], + columns: [{ name: 'order_id', type: 'string' }], + joins: [], + measures: [], + usage: { + narrative: 'old', + frequencyTier: 'low' as const, + commonFilters: [], + commonJoins: [], + }, + }, + ], + ], + ]); + const afterSourcesByConnection = new Map([ + [ + 'warehouse', + [ + { + name: 'orders', + grain: ['order_id'], + columns: [{ name: 'order_id', type: 'string' }], + joins: [], + measures: [], + usage: { + narrative: 'new', + frequencyTier: 'high' as const, + commonFilters: [], + commonJoins: [], + }, + }, + ], + ], + ]); + + const result = await deriveFinalizationTouchedSources({ + changedPaths: ['semantic-layer/warehouse/_schema/public.yaml'], + beforeSourcesByConnection, + afterSourcesByConnection, + }); + + expect(result).toEqual({ + touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + unresolvedPaths: [], + }); + }); + + it('flags aggregate _schema changes that cannot be resolved to logical sources', async () => { + const beforeSourcesByConnection = new Map([['warehouse', []]]); + const afterSourcesByConnection = new Map([['warehouse', []]]); + + const result = await deriveFinalizationTouchedSources({ + changedPaths: ['semantic-layer/warehouse/_schema/public.yaml'], + beforeSourcesByConnection, + afterSourcesByConnection, + }); + + expect(result).toEqual({ + touchedSources: [], + unresolvedPaths: ['semantic-layer/warehouse/_schema/public.yaml'], + }); + }); +}); + +describe('compareFinalizationDeclarations', () => { + it('reports missing and extra adapter declarations', () => { + expect( + compareFinalizationDeclarations({ + declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'customers' }], + declaredChangedWikiPageKeys: ['orders'], + derivedChangedWikiPageKeys: ['orders', 'patterns'], + }), + ).toEqual([ + { + artifactKind: 'sl', + key: 'warehouse:customers', + direction: 'missing_from_adapter_declaration', + }, + { + artifactKind: 'sl', + key: 'warehouse:orders', + direction: 'extra_in_adapter_declaration', + }, + { + artifactKind: 'wiki', + key: 'patterns', + direction: 'missing_from_adapter_declaration', + }, + ]); + }); +}); diff --git a/packages/context/src/ingest/finalization-scope.ts b/packages/context/src/ingest/finalization-scope.ts new file mode 100644 index 00000000..6ecdc83e --- /dev/null +++ b/packages/context/src/ingest/finalization-scope.ts @@ -0,0 +1,145 @@ +import type { SemanticLayerSource } from '../sl/index.js'; +import type { TouchedSlSource } from '../tools/index.js'; +import type { IngestReportFinalizationMismatch } from './reports.js'; + +interface DeriveTouchedSourcesInput { + changedPaths: string[]; + beforeSourcesByConnection: Map; + afterSourcesByConnection: Map; +} + +interface DeriveTouchedSourcesResult { + touchedSources: TouchedSlSource[]; + unresolvedPaths: string[]; +} + +interface CompareFinalizationDeclarationsInput { + declaredTouchedSources: TouchedSlSource[]; + derivedTouchedSources: TouchedSlSource[]; + declaredChangedWikiPageKeys: string[]; + derivedChangedWikiPageKeys: string[]; +} + +function uniqueSorted(values: string[]): string[] { + return [...new Set(values.filter((value) => value.length > 0))].sort(); +} + +function touchedKey(source: TouchedSlSource): string { + return `${source.connectionId}:${source.sourceName}`; +} + +function stableJson(value: unknown): string { + if (Array.isArray(value)) { + return `[${value.map((entry) => stableJson(entry)).join(',')}]`; + } + if (value && typeof value === 'object') { + const record = value as Record; + return `{${Object.keys(record) + .sort() + .map((key) => `${JSON.stringify(key)}:${stableJson(record[key])}`) + .join(',')}}`; + } + return JSON.stringify(value); +} + +function changedSourceNames( + beforeSources: SemanticLayerSource[], + afterSources: SemanticLayerSource[], +): string[] { + const before = new Map(beforeSources.map((source) => [source.name, stableJson(source)])); + const after = new Map(afterSources.map((source) => [source.name, stableJson(source)])); + return uniqueSorted( + uniqueSorted([...before.keys(), ...after.keys()]).filter( + (sourceName) => before.get(sourceName) !== after.get(sourceName), + ), + ); +} + +export function deriveFinalizationWikiPageKeys(paths: string[]): string[] { + return uniqueSorted( + paths + .filter((path) => path.startsWith('wiki/global/') && path.endsWith('.md')) + .filter((path) => !path.slice('wiki/global/'.length, -'.md'.length).includes('/')) + .map((path) => path.slice('wiki/global/'.length, -'.md'.length)), + ); +} + +export async function deriveFinalizationTouchedSources( + input: DeriveTouchedSourcesInput, +): Promise { + const touched = new Map(); + const unresolvedPaths: string[] = []; + + for (const path of input.changedPaths) { + if (!path.startsWith('semantic-layer/') || !(path.endsWith('.yaml') || path.endsWith('.yml'))) { + continue; + } + const parts = path.split('/'); + const connectionId = parts[1] ?? ''; + if (!connectionId) { + unresolvedPaths.push(path); + continue; + } + if (parts[2] !== '_schema') { + const fileName = parts.at(-1) ?? ''; + const sourceName = fileName.replace(/\.ya?ml$/, ''); + if (!sourceName) { + unresolvedPaths.push(path); + continue; + } + touched.set(`${connectionId}:${sourceName}`, { connectionId, sourceName }); + continue; + } + + const changedNames = changedSourceNames( + input.beforeSourcesByConnection.get(connectionId) ?? [], + input.afterSourcesByConnection.get(connectionId) ?? [], + ); + if (changedNames.length === 0) { + unresolvedPaths.push(path); + continue; + } + for (const sourceName of changedNames) { + touched.set(`${connectionId}:${sourceName}`, { connectionId, sourceName }); + } + } + + return { + touchedSources: [...touched.values()].sort((left, right) => + touchedKey(left).localeCompare(touchedKey(right)), + ), + unresolvedPaths: uniqueSorted(unresolvedPaths), + }; +} + +export function compareFinalizationDeclarations( + input: CompareFinalizationDeclarationsInput, +): IngestReportFinalizationMismatch[] { + const mismatches: IngestReportFinalizationMismatch[] = []; + const declaredSl = new Set(input.declaredTouchedSources.map(touchedKey)); + const derivedSl = new Set(input.derivedTouchedSources.map(touchedKey)); + const declaredWiki = new Set(input.declaredChangedWikiPageKeys); + const derivedWiki = new Set(input.derivedChangedWikiPageKeys); + + for (const key of [...derivedSl].sort()) { + if (!declaredSl.has(key)) { + mismatches.push({ artifactKind: 'sl', key, direction: 'missing_from_adapter_declaration' }); + } + } + for (const key of [...declaredSl].sort()) { + if (!derivedSl.has(key)) { + mismatches.push({ artifactKind: 'sl', key, direction: 'extra_in_adapter_declaration' }); + } + } + for (const key of [...derivedWiki].sort()) { + if (!declaredWiki.has(key)) { + mismatches.push({ artifactKind: 'wiki', key, direction: 'missing_from_adapter_declaration' }); + } + } + for (const key of [...declaredWiki].sort()) { + if (!derivedWiki.has(key)) { + mismatches.push({ artifactKind: 'wiki', key, direction: 'extra_in_adapter_declaration' }); + } + } + return mismatches; +} diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index 450306dc..d8e9c856 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -349,7 +349,6 @@ export type { HistoricSqlTableUsageEvidence, } from './adapters/historic-sql/evidence.js'; export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js'; -export { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js'; export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js'; export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js'; export { @@ -664,5 +663,8 @@ export type { WorkUnit, DeterministicProjectionContext, ProjectionResult, + DeterministicFinalizationContext, + FinalizationOverrideReplay, + FinalizationResult, } from './types.js'; export * from './wiki-body-refs.js'; diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index f84d8fd1..fee60600 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -118,6 +118,35 @@ function makeWikiService(root: string) { content: content.trim(), }; }), + writePage: vi.fn( + async ( + _scope: string, + _scopeId: string | null, + key: string, + frontmatter: { summary?: string; usage_mode?: string; refs?: string[]; sl_refs?: string[] }, + content: string, + ) => { + await mkdir(join(root, 'wiki/global'), { recursive: true }); + const refs = (frontmatter.refs ?? []).map((ref) => ` - ${ref}`).join('\n'); + const slRefs = (frontmatter.sl_refs ?? []).map((ref) => ` - ${ref}`).join('\n'); + await writeFile( + join(root, 'wiki/global', `${key}.md`), + [ + '---', + `summary: ${frontmatter.summary ?? key}`, + `usage_mode: ${frontmatter.usage_mode ?? 'auto'}`, + 'refs:', + refs, + 'sl_refs:', + slRefs, + '---', + '', + content, + '', + ].join('\n'), + ); + }, + ), syncFromCommit: vi.fn(), }; } @@ -2160,4 +2189,187 @@ describe('IngestBundleRunner isolated diff path', () => { await rm(runtime.homeDir, { recursive: true, force: true }); } }); + it('runs finalization before wiki sl-ref repair and final gates', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + adapter.finalize = vi.fn(async ({ workdir }) => { + await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true }); + await mkdir(join(workdir, 'wiki/global'), { recursive: true }); + await writeFile( + join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'), + 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', + ); + await writeFile( + join(workdir, 'wiki/global/finalized-accounts.md'), + '---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n', + ); + return { + warnings: [], + errors: [], + touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }], + changedWikiPageKeys: ['finalized-accounts'], + actions: [ + { + target: 'sl', + type: 'created', + key: 'mart_account_segments', + detail: 'Finalized accounts', + targetConnectionId: 'warehouse', + rawPaths: ['cards/source.json'], + }, + { + target: 'wiki', + type: 'created', + key: 'finalized-accounts', + detail: 'Finalized wiki', + rawPaths: ['cards/source.json'], + }, + ], + }; + }); + deps.agentRunner.runLoop = vi.fn(async () => ({ stopReason: 'natural' as const })) as never; + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); + + await runner.run({ + jobId: 'job-finalization', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }); + + const trace = await readFile( + join(runtime.configDir, '.ktx/ingest-traces/job-finalization/trace.jsonl'), + 'utf-8', + ); + expect(trace.indexOf('finalization_committed')).toBeLessThan(trace.indexOf('wiki_sl_refs_repaired')); + expect(trace.indexOf('wiki_sl_refs_repaired')).toBeLessThan(trace.indexOf('final_artifact_gates')); + await expect(readFile(join(runtime.configDir, 'wiki/global/finalized-accounts.md'), 'utf-8')).resolves.toContain( + 'sl_refs:\n - mart_account_segments', + ); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + + it('fails when finalization edits a path already changed earlier in the run', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async () => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + await mkdir(join(root, 'wiki/global'), { recursive: true }); + await writeFile( + join(root, 'wiki/global/orders.md'), + '---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n', + ); + currentSession.actions.push({ + target: 'wiki', + type: 'created', + key: 'orders', + detail: 'WU orders', + rawPaths: ['cards/source.json'], + }); + await currentSession.gitService.commitFiles( + ['wiki/global/orders.md'], + 'wu orders', + 'KTX Test', + 'system@ktx.local', + ); + return { stopReason: 'natural' as const }; + }) as never; + adapter.finalize = vi.fn(async ({ workdir }) => { + await writeFile( + join(workdir, 'wiki/global/orders.md'), + '---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n', + ); + return { + warnings: [], + errors: [], + touchedSources: [], + changedWikiPageKeys: ['orders'], + actions: [{ target: 'wiki', type: 'updated', key: 'orders', detail: 'Conflicting finalization' }], + }; + }); + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); + + await expect( + runner.run({ + jobId: 'job-finalization-overlap', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + + it('rejects finalization writes to unauthorized semantic-layer targets', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ workUnits: [] }); + adapter.finalize = vi.fn(async ({ workdir }) => { + await mkdir(join(workdir, 'semantic-layer/other-warehouse'), { recursive: true }); + await writeFile( + join(workdir, 'semantic-layer/other-warehouse/orders.yaml'), + 'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n', + ); + return { + warnings: [], + errors: [], + touchedSources: [{ connectionId: 'other-warehouse', sourceName: 'orders' }], + changedWikiPageKeys: [], + actions: [ + { + target: 'sl', + type: 'created', + key: 'orders', + targetConnectionId: 'other-warehouse', + detail: 'Forbidden target', + rawPaths: ['cards/source.json'], + }, + ], + }; + }); + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); + + await expect( + runner.run({ + jobId: 'job-finalization-target-policy', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/semantic-layer target connection not allowed/); + const trace = await readFile( + join(runtime.configDir, '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl'), + 'utf-8', + ); + expect(trace).toContain('finalization_committed'); + expect(trace).toContain('semantic_layer_target_policy'); + expect(trace).toContain('ingest_failed'); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); }); diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index e3a95c5f..ae6f4c14 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -95,6 +95,7 @@ const makeDeps = () => { triageSupported: undefined as undefined | boolean, detect: vi.fn().mockResolvedValue(true), listTargetConnectionIds: undefined as undefined | ((stagedDir: string) => Promise), + finalize: undefined as any, chunk: vi.fn().mockResolvedValue({ workUnits: [{ unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] }], }), @@ -131,6 +132,7 @@ const makeDeps = () => { }), applyPatchFile3WayIndex: vi.fn(), diffNameStatus: vi.fn().mockResolvedValue([]), + changedPaths: vi.fn().mockResolvedValue([]), }; const sessionWorktreeService = { create: vi.fn().mockResolvedValue({ @@ -1574,27 +1576,69 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { ); }); - it('runs a registered post-processor before squash, records the outcome, and reindexes touched sources after squash', async () => { + it('runs adapter finalization before squash, records the outcome, and reindexes touched sources', async () => { const deps = makeDeps(); deps.adapter.source = 'metricflow'; deps.registry.get.mockReturnValue(deps.adapter); deps.adapter.chunk.mockResolvedValue({ - workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }], + workUnits: [], parseArtifacts: { semanticModels: [{ name: 'orders' }] }, }); deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']); + deps.adapter.finalize = vi.fn().mockResolvedValue({ + result: { sourcesTouched: 1 }, + warnings: ['kept going'], + errors: [], + touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }], + changedWikiPageKeys: [], + actions: [ + { + target: 'sl', + type: 'updated', + key: 'orders', + targetConnectionId: 'warehouse-2', + detail: 'Finalized orders usage', + rawPaths: ['semantic_models.yml'], + }, + ], + }); deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }), ); - const postProcessor = { - run: vi.fn().mockResolvedValue({ - result: { sourcesCreated: 1 }, - warnings: ['kept going'], - errors: [], - touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }], + let head = 'pre-finalization'; + const git = { + revParseHead: vi.fn(async () => head), + commitFiles: vi.fn().mockImplementation(async (paths: string[]) => { + if (paths.includes('semantic-layer/warehouse-2/orders.yaml')) { + head = 'post-finalization'; + return { created: true, commitHash: 'finalization-sha' }; + } + return { created: true, commitHash: head }; }), + commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'post-finalization' }), + resetHardTo: vi.fn(), + assertWorktreeClean: vi.fn().mockResolvedValue(undefined), + writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => { + await writeFile(patchPath, '', 'utf-8'); + }), + applyPatchFile3WayIndex: vi.fn(), + diffNameStatus: vi.fn().mockImplementation(async (from: string, to: string) => + from === 'pre-finalization' && to === 'post-finalization' + ? [{ status: 'M', path: 'semantic-layer/warehouse-2/orders.yaml' }] + : [], + ), + changedPaths: vi.fn().mockResolvedValue(['semantic-layer/warehouse-2/orders.yaml']), }; - const runner = buildRunner(deps, { postProcessors: { metricflow: postProcessor } }); + deps.sessionWorktreeService.create.mockResolvedValue({ + chatId: 'j1', + workdir: '/tmp/wt', + branch: 'session/j1', + baseSha: 'b', + createdAt: new Date(), + git, + config: {}, + }); + const runner = buildRunner(deps); (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ currentHashes: new Map([['semantic_models.yml', 'h1']]), rawDirInWorktree: 'raw-sources/c1/metricflow/s', @@ -1609,26 +1653,29 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { bundleRef: { kind: 'upload', uploadId: 'upload-x' }, }); - expect(postProcessor.run).toHaveBeenCalledWith({ - connectionId: 'c1', - sourceKey: 'metricflow', - syncId: expect.any(String), - jobId: 'j1', - runId: 'run-1', - workdir: '/tmp/wt', - parseArtifacts: { semanticModels: [{ name: 'orders' }] }, - }); + expect(deps.adapter.finalize).toHaveBeenCalledWith( + expect.objectContaining({ + connectionId: 'c1', + sourceKey: 'metricflow', + syncId: expect.any(String), + jobId: 'j1', + runId: 'run-1', + workdir: '/tmp/wt', + parseArtifacts: { semanticModels: [{ name: 'orders' }] }, + }), + ); expect(deps.reportsRepo.create).toHaveBeenCalledWith( expect.objectContaining({ body: expect.objectContaining({ - postProcessor: { + finalization: expect.objectContaining({ sourceKey: 'metricflow', status: 'success', - result: { sourcesCreated: 1 }, - warnings: ['kept going'], - errors: [], - touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }], - }, + commitSha: 'finalization-sha', + touchedPaths: ['semantic-layer/warehouse-2/orders.yaml'], + derivedTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }], + declaredTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }], + actions: [expect.objectContaining({ key: 'orders' })], + }), }), }), ); @@ -1637,7 +1684,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success'); }); - it('includes historic-sql post-processor output in memory-flow saved counts', async () => { + it('includes finalization actions in memory-flow saved counts', async () => { const deps = makeDeps(); deps.adapter.source = 'historic-sql'; deps.registry.get.mockReturnValue(deps.adapter); @@ -1651,21 +1698,19 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { }, ], }); - const postProcessor = { - run: vi.fn().mockResolvedValue({ - result: { - tableUsageMerged: 2, - staleTablesMarked: 1, - patternPagesWritten: 3, - stalePatternPagesMarked: 1, - archivedPatternPages: 1, - }, - warnings: [], - errors: [], - touchedSources: [{ connectionId: 'c1', sourceName: 'orders' }], - }), - }; - const runner = buildRunner(deps, { postProcessors: { 'historic-sql': postProcessor } }); + deps.adapter.finalize = vi.fn().mockResolvedValue({ + warnings: [], + errors: [], + touchedSources: [], + changedWikiPageKeys: [], + actions: [ + { target: 'sl', type: 'updated', key: 'orders', detail: 'Merged usage' }, + { target: 'sl', type: 'updated', key: 'customers', detail: 'Merged usage' }, + { target: 'wiki', type: 'created', key: 'historic-sql-orders', detail: 'Projected pattern' }, + { target: 'wiki', type: 'updated', key: 'historic-sql-customers', detail: 'Projected pattern' }, + ], + }); + const runner = buildRunner(deps); (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ currentHashes: new Map([['tables/public/orders.json', 'h1']]), rawDirInWorktree: 'raw-sources/c1/historic-sql/s', @@ -1691,13 +1736,13 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(memoryFlow.snapshot().events).toContainEqual( expect.objectContaining({ type: 'saved', - wikiCount: 5, - slCount: 3, + wikiCount: 2, + slCount: 2, }), ); }); - it('marks post-processor infrastructure failure as failed and preserves worktree cleanup state', async () => { + it('marks finalization infrastructure failure as failed and preserves worktree cleanup state', async () => { const deps = makeDeps(); deps.adapter.source = 'metricflow'; deps.registry.get.mockReturnValue(deps.adapter); @@ -1705,8 +1750,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }], parseArtifacts: { semanticModels: [{ name: 'orders' }] }, }); - const postProcessor = { run: vi.fn().mockRejectedValue(new Error('worktree write failed')) }; - const runner = buildRunner(deps, { postProcessors: { metricflow: postProcessor } }); + deps.adapter.finalize = vi.fn().mockRejectedValue(new Error('worktree write failed')); + const runner = buildRunner(deps); (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ currentHashes: new Map([['semantic_models.yml', 'h1']]), rawDirInWorktree: 'raw-sources/c1/metricflow/s', @@ -1728,6 +1773,132 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'crash'); }); + it('reports finalization actions excluded from provenance when raw paths are not defensible', async () => { + const deps = makeDeps(); + deps.adapter.finalize = vi.fn().mockResolvedValue({ + warnings: [], + errors: [], + touchedSources: [], + changedWikiPageKeys: [], + actions: [ + { target: 'wiki', type: 'updated', key: 'historic-sql-pattern', detail: 'No raw path' }, + { target: 'sl', type: 'updated', key: 'orders', detail: 'Invalid raw path', rawPaths: ['missing.json'] }, + ], + }); + const runner = buildRunner(deps); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['current.json', 'h1']]), + rawDirInWorktree: 'raw-sources/c1/fake/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + + await runner.run({ + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }); + + expect(deps.reportsRepo.create).toHaveBeenCalledWith( + expect.objectContaining({ + body: expect.objectContaining({ + finalization: expect.objectContaining({ + provenanceExclusions: [ + expect.objectContaining({ reason: 'missing_raw_paths' }), + expect.objectContaining({ reason: 'raw_path_not_defensible', invalidRawPaths: ['missing.json'] }), + ], + }), + }), + }), + ); + expect(deps.provenanceRepo.insertMany).not.toHaveBeenCalledWith( + expect.arrayContaining([expect.objectContaining({ rawPath: 'missing.json' })]), + ); + }); + + it('passes explicit override replay metadata and no current work unit outcomes', async () => { + const deps = makeDeps(); + deps.reportsRepo.findByJobId.mockResolvedValue({ + id: 'prior-report', + runId: 'prior-run', + jobId: 'prior-job', + connectionId: 'c1', + sourceKey: 'fake', + createdAt: '2026-05-18T00:00:00.000Z', + body: { + status: 'completed', + syncId: 'prior-sync', + diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 }, + commitSha: 'prior-sha', + workUnits: [ + { + unitKey: 'prior-unit', + rawFiles: ['prior.json'], + status: 'success', + actions: [{ target: 'wiki', type: 'created', key: 'prior', detail: 'prior' }], + touchedSlSources: [], + }, + ], + failedWorkUnits: [], + reconciliationSkipped: false, + conflictsResolved: [], + evictionsApplied: [ + { + rawPath: 'do-not-replay.json', + artifactKind: 'wiki', + artifactKey: 'old', + action: 'removed', + reason: 'prior', + }, + ], + unmappedFallbacks: [], + artifactResolutions: [], + evictionInputs: ['evicted-from-prior-report.json'], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: [], + }, + }); + deps.adapter.finalize = vi.fn().mockResolvedValue({ + warnings: [], + errors: [], + touchedSources: [], + changedWikiPageKeys: [], + actions: [], + }); + deps.gitService.listFilesAtHead.mockResolvedValue(['raw-sources/c1/fake/prior-sync/prior.json']); + deps.gitService.getFileAtCommit.mockResolvedValue('{"id":1}\n'); + const runner = buildRunner(deps); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['prior.json', 'h1']]), + rawDirInWorktree: 'raw-sources/c1/fake/prior-sync', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/prior'); + + await runner.run({ + jobId: 'override-job', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'manual_override', + bundleRef: { kind: 'override', priorJobId: 'prior-job' }, + }); + + expect(deps.adapter.finalize).toHaveBeenCalledWith( + expect.objectContaining({ + workUnitOutcomes: [], + overrideReplay: { + priorJobId: 'prior-job', + priorRunId: 'prior-run', + priorSyncId: 'prior-sync', + evictionRawPaths: ['evicted-from-prior-report.json'], + }, + }), + ); + }); + it('includes existing global wiki pages in WorkUnit prompts', async () => { const deps = makeDeps(); deps.knowledgeIndex.listPagesForUser.mockResolvedValue([ diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index a390ef08..43a2b251 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -14,6 +14,11 @@ import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/ import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js'; import { selectRelevantCanonicalPins } from './canonical-pins.js'; import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js'; +import { + compareFinalizationDeclarations, + deriveFinalizationTouchedSources, + deriveFinalizationWikiPageKeys, +} from './finalization-scope.js'; import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js'; import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js'; import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js'; @@ -33,8 +38,8 @@ import type { import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js'; import { buildStageIndexFromReportBody, - postProcessorSavedMemoryCounts, - type IngestReportPostProcessorOutcome, + type IngestReportFinalizationProvenanceExclusion, + type IngestReportFinalizationOutcome, type IngestReportProvenanceDetail, type IngestReportSnapshot, type IngestReportWorkUnit, @@ -174,6 +179,11 @@ type ProvenanceRowOrigin = actionIndex: number; action: MemoryAction; } + | { + source: 'finalization_action'; + actionIndex: number; + action: MemoryAction; + } | { source: 'artifact_resolution'; resolutionIndex: number; @@ -411,6 +421,19 @@ export class IngestBundleRunner { return false; } + private async loadSourcesByConnection( + workdir: string, + connectionIds: string[], + ): Promise> { + const service = this.deps.semanticLayerService.forWorktree(workdir); + const result = new Map(); + for (const connectionId of connectionIds) { + const { sources } = await service.loadAllSources(connectionId); + result.set(connectionId, sources); + } + return result; + } + private resolveContextCuratorBudget( bundleRef: IngestBundleJob['bundleRef'], stageIndex: StageIndex, @@ -466,6 +489,7 @@ export class IngestBundleRunner { currentHashes: Map; stageIndex: StageIndex; reconcileActions: MemoryAction[]; + finalizationActions: MemoryAction[]; }): ProvenancePlan { const rows: IngestProvenanceInsert[] = []; const diagnostics: ProvenanceRowDiagnostic[] = []; @@ -523,6 +547,15 @@ export class IngestBundleRunner { }); } }); + input.finalizationActions.forEach((action, actionIndex) => { + for (const rawPath of action.rawPaths ?? []) { + pushActionProvenance(rawPath, action, { + source: 'finalization_action', + actionIndex, + action, + }); + } + }); (input.stageIndex.artifactResolutions ?? []).forEach((resolution, resolutionIndex) => { const hash = input.currentHashes.get(resolution.rawPath) ?? ''; pushRow( @@ -569,6 +602,35 @@ export class IngestBundleRunner { return { rows, diagnostics }; } + private partitionFinalizationActionsForProvenance(input: { + actions: MemoryAction[]; + currentRawPaths: Set; + currentEvictionRawPaths: Set; + overrideEvictionRawPaths: Set; + }): { actions: MemoryAction[]; exclusions: IngestReportFinalizationProvenanceExclusion[] } { + const defensible = new Set([ + ...input.currentRawPaths, + ...input.currentEvictionRawPaths, + ...input.overrideEvictionRawPaths, + ]); + const actions: MemoryAction[] = []; + const exclusions: IngestReportFinalizationProvenanceExclusion[] = []; + for (const action of input.actions) { + const rawPaths = action.rawPaths ?? []; + if (rawPaths.length === 0) { + exclusions.push({ action, reason: 'missing_raw_paths' }); + continue; + } + const invalidRawPaths = rawPaths.filter((rawPath) => !defensible.has(rawPath)).sort(); + if (invalidRawPaths.length > 0) { + exclusions.push({ action, reason: 'raw_path_not_defensible', invalidRawPaths }); + continue; + } + actions.push(action); + } + return { actions, exclusions }; + } + private toReportProvenanceRows(rows: IngestProvenanceInsert[]): IngestReportProvenanceDetail[] { return rows.map(({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({ rawPath, @@ -951,6 +1013,7 @@ export class IngestBundleRunner { let latestEvictionInputs: string[] = []; let latestUnresolvedCards: UnresolvedCardInfo[] = []; let latestReportProvenanceRows: IngestReportProvenanceDetail[] = []; + let latestFinalizationOutcome: IngestReportFinalizationOutcome | undefined; let activeFailureDetails: Record | undefined; let latestIsolatedDiffSummary: | { @@ -1174,7 +1237,7 @@ export class IngestBundleRunner { let unresolvedCards: UnresolvedCardInfo[] | undefined; let sourceContextReport: { capped?: boolean; warnings?: string[] } | undefined; let parseArtifacts: unknown; - let postProcessorOutcome: IngestReportPostProcessorOutcome | undefined; + let finalizationOutcome: IngestReportFinalizationOutcome | undefined; let wikiSlRefRepairResult: WikiSlRefRepairResult | null = null; let reconcileNotes: string[] = []; let triageResult: PageTriageRunResult | null = null; @@ -1954,62 +2017,215 @@ export class IngestBundleRunner { await stage4?.updateProgress(1.0, reconcileOutcome.skipped ? 'No reconciliation needed' : 'Reconciled'); - const postProcessor = this.deps.postProcessors?.[job.sourceKey]; - activePhase = 'post_processor'; - if (postProcessor) { - const stagePostProcessor = ctx?.startPhase(0.04); - emitStageProgress('post_processor', 87, 'Running deterministic imports'); - await stagePostProcessor?.updateProgress(0.0, 'Running deterministic imports'); - try { - const result = await traceTimed( - runTrace, - 'post_processor', - 'post_processor', - { sourceKey: job.sourceKey }, - () => - postProcessor.run({ - connectionId: job.connectionId, - sourceKey: job.sourceKey, - syncId, - jobId: job.jobId, - runId: createdRunRow.id, - workdir: sessionWorktree.workdir, - parseArtifacts, - }), - ); - postProcessorOutcome = { + const preFinalizationSha = await sessionWorktree.git.revParseHead(); + const preFinalizationSourcesByConnection = await this.loadSourcesByConnection( + sessionWorktree.workdir, + slConnectionIds, + ); + let finalizationActions: MemoryAction[] = []; + let finalizationTouchedPaths: string[] = []; + let finalizationTouchedSources: TouchedSlSource[] = []; + let finalizationChangedWikiPageKeys: string[] = []; + let finalizationSha: string | null = null; + + activePhase = 'finalization'; + if (adapter.finalize) { + const stageFinalization = ctx?.startPhase(0.04); + emitStageProgress('finalization', 87, 'Running deterministic finalization'); + await stageFinalization?.updateProgress(0.0, 'Running deterministic finalization'); + await runTrace.event('debug', 'finalization', 'finalization_started', { sourceKey: job.sourceKey }); + const result = await adapter.finalize({ + connectionId: job.connectionId, + sourceKey: job.sourceKey, + syncId, + jobId: job.jobId, + runId: createdRunRow.id, + stagedDir, + workdir: sessionWorktree.workdir, + ...(overrideReport ? {} : { parseArtifacts }), + stageIndex, + workUnitOutcomes, + reconciliationActions: reconcileActions, + ...(overrideReport + ? { + overrideReplay: { + priorJobId: overrideReport.jobId, + priorRunId: overrideReport.runId, + priorSyncId: overrideReport.body.syncId, + evictionRawPaths: overrideReport.body.evictionInputs, + }, + } + : {}), + }); + if (result.errors.length > 0) { + finalizationOutcome = { sourceKey: job.sourceKey, - status: result.errors.length > 0 && result.touchedSources.length === 0 ? 'failed' : 'success', + status: 'failed', + commitSha: null, + touchedPaths: [], + declaredTouchedSources: result.touchedSources, + derivedTouchedSources: [], + declaredChangedWikiPageKeys: result.changedWikiPageKeys, + derivedChangedWikiPageKeys: [], + mismatches: [], result: result.result, errors: result.errors, warnings: result.warnings, - touchedSources: result.touchedSources, + actions: result.actions ?? [], + provenanceExclusions: [], }; - emitStageProgress('post_processor', 88, 'Deterministic imports complete'); - await stagePostProcessor?.updateProgress(1.0, 'Deterministic imports complete'); - } catch (error) { - postProcessorOutcome = { + latestFinalizationOutcome = finalizationOutcome; + await runTrace.event('error', 'finalization', 'finalization_failed', { + sourceKey: job.sourceKey, + errors: result.errors, + warnings: result.warnings, + }); + throw new Error(`deterministic finalization failed: ${result.errors.join('; ')}`); + } + + const changedBeforeFinalization = new Set([ + ...projectionTouchedPaths, + ...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []), + ...(preReconciliationSha && preFinalizationSha !== preReconciliationSha + ? (await sessionWorktree.git.diffNameStatus(preReconciliationSha, preFinalizationSha)).map( + (entry) => entry.path, + ) + : []), + ]); + finalizationTouchedPaths = await sessionWorktree.git.changedPaths(); + const overlapping = finalizationTouchedPaths.filter((path) => changedBeforeFinalization.has(path)); + if (overlapping.length > 0) { + await runTrace.event('error', 'finalization', 'finalization_failed', { + sourceKey: job.sourceKey, + reason: 'path_overlap', + overlappingPaths: overlapping.sort(), + }); + throw new Error( + `finalization modified path(s) already changed earlier in this run: ${overlapping.sort().join(', ')}`, + ); + } + + const finalizationCommit = + finalizationTouchedPaths.length > 0 + ? await sessionWorktree.git.commitFiles( + finalizationTouchedPaths, + `ingest(${job.sourceKey}): deterministic finalization syncId=${syncId}`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ) + : await sessionWorktree.git.commitStaged( + `ingest(${job.sourceKey}): deterministic finalization syncId=${syncId}`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ); + finalizationSha = finalizationCommit.created ? finalizationCommit.commitHash : null; + const postFinalizationSha = await sessionWorktree.git.revParseHead(); + finalizationTouchedPaths = + preFinalizationSha !== postFinalizationSha + ? (await sessionWorktree.git.diffNameStatus(preFinalizationSha, postFinalizationSha)).map( + (entry) => entry.path, + ) + : []; + + const changedConnectionIds = [ + ...new Set([ + ...slConnectionIds, + ...finalizationTouchedPaths + .filter((path) => path.startsWith('semantic-layer/')) + .map((path) => path.split('/')[1]) + .filter((connectionId): connectionId is string => Boolean(connectionId)), + ]), + ].sort(); + const postFinalizationSourcesByConnection = await this.loadSourcesByConnection( + sessionWorktree.workdir, + changedConnectionIds, + ); + const scope = await deriveFinalizationTouchedSources({ + changedPaths: finalizationTouchedPaths, + beforeSourcesByConnection: preFinalizationSourcesByConnection, + afterSourcesByConnection: postFinalizationSourcesByConnection, + }); + if (scope.unresolvedPaths.length > 0) { + await runTrace.event('error', 'finalization', 'finalization_failed', { + sourceKey: job.sourceKey, + reason: 'unresolved_semantic_layer_paths', + unresolvedPaths: scope.unresolvedPaths, + }); + throw new Error(`could not resolve finalization semantic-layer path(s): ${scope.unresolvedPaths.join(', ')}`); + } + finalizationTouchedSources = scope.touchedSources; + finalizationChangedWikiPageKeys = deriveFinalizationWikiPageKeys(finalizationTouchedPaths); + const mismatches = compareFinalizationDeclarations({ + declaredTouchedSources: result.touchedSources, + derivedTouchedSources: finalizationTouchedSources, + declaredChangedWikiPageKeys: result.changedWikiPageKeys, + derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys, + }); + if (mismatches.length > 0) { + finalizationOutcome = { sourceKey: job.sourceKey, status: 'failed', - errors: [error instanceof Error ? error.message : String(error)], - warnings: [], - touchedSources: [], + commitSha: finalizationSha, + touchedPaths: finalizationTouchedPaths, + declaredTouchedSources: result.touchedSources, + derivedTouchedSources: finalizationTouchedSources, + declaredChangedWikiPageKeys: result.changedWikiPageKeys, + derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys, + mismatches, + result: result.result, + errors: ['finalization touched artifact declaration mismatch'], + warnings: result.warnings, + actions: result.actions ?? [], + provenanceExclusions: [], }; - await this.deps.runs.markFailed(runRow.id); - throw error; + latestFinalizationOutcome = finalizationOutcome; + await runTrace.event('error', 'finalization', 'finalization_failed', { + sourceKey: job.sourceKey, + reason: 'declaration_mismatch', + mismatches, + }); + throw new Error( + `finalization touched artifact declaration mismatch: ${mismatches + .map((mismatch) => `${mismatch.direction}:${mismatch.artifactKind}:${mismatch.key}`) + .join(', ')}`, + ); } + finalizationActions = result.actions ?? []; + finalizationOutcome = { + sourceKey: job.sourceKey, + status: 'success', + commitSha: finalizationSha, + touchedPaths: finalizationTouchedPaths, + declaredTouchedSources: result.touchedSources, + derivedTouchedSources: finalizationTouchedSources, + declaredChangedWikiPageKeys: result.changedWikiPageKeys, + derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys, + mismatches, + result: result.result, + errors: [], + warnings: result.warnings, + actions: finalizationActions, + provenanceExclusions: [], + }; + latestFinalizationOutcome = finalizationOutcome; + emitStageProgress('finalization', 88, 'Deterministic finalization complete'); + await stageFinalization?.updateProgress(1.0, 'Deterministic finalization complete'); + await runTrace.event('debug', 'finalization', 'finalization_committed', { + sourceKey: job.sourceKey, + commitSha: finalizationSha, + touchedPaths: finalizationTouchedPaths, + touchedSources: finalizationTouchedSources, + changedWikiPageKeys: finalizationChangedWikiPageKeys, + warnings: result.warnings, + }); + } else { + await runTrace.event('debug', 'finalization', 'finalization_skipped', { sourceKey: job.sourceKey }); } - await runTrace.event('debug', 'post_processor', 'post_processor_finished', { - sourceKey: job.sourceKey, - status: postProcessorOutcome?.status ?? 'skipped', - touchedSources: postProcessorOutcome?.touchedSources ?? [], - warnings: postProcessorOutcome?.warnings ?? [], - }); const repairConnectionIds = [ ...new Set([ ...slConnectionIds, - ...(postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId), + ...finalizationTouchedSources.map((source) => source.connectionId), ]), ].sort(); activePhase = 'wiki_sl_ref_repair'; @@ -2044,6 +2260,7 @@ export class IngestBundleRunner { .flatMap((outcome) => outcome.patchTouchedPaths ?? []) .flatMap((path) => this.wikiPageKeysFromPaths([path])), ...this.wikiPageKeysFromActions(reconcileActions), + ...finalizationChangedWikiPageKeys, ...postReconciliationPaths.flatMap((path) => this.wikiPageKeysFromPaths([path])), ...wikiSlRefRepairResult.repairs.filter((repair) => repair.scope === 'GLOBAL').map((repair) => repair.pageKey), ]); @@ -2052,7 +2269,7 @@ export class IngestBundleRunner { ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources), ...this.touchedSlSourcesFromActions(reconcileActions, job.connectionId), ...this.touchedSlSourcesFromPaths(postReconciliationPaths), - ...(postProcessorOutcome?.touchedSources ?? []), + ...finalizationTouchedSources, ]); const finalWikiGateScope = await this.wikiPageKeysForFinalGates({ wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), @@ -2066,9 +2283,7 @@ export class IngestBundleRunner { ...projectionTouchedPaths, ...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []), ...postReconciliationPaths, - ...(postProcessorOutcome?.touchedSources ?? []).map( - (source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`, - ), + ...finalizationTouchedPaths, ]; const targetPolicyTraceData = { allowedTargetConnectionIds: slConnectionIds, @@ -2235,12 +2450,23 @@ export class IngestBundleRunner { latestArtifactResolutions = stageIndex.artifactResolutions ?? []; latestEvictionInputs = eviction?.deletedRawPaths ?? []; latestUnresolvedCards = unresolvedCards ?? []; + const finalizationProvenance = this.partitionFinalizationActionsForProvenance({ + actions: finalizationActions, + currentRawPaths: new Set(currentHashes.keys()), + currentEvictionRawPaths: new Set(stageIndex.evictionsApplied.map((entry) => entry.rawPath)), + overrideEvictionRawPaths: new Set(overrideReport?.body.evictionInputs ?? []), + }); + if (finalizationOutcome) { + finalizationOutcome.provenanceExclusions = finalizationProvenance.exclusions; + latestFinalizationOutcome = finalizationOutcome; + } const provenancePlan = this.buildProvenancePlan({ job, syncId, currentHashes, stageIndex, reconcileActions, + finalizationActions: finalizationProvenance.actions, }); const provenanceRows = provenancePlan.rows; const currentRawPaths = new Set(currentHashes.keys()); @@ -2300,13 +2526,15 @@ export class IngestBundleRunner { commitSha, touchedPaths: mergeResult.touchedPaths, }); - const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions); - const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome); + const memoryFlowSavedActions = stageIndex.workUnits + .flatMap((wu) => wu.actions) + .concat(reconcileActions) + .concat(finalizationActions); memoryFlow?.emit({ type: 'saved', commitSha, - wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki') + postProcessorMemoryCounts.wikiCount, - slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl') + postProcessorMemoryCounts.slCount, + wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki'), + slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl'), }); await stage6?.updateProgress(1.0, commitSha ? `Saved changes (${commitSha.slice(0, 8)})` : 'No changes to save'); @@ -2325,7 +2553,7 @@ export class IngestBundleRunner { memoryFlowSavedActions .filter((action) => action.target === 'sl') .map((action) => actionTargetConnectionId(action, job.connectionId)) - .concat((postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId)), + .concat(finalizationTouchedSources.map((source) => source.connectionId)), ), ].sort(); for (const connectionId of touchedConnections) { @@ -2416,7 +2644,7 @@ export class IngestBundleRunner { overrideOf: overrideReport?.jobId ?? null, provenanceRows: reportProvenanceRows, toolTranscripts: reportToolTranscripts, - postProcessor: postProcessorOutcome, + finalization: finalizationOutcome, wikiSlRefRepairs: wikiSlRefRepairResult.repairs, wikiSlRefRepairWarnings: wikiSlRefRepairResult.warnings, ...(reportMemoryFlow ? { memoryFlow: reportMemoryFlow } : {}), @@ -2585,6 +2813,7 @@ export class IngestBundleRunner { artifactResolutions: latestArtifactResolutions, evictionInputs: latestEvictionInputs, reconciliationActions: latestReconciliationActions, + finalization: latestFinalizationOutcome, evictionDecisions: [], unresolvedCards: latestUnresolvedCards, supersededBy: null, diff --git a/packages/context/src/ingest/local-bundle-ingest.test.ts b/packages/context/src/ingest/local-bundle-ingest.test.ts index faea9c2f..5a336f35 100644 --- a/packages/context/src/ingest/local-bundle-ingest.test.ts +++ b/packages/context/src/ingest/local-bundle-ingest.test.ts @@ -8,10 +8,17 @@ import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project import { makeLocalGitRepo } from '../test/make-local-git-repo.js'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; +import { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; import { createDefaultLocalIngestAdapters, localPullConfigForAdapter } from './local-adapters.js'; import { getLocalIngestStatus, runLocalIngest } from './local-ingest.js'; -import type { ChunkResult, DiffSet, SourceAdapter } from './types.js'; +import type { + ChunkResult, + DeterministicFinalizationContext, + DiffSet, + FinalizationResult, + SourceAdapter, +} from './types.js'; class TestAgentRunner implements AgentRunnerPort { runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' as const }); @@ -174,6 +181,25 @@ class HistoricSqlEvidenceTestAdapter implements SourceAdapter { ], }); } + + async finalize(ctx: DeterministicFinalizationContext): Promise { + const projection = await projectHistoricSqlEvidence({ + workdir: ctx.workdir, + connectionId: ctx.connectionId, + syncId: ctx.syncId, + runId: ctx.runId, + overrideReplay: ctx.overrideReplay, + }); + + return { + result: projection, + warnings: projection.warnings, + errors: [], + touchedSources: projection.touchedSources, + changedWikiPageKeys: projection.changedWikiPageKeys, + actions: projection.actions, + }; + } } function makeLookerRuntimeClient() { @@ -478,7 +504,7 @@ describe('canonical local ingest', () => { ]); }); - it('runs historic-SQL evidence projection through the local bundle post-processor', async () => { + it('runs historic-SQL evidence projection through local bundle finalization', async () => { const projectDir = join(tempDir, 'historic-sql-project'); await initKtxProject({ projectDir }); await writeFile( @@ -550,11 +576,12 @@ describe('canonical local ingest', () => { }); expect(result.result.failedWorkUnits).toEqual([]); - expect(result.report.body.postProcessor).toMatchObject({ + expect(result.report.body.finalization).toMatchObject({ sourceKey: 'historic-sql', status: 'success', result: { tableUsageMerged: 1 }, - touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], + derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }], }); await expect(readFile(join(projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain( 'Orders are repeatedly queried by lifecycle status.', diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 7e0fc1e5..f5bb73bc 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -72,7 +72,6 @@ import { CuratorPaginationService, } from './context-candidates/index.js'; import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js'; -import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js'; import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js'; import { DiffSetService } from './diff-set.service.js'; import { ingestTracePathForJob, type IngestTraceLevel } from './ingest-trace.js'; @@ -774,9 +773,6 @@ export function createLocalBundleIngestRuntime( settings: { batchSize: 8, maxPasses: 8, stepBudgetPerPass: 60 }, logger, }), - postProcessors: { - 'historic-sql': new HistoricSqlProjectionPostProcessor(), - }, logger, }; diff --git a/packages/context/src/ingest/local-ingest.ts b/packages/context/src/ingest/local-ingest.ts index b64fdcb7..0ac300c4 100644 --- a/packages/context/src/ingest/local-ingest.ts +++ b/packages/context/src/ingest/local-ingest.ts @@ -321,13 +321,6 @@ async function recordLocalMetabaseChildFailure(options: { overrideOf: null, provenanceRows: [], toolTranscripts: [], - postProcessor: { - sourceKey: 'metabase', - status: 'failed', - errors: [reason], - warnings: [], - touchedSources: [], - }, }; const report = await store.create({ diff --git a/packages/context/src/ingest/memory-flow/schema.ts b/packages/context/src/ingest/memory-flow/schema.ts index 7f00cde3..09cba418 100644 --- a/packages/context/src/ingest/memory-flow/schema.ts +++ b/packages/context/src/ingest/memory-flow/schema.ts @@ -59,7 +59,7 @@ export const memoryFlowEventSchema = z.discriminatedUnion('type', [ 'source', 'integration', 'reconciliation', - 'post_processor', + 'finalization', 'wiki_sl_ref_repair', 'final_gates', 'save', diff --git a/packages/context/src/ingest/memory-flow/types.ts b/packages/context/src/ingest/memory-flow/types.ts index df8dfff3..ab4619a6 100644 --- a/packages/context/src/ingest/memory-flow/types.ts +++ b/packages/context/src/ingest/memory-flow/types.ts @@ -50,7 +50,7 @@ type MemoryFlowEventPayload = | 'source' | 'integration' | 'reconciliation' - | 'post_processor' + | 'finalization' | 'wiki_sl_ref_repair' | 'final_gates' | 'save' diff --git a/packages/context/src/ingest/ports.ts b/packages/context/src/ingest/ports.ts index 32410cbc..15123089 100644 --- a/packages/context/src/ingest/ports.ts +++ b/packages/context/src/ingest/ports.ts @@ -13,7 +13,7 @@ import type { SlValidationDeps, SlValidatorPort, } from '../sl/index.js'; -import type { ToolContext, ToolSession, TouchedSlSource } from '../tools/index.js'; +import type { ToolContext, ToolSession } from '../tools/index.js'; import type { KnowledgeIndexPort, KnowledgeWikiService } from '../wiki/index.js'; import type { CanonicalPin } from './canonical-pins.js'; import type { IngestTraceLevel } from './ingest-trace.js'; @@ -323,27 +323,6 @@ export interface CuratorPaginationPort { }): Promise; } -export interface IngestBundlePostProcessorInput { - connectionId: string; - sourceKey: string; - syncId: string; - jobId: string; - runId: string; - workdir: string; - parseArtifacts: unknown; -} - -export interface IngestBundlePostProcessorResult { - result?: unknown; - warnings: string[]; - errors: string[]; - touchedSources: TouchedSlSource[]; -} - -export interface IngestBundlePostProcessorPort { - run(input: IngestBundlePostProcessorInput): Promise; -} - export interface IngestBundleRunnerDeps { runs: IngestRunsPort; provenance: IngestProvenancePort; @@ -377,7 +356,6 @@ export interface IngestBundleRunnerDeps { candidateDedup?: CandidateDedupPort; contextCandidateCarryforward?: ContextCandidateCarryforwardPort; curatorPagination?: CuratorPaginationPort; - postProcessors?: Record; logger?: KtxLogger; } diff --git a/packages/context/src/ingest/report-snapshot.ts b/packages/context/src/ingest/report-snapshot.ts index eef64b48..3cde6a4e 100644 --- a/packages/context/src/ingest/report-snapshot.ts +++ b/packages/context/src/ingest/report-snapshot.ts @@ -129,6 +129,35 @@ const ingestReportFailureSchema = z.object({ details: z.record(z.string(), z.unknown()).optional(), }); +const finalizationMismatchSchema = z.object({ + artifactKind: z.enum(['sl', 'wiki']), + key: z.string().min(1), + direction: z.enum(['missing_from_adapter_declaration', 'extra_in_adapter_declaration']), +}); + +const finalizationProvenanceExclusionSchema = z.object({ + action: ingestActionSchema, + reason: z.enum(['missing_raw_paths', 'raw_path_not_defensible']), + invalidRawPaths: z.array(z.string()).optional(), +}); + +const finalizationOutcomeSchema = z.object({ + sourceKey: z.string().min(1), + status: z.enum(['success', 'failed', 'skipped']), + commitSha: z.string().nullable(), + touchedPaths: z.array(z.string()), + declaredTouchedSources: z.array(touchedSlSourceSchema), + derivedTouchedSources: z.array(touchedSlSourceSchema), + declaredChangedWikiPageKeys: z.array(z.string()), + derivedChangedWikiPageKeys: z.array(z.string()), + mismatches: z.array(finalizationMismatchSchema).default([]), + result: z.unknown().optional(), + errors: z.array(z.string()), + warnings: z.array(z.string()), + actions: z.array(ingestActionSchema).default([]), + provenanceExclusions: z.array(finalizationProvenanceExclusionSchema).default([]), +}); + export const ingestReportSnapshotSchema = z .object({ id: z.string().min(1), @@ -188,6 +217,7 @@ export const ingestReportSnapshotSchema = z overrideOf: z.string().nullable().default(null), provenanceRows: z.array(provenanceDetailSchema).default([]), toolTranscripts: z.array(toolTranscriptSummarySchema).default([]), + finalization: finalizationOutcomeSchema.optional(), memoryFlow: memoryFlowReplayInputSchema.optional(), }) .passthrough(), diff --git a/packages/context/src/ingest/reports.ts b/packages/context/src/ingest/reports.ts index 431e4063..280b224f 100644 --- a/packages/context/src/ingest/reports.ts +++ b/packages/context/src/ingest/reports.ts @@ -39,13 +39,33 @@ export interface IngestReportToolTranscriptSummary { toolNames: string[]; } -export interface IngestReportPostProcessorOutcome { +export interface IngestReportFinalizationMismatch { + artifactKind: 'sl' | 'wiki'; + key: string; + direction: 'missing_from_adapter_declaration' | 'extra_in_adapter_declaration'; +} + +export interface IngestReportFinalizationProvenanceExclusion { + action: MemoryAction; + reason: 'missing_raw_paths' | 'raw_path_not_defensible'; + invalidRawPaths?: string[]; +} + +export interface IngestReportFinalizationOutcome { sourceKey: string; - status: 'success' | 'failed'; + status: 'success' | 'failed' | 'skipped'; + commitSha: string | null; + touchedPaths: string[]; + declaredTouchedSources: TouchedSlSource[]; + derivedTouchedSources: TouchedSlSource[]; + declaredChangedWikiPageKeys: string[]; + derivedChangedWikiPageKeys: string[]; + mismatches: IngestReportFinalizationMismatch[]; result?: unknown; errors: string[]; warnings: string[]; - touchedSources: TouchedSlSource[]; + actions: MemoryAction[]; + provenanceExclusions: IngestReportFinalizationProvenanceExclusion[]; } export interface IngestReportFailure { @@ -94,7 +114,7 @@ export interface IngestReportBody { overrideOf: string | null; provenanceRows: IngestReportProvenanceDetail[]; toolTranscripts: IngestReportToolTranscriptSummary[]; - postProcessor?: IngestReportPostProcessorOutcome; + finalization?: IngestReportFinalizationOutcome; wikiSlRefRepairs?: WikiSlRefRepair[]; wikiSlRefRepairWarnings?: string[]; memoryFlow?: MemoryFlowReplayInput; @@ -115,44 +135,25 @@ export interface IngestSavedMemoryCounts { slCount: number; } -function numericResultField(result: Record, field: string): number { - const value = result[field]; - return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : 0; -} - -export function postProcessorSavedMemoryCounts( - postProcessor: IngestReportPostProcessorOutcome | undefined, +export function finalizationSavedMemoryCounts( + finalization: IngestReportFinalizationOutcome | undefined, ): IngestSavedMemoryCounts { - if (!postProcessor || postProcessor.sourceKey !== 'historic-sql') { - return { wikiCount: 0, slCount: 0 }; - } - const result = postProcessor.result; - if (!result || typeof result !== 'object' || Array.isArray(result)) { - return { wikiCount: 0, slCount: 0 }; - } - const record = result as Record; + const actions = finalization?.actions ?? []; return { - wikiCount: - numericResultField(record, 'patternPagesWritten') + - numericResultField(record, 'stalePatternPagesMarked') + - numericResultField(record, 'archivedPatternPages'), - slCount: numericResultField(record, 'tableUsageMerged') + numericResultField(record, 'staleTablesMarked'), + wikiCount: actions.filter((action) => action.target === 'wiki').length, + slCount: actions.filter((action) => action.target === 'sl').length, }; } export function savedMemoryCountsForReport(report: IngestReportSnapshot): IngestSavedMemoryCounts { const workUnitActions = report.body.workUnits.flatMap((workUnit) => workUnit.actions); const reconciliationActions = report.body.reconciliationActions ?? []; - const actions = [...workUnitActions, ...reconciliationActions]; - const directCounts = { + const finalizationActions = report.body.finalization?.actions ?? []; + const actions = [...workUnitActions, ...reconciliationActions, ...finalizationActions]; + return { wikiCount: actions.filter((action) => action.target === 'wiki').length, slCount: actions.filter((action) => action.target === 'sl').length, }; - const postProcessorCounts = postProcessorSavedMemoryCounts(report.body.postProcessor); - return { - wikiCount: directCounts.wikiCount + postProcessorCounts.wikiCount, - slCount: directCounts.slCount + postProcessorCounts.slCount, - }; } export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex { diff --git a/packages/context/src/ingest/types.ts b/packages/context/src/ingest/types.ts index 370c7511..e0317141 100644 --- a/packages/context/src/ingest/types.ts +++ b/packages/context/src/ingest/types.ts @@ -1,6 +1,10 @@ import type { KtxEmbeddingPort } from '../core/embedding.js'; +import type { MemoryAction } from '../memory/index.js'; import type { SemanticLayerService } from '../sl/index.js'; +import type { TouchedSlSource } from '../tools/index.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; +import type { StageIndex } from './stages/stage-index.types.js'; +import type { WorkUnitOutcome } from './stages/stage-3-work-units.js'; export type IngestTrigger = 'upload' | 'scheduled_pull' | 'manual_resync' | 'manual_override'; @@ -118,6 +122,37 @@ export interface ProjectionResult { result?: unknown; } +export interface FinalizationOverrideReplay { + priorJobId: string; + priorRunId: string; + priorSyncId: string; + evictionRawPaths: string[]; +} + +export interface DeterministicFinalizationContext { + connectionId: string; + sourceKey: string; + syncId: string; + jobId: string; + runId: string; + stagedDir: string; + workdir: string; + parseArtifacts?: unknown; + stageIndex: StageIndex; + workUnitOutcomes: WorkUnitOutcome[]; + reconciliationActions: MemoryAction[]; + overrideReplay?: FinalizationOverrideReplay; +} + +export interface FinalizationResult { + warnings: string[]; + errors: string[]; + touchedSources: TouchedSlSource[]; + changedWikiPageKeys: string[]; + actions?: MemoryAction[]; + result?: unknown; +} + export interface SourceAdapter { readonly source: string; readonly skillNames: string[]; @@ -132,6 +167,7 @@ export interface SourceAdapter { chunk(stagedDir: string, diffSet?: DiffSet): Promise; clusterWorkUnits?(ctx: ClusterWorkUnitsContext): Promise; project?(ctx: DeterministicProjectionContext): Promise; + finalize?(ctx: DeterministicFinalizationContext): Promise; describeScope?(stagedDir: string): Promise; onPullSucceeded?(ctx: { connectionId: string; diff --git a/packages/context/src/package-exports.test.ts b/packages/context/src/package-exports.test.ts index ea6c6592..1bd1313a 100644 --- a/packages/context/src/package-exports.test.ts +++ b/packages/context/src/package-exports.test.ts @@ -1,4 +1,9 @@ import { describe, expect, it } from 'vitest'; +import type { + DeterministicFinalizationContext, + FinalizationOverrideReplay, + FinalizationResult, +} from './ingest/index.js'; import type { ApplyLocalScanRelationshipReviewDecisionsInput, ApplyLocalScanRelationshipReviewDecisionsResult, @@ -9,6 +14,12 @@ const scanTypeExportCoverage: Partial<{ result: ApplyLocalScanRelationshipReviewDecisionsResult; }> = {}; +const ingestFinalizationTypeExportCoverage: Partial<{ + context: DeterministicFinalizationContext; + overrideReplay: FinalizationOverrideReplay; + result: FinalizationResult; +}> = {}; + describe('@ktx/context package exports', () => { it('exports package entry points used by host adapters', async () => { const core = await import('./core/index.js'); @@ -41,6 +52,7 @@ describe('@ktx/context package exports', () => { expect(connections.notionConnectionToPullConfig).toBeTypeOf('function'); expect(scan).toBeDefined(); expect(scanTypeExportCoverage).toEqual({}); + expect(ingestFinalizationTypeExportCoverage).toEqual({}); expect(scan.createKtxConnectorCapabilities).toBeTypeOf('function'); expect(`liveDatabaseSnapshotToKtx${'SchemaSnapshot'}` in scan).toBe(false); expect(scan.normalizeKtxNativeType).toBeTypeOf('function'); @@ -243,7 +255,6 @@ describe('@ktx/context package exports', () => { expect(ingest.historicSqlEvidenceEnvelopeSchema).toBeDefined(); expect(ingest.historicSqlEvidencePath).toBeTypeOf('function'); expect(ingest.createEmitHistoricSqlEvidenceTool).toBeTypeOf('function'); - expect(ingest.HistoricSqlProjectionPostProcessor).toBeTypeOf('function'); expect(ingest.SqliteContextEvidenceStore).toBeTypeOf('function'); expect(ingest.SqliteBundleIngestStore).toBeTypeOf('function'); expect(ingest.CuratorPaginationService).toBeTypeOf('function'); diff --git a/packages/context/src/project/driver-schemas.ts b/packages/context/src/project/driver-schemas.ts index 1815975d..c3b819ea 100644 --- a/packages/context/src/project/driver-schemas.ts +++ b/packages/context/src/project/driver-schemas.ts @@ -27,6 +27,12 @@ function warehouseConnectionSchema(driver: .min(1) .optional() .describe('Warehouse connection URL or DSN; may contain environment-variable references like env:DATABASE_URL.'), + enabled_tables: z + .array(z.string().min(1)) + .optional() + .describe( + 'Optional allowlist of fully-qualified table names ("schema.table") to ingest. When set, live-database ingest discards any table whose schema-qualified name is not in this list. Useful for smoke-testing deep ingest on a single table.', + ), }) .describe( `${driver} warehouse connection. Additional driver-tunable fields (e.g. historicSql, context.queryHistory) are accepted and passed through.`, diff --git a/packages/context/src/scan/enabled-tables.ts b/packages/context/src/scan/enabled-tables.ts new file mode 100644 index 00000000..f522d44f --- /dev/null +++ b/packages/context/src/scan/enabled-tables.ts @@ -0,0 +1,17 @@ +import type { KtxSchemaSnapshot } from './types.js'; + +export function resolveEnabledTables(connection: Record | undefined): Set | null { + const raw = connection?.enabled_tables; + if (!Array.isArray(raw) || raw.length === 0) return null; + return new Set(raw.filter((v): v is string => typeof v === 'string')); +} + +export function filterSnapshotTables(snapshot: KtxSchemaSnapshot, enabledTables: Set): KtxSchemaSnapshot { + return { + ...snapshot, + tables: snapshot.tables.filter((table) => { + const key = table.db ? `${table.db}.${table.name}` : table.name; + return enabledTables.has(key); + }), + }; +} diff --git a/packages/context/src/scan/local-scan.ts b/packages/context/src/scan/local-scan.ts index f9ac77d3..e878f874 100644 --- a/packages/context/src/scan/local-scan.ts +++ b/packages/context/src/scan/local-scan.ts @@ -15,6 +15,7 @@ import type { KtxProjectLlmConfig, KtxScanEnrichmentConfig, KtxScanRelationshipC import type { KtxLocalProject } from '../project/index.js'; import { ktxLocalStateDbPath } from '../project/local-state-db.js'; import { redactKtxScanReport } from './credentials.js'; +import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; import { completedKtxScanEnrichmentStateSummary } from './enrichment-state.js'; import { failedKtxScanEnrichmentSummary, ktxScanErrorMessage } from './enrichment-summary.js'; import { @@ -320,22 +321,6 @@ async function readScanReport( } } -export function resolveEnabledTables(connection: Record | undefined): Set | null { - const raw = connection?.enabled_tables; - if (!Array.isArray(raw) || raw.length === 0) return null; - return new Set(raw.filter((v): v is string => typeof v === 'string')); -} - -export function filterSnapshotTables(snapshot: KtxSchemaSnapshot, enabledTables: Set): KtxSchemaSnapshot { - return { - ...snapshot, - tables: snapshot.tables.filter((table) => { - const key = table.db ? `${table.db}.${table.name}` : table.name; - return enabledTables.has(key); - }), - }; -} - function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set): KtxScanConnector { return { ...connector, @@ -346,6 +331,8 @@ function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set }; } +export { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; + function withInternalLiveDatabaseAdapter(project: KtxLocalProject): KtxLocalProject { if (project.config.ingest.adapters.includes(LIVE_DATABASE_ADAPTER)) { return project;