Implement adapter-owned ingest finalization v1

Moves finalization from runner-owned post-processors into typed
SourceAdapter.finalize() contracts. Adds finalization report schema,
scope derivation, override replay context, and migrates historic-SQL
projection. Removes IngestBundlePostProcessorPort wiring and
HistoricSqlProjectionPostProcessor.
This commit is contained in:
Andrey Avtomonov 2026-05-18 15:55:07 +02:00
parent e8d461fcb8
commit 013998387e
25 changed files with 3176 additions and 295 deletions

File diff suppressed because it is too large Load diff

View file

@ -1020,9 +1020,16 @@ describe('runKtxIngest', () => {
sourceKey: 'historic-sql',
body: {
workUnits: [],
postProcessor: {
finalization: {
sourceKey: 'historic-sql',
status: 'success',
commitSha: 'finalization-sha',
touchedPaths: ['semantic-layer/warehouse/_schema/public.yaml', 'wiki/global/historic-sql-orders.md'],
declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
declaredChangedWikiPageKeys: ['historic-sql-orders'],
derivedChangedWikiPageKeys: ['historic-sql-orders'],
mismatches: [],
result: {
tableUsageMerged: 56,
staleTablesMarked: 1,
@ -1032,7 +1039,24 @@ describe('runKtxIngest', () => {
},
errors: [],
warnings: [],
touchedSources: [],
actions: [
...Array.from({ length: 57 }, (_, index) => ({
target: 'sl' as const,
type: 'updated' as const,
key: `orders-${index}`,
detail: 'Merged usage',
targetConnectionId: 'warehouse',
rawPaths: ['tables/public/orders.json'],
})),
...Array.from({ length: 35 }, (_, index) => ({
target: 'wiki' as const,
type: 'updated' as const,
key: `historic-sql-orders-${index}`,
detail: 'Projected pattern',
rawPaths: ['patterns/orders.json'],
})),
],
provenanceExclusions: [],
},
},
}),

View file

@ -533,6 +533,19 @@ export class GitService {
return out;
}
async changedPaths(): Promise<string[]> {
const raw = await this.git.raw(['status', '--porcelain=v1', '-z']);
const fields = raw.split('\0').filter(Boolean);
const paths: string[] = [];
for (const field of fields) {
const path = field.slice(3);
if (path.length > 0) {
paths.push(path);
}
}
return [...new Set(paths)].sort();
}
/**
* List all paths under the working tree that match `pathSpec`, scoped to HEAD.
* Used for the reconciler's first-ever run when there's no watermark to diff from.

View file

@ -1,6 +1,15 @@
import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter } from '../../types.js';
import type {
ChunkResult,
DeterministicFinalizationContext,
DiffSet,
FetchContext,
FinalizationResult,
ScopeDescriptor,
SourceAdapter,
} from '../../types.js';
import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js';
import { detectHistoricSqlStagedDir } from './detect.js';
import { projectHistoricSqlEvidence } from './projection.js';
import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js';
import { type HistoricSqlSourceAdapterDeps } from './types.js';
@ -35,4 +44,22 @@ export class HistoricSqlSourceAdapter implements SourceAdapter {
describeScope(stagedDir: string): Promise<ScopeDescriptor> {
return describeHistoricSqlUnifiedScope(stagedDir);
}
async finalize(ctx: DeterministicFinalizationContext): Promise<FinalizationResult> {
const projection = await projectHistoricSqlEvidence({
workdir: ctx.workdir,
connectionId: ctx.connectionId,
syncId: ctx.syncId,
runId: ctx.runId,
overrideReplay: ctx.overrideReplay,
});
return {
result: projection,
warnings: projection.warnings,
errors: [],
touchedSources: projection.touchedSources,
changedWikiPageKeys: projection.changedWikiPageKeys,
actions: projection.actions,
};
}
}

View file

@ -242,12 +242,12 @@ describe('historic-SQL local ingest retrieval acceptance', () => {
expect(result.result.failedWorkUnits).toEqual([]);
expect(result.result.workUnitCount).toBe(3);
expect(agentRunner.runLoop).toHaveBeenCalledTimes(3);
const postProcessor = result.report.body.postProcessor;
expect(postProcessor).toBeDefined();
if (!postProcessor) {
throw new Error('Expected historic-SQL post-processor result');
const finalization = result.report.body.finalization;
expect(finalization).toBeDefined();
if (!finalization) {
throw new Error('Expected historic-SQL finalization result');
}
expect(postProcessor).toMatchObject({
expect(finalization).toMatchObject({
sourceKey: 'historic-sql',
status: 'success',
result: {
@ -255,7 +255,7 @@ describe('historic-SQL local ingest retrieval acceptance', () => {
patternPagesWritten: 1,
},
});
expect(postProcessor.touchedSources).toEqual(
expect(finalization.declaredTouchedSources).toEqual(
expect.arrayContaining([
{ connectionId: 'warehouse', sourceName: 'customers' },
{ connectionId: 'warehouse', sourceName: 'orders' },

View file

@ -1,74 +0,0 @@
import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import YAML from 'yaml';
import { describe, expect, it } from 'vitest';
import { HistoricSqlProjectionPostProcessor } from './post-processor.js';
async function tempWorkdir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-post-processor-'));
}
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
await mkdir(join(target, '..'), { recursive: true });
await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
}
describe('HistoricSqlProjectionPostProcessor', () => {
it('projects current run evidence before the ingest squash commit', async () => {
const workdir = await tempWorkdir();
await mkdir(join(workdir, 'semantic-layer/warehouse/_schema'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'),
YAML.stringify({ tables: { orders: { table: 'public.orders', columns: [{ name: 'id', type: 'string' }] } } }),
'utf-8',
);
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
});
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' });
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by lifecycle status.',
frequencyTier: 'high',
commonFilters: ['status'],
commonJoins: [],
staleSince: null,
},
});
const result = await new HistoricSqlProjectionPostProcessor().run({
connectionId: 'warehouse',
sourceKey: 'historic-sql',
syncId: 'sync-1',
jobId: 'job-1',
runId: 'run-1',
workdir,
parseArtifacts: null,
});
expect(result.errors).toEqual([]);
expect(result.warnings).toEqual([]);
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
expect(result.result).toMatchObject({ tableUsageMerged: 1 });
await expect(readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain(
'Orders are repeatedly queried by lifecycle status.',
);
});
});

View file

@ -1,41 +0,0 @@
import type { IngestBundlePostProcessorInput, IngestBundlePostProcessorPort, IngestBundlePostProcessorResult } from '../../ports.js';
import { createSimpleGit } from '../../../core/git-env.js';
import { projectHistoricSqlEvidence } from './projection.js';
async function commitProjectionChanges(workdir: string): Promise<void> {
const git = createSimpleGit(workdir);
if (!(await git.checkIsRepo().catch(() => false))) {
return;
}
const status = await git.status();
const paths = status.files
.map((file) => file.path)
.filter((path) => path.startsWith('semantic-layer/') || path.startsWith('wiki/global/historic-sql'));
if (paths.length === 0) {
return;
}
await git.add(paths);
const staged = await git.diff(['--cached', '--name-only']);
if (!staged.trim()) {
return;
}
await git.commit('Project historic SQL evidence', { '--author': 'System User <system@example.com>' });
}
export class HistoricSqlProjectionPostProcessor implements IngestBundlePostProcessorPort {
async run(input: IngestBundlePostProcessorInput): Promise<IngestBundlePostProcessorResult> {
const projection = await projectHistoricSqlEvidence({
workdir: input.workdir,
connectionId: input.connectionId,
syncId: input.syncId,
runId: input.runId,
});
await commitProjectionChanges(input.workdir);
return {
result: projection,
warnings: projection.warnings,
errors: [],
touchedSources: projection.touchedSources,
};
}
}

View file

@ -74,6 +74,15 @@ describe('projectHistoricSqlEvidence', () => {
const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' });
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
expect(result.actions).toEqual(
expect.arrayContaining([
expect.objectContaining({
target: 'sl',
key: 'orders',
rawPaths: ['tables/public.orders.json'],
}),
]),
);
const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'));
expect(shard.tables.orders.usage).toEqual({
ownerNote: 'keep me',
@ -164,6 +173,16 @@ describe('projectHistoricSqlEvidence', () => {
const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' });
expect(result.patternPagesWritten).toBe(1);
expect(result.changedWikiPageKeys).toContain('historic-sql-old-order-lifecycle');
expect(result.actions).toEqual(
expect.arrayContaining([
expect.objectContaining({
target: 'wiki',
key: 'historic-sql-old-order-lifecycle',
rawPaths: ['patterns-input.json'],
}),
]),
);
await expect(readFile(join(workdir, 'wiki/global/historic-sql-old-order-lifecycle.md'), 'utf-8')).resolves.toContain(
'Order Lifecycle Analysis',
);
@ -320,6 +339,19 @@ describe('projectHistoricSqlEvidence', () => {
probeWarnings: [],
staleArchiveAfterDays: 90,
});
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/customers.json', {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.customers',
rawPath: 'tables/public.customers.json',
usage: {
narrative: 'Customers were queried.',
frequencyTier: 'low',
commonFilters: [],
commonJoins: [],
staleSince: null,
},
});
await writeText(
workdir,
'wiki/global/historic-sql-old-template.md',
@ -346,6 +378,9 @@ describe('projectHistoricSqlEvidence', () => {
expect(result.staleTablesMarked).toBe(1);
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
const staleAction = result.actions.find((action) => action.target === 'sl' && action.key === 'orders');
expect(staleAction).toEqual(expect.objectContaining({ target: 'sl', key: 'orders' }));
expect(staleAction?.rawPaths).toBeUndefined();
const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'));
expect(shard.tables.orders.usage).toEqual({
ownerNote: 'keep analyst annotation',
@ -360,4 +395,63 @@ describe('projectHistoricSqlEvidence', () => {
'Old body',
);
});
it('does not mark stale or archive pages when override replay has no current-run evidence', async () => {
const workdir = await tempWorkdir();
await writeText(
workdir,
'semantic-layer/warehouse/_schema/public.yaml',
YAML.stringify({
tables: {
orders: {
table: 'public.orders',
usage: {
narrative: 'Orders were active before.',
frequencyTier: 'high',
commonFilters: ['status'],
commonGroupBys: ['status'],
commonJoins: [],
},
columns: [{ name: 'id', type: 'string' }],
},
},
}),
);
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/override-sync/manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 0,
touchedTableCount: 0,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
});
const result = await projectHistoricSqlEvidence({
workdir,
connectionId: 'warehouse',
syncId: 'override-sync',
runId: 'override-run',
overrideReplay: {
priorJobId: 'prior-job',
priorRunId: 'prior-run',
priorSyncId: 'prior-sync',
evictionRawPaths: ['tables/public/orders.json'],
},
});
expect(result.tableUsageMerged).toBe(0);
expect(result.staleTablesMarked).toBe(0);
expect(result.patternPagesWritten).toBe(0);
expect(result.stalePatternPagesMarked).toBe(0);
expect(result.archivedPatternPages).toBe(0);
expect(result.touchedSources).toEqual([]);
expect(result.changedWikiPageKeys).toEqual([]);
expect(result.actions).toEqual([]);
});
});

View file

@ -1,7 +1,9 @@
import { access, mkdir, readdir, readFile, rename, writeFile } from 'node:fs/promises';
import { dirname, join, relative } from 'node:path';
import YAML from 'yaml';
import type { MemoryAction } from '../../../memory/index.js';
import { rawSourcesDirForSync } from '../../raw-sources-paths.js';
import type { FinalizationOverrideReplay } from '../../types.js';
import { mergeUsagePreservingExternal } from '../live-database/manifest.js';
import { historicSqlEvidenceEnvelopeSchema, type HistoricSqlEvidenceEnvelope } from './evidence.js';
import type { TableUsageOutput } from './skill-schemas.js';
@ -12,6 +14,7 @@ export interface HistoricSqlProjectionInput {
connectionId: string;
syncId: string;
runId: string;
overrideReplay?: FinalizationOverrideReplay;
}
export interface HistoricSqlProjectionResult {
@ -21,6 +24,8 @@ export interface HistoricSqlProjectionResult {
stalePatternPagesMarked: number;
archivedPatternPages: number;
touchedSources: Array<{ connectionId: string; sourceName: string }>;
changedWikiPageKeys: string[];
actions: MemoryAction[];
warnings: string[];
}
@ -223,6 +228,8 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
stalePatternPagesMarked: 0,
archivedPatternPages: 0,
touchedSources: [],
changedWikiPageKeys: [],
actions: [],
warnings: [],
};
const touchedKeys = new Set<string>();
@ -230,6 +237,16 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
const manifest = stagedManifestSchema.parse(await readJson(join(rawDir, 'manifest.json')));
const currentTables = await currentStagedTables(rawDir);
const evidence = await loadEvidence(input.workdir, input.runId);
if (input.overrideReplay && evidence.length === 0) {
result.warnings.push(
'historic-sql finalization skipped stale/archive cleanup during override replay without current-run evidence',
);
return result;
}
if (evidence.length === 0) {
result.warnings.push('historic-sql finalization skipped because no current-run evidence was emitted');
return result;
}
const tableEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'table_usage' } => entry.kind === 'table_usage');
const patternEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'pattern' } => entry.kind === 'pattern');
@ -255,6 +272,14 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
touchedKeys.add(key);
result.touchedSources.push({ connectionId: input.connectionId, sourceName });
}
result.actions.push({
target: 'sl',
type: 'updated',
key: sourceName,
targetConnectionId: input.connectionId,
detail: `Merged historic-SQL usage for ${matchingEvidence.table}`,
rawPaths: [matchingEvidence.rawPath],
});
}
} else if (entry.usage && !currentTables.has(tableRef)) {
const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, staleUsage(manifest.fetchedAt));
@ -267,6 +292,13 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
touchedKeys.add(key);
result.touchedSources.push({ connectionId: input.connectionId, sourceName });
}
result.actions.push({
target: 'sl',
type: 'updated',
key: sourceName,
targetConnectionId: input.connectionId,
detail: `Marked historic-SQL usage stale for ${tableRef}`,
});
}
}
}
@ -303,6 +335,14 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
await writeFile(pagePath, renderMarkdownPage(frontmatter, renderPatternMarkdown(pattern)), 'utf-8');
writtenKeys.add(key);
result.patternPagesWritten += 1;
result.changedWikiPageKeys.push(key);
result.actions.push({
target: 'wiki',
type: reusable ? 'updated' : 'created',
key,
detail: `Projected historic-SQL pattern ${pattern.pattern.title}`,
rawPaths: [pattern.rawPath],
});
}
for (const page of patternPages) {
@ -315,6 +355,13 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
'utf-8',
);
result.archivedPatternPages += 1;
result.changedWikiPageKeys.push(page.key);
result.actions.push({
target: 'wiki',
type: 'updated',
key: page.key,
detail: `Archived stale historic-SQL pattern page ${page.key}`,
});
continue;
}
const tags = [...new Set([...stringArray(page.frontmatter.tags), 'stale'])];
@ -324,7 +371,15 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
'utf-8',
);
result.stalePatternPagesMarked += 1;
result.changedWikiPageKeys.push(page.key);
result.actions.push({
target: 'wiki',
type: 'updated',
key: page.key,
detail: `Marked historic-SQL pattern page ${page.key} stale`,
});
}
result.changedWikiPageKeys = [...new Set(result.changedWikiPageKeys)].sort();
return result;
}

View file

@ -0,0 +1,131 @@
import { describe, expect, it } from 'vitest';
import {
compareFinalizationDeclarations,
deriveFinalizationTouchedSources,
deriveFinalizationWikiPageKeys,
} from './finalization-scope.js';
describe('deriveFinalizationWikiPageKeys', () => {
it('maps changed global wiki markdown paths to page keys', () => {
expect(
deriveFinalizationWikiPageKeys([
'wiki/global/historic-sql-orders.md',
'wiki/global/nested/page.md',
'README.md',
]),
).toEqual(['historic-sql-orders']);
});
});
describe('deriveFinalizationTouchedSources', () => {
it('maps standalone semantic-layer files directly', async () => {
const result = await deriveFinalizationTouchedSources({
changedPaths: ['semantic-layer/warehouse/orders.yaml'],
beforeSourcesByConnection: new Map(),
afterSourcesByConnection: new Map(),
});
expect(result).toEqual({
touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
unresolvedPaths: [],
});
});
it('resolves aggregate _schema changes by comparing loaded source snapshots', async () => {
const beforeSourcesByConnection = new Map([
[
'warehouse',
[
{
name: 'orders',
grain: ['order_id'],
columns: [{ name: 'order_id', type: 'string' }],
joins: [],
measures: [],
usage: {
narrative: 'old',
frequencyTier: 'low' as const,
commonFilters: [],
commonJoins: [],
},
},
],
],
]);
const afterSourcesByConnection = new Map([
[
'warehouse',
[
{
name: 'orders',
grain: ['order_id'],
columns: [{ name: 'order_id', type: 'string' }],
joins: [],
measures: [],
usage: {
narrative: 'new',
frequencyTier: 'high' as const,
commonFilters: [],
commonJoins: [],
},
},
],
],
]);
const result = await deriveFinalizationTouchedSources({
changedPaths: ['semantic-layer/warehouse/_schema/public.yaml'],
beforeSourcesByConnection,
afterSourcesByConnection,
});
expect(result).toEqual({
touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
unresolvedPaths: [],
});
});
it('flags aggregate _schema changes that cannot be resolved to logical sources', async () => {
const beforeSourcesByConnection = new Map([['warehouse', []]]);
const afterSourcesByConnection = new Map([['warehouse', []]]);
const result = await deriveFinalizationTouchedSources({
changedPaths: ['semantic-layer/warehouse/_schema/public.yaml'],
beforeSourcesByConnection,
afterSourcesByConnection,
});
expect(result).toEqual({
touchedSources: [],
unresolvedPaths: ['semantic-layer/warehouse/_schema/public.yaml'],
});
});
});
describe('compareFinalizationDeclarations', () => {
it('reports missing and extra adapter declarations', () => {
expect(
compareFinalizationDeclarations({
declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'customers' }],
declaredChangedWikiPageKeys: ['orders'],
derivedChangedWikiPageKeys: ['orders', 'patterns'],
}),
).toEqual([
{
artifactKind: 'sl',
key: 'warehouse:customers',
direction: 'missing_from_adapter_declaration',
},
{
artifactKind: 'sl',
key: 'warehouse:orders',
direction: 'extra_in_adapter_declaration',
},
{
artifactKind: 'wiki',
key: 'patterns',
direction: 'missing_from_adapter_declaration',
},
]);
});
});

View file

@ -0,0 +1,145 @@
import type { SemanticLayerSource } from '../sl/index.js';
import type { TouchedSlSource } from '../tools/index.js';
import type { IngestReportFinalizationMismatch } from './reports.js';
interface DeriveTouchedSourcesInput {
changedPaths: string[];
beforeSourcesByConnection: Map<string, SemanticLayerSource[]>;
afterSourcesByConnection: Map<string, SemanticLayerSource[]>;
}
interface DeriveTouchedSourcesResult {
touchedSources: TouchedSlSource[];
unresolvedPaths: string[];
}
interface CompareFinalizationDeclarationsInput {
declaredTouchedSources: TouchedSlSource[];
derivedTouchedSources: TouchedSlSource[];
declaredChangedWikiPageKeys: string[];
derivedChangedWikiPageKeys: string[];
}
function uniqueSorted(values: string[]): string[] {
return [...new Set(values.filter((value) => value.length > 0))].sort();
}
function touchedKey(source: TouchedSlSource): string {
return `${source.connectionId}:${source.sourceName}`;
}
function stableJson(value: unknown): string {
if (Array.isArray(value)) {
return `[${value.map((entry) => stableJson(entry)).join(',')}]`;
}
if (value && typeof value === 'object') {
const record = value as Record<string, unknown>;
return `{${Object.keys(record)
.sort()
.map((key) => `${JSON.stringify(key)}:${stableJson(record[key])}`)
.join(',')}}`;
}
return JSON.stringify(value);
}
function changedSourceNames(
beforeSources: SemanticLayerSource[],
afterSources: SemanticLayerSource[],
): string[] {
const before = new Map(beforeSources.map((source) => [source.name, stableJson(source)]));
const after = new Map(afterSources.map((source) => [source.name, stableJson(source)]));
return uniqueSorted(
uniqueSorted([...before.keys(), ...after.keys()]).filter(
(sourceName) => before.get(sourceName) !== after.get(sourceName),
),
);
}
export function deriveFinalizationWikiPageKeys(paths: string[]): string[] {
return uniqueSorted(
paths
.filter((path) => path.startsWith('wiki/global/') && path.endsWith('.md'))
.filter((path) => !path.slice('wiki/global/'.length, -'.md'.length).includes('/'))
.map((path) => path.slice('wiki/global/'.length, -'.md'.length)),
);
}
export async function deriveFinalizationTouchedSources(
input: DeriveTouchedSourcesInput,
): Promise<DeriveTouchedSourcesResult> {
const touched = new Map<string, TouchedSlSource>();
const unresolvedPaths: string[] = [];
for (const path of input.changedPaths) {
if (!path.startsWith('semantic-layer/') || !(path.endsWith('.yaml') || path.endsWith('.yml'))) {
continue;
}
const parts = path.split('/');
const connectionId = parts[1] ?? '';
if (!connectionId) {
unresolvedPaths.push(path);
continue;
}
if (parts[2] !== '_schema') {
const fileName = parts.at(-1) ?? '';
const sourceName = fileName.replace(/\.ya?ml$/, '');
if (!sourceName) {
unresolvedPaths.push(path);
continue;
}
touched.set(`${connectionId}:${sourceName}`, { connectionId, sourceName });
continue;
}
const changedNames = changedSourceNames(
input.beforeSourcesByConnection.get(connectionId) ?? [],
input.afterSourcesByConnection.get(connectionId) ?? [],
);
if (changedNames.length === 0) {
unresolvedPaths.push(path);
continue;
}
for (const sourceName of changedNames) {
touched.set(`${connectionId}:${sourceName}`, { connectionId, sourceName });
}
}
return {
touchedSources: [...touched.values()].sort((left, right) =>
touchedKey(left).localeCompare(touchedKey(right)),
),
unresolvedPaths: uniqueSorted(unresolvedPaths),
};
}
export function compareFinalizationDeclarations(
input: CompareFinalizationDeclarationsInput,
): IngestReportFinalizationMismatch[] {
const mismatches: IngestReportFinalizationMismatch[] = [];
const declaredSl = new Set(input.declaredTouchedSources.map(touchedKey));
const derivedSl = new Set(input.derivedTouchedSources.map(touchedKey));
const declaredWiki = new Set(input.declaredChangedWikiPageKeys);
const derivedWiki = new Set(input.derivedChangedWikiPageKeys);
for (const key of [...derivedSl].sort()) {
if (!declaredSl.has(key)) {
mismatches.push({ artifactKind: 'sl', key, direction: 'missing_from_adapter_declaration' });
}
}
for (const key of [...declaredSl].sort()) {
if (!derivedSl.has(key)) {
mismatches.push({ artifactKind: 'sl', key, direction: 'extra_in_adapter_declaration' });
}
}
for (const key of [...derivedWiki].sort()) {
if (!declaredWiki.has(key)) {
mismatches.push({ artifactKind: 'wiki', key, direction: 'missing_from_adapter_declaration' });
}
}
for (const key of [...declaredWiki].sort()) {
if (!derivedWiki.has(key)) {
mismatches.push({ artifactKind: 'wiki', key, direction: 'extra_in_adapter_declaration' });
}
}
return mismatches;
}

View file

@ -349,7 +349,6 @@ export type {
HistoricSqlTableUsageEvidence,
} from './adapters/historic-sql/evidence.js';
export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js';
export { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js';
export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js';
export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js';
export {

View file

@ -118,6 +118,35 @@ function makeWikiService(root: string) {
content: content.trim(),
};
}),
writePage: vi.fn(
async (
_scope: string,
_scopeId: string | null,
key: string,
frontmatter: { summary?: string; usage_mode?: string; refs?: string[]; sl_refs?: string[] },
content: string,
) => {
await mkdir(join(root, 'wiki/global'), { recursive: true });
const refs = (frontmatter.refs ?? []).map((ref) => ` - ${ref}`).join('\n');
const slRefs = (frontmatter.sl_refs ?? []).map((ref) => ` - ${ref}`).join('\n');
await writeFile(
join(root, 'wiki/global', `${key}.md`),
[
'---',
`summary: ${frontmatter.summary ?? key}`,
`usage_mode: ${frontmatter.usage_mode ?? 'auto'}`,
'refs:',
refs,
'sl_refs:',
slRefs,
'---',
'',
content,
'',
].join('\n'),
);
},
),
syncFromCommit: vi.fn(),
};
}
@ -2160,4 +2189,187 @@ describe('IngestBundleRunner isolated diff path', () => {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('runs finalization before wiki sl-ref repair and final gates', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
adapter.finalize = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(workdir, 'wiki/global'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
await writeFile(
join(workdir, 'wiki/global/finalized-accounts.md'),
'---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: ['finalized-accounts'],
actions: [
{
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Finalized accounts',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
},
{
target: 'wiki',
type: 'created',
key: 'finalized-accounts',
detail: 'Finalized wiki',
rawPaths: ['cards/source.json'],
},
],
};
});
deps.agentRunner.runLoop = vi.fn(async () => ({ stopReason: 'natural' as const })) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await runner.run({
jobId: 'job-finalization',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-finalization/trace.jsonl'),
'utf-8',
);
expect(trace.indexOf('finalization_committed')).toBeLessThan(trace.indexOf('wiki_sl_refs_repaired'));
expect(trace.indexOf('wiki_sl_refs_repaired')).toBeLessThan(trace.indexOf('final_artifact_gates'));
await expect(readFile(join(runtime.configDir, 'wiki/global/finalized-accounts.md'), 'utf-8')).resolves.toContain(
'sl_refs:\n - mart_account_segments',
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('fails when finalization edits a path already changed earlier in the run', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/orders.md'),
'---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'orders',
detail: 'WU orders',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/orders.md'],
'wu orders',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' as const };
}) as never;
adapter.finalize = vi.fn(async ({ workdir }) => {
await writeFile(
join(workdir, 'wiki/global/orders.md'),
'---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n',
);
return {
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: ['orders'],
actions: [{ target: 'wiki', type: 'updated', key: 'orders', detail: 'Conflicting finalization' }],
};
});
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-finalization-overlap',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects finalization writes to unauthorized semantic-layer targets', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({ workUnits: [] });
adapter.finalize = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/other-warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/other-warehouse/orders.yaml'),
'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'other-warehouse', sourceName: 'orders' }],
changedWikiPageKeys: [],
actions: [
{
target: 'sl',
type: 'created',
key: 'orders',
targetConnectionId: 'other-warehouse',
detail: 'Forbidden target',
rawPaths: ['cards/source.json'],
},
],
};
});
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-finalization-target-policy',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/semantic-layer target connection not allowed/);
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('finalization_committed');
expect(trace).toContain('semantic_layer_target_policy');
expect(trace).toContain('ingest_failed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
});

View file

@ -95,6 +95,7 @@ const makeDeps = () => {
triageSupported: undefined as undefined | boolean,
detect: vi.fn().mockResolvedValue(true),
listTargetConnectionIds: undefined as undefined | ((stagedDir: string) => Promise<string[]>),
finalize: undefined as any,
chunk: vi.fn().mockResolvedValue({
workUnits: [{ unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] }],
}),
@ -131,6 +132,7 @@ const makeDeps = () => {
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockResolvedValue([]),
changedPaths: vi.fn().mockResolvedValue([]),
};
const sessionWorktreeService = {
create: vi.fn().mockResolvedValue({
@ -1574,27 +1576,69 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
);
});
it('runs a registered post-processor before squash, records the outcome, and reindexes touched sources after squash', async () => {
it('runs adapter finalization before squash, records the outcome, and reindexes touched sources', async () => {
const deps = makeDeps();
deps.adapter.source = 'metricflow';
deps.registry.get.mockReturnValue(deps.adapter);
deps.adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }],
workUnits: [],
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']);
deps.adapter.finalize = vi.fn().mockResolvedValue({
result: { sourcesTouched: 1 },
warnings: ['kept going'],
errors: [],
touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
changedWikiPageKeys: [],
actions: [
{
target: 'sl',
type: 'updated',
key: 'orders',
targetConnectionId: 'warehouse-2',
detail: 'Finalized orders usage',
rawPaths: ['semantic_models.yml'],
},
],
});
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);
const postProcessor = {
run: vi.fn().mockResolvedValue({
result: { sourcesCreated: 1 },
warnings: ['kept going'],
errors: [],
touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
let head = 'pre-finalization';
const git = {
revParseHead: vi.fn(async () => head),
commitFiles: vi.fn().mockImplementation(async (paths: string[]) => {
if (paths.includes('semantic-layer/warehouse-2/orders.yaml')) {
head = 'post-finalization';
return { created: true, commitHash: 'finalization-sha' };
}
return { created: true, commitHash: head };
}),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'post-finalization' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockResolvedValue(undefined),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockImplementation(async (from: string, to: string) =>
from === 'pre-finalization' && to === 'post-finalization'
? [{ status: 'M', path: 'semantic-layer/warehouse-2/orders.yaml' }]
: [],
),
changedPaths: vi.fn().mockResolvedValue(['semantic-layer/warehouse-2/orders.yaml']),
};
const runner = buildRunner(deps, { postProcessors: { metricflow: postProcessor } });
deps.sessionWorktreeService.create.mockResolvedValue({
chatId: 'j1',
workdir: '/tmp/wt',
branch: 'session/j1',
baseSha: 'b',
createdAt: new Date(),
git,
config: {},
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['semantic_models.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/metricflow/s',
@ -1609,26 +1653,29 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(postProcessor.run).toHaveBeenCalledWith({
connectionId: 'c1',
sourceKey: 'metricflow',
syncId: expect.any(String),
jobId: 'j1',
runId: 'run-1',
workdir: '/tmp/wt',
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
expect(deps.adapter.finalize).toHaveBeenCalledWith(
expect.objectContaining({
connectionId: 'c1',
sourceKey: 'metricflow',
syncId: expect.any(String),
jobId: 'j1',
runId: 'run-1',
workdir: '/tmp/wt',
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
}),
);
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
postProcessor: {
finalization: expect.objectContaining({
sourceKey: 'metricflow',
status: 'success',
result: { sourcesCreated: 1 },
warnings: ['kept going'],
errors: [],
touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
},
commitSha: 'finalization-sha',
touchedPaths: ['semantic-layer/warehouse-2/orders.yaml'],
derivedTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
declaredTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
actions: [expect.objectContaining({ key: 'orders' })],
}),
}),
}),
);
@ -1637,7 +1684,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
});
it('includes historic-sql post-processor output in memory-flow saved counts', async () => {
it('includes finalization actions in memory-flow saved counts', async () => {
const deps = makeDeps();
deps.adapter.source = 'historic-sql';
deps.registry.get.mockReturnValue(deps.adapter);
@ -1651,21 +1698,19 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
},
],
});
const postProcessor = {
run: vi.fn().mockResolvedValue({
result: {
tableUsageMerged: 2,
staleTablesMarked: 1,
patternPagesWritten: 3,
stalePatternPagesMarked: 1,
archivedPatternPages: 1,
},
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'c1', sourceName: 'orders' }],
}),
};
const runner = buildRunner(deps, { postProcessors: { 'historic-sql': postProcessor } });
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [
{ target: 'sl', type: 'updated', key: 'orders', detail: 'Merged usage' },
{ target: 'sl', type: 'updated', key: 'customers', detail: 'Merged usage' },
{ target: 'wiki', type: 'created', key: 'historic-sql-orders', detail: 'Projected pattern' },
{ target: 'wiki', type: 'updated', key: 'historic-sql-customers', detail: 'Projected pattern' },
],
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['tables/public/orders.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/historic-sql/s',
@ -1691,13 +1736,13 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(memoryFlow.snapshot().events).toContainEqual(
expect.objectContaining({
type: 'saved',
wikiCount: 5,
slCount: 3,
wikiCount: 2,
slCount: 2,
}),
);
});
it('marks post-processor infrastructure failure as failed and preserves worktree cleanup state', async () => {
it('marks finalization infrastructure failure as failed and preserves worktree cleanup state', async () => {
const deps = makeDeps();
deps.adapter.source = 'metricflow';
deps.registry.get.mockReturnValue(deps.adapter);
@ -1705,8 +1750,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }],
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
const postProcessor = { run: vi.fn().mockRejectedValue(new Error('worktree write failed')) };
const runner = buildRunner(deps, { postProcessors: { metricflow: postProcessor } });
deps.adapter.finalize = vi.fn().mockRejectedValue(new Error('worktree write failed'));
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['semantic_models.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/metricflow/s',
@ -1728,6 +1773,132 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'crash');
});
it('reports finalization actions excluded from provenance when raw paths are not defensible', async () => {
const deps = makeDeps();
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [
{ target: 'wiki', type: 'updated', key: 'historic-sql-pattern', detail: 'No raw path' },
{ target: 'sl', type: 'updated', key: 'orders', detail: 'Invalid raw path', rawPaths: ['missing.json'] },
],
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['current.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
finalization: expect.objectContaining({
provenanceExclusions: [
expect.objectContaining({ reason: 'missing_raw_paths' }),
expect.objectContaining({ reason: 'raw_path_not_defensible', invalidRawPaths: ['missing.json'] }),
],
}),
}),
}),
);
expect(deps.provenanceRepo.insertMany).not.toHaveBeenCalledWith(
expect.arrayContaining([expect.objectContaining({ rawPath: 'missing.json' })]),
);
});
it('passes explicit override replay metadata and no current work unit outcomes', async () => {
const deps = makeDeps();
deps.reportsRepo.findByJobId.mockResolvedValue({
id: 'prior-report',
runId: 'prior-run',
jobId: 'prior-job',
connectionId: 'c1',
sourceKey: 'fake',
createdAt: '2026-05-18T00:00:00.000Z',
body: {
status: 'completed',
syncId: 'prior-sync',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
commitSha: 'prior-sha',
workUnits: [
{
unitKey: 'prior-unit',
rawFiles: ['prior.json'],
status: 'success',
actions: [{ target: 'wiki', type: 'created', key: 'prior', detail: 'prior' }],
touchedSlSources: [],
},
],
failedWorkUnits: [],
reconciliationSkipped: false,
conflictsResolved: [],
evictionsApplied: [
{
rawPath: 'do-not-replay.json',
artifactKind: 'wiki',
artifactKey: 'old',
action: 'removed',
reason: 'prior',
},
],
unmappedFallbacks: [],
artifactResolutions: [],
evictionInputs: ['evicted-from-prior-report.json'],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
},
});
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [],
});
deps.gitService.listFilesAtHead.mockResolvedValue(['raw-sources/c1/fake/prior-sync/prior.json']);
deps.gitService.getFileAtCommit.mockResolvedValue('{"id":1}\n');
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['prior.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/prior-sync',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/prior');
await runner.run({
jobId: 'override-job',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'manual_override',
bundleRef: { kind: 'override', priorJobId: 'prior-job' },
});
expect(deps.adapter.finalize).toHaveBeenCalledWith(
expect.objectContaining({
workUnitOutcomes: [],
overrideReplay: {
priorJobId: 'prior-job',
priorRunId: 'prior-run',
priorSyncId: 'prior-sync',
evictionRawPaths: ['evicted-from-prior-report.json'],
},
}),
);
});
it('includes existing global wiki pages in WorkUnit prompts', async () => {
const deps = makeDeps();
deps.knowledgeIndex.listPagesForUser.mockResolvedValue([

View file

@ -14,6 +14,11 @@ import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/
import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js';
import { selectRelevantCanonicalPins } from './canonical-pins.js';
import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js';
import {
compareFinalizationDeclarations,
deriveFinalizationTouchedSources,
deriveFinalizationWikiPageKeys,
} from './finalization-scope.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js';
@ -33,8 +38,8 @@ import type {
import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js';
import {
buildStageIndexFromReportBody,
postProcessorSavedMemoryCounts,
type IngestReportPostProcessorOutcome,
type IngestReportFinalizationProvenanceExclusion,
type IngestReportFinalizationOutcome,
type IngestReportProvenanceDetail,
type IngestReportSnapshot,
type IngestReportWorkUnit,
@ -174,6 +179,11 @@ type ProvenanceRowOrigin =
actionIndex: number;
action: MemoryAction;
}
| {
source: 'finalization_action';
actionIndex: number;
action: MemoryAction;
}
| {
source: 'artifact_resolution';
resolutionIndex: number;
@ -411,6 +421,19 @@ export class IngestBundleRunner {
return false;
}
private async loadSourcesByConnection(
workdir: string,
connectionIds: string[],
): Promise<Map<string, SemanticLayerSource[]>> {
const service = this.deps.semanticLayerService.forWorktree(workdir);
const result = new Map<string, SemanticLayerSource[]>();
for (const connectionId of connectionIds) {
const { sources } = await service.loadAllSources(connectionId);
result.set(connectionId, sources);
}
return result;
}
private resolveContextCuratorBudget(
bundleRef: IngestBundleJob['bundleRef'],
stageIndex: StageIndex,
@ -466,6 +489,7 @@ export class IngestBundleRunner {
currentHashes: Map<string, string>;
stageIndex: StageIndex;
reconcileActions: MemoryAction[];
finalizationActions: MemoryAction[];
}): ProvenancePlan {
const rows: IngestProvenanceInsert[] = [];
const diagnostics: ProvenanceRowDiagnostic[] = [];
@ -523,6 +547,15 @@ export class IngestBundleRunner {
});
}
});
input.finalizationActions.forEach((action, actionIndex) => {
for (const rawPath of action.rawPaths ?? []) {
pushActionProvenance(rawPath, action, {
source: 'finalization_action',
actionIndex,
action,
});
}
});
(input.stageIndex.artifactResolutions ?? []).forEach((resolution, resolutionIndex) => {
const hash = input.currentHashes.get(resolution.rawPath) ?? '';
pushRow(
@ -569,6 +602,35 @@ export class IngestBundleRunner {
return { rows, diagnostics };
}
private partitionFinalizationActionsForProvenance(input: {
actions: MemoryAction[];
currentRawPaths: Set<string>;
currentEvictionRawPaths: Set<string>;
overrideEvictionRawPaths: Set<string>;
}): { actions: MemoryAction[]; exclusions: IngestReportFinalizationProvenanceExclusion[] } {
const defensible = new Set([
...input.currentRawPaths,
...input.currentEvictionRawPaths,
...input.overrideEvictionRawPaths,
]);
const actions: MemoryAction[] = [];
const exclusions: IngestReportFinalizationProvenanceExclusion[] = [];
for (const action of input.actions) {
const rawPaths = action.rawPaths ?? [];
if (rawPaths.length === 0) {
exclusions.push({ action, reason: 'missing_raw_paths' });
continue;
}
const invalidRawPaths = rawPaths.filter((rawPath) => !defensible.has(rawPath)).sort();
if (invalidRawPaths.length > 0) {
exclusions.push({ action, reason: 'raw_path_not_defensible', invalidRawPaths });
continue;
}
actions.push(action);
}
return { actions, exclusions };
}
private toReportProvenanceRows(rows: IngestProvenanceInsert[]): IngestReportProvenanceDetail[] {
return rows.map(({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({
rawPath,
@ -951,6 +1013,7 @@ export class IngestBundleRunner {
let latestEvictionInputs: string[] = [];
let latestUnresolvedCards: UnresolvedCardInfo[] = [];
let latestReportProvenanceRows: IngestReportProvenanceDetail[] = [];
let latestFinalizationOutcome: IngestReportFinalizationOutcome | undefined;
let activeFailureDetails: Record<string, unknown> | undefined;
let latestIsolatedDiffSummary:
| {
@ -1174,7 +1237,7 @@ export class IngestBundleRunner {
let unresolvedCards: UnresolvedCardInfo[] | undefined;
let sourceContextReport: { capped?: boolean; warnings?: string[] } | undefined;
let parseArtifacts: unknown;
let postProcessorOutcome: IngestReportPostProcessorOutcome | undefined;
let finalizationOutcome: IngestReportFinalizationOutcome | undefined;
let wikiSlRefRepairResult: WikiSlRefRepairResult | null = null;
let reconcileNotes: string[] = [];
let triageResult: PageTriageRunResult | null = null;
@ -1954,62 +2017,215 @@ export class IngestBundleRunner {
await stage4?.updateProgress(1.0, reconcileOutcome.skipped ? 'No reconciliation needed' : 'Reconciled');
const postProcessor = this.deps.postProcessors?.[job.sourceKey];
activePhase = 'post_processor';
if (postProcessor) {
const stagePostProcessor = ctx?.startPhase(0.04);
emitStageProgress('post_processor', 87, 'Running deterministic imports');
await stagePostProcessor?.updateProgress(0.0, 'Running deterministic imports');
try {
const result = await traceTimed(
runTrace,
'post_processor',
'post_processor',
{ sourceKey: job.sourceKey },
() =>
postProcessor.run({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: createdRunRow.id,
workdir: sessionWorktree.workdir,
parseArtifacts,
}),
);
postProcessorOutcome = {
const preFinalizationSha = await sessionWorktree.git.revParseHead();
const preFinalizationSourcesByConnection = await this.loadSourcesByConnection(
sessionWorktree.workdir,
slConnectionIds,
);
let finalizationActions: MemoryAction[] = [];
let finalizationTouchedPaths: string[] = [];
let finalizationTouchedSources: TouchedSlSource[] = [];
let finalizationChangedWikiPageKeys: string[] = [];
let finalizationSha: string | null = null;
activePhase = 'finalization';
if (adapter.finalize) {
const stageFinalization = ctx?.startPhase(0.04);
emitStageProgress('finalization', 87, 'Running deterministic finalization');
await stageFinalization?.updateProgress(0.0, 'Running deterministic finalization');
await runTrace.event('debug', 'finalization', 'finalization_started', { sourceKey: job.sourceKey });
const result = await adapter.finalize({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: createdRunRow.id,
stagedDir,
workdir: sessionWorktree.workdir,
...(overrideReport ? {} : { parseArtifacts }),
stageIndex,
workUnitOutcomes,
reconciliationActions: reconcileActions,
...(overrideReport
? {
overrideReplay: {
priorJobId: overrideReport.jobId,
priorRunId: overrideReport.runId,
priorSyncId: overrideReport.body.syncId,
evictionRawPaths: overrideReport.body.evictionInputs,
},
}
: {}),
});
if (result.errors.length > 0) {
finalizationOutcome = {
sourceKey: job.sourceKey,
status: result.errors.length > 0 && result.touchedSources.length === 0 ? 'failed' : 'success',
status: 'failed',
commitSha: null,
touchedPaths: [],
declaredTouchedSources: result.touchedSources,
derivedTouchedSources: [],
declaredChangedWikiPageKeys: result.changedWikiPageKeys,
derivedChangedWikiPageKeys: [],
mismatches: [],
result: result.result,
errors: result.errors,
warnings: result.warnings,
touchedSources: result.touchedSources,
actions: result.actions ?? [],
provenanceExclusions: [],
};
emitStageProgress('post_processor', 88, 'Deterministic imports complete');
await stagePostProcessor?.updateProgress(1.0, 'Deterministic imports complete');
} catch (error) {
postProcessorOutcome = {
latestFinalizationOutcome = finalizationOutcome;
await runTrace.event('error', 'finalization', 'finalization_failed', {
sourceKey: job.sourceKey,
errors: result.errors,
warnings: result.warnings,
});
throw new Error(`deterministic finalization failed: ${result.errors.join('; ')}`);
}
const changedBeforeFinalization = new Set([
...projectionTouchedPaths,
...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []),
...(preReconciliationSha && preFinalizationSha !== preReconciliationSha
? (await sessionWorktree.git.diffNameStatus(preReconciliationSha, preFinalizationSha)).map(
(entry) => entry.path,
)
: []),
]);
finalizationTouchedPaths = await sessionWorktree.git.changedPaths();
const overlapping = finalizationTouchedPaths.filter((path) => changedBeforeFinalization.has(path));
if (overlapping.length > 0) {
await runTrace.event('error', 'finalization', 'finalization_failed', {
sourceKey: job.sourceKey,
reason: 'path_overlap',
overlappingPaths: overlapping.sort(),
});
throw new Error(
`finalization modified path(s) already changed earlier in this run: ${overlapping.sort().join(', ')}`,
);
}
const finalizationCommit =
finalizationTouchedPaths.length > 0
? await sessionWorktree.git.commitFiles(
finalizationTouchedPaths,
`ingest(${job.sourceKey}): deterministic finalization syncId=${syncId}`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
)
: await sessionWorktree.git.commitStaged(
`ingest(${job.sourceKey}): deterministic finalization syncId=${syncId}`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
);
finalizationSha = finalizationCommit.created ? finalizationCommit.commitHash : null;
const postFinalizationSha = await sessionWorktree.git.revParseHead();
finalizationTouchedPaths =
preFinalizationSha !== postFinalizationSha
? (await sessionWorktree.git.diffNameStatus(preFinalizationSha, postFinalizationSha)).map(
(entry) => entry.path,
)
: [];
const changedConnectionIds = [
...new Set([
...slConnectionIds,
...finalizationTouchedPaths
.filter((path) => path.startsWith('semantic-layer/'))
.map((path) => path.split('/')[1])
.filter((connectionId): connectionId is string => Boolean(connectionId)),
]),
].sort();
const postFinalizationSourcesByConnection = await this.loadSourcesByConnection(
sessionWorktree.workdir,
changedConnectionIds,
);
const scope = await deriveFinalizationTouchedSources({
changedPaths: finalizationTouchedPaths,
beforeSourcesByConnection: preFinalizationSourcesByConnection,
afterSourcesByConnection: postFinalizationSourcesByConnection,
});
if (scope.unresolvedPaths.length > 0) {
await runTrace.event('error', 'finalization', 'finalization_failed', {
sourceKey: job.sourceKey,
reason: 'unresolved_semantic_layer_paths',
unresolvedPaths: scope.unresolvedPaths,
});
throw new Error(`could not resolve finalization semantic-layer path(s): ${scope.unresolvedPaths.join(', ')}`);
}
finalizationTouchedSources = scope.touchedSources;
finalizationChangedWikiPageKeys = deriveFinalizationWikiPageKeys(finalizationTouchedPaths);
const mismatches = compareFinalizationDeclarations({
declaredTouchedSources: result.touchedSources,
derivedTouchedSources: finalizationTouchedSources,
declaredChangedWikiPageKeys: result.changedWikiPageKeys,
derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys,
});
if (mismatches.length > 0) {
finalizationOutcome = {
sourceKey: job.sourceKey,
status: 'failed',
errors: [error instanceof Error ? error.message : String(error)],
warnings: [],
touchedSources: [],
commitSha: finalizationSha,
touchedPaths: finalizationTouchedPaths,
declaredTouchedSources: result.touchedSources,
derivedTouchedSources: finalizationTouchedSources,
declaredChangedWikiPageKeys: result.changedWikiPageKeys,
derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys,
mismatches,
result: result.result,
errors: ['finalization touched artifact declaration mismatch'],
warnings: result.warnings,
actions: result.actions ?? [],
provenanceExclusions: [],
};
await this.deps.runs.markFailed(runRow.id);
throw error;
latestFinalizationOutcome = finalizationOutcome;
await runTrace.event('error', 'finalization', 'finalization_failed', {
sourceKey: job.sourceKey,
reason: 'declaration_mismatch',
mismatches,
});
throw new Error(
`finalization touched artifact declaration mismatch: ${mismatches
.map((mismatch) => `${mismatch.direction}:${mismatch.artifactKind}:${mismatch.key}`)
.join(', ')}`,
);
}
finalizationActions = result.actions ?? [];
finalizationOutcome = {
sourceKey: job.sourceKey,
status: 'success',
commitSha: finalizationSha,
touchedPaths: finalizationTouchedPaths,
declaredTouchedSources: result.touchedSources,
derivedTouchedSources: finalizationTouchedSources,
declaredChangedWikiPageKeys: result.changedWikiPageKeys,
derivedChangedWikiPageKeys: finalizationChangedWikiPageKeys,
mismatches,
result: result.result,
errors: [],
warnings: result.warnings,
actions: finalizationActions,
provenanceExclusions: [],
};
latestFinalizationOutcome = finalizationOutcome;
emitStageProgress('finalization', 88, 'Deterministic finalization complete');
await stageFinalization?.updateProgress(1.0, 'Deterministic finalization complete');
await runTrace.event('debug', 'finalization', 'finalization_committed', {
sourceKey: job.sourceKey,
commitSha: finalizationSha,
touchedPaths: finalizationTouchedPaths,
touchedSources: finalizationTouchedSources,
changedWikiPageKeys: finalizationChangedWikiPageKeys,
warnings: result.warnings,
});
} else {
await runTrace.event('debug', 'finalization', 'finalization_skipped', { sourceKey: job.sourceKey });
}
await runTrace.event('debug', 'post_processor', 'post_processor_finished', {
sourceKey: job.sourceKey,
status: postProcessorOutcome?.status ?? 'skipped',
touchedSources: postProcessorOutcome?.touchedSources ?? [],
warnings: postProcessorOutcome?.warnings ?? [],
});
const repairConnectionIds = [
...new Set([
...slConnectionIds,
...(postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId),
...finalizationTouchedSources.map((source) => source.connectionId),
]),
].sort();
activePhase = 'wiki_sl_ref_repair';
@ -2044,6 +2260,7 @@ export class IngestBundleRunner {
.flatMap((outcome) => outcome.patchTouchedPaths ?? [])
.flatMap((path) => this.wikiPageKeysFromPaths([path])),
...this.wikiPageKeysFromActions(reconcileActions),
...finalizationChangedWikiPageKeys,
...postReconciliationPaths.flatMap((path) => this.wikiPageKeysFromPaths([path])),
...wikiSlRefRepairResult.repairs.filter((repair) => repair.scope === 'GLOBAL').map((repair) => repair.pageKey),
]);
@ -2052,7 +2269,7 @@ export class IngestBundleRunner {
...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources),
...this.touchedSlSourcesFromActions(reconcileActions, job.connectionId),
...this.touchedSlSourcesFromPaths(postReconciliationPaths),
...(postProcessorOutcome?.touchedSources ?? []),
...finalizationTouchedSources,
]);
const finalWikiGateScope = await this.wikiPageKeysForFinalGates({
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
@ -2066,9 +2283,7 @@ export class IngestBundleRunner {
...projectionTouchedPaths,
...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []),
...postReconciliationPaths,
...(postProcessorOutcome?.touchedSources ?? []).map(
(source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`,
),
...finalizationTouchedPaths,
];
const targetPolicyTraceData = {
allowedTargetConnectionIds: slConnectionIds,
@ -2235,12 +2450,23 @@ export class IngestBundleRunner {
latestArtifactResolutions = stageIndex.artifactResolutions ?? [];
latestEvictionInputs = eviction?.deletedRawPaths ?? [];
latestUnresolvedCards = unresolvedCards ?? [];
const finalizationProvenance = this.partitionFinalizationActionsForProvenance({
actions: finalizationActions,
currentRawPaths: new Set(currentHashes.keys()),
currentEvictionRawPaths: new Set(stageIndex.evictionsApplied.map((entry) => entry.rawPath)),
overrideEvictionRawPaths: new Set(overrideReport?.body.evictionInputs ?? []),
});
if (finalizationOutcome) {
finalizationOutcome.provenanceExclusions = finalizationProvenance.exclusions;
latestFinalizationOutcome = finalizationOutcome;
}
const provenancePlan = this.buildProvenancePlan({
job,
syncId,
currentHashes,
stageIndex,
reconcileActions,
finalizationActions: finalizationProvenance.actions,
});
const provenanceRows = provenancePlan.rows;
const currentRawPaths = new Set(currentHashes.keys());
@ -2300,13 +2526,15 @@ export class IngestBundleRunner {
commitSha,
touchedPaths: mergeResult.touchedPaths,
});
const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions);
const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome);
const memoryFlowSavedActions = stageIndex.workUnits
.flatMap((wu) => wu.actions)
.concat(reconcileActions)
.concat(finalizationActions);
memoryFlow?.emit({
type: 'saved',
commitSha,
wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki') + postProcessorMemoryCounts.wikiCount,
slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl') + postProcessorMemoryCounts.slCount,
wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki'),
slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl'),
});
await stage6?.updateProgress(1.0, commitSha ? `Saved changes (${commitSha.slice(0, 8)})` : 'No changes to save');
@ -2325,7 +2553,7 @@ export class IngestBundleRunner {
memoryFlowSavedActions
.filter((action) => action.target === 'sl')
.map((action) => actionTargetConnectionId(action, job.connectionId))
.concat((postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId)),
.concat(finalizationTouchedSources.map((source) => source.connectionId)),
),
].sort();
for (const connectionId of touchedConnections) {
@ -2416,7 +2644,7 @@ export class IngestBundleRunner {
overrideOf: overrideReport?.jobId ?? null,
provenanceRows: reportProvenanceRows,
toolTranscripts: reportToolTranscripts,
postProcessor: postProcessorOutcome,
finalization: finalizationOutcome,
wikiSlRefRepairs: wikiSlRefRepairResult.repairs,
wikiSlRefRepairWarnings: wikiSlRefRepairResult.warnings,
...(reportMemoryFlow ? { memoryFlow: reportMemoryFlow } : {}),
@ -2585,6 +2813,7 @@ export class IngestBundleRunner {
artifactResolutions: latestArtifactResolutions,
evictionInputs: latestEvictionInputs,
reconciliationActions: latestReconciliationActions,
finalization: latestFinalizationOutcome,
evictionDecisions: [],
unresolvedCards: latestUnresolvedCards,
supersededBy: null,

View file

@ -548,11 +548,12 @@ describe('canonical local ingest', () => {
});
expect(result.result.failedWorkUnits).toEqual([]);
expect(result.report.body.postProcessor).toMatchObject({
expect(result.report.body.finalization).toMatchObject({
sourceKey: 'historic-sql',
status: 'success',
result: { tableUsageMerged: 1 },
touchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
declaredTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
derivedTouchedSources: [{ connectionId: 'warehouse', sourceName: 'orders' }],
});
await expect(readFile(join(projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves.toContain(
'Orders are repeatedly queried by lifecycle status.',

View file

@ -72,7 +72,6 @@ import {
CuratorPaginationService,
} from './context-candidates/index.js';
import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js';
import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js';
import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js';
import { DiffSetService } from './diff-set.service.js';
import { ingestTracePathForJob, type IngestTraceLevel } from './ingest-trace.js';
@ -771,9 +770,6 @@ export function createLocalBundleIngestRuntime(
settings: { batchSize: 8, maxPasses: 8, stepBudgetPerPass: 60 },
logger,
}),
postProcessors: {
'historic-sql': new HistoricSqlProjectionPostProcessor(),
},
logger,
};

View file

@ -321,13 +321,6 @@ async function recordLocalMetabaseChildFailure(options: {
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
postProcessor: {
sourceKey: 'metabase',
status: 'failed',
errors: [reason],
warnings: [],
touchedSources: [],
},
};
const report = await store.create({

View file

@ -59,7 +59,7 @@ export const memoryFlowEventSchema = z.discriminatedUnion('type', [
'source',
'integration',
'reconciliation',
'post_processor',
'finalization',
'wiki_sl_ref_repair',
'final_gates',
'save',

View file

@ -50,7 +50,7 @@ type MemoryFlowEventPayload =
| 'source'
| 'integration'
| 'reconciliation'
| 'post_processor'
| 'finalization'
| 'wiki_sl_ref_repair'
| 'final_gates'
| 'save'

View file

@ -13,7 +13,7 @@ import type {
SlValidationDeps,
SlValidatorPort,
} from '../sl/index.js';
import type { ToolContext, ToolSession, TouchedSlSource } from '../tools/index.js';
import type { ToolContext, ToolSession } from '../tools/index.js';
import type { KnowledgeIndexPort, KnowledgeWikiService } from '../wiki/index.js';
import type { CanonicalPin } from './canonical-pins.js';
import type { IngestTraceLevel } from './ingest-trace.js';
@ -323,27 +323,6 @@ export interface CuratorPaginationPort {
}): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>;
}
export interface IngestBundlePostProcessorInput {
connectionId: string;
sourceKey: string;
syncId: string;
jobId: string;
runId: string;
workdir: string;
parseArtifacts: unknown;
}
export interface IngestBundlePostProcessorResult {
result?: unknown;
warnings: string[];
errors: string[];
touchedSources: TouchedSlSource[];
}
export interface IngestBundlePostProcessorPort {
run(input: IngestBundlePostProcessorInput): Promise<IngestBundlePostProcessorResult>;
}
export interface IngestBundleRunnerDeps {
runs: IngestRunsPort;
provenance: IngestProvenancePort;
@ -377,7 +356,6 @@ export interface IngestBundleRunnerDeps {
candidateDedup?: CandidateDedupPort;
contextCandidateCarryforward?: ContextCandidateCarryforwardPort;
curatorPagination?: CuratorPaginationPort;
postProcessors?: Record<string, IngestBundlePostProcessorPort>;
logger?: KtxLogger;
}

View file

@ -129,6 +129,35 @@ const ingestReportFailureSchema = z.object({
details: z.record(z.string(), z.unknown()).optional(),
});
const finalizationMismatchSchema = z.object({
artifactKind: z.enum(['sl', 'wiki']),
key: z.string().min(1),
direction: z.enum(['missing_from_adapter_declaration', 'extra_in_adapter_declaration']),
});
const finalizationProvenanceExclusionSchema = z.object({
action: ingestActionSchema,
reason: z.enum(['missing_raw_paths', 'raw_path_not_defensible']),
invalidRawPaths: z.array(z.string()).optional(),
});
const finalizationOutcomeSchema = z.object({
sourceKey: z.string().min(1),
status: z.enum(['success', 'failed', 'skipped']),
commitSha: z.string().nullable(),
touchedPaths: z.array(z.string()),
declaredTouchedSources: z.array(touchedSlSourceSchema),
derivedTouchedSources: z.array(touchedSlSourceSchema),
declaredChangedWikiPageKeys: z.array(z.string()),
derivedChangedWikiPageKeys: z.array(z.string()),
mismatches: z.array(finalizationMismatchSchema).default([]),
result: z.unknown().optional(),
errors: z.array(z.string()),
warnings: z.array(z.string()),
actions: z.array(ingestActionSchema).default([]),
provenanceExclusions: z.array(finalizationProvenanceExclusionSchema).default([]),
});
export const ingestReportSnapshotSchema = z
.object({
id: z.string().min(1),
@ -188,6 +217,7 @@ export const ingestReportSnapshotSchema = z
overrideOf: z.string().nullable().default(null),
provenanceRows: z.array(provenanceDetailSchema).default([]),
toolTranscripts: z.array(toolTranscriptSummarySchema).default([]),
finalization: finalizationOutcomeSchema.optional(),
memoryFlow: memoryFlowReplayInputSchema.optional(),
})
.passthrough(),

View file

@ -39,13 +39,33 @@ export interface IngestReportToolTranscriptSummary {
toolNames: string[];
}
export interface IngestReportPostProcessorOutcome {
export interface IngestReportFinalizationMismatch {
artifactKind: 'sl' | 'wiki';
key: string;
direction: 'missing_from_adapter_declaration' | 'extra_in_adapter_declaration';
}
export interface IngestReportFinalizationProvenanceExclusion {
action: MemoryAction;
reason: 'missing_raw_paths' | 'raw_path_not_defensible';
invalidRawPaths?: string[];
}
export interface IngestReportFinalizationOutcome {
sourceKey: string;
status: 'success' | 'failed';
status: 'success' | 'failed' | 'skipped';
commitSha: string | null;
touchedPaths: string[];
declaredTouchedSources: TouchedSlSource[];
derivedTouchedSources: TouchedSlSource[];
declaredChangedWikiPageKeys: string[];
derivedChangedWikiPageKeys: string[];
mismatches: IngestReportFinalizationMismatch[];
result?: unknown;
errors: string[];
warnings: string[];
touchedSources: TouchedSlSource[];
actions: MemoryAction[];
provenanceExclusions: IngestReportFinalizationProvenanceExclusion[];
}
export interface IngestReportFailure {
@ -94,7 +114,7 @@ export interface IngestReportBody {
overrideOf: string | null;
provenanceRows: IngestReportProvenanceDetail[];
toolTranscripts: IngestReportToolTranscriptSummary[];
postProcessor?: IngestReportPostProcessorOutcome;
finalization?: IngestReportFinalizationOutcome;
wikiSlRefRepairs?: WikiSlRefRepair[];
wikiSlRefRepairWarnings?: string[];
memoryFlow?: MemoryFlowReplayInput;
@ -115,44 +135,25 @@ export interface IngestSavedMemoryCounts {
slCount: number;
}
function numericResultField(result: Record<string, unknown>, field: string): number {
const value = result[field];
return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : 0;
}
export function postProcessorSavedMemoryCounts(
postProcessor: IngestReportPostProcessorOutcome | undefined,
export function finalizationSavedMemoryCounts(
finalization: IngestReportFinalizationOutcome | undefined,
): IngestSavedMemoryCounts {
if (!postProcessor || postProcessor.sourceKey !== 'historic-sql') {
return { wikiCount: 0, slCount: 0 };
}
const result = postProcessor.result;
if (!result || typeof result !== 'object' || Array.isArray(result)) {
return { wikiCount: 0, slCount: 0 };
}
const record = result as Record<string, unknown>;
const actions = finalization?.actions ?? [];
return {
wikiCount:
numericResultField(record, 'patternPagesWritten') +
numericResultField(record, 'stalePatternPagesMarked') +
numericResultField(record, 'archivedPatternPages'),
slCount: numericResultField(record, 'tableUsageMerged') + numericResultField(record, 'staleTablesMarked'),
wikiCount: actions.filter((action) => action.target === 'wiki').length,
slCount: actions.filter((action) => action.target === 'sl').length,
};
}
export function savedMemoryCountsForReport(report: IngestReportSnapshot): IngestSavedMemoryCounts {
const workUnitActions = report.body.workUnits.flatMap((workUnit) => workUnit.actions);
const reconciliationActions = report.body.reconciliationActions ?? [];
const actions = [...workUnitActions, ...reconciliationActions];
const directCounts = {
const finalizationActions = report.body.finalization?.actions ?? [];
const actions = [...workUnitActions, ...reconciliationActions, ...finalizationActions];
return {
wikiCount: actions.filter((action) => action.target === 'wiki').length,
slCount: actions.filter((action) => action.target === 'sl').length,
};
const postProcessorCounts = postProcessorSavedMemoryCounts(report.body.postProcessor);
return {
wikiCount: directCounts.wikiCount + postProcessorCounts.wikiCount,
slCount: directCounts.slCount + postProcessorCounts.slCount,
};
}
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {

View file

@ -1,6 +1,10 @@
import type { KtxEmbeddingPort } from '../core/embedding.js';
import type { MemoryAction } from '../memory/index.js';
import type { SemanticLayerService } from '../sl/index.js';
import type { TouchedSlSource } from '../tools/index.js';
import type { MemoryFlowEventSink } from './memory-flow/types.js';
import type { StageIndex } from './stages/stage-index.types.js';
import type { WorkUnitOutcome } from './stages/stage-3-work-units.js';
export type IngestTrigger = 'upload' | 'scheduled_pull' | 'manual_resync' | 'manual_override';
@ -118,6 +122,37 @@ export interface ProjectionResult {
result?: unknown;
}
export interface FinalizationOverrideReplay {
priorJobId: string;
priorRunId: string;
priorSyncId: string;
evictionRawPaths: string[];
}
export interface DeterministicFinalizationContext {
connectionId: string;
sourceKey: string;
syncId: string;
jobId: string;
runId: string;
stagedDir: string;
workdir: string;
parseArtifacts?: unknown;
stageIndex: StageIndex;
workUnitOutcomes: WorkUnitOutcome[];
reconciliationActions: MemoryAction[];
overrideReplay?: FinalizationOverrideReplay;
}
export interface FinalizationResult {
warnings: string[];
errors: string[];
touchedSources: TouchedSlSource[];
changedWikiPageKeys: string[];
actions?: MemoryAction[];
result?: unknown;
}
export interface SourceAdapter {
readonly source: string;
readonly skillNames: string[];
@ -132,6 +167,7 @@ export interface SourceAdapter {
chunk(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult>;
clusterWorkUnits?(ctx: ClusterWorkUnitsContext): Promise<WorkUnit[]>;
project?(ctx: DeterministicProjectionContext): Promise<ProjectionResult>;
finalize?(ctx: DeterministicFinalizationContext): Promise<FinalizationResult>;
describeScope?(stagedDir: string): Promise<ScopeDescriptor>;
onPullSucceeded?(ctx: {
connectionId: string;

View file

@ -243,7 +243,6 @@ describe('@ktx/context package exports', () => {
expect(ingest.historicSqlEvidenceEnvelopeSchema).toBeDefined();
expect(ingest.historicSqlEvidencePath).toBeTypeOf('function');
expect(ingest.createEmitHistoricSqlEvidenceTool).toBeTypeOf('function');
expect(ingest.HistoricSqlProjectionPostProcessor).toBeTypeOf('function');
expect(ingest.SqliteContextEvidenceStore).toBeTypeOf('function');
expect(ingest.SqliteBundleIngestStore).toBeTypeOf('function');
expect(ingest.CuratorPaginationService).toBeTypeOf('function');