fix(ingest): attribute historic-sql evidence writes in bundle report (#220)

The emit_historic_sql_evidence tool took rawPath as LLM-supplied input,
so projection actions frequently lacked defensible raw paths and every
row in bundle_ingest_reports fell through as actionType: 'skipped' with
null artifact metadata, hiding the wiki pages and SL merges the run had
actually produced (KLO-698).

The tool now reads the work unit's rawFiles from session.allowedRawPaths
and stores them on the evidence envelope; the projection emits actions
with those paths, and stale/archive actions are anchored to manifest.json
so they also surface as non-skipped provenance rows.
This commit is contained in:
Andrey Avtomonov 2026-05-26 12:21:53 +02:00 committed by GitHub
parent 2a6fb19ba4
commit 1071f9d1c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 99 additions and 31 deletions

View file

@ -11,15 +11,14 @@ describe('emit_historic_sql_evidence tool', () => {
});
});
it('writes table usage evidence to the ignored run evidence directory', async () => {
const writeFile = vi.fn(async () => ({ success: true, commitHash: null }));
it('writes table usage evidence using the work unit allowed raw paths', async () => {
const writeFile = vi.fn(async (_path: string, _body: string) => ({ success: true, commitHash: null }));
const tool = createEmitHistoricSqlEvidenceTool();
const result = await tool.execute!(
{
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by paid status.',
frequencyTier: 'high',
@ -36,6 +35,7 @@ describe('emit_historic_sql_evidence tool', () => {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'historic-sql' },
allowedRawPaths: new Set(['tables/public.orders.json']),
configService: { writeFile },
},
},
@ -45,12 +45,53 @@ describe('emit_historic_sql_evidence tool', () => {
expect(result).toBe('Recorded historic-SQL table_usage evidence for public.orders.');
expect(writeFile).toHaveBeenCalledWith(
'.ktx/ingest-evidence/historic-sql/run-1/historic-sql-table-public-orders.json',
expect.stringContaining('"kind": "table_usage"'),
expect.stringContaining('"rawPaths"'),
'System User',
'system@example.com',
'Record historic-SQL evidence: historic-sql-table-public-orders',
{ skipLock: true },
);
expect(writeFile).toHaveBeenCalledWith(
expect.any(String),
expect.stringContaining('tables/public.orders.json'),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(Object),
);
});
it('rejects calls without a WorkUnit raw file context', async () => {
const tool = createEmitHistoricSqlEvidenceTool();
await expect(
tool.execute!(
{
kind: 'pattern',
pattern: {
slug: 'orders',
title: 'Orders',
narrative: 'Orders pattern.',
definitionSql: 'select * from public.orders',
tablesInvolved: ['public.orders'],
slRefs: ['orders'],
constituentTemplateIds: ['pg:1'],
},
},
{
toolCallId: 'call-1',
messages: [],
abortSignal: new AbortController().signal,
experimental_context: {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'historic-sql' },
configService: { writeFile: vi.fn() },
},
},
} as never,
),
).resolves.toContain('emit_historic_sql_evidence requires a WorkUnit context');
});
it('rejects non-historic ingest sessions', async () => {
@ -60,7 +101,6 @@ describe('emit_historic_sql_evidence tool', () => {
tool.execute!(
{
kind: 'pattern',
rawPath: 'patterns-input.json',
pattern: {
slug: 'orders',
title: 'Orders',
@ -79,6 +119,7 @@ describe('emit_historic_sql_evidence tool', () => {
connectionId: 'warehouse',
session: {
ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'notion' },
allowedRawPaths: new Set(['patterns-input/part-0001.json']),
configService: { writeFile: vi.fn() },
},
},

View file

@ -12,7 +12,7 @@ describe('historic-sql evidence contracts', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
rawPaths: ['tables/public.orders.json'],
usage: {
narrative: 'Orders are repeatedly queried for paid/refunded lifecycle analysis.',
frequencyTier: 'high',
@ -32,7 +32,7 @@ describe('historic-sql evidence contracts', () => {
historicSqlEvidenceEnvelopeSchema.parse({
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',

View file

@ -57,7 +57,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Analysts repeatedly inspect paid order lifecycle by customer segment.',
frequencyTier: 'high',
@ -76,7 +75,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.customers',
rawPath: 'tables/public.customers.json',
usage: {
narrative: 'Customers provide segment context for paid order lifecycle analysis.',
frequencyTier: 'mid',
@ -94,7 +92,6 @@ class HistoricSqlAcceptanceAgentRunner implements AgentRunnerPort {
if (params.telemetryTags.unitKey === 'historic-sql-patterns-part-0001') {
const result = await emitEvidence.execute({
kind: 'pattern',
rawPath: 'patterns-input/part-0001.json',
pattern: {
slug: 'paid-order-lifecycle',
title: 'Paid Order Lifecycle',
@ -257,6 +254,33 @@ describe('historic-SQL local ingest retrieval acceptance', () => {
]),
);
// Regression for KLO-698: the bundle report's provenance rows must
// attribute the table-usage merges and pattern-page writes back to
// their raw files instead of falling through as `actionType: 'skipped'`
// with null artifact metadata.
const provenanceRows = result.report.body.provenanceRows;
const nonSkipped = provenanceRows.filter((row) => row.actionType !== 'skipped');
expect(nonSkipped).toEqual(
expect.arrayContaining([
expect.objectContaining({
rawPath: 'tables/public.orders.json',
artifactKind: 'sl',
artifactKey: 'orders',
}),
expect.objectContaining({
rawPath: 'tables/public.customers.json',
artifactKind: 'sl',
artifactKey: 'customers',
}),
expect.objectContaining({
rawPath: 'patterns-input/part-0001.json',
artifactKind: 'wiki',
artifactKey: 'historic-sql-paid-order-lifecycle',
actionType: 'wiki_written',
}),
]),
);
await expect(readFile(join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')).resolves
.toContain('Analysts repeatedly inspect paid order lifecycle by customer segment.');
await expect(readFile(join(project.projectDir, 'wiki/global/historic-sql-paid-order-lifecycle.md'), 'utf-8'))

View file

@ -60,7 +60,7 @@ describe('projectHistoricSqlEvidence', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
rawPaths: ['tables/public.orders.json'],
usage: {
narrative: 'Orders are repeatedly queried for lifecycle analysis.',
frequencyTier: 'high',
@ -158,7 +158,7 @@ describe('projectHistoricSqlEvidence', () => {
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', {
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
@ -179,7 +179,7 @@ describe('projectHistoricSqlEvidence', () => {
expect.objectContaining({
target: 'wiki',
key: 'historic-sql-old-order-lifecycle',
rawPaths: ['patterns-input.json'],
rawPaths: ['patterns-input/part-0001.json'],
}),
]),
);
@ -234,7 +234,7 @@ describe('projectHistoricSqlEvidence', () => {
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', {
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
rawPaths: ['patterns-input/part-0001.json'],
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
@ -343,7 +343,7 @@ describe('projectHistoricSqlEvidence', () => {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.customers',
rawPath: 'tables/public.customers.json',
rawPaths: ['tables/public.customers.json'],
usage: {
narrative: 'Customers were queried.',
frequencyTier: 'low',
@ -380,7 +380,7 @@ describe('projectHistoricSqlEvidence', () => {
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
const staleAction = result.actions.find((action) => action.target === 'sl' && action.key === 'orders');
expect(staleAction).toEqual(expect.objectContaining({ target: 'sl', key: 'orders' }));
expect(staleAction?.rawPaths).toBeUndefined();
expect(staleAction?.rawPaths).toEqual(['manifest.json']);
const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'));
expect(shard.tables.orders.usage).toEqual({
ownerNote: 'keep analyst annotation',

View file

@ -139,7 +139,6 @@ class HistoricSqlEvidenceAgentRunner implements AgentRunnerPort {
const result = await emitEvidence.execute({
kind: 'table_usage',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried by lifecycle status.',
frequencyTier: 'high',