fix(ingest): attribute historic-sql evidence writes in bundle report (#220)

The emit_historic_sql_evidence tool took rawPath as LLM-supplied input,
so projection actions frequently lacked defensible raw paths and every
row in bundle_ingest_reports fell through as actionType: 'skipped' with
null artifact metadata, hiding the wiki pages and SL merges the run had
actually produced (KLO-698).

The tool now reads the work unit's rawFiles from session.allowedRawPaths
and stores them on the evidence envelope; the projection emits actions
with those paths, and stale/archive actions are anchored to manifest.json
so they also surface as non-skipped provenance rows.
This commit is contained in:
Andrey Avtomonov 2026-05-26 12:21:53 +02:00 committed by GitHub
parent 2a6fb19ba4
commit 1071f9d1c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 99 additions and 31 deletions

View file

@ -60,7 +60,7 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe
dependencyPaths: ['manifest.json'],
peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(),
notes:
`Use historic_sql_patterns. Read ${path} and emit pattern objects with emit_historic_sql_evidence using rawPath "${path}". Do not call wiki_write or sl_write_source.`,
`Use historic_sql_patterns. Read ${path} and emit pattern objects with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.`,
});
}

View file

@ -10,7 +10,6 @@ const emitHistoricSqlEvidenceInputSchema = z
.object({
kind: z.enum(['table_usage', 'pattern']),
table: z.string().min(1).optional(),
rawPath: z.string().min(1),
usage: tableUsageOutputSchema.optional(),
pattern: patternOutputSchema.optional(),
})
@ -46,6 +45,7 @@ interface EmitHistoricSqlEvidenceToolContext {
connectionId?: string | null;
session?: {
ingest?: { runId: string; sourceKey: string };
allowedRawPaths?: ReadonlySet<string>;
configService?: {
writeFile(
path: string,
@ -66,7 +66,7 @@ function unitKeyForEvidence(input: EmitHistoricSqlEvidenceInput): string {
return `historic-sql-pattern-${String(input.pattern?.slug).replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '')}`;
}
function evidenceEnvelope(input: EmitHistoricSqlEvidenceInput, connectionId: string) {
function evidenceEnvelope(input: EmitHistoricSqlEvidenceInput, connectionId: string, rawPaths: string[]) {
if (input.kind === 'table_usage') {
if (!input.table || !input.usage) {
throw new Error('Invalid historic-SQL table usage evidence input.');
@ -75,7 +75,7 @@ function evidenceEnvelope(input: EmitHistoricSqlEvidenceInput, connectionId: str
kind: 'table_usage' as const,
connectionId,
table: input.table,
rawPath: input.rawPath,
rawPaths,
usage: input.usage,
};
}
@ -85,7 +85,7 @@ function evidenceEnvelope(input: EmitHistoricSqlEvidenceInput, connectionId: str
return {
kind: 'pattern' as const,
connectionId,
rawPath: input.rawPath,
rawPaths,
pattern: input.pattern,
};
}
@ -102,9 +102,13 @@ export function createEmitHistoricSqlEvidenceTool(defaultContext?: EmitHistoricS
if (!ingest || ingest.sourceKey !== 'historic-sql' || !configService || !context?.connectionId) {
return 'Error: emit_historic_sql_evidence is only available during historic-sql ingest.';
}
const rawPaths = context.session?.allowedRawPaths ? [...context.session.allowedRawPaths].sort() : [];
if (rawPaths.length === 0) {
return 'Error: emit_historic_sql_evidence requires a WorkUnit context with at least one raw file.';
}
const unitKey = unitKeyForEvidence(input);
const evidence = evidenceEnvelope(input, context.connectionId);
const evidence = evidenceEnvelope(input, context.connectionId, rawPaths);
const content = serializeHistoricSqlEvidence(evidence);
await configService.writeFile(
historicSqlEvidencePath(ingest.runId, unitKey),

View file

@ -14,7 +14,7 @@ export const historicSqlTableUsageEvidenceSchema = z.object({
kind: z.literal('table_usage'),
connectionId: z.string().min(1),
table: z.string().min(1),
rawPath: z.string().min(1),
rawPaths: z.array(z.string().min(1)).min(1),
usage: tableUsageOutputSchema,
});
@ -22,7 +22,7 @@ export const historicSqlTableUsageEvidenceSchema = z.object({
export const historicSqlPatternEvidenceSchema = z.object({
kind: z.literal('pattern'),
connectionId: z.string().min(1),
rawPath: z.string().min(1),
rawPaths: z.array(z.string().min(1)).min(1),
pattern: patternOutputSchema,
});

View file

@ -278,7 +278,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
key: sourceName,
targetConnectionId: input.connectionId,
detail: `Merged historic-SQL usage for ${matchingEvidence.table}`,
rawPaths: [matchingEvidence.rawPath],
rawPaths: matchingEvidence.rawPaths,
});
}
} else if (entry.usage && !currentTables.has(tableRef)) {
@ -298,6 +298,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
key: sourceName,
targetConnectionId: input.connectionId,
detail: `Marked historic-SQL usage stale for ${tableRef}`,
rawPaths: ['manifest.json'],
});
}
}
@ -341,7 +342,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
type: reusable ? 'updated' : 'created',
key,
detail: `Projected historic-SQL pattern ${pattern.pattern.title}`,
rawPaths: [pattern.rawPath],
rawPaths: pattern.rawPaths,
});
}
@ -361,6 +362,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
type: 'updated',
key: page.key,
detail: `Archived stale historic-SQL pattern page ${page.key}`,
rawPaths: ['manifest.json'],
});
continue;
}
@ -377,6 +379,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
type: 'updated',
key: page.key,
detail: `Marked historic-SQL pattern page ${page.key} stale`,
rawPaths: ['manifest.json'],
});
}

View file

@ -15,8 +15,7 @@ Use this skill when the WorkUnit raw file is a `patterns-input/part-0001.json` s
3. Call `read_raw_file` for that exact raw file path.
4. Identify recurring analytical intents that span at least two tables and have repeated usage signal.
5. Emit one `pattern` evidence object per durable cross-table intent by calling `emit_historic_sql_evidence`.
6. Set each evidence object's `rawPath` to the exact raw file path read in step 3.
7. Stop after all pattern evidence has been emitted.
6. Stop after all pattern evidence has been emitted.
Every join column mentioned in pattern descriptions must be verified via
entity_details for both sides of the join.
@ -56,7 +55,6 @@ Each call to `emit_historic_sql_evidence` must use this shape:
```json
{
"kind": "pattern",
"rawPath": "patterns-input/part-0001.json",
"pattern": {
"slug": "order-lifecycle-analysis",
"title": "Order Lifecycle Analysis",

View file

@ -53,7 +53,6 @@ Call `emit_historic_sql_evidence` with this shape:
{
"kind": "table_usage",
"table": "public.orders",
"rawPath": "tables/public.orders.json",
"usage": {
"narrative": "Orders are repeatedly queried for paid/refunded lifecycle analysis and customer-level rollups.",
"frequencyTier": "high",

View file

@ -11,15 +11,14 @@ describe('emit_historic_sql_evidence tool', () => {
});
});
it('writes table usage evidence to the ignored run evidence directory', async () => {
const writeFile = vi.fn(async () => ({ success: true, commitHash: null }));
it('writes table usage evidence using the work unit allowed raw paths', async () => {
const writeFile = vi.fn(async (_path: string, _body: string) => ({ success: true, commitHash: null }));
const tool = createEmitHistoricSqlEvidenceTool();
const result = await tool.execute!(
{
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by paid status.',
frequencyTier: 'high',
@ -36,6 +35,7 @@ describe('emit_historic_sql_evidence tool', () => {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'historic-sql' },
allowedRawPaths: new Set(['tables/public.orders.json']),
configService: { writeFile },
},
},
@ -45,12 +45,53 @@ describe('emit_historic_sql_evidence tool', () => {
expect(result).toBe('Recorded historic-SQL table_usage evidence for public.orders.');
expect(writeFile).toHaveBeenCalledWith(
'.ktx/ingest-evidence/historic-sql/run-1/historic-sql-table-public-orders.json',
expect.stringContaining('"kind": "table_usage"'),
expect.stringContaining('"rawPaths"'),
'System User',
'system@example.com',
'Record historic-SQL evidence: historic-sql-table-public-orders',
{ skipLock: true },
);
expect(writeFile).toHaveBeenCalledWith(
expect.any(String),
expect.stringContaining('tables/public.orders.json'),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(Object),
);
});
it('rejects calls without a WorkUnit raw file context', async () => {
const tool = createEmitHistoricSqlEvidenceTool();
await expect(
tool.execute!(
{
kind: 'pattern',
pattern: {
slug: 'orders',
title: 'Orders',
narrative: 'Orders pattern.',
definitionSql: 'select * from public.orders',
tablesInvolved: ['public.orders'],
slRefs: ['orders'],
constituentTemplateIds: ['pg:1'],
},
},
{
toolCallId: 'call-1',
messages: [],
abortSignal: new AbortController().signal,
experimental_context: {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'historic-sql' },
configService: { writeFile: vi.fn() },
},
},
} as never,
),
).resolves.toContain('emit_historic_sql_evidence requires a WorkUnit context');
});
it('rejects non-historic ingest sessions', async () => {
@ -60,7 +101,6 @@ describe('emit_historic_sql_evidence tool', () => {
tool.execute!(
{
kind: 'pattern',
rawPath: 'patterns-input.json',
pattern: {
slug: 'orders',
title: 'Orders',
@ -79,6 +119,7 @@ describe('emit_historic_sql_evidence tool', () => {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'notion' },
allowedRawPaths: new Set(['patterns-input/part-0001.json']),
configService: { writeFile: vi.fn() },
},
},

View file

@ -12,7 +12,7 @@ describe('historic-sql evidence contracts', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
rawPaths: ['tables/public.orders.json'],
usage: {
narrative: 'Orders are repeatedly queried for paid/refunded lifecycle analysis.',
frequencyTier: 'high',
@ -32,7 +32,7 @@ describe('historic-sql evidence contracts', () => {
historicSqlEvidenceEnvelopeSchema.parse({
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',

View file

@ -57,7 +57,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Analysts repeatedly inspect paid order lifecycle by customer segment.',
frequencyTier: 'high',
@ -76,7 +75,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.customers',
rawPath: 'tables/public.customers.json',
usage: {
narrative: 'Customers provide segment context for paid order lifecycle analysis.',
frequencyTier: 'mid',
@ -94,7 +92,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
if (params.telemetryTags.unitKey === 'historic-sql-patterns-part-0001') {
const result = await emitEvidence.execute({
kind: 'pattern',
rawPath: 'patterns-input/part-0001.json',
pattern: {
slug: 'paid-order-lifecycle',
title: 'Paid Order Lifecycle',
@ -257,6 +254,33 @@ describe('historic-SQL local ingest retrieval acceptance', () => {
]),
);
// Regression for KLO-698: the bundle report's provenance rows must
// attribute the table-usage merges and pattern-page writes back to
// their raw files instead of falling through as `actionType: 'skipped'`
// with null artifact metadata.
const provenanceRows = result.report.body.provenanceRows;
const nonSkipped = provenanceRows.filter((row) => row.actionType !== 'skipped');
expect(nonSkipped).toEqual(
expect.arrayContaining([
expect.objectContaining({
rawPath: 'tables/public.orders.json',
artifactKind: 'sl',
artifactKey: 'orders',
}),
expect.objectContaining({
rawPath: 'tables/public.customers.json',
artifactKind: 'sl',
artifactKey: 'customers',
}),
expect.objectContaining({
rawPath: 'patterns-input/part-0001.json',
artifactKind: 'wiki',
artifactKey: 'historic-sql-paid-order-lifecycle',
actionType: 'wiki_written',
}),
]),
);
await expect(readFile(join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves
.toContain('Analysts repeatedly inspect paid order lifecycle by customer segment.');
await expect(readFile(join(project.projectDir, 'wiki/global/historic-sql-paid-order-lifecycle.md'), 'utf-8'))

View file

@ -60,7 +60,7 @@ describe('projectHistoricSqlEvidence', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
rawPaths: ['tables/public.orders.json'],
usage: {
narrative: 'Orders are repeatedly queried for lifecycle analysis.',
frequencyTier: 'high',
@ -158,7 +158,7 @@ describe('projectHistoricSqlEvidence', () => {
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', {
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
@ -179,7 +179,7 @@ describe('projectHistoricSqlEvidence', () => {
expect.objectContaining({
target: 'wiki',
key: 'historic-sql-old-order-lifecycle',
rawPaths: ['patterns-input.json'],
rawPaths: ['patterns-input/part-0001.json'],
}),
]),
);
@ -234,7 +234,7 @@ describe('projectHistoricSqlEvidence', () => {
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', {
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
@ -343,7 +343,7 @@ describe('projectHistoricSqlEvidence', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.customers',
rawPath: 'tables/public.customers.json',
rawPaths: ['tables/public.customers.json'],
usage: {
narrative: 'Customers were queried.',
frequencyTier: 'low',
@ -380,7 +380,7 @@ describe('projectHistoricSqlEvidence', () => {
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
const staleAction = result.actions.find((action) => action.target === 'sl' && action.key === 'orders');
expect(staleAction).toEqual(expect.objectContaining({ target: 'sl', key: 'orders' }));
expect(staleAction?.rawPaths).toBeUndefined();
expect(staleAction?.rawPaths).toEqual(['manifest.json']);
const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'));
expect(shard.tables.orders.usage).toEqual({
ownerNote: 'keep analyst annotation',

View file

@ -139,7 +139,6 @@ class HistoricSqlEvidenceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by lifecycle status.',
frequencyTier: 'high',