Track raw paths for ingest actions

This commit is contained in:
Luca Martial 2026-05-11 23:03:16 -07:00
parent f8aedc858b
commit 3db57465db
22 changed files with 306 additions and 37 deletions

View file

@ -5,7 +5,7 @@ import pLimit from 'p-limit';
import { z } from 'zod';
import { type KtxLogger, noopLogger } from '../core/index.js';
import type { CaptureSession, MemoryAction } from '../memory/index.js';
import type { SlValidationDeps } from '../sl/index.js';
import type { SemanticLayerService, SemanticLayerSource, SlValidationDeps } from '../sl/index.js';
import { createTouchedSlSources, type ToolContext, type ToolSession } from '../tools/index.js';
import { actionTargetConnectionId } from './action-identity.js';
import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
@ -86,6 +86,47 @@ function reportIdFromCreateResult(result: unknown): string | undefined {
return typeof id === 'string' && id.length > 0 ? id : undefined;
}
function normalizeTableReference(value: string): string {
return value
.trim()
.replace(/["`]/g, '')
.replace(/[\[\]]/g, '')
.toLowerCase();
}
function finalReferenceSegment(value: string): string {
const parts = value.split('.').filter((part) => part.length > 0);
return parts.at(-1) ?? value;
}
function semanticSourceMatchesTableRef(source: SemanticLayerSource, tableRef: string): boolean {
const normalizedRef = normalizeTableReference(tableRef);
if (!normalizedRef) {
return false;
}
const normalizedSourceName = normalizeTableReference(source.name);
if (normalizedSourceName === normalizedRef) {
return true;
}
const table = typeof source.table === 'string' ? normalizeTableReference(source.table) : '';
if (table && table === normalizedRef) {
return true;
}
const refIsQualified = normalizedRef.includes('.');
if (refIsQualified && normalizedSourceName === finalReferenceSegment(normalizedRef)) {
return true;
}
return false;
}
function rawPathsForAction(action: MemoryAction, fallbackRawPaths: string[]): string[] {
return action.rawPaths && action.rawPaths.length > 0 ? [...new Set(action.rawPaths)] : fallbackRawPaths;
}
export class IngestBundleRunner {
private readonly logger: KtxLogger;
private readonly chainByConnection = new Map<string, Promise<unknown>>();
@ -281,6 +322,24 @@ export class IngestBundleRunner {
return blocks.join('\n\n');
}
private async tableRefExistsInSemanticLayer(
semanticLayerService: SemanticLayerService,
connectionIds: string[],
tableRef: string,
): Promise<boolean> {
for (const connectionId of connectionIds) {
try {
const sources = await semanticLayerService.loadAllSources(connectionId);
if (sources.some((source) => semanticSourceMatchesTableRef(source, tableRef))) {
return true;
}
} catch {
// Fallback diagnostics should not fail an ingest stage if an index lookup is temporarily unavailable.
}
}
return false;
}
private resolveContextCuratorBudget(
bundleRef: IngestBundleJob['bundleRef'],
stageIndex: StageIndex,
@ -603,6 +662,7 @@ export class IngestBundleRunner {
preHead: sessionWorktree.baseSha,
touchedSlSources: session.touchedSlSources,
actions: sessionActions,
allowedRawPaths: new Set(wu.rawFiles),
semanticLayerService: scopedSemanticLayerService,
wikiService: scopedWikiService,
configService: sessionWorktree.config,
@ -667,6 +727,8 @@ export class IngestBundleRunner {
emit_unmapped_fallback: createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: new Set(wu.rawFiles),
tableRefExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
}),
};
@ -825,6 +887,10 @@ export class IngestBundleRunner {
const reconcileActions: MemoryAction[] = [];
const rcScopedWiki = this.deps.wikiService.forWorktree(sessionWorktree.workdir);
const rcScopedSl = this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir);
const reconciliationAllowedRawPaths = new Set<string>([
...currentHashes.keys(),
...(eviction?.deletedRawPaths ?? []),
]);
const rcToolSession: ToolSession = {
connectionId: job.connectionId,
@ -832,6 +898,7 @@ export class IngestBundleRunner {
preHead: reconcileSession.preHead,
touchedSlSources: reconcileSession.touchedSlSources,
actions: reconcileActions,
allowedRawPaths: reconciliationAllowedRawPaths,
semanticLayerService: rcScopedSl,
wikiService: rcScopedWiki,
configService: sessionWorktree.config,
@ -896,6 +963,7 @@ export class IngestBundleRunner {
emit_unmapped_fallback: createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: allStagedPaths,
tableRefExists: (tableRef) => this.tableRefExistsInSemanticLayer(rcScopedSl, slConnectionIds, tableRef),
}),
};
@ -1153,26 +1221,34 @@ export class IngestBundleRunner {
return a.type === 'created' ? 'source_created' : 'measure_added';
};
const producedPaths = new Set<string>();
const pushActionProvenance = (rawPath: string, action: MemoryAction): void => {
const hash = currentHashes.get(rawPath) ?? 'unknown';
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
rawPath,
rawContentHash: hash,
artifactKind: action.target,
artifactKey: action.key,
targetConnectionId: action.target === 'sl' ? actionTargetConnectionId(action, job.connectionId) : null,
artifactContentHash: null,
actionType: actionToType(action),
});
producedPaths.add(rawPath);
};
for (const wu of stageIndex.workUnits) {
for (const rawPath of wu.rawFiles) {
const hash = currentHashes.get(rawPath) ?? 'unknown';
for (const action of wu.actions) {
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
rawPath,
rawContentHash: hash,
artifactKind: action.target,
artifactKey: action.key,
targetConnectionId: action.target === 'sl' ? (action.targetConnectionId ?? null) : null,
artifactContentHash: null,
actionType: actionToType(action),
});
producedPaths.add(rawPath);
for (const action of wu.actions) {
for (const rawPath of rawPathsForAction(action, wu.rawFiles)) {
pushActionProvenance(rawPath, action);
}
}
}
for (const action of reconcileActions) {
for (const rawPath of action.rawPaths ?? []) {
pushActionProvenance(rawPath, action);
}
}
for (const resolution of stageIndex.artifactResolutions ?? []) {
const hash = currentHashes.get(resolution.rawPath) ?? 'unknown';
provenanceRows.push({

View file

@ -88,6 +88,35 @@ class WikiWritingAgentRunner extends AgentRunnerService {
}
}
class WikiWritingWithRawPathAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags?.operationName === 'ingest-bundle-wu') {
const wikiWrite = params.toolSet.wiki_write;
if (!wikiWrite?.execute) {
throw new Error('wiki_write tool was not available to the WorkUnit');
}
const result = await wikiWrite.execute(
{
key: 'orders_context',
summary: 'Orders source context',
content: 'Orders are purchase records used for revenue analysis.',
tags: ['orders'],
rawPaths: ['orders/orders.json'],
},
{ toolCallId: 'wiki-write' },
);
if (!result.structured.success) {
throw new Error(result.markdown);
}
}
return { stopReason: 'natural' as const };
});
constructor() {
super({ llmProvider: { getModel: () => ({}) as never } as never });
}
}
class HistoricSqlEvidenceAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async (params: any) => {
if (
@ -374,6 +403,42 @@ describe('canonical local ingest', () => {
}
});
it('uses explicit action raw paths to avoid over-attributing work-unit provenance', async () => {
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await writeFile(join(sourceDir, 'orders', 'unrelated.json'), '{"name":"unrelated"}\n', 'utf-8');
const agentRunner = new WikiWritingWithRawPathAgentRunner();
const result = await runLocalIngest({
project,
adapters: [new FakeSourceAdapter()],
adapter: 'fake',
connectionId: 'warehouse',
sourceDir,
jobId: 'wiki-raw-path-local-1',
agentRunner,
});
expect(result.result.failedWorkUnits).toEqual([]);
expect(result.report.body.provenanceRows).toEqual([
{
rawPath: 'orders/orders.json',
artifactKind: 'wiki',
artifactKey: 'orders_context',
targetConnectionId: null,
actionType: 'wiki_written',
},
{
rawPath: 'orders/unrelated.json',
artifactKind: null,
artifactKey: null,
targetConnectionId: null,
actionType: 'skipped',
},
]);
});
it('runs historic-SQL evidence projection through the local bundle post-processor', async () => {
const projectDir = join(tempDir, 'historic-sql-project');
await initKtxProject({ projectDir, projectName: 'warehouse' });

View file

@ -16,6 +16,7 @@ const ingestActionSchema = z.object({
key: z.string(),
detail: z.string(),
targetConnectionId: z.string().nullable().default(null),
rawPaths: z.array(z.string()).optional(),
});
const touchedSlSourceSchema = z.object({

View file

@ -201,6 +201,27 @@ describe('reconciliation emit tools', () => {
expect(stageIndex.unmappedFallbacks).toEqual([]);
});
it('rejects missing-table fallback decisions when the table resolves to an existing semantic source', async () => {
const stageIndex = makeStageIndex();
const tool = createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: new Set(['cards/revenue.json']),
tableRefExists: async (tableRef) => tableRef === 'orbit_analytics.mart_revenue_daily',
});
const output = await executeTool(tool, {
rawPath: 'cards/revenue.json',
reason: 'no_physical_table',
tableRef: 'orbit_analytics.mart_revenue_daily',
fallback: 'wiki_only',
});
expect(output).toContain(
'Error: tableRef "orbit_analytics.mart_revenue_daily" already resolves to a semantic source',
);
expect(stageIndex.unmappedFallbacks).toEqual([]);
});
it('records explicit artifact resolutions for provenance rows', async () => {
const stageIndex = makeStageIndex();
const tool = createEmitArtifactResolutionTool({

View file

@ -5,6 +5,7 @@ import type { StageIndex, UnmappedFallbackRecord, UnmappedFallbackReason } from
interface EmitUnmappedFallbackDeps {
stageIndex: StageIndex;
allowedPaths: ReadonlySet<string>;
tableRefExists?: (tableRef: string) => Promise<boolean>;
}
const unmappedFallbackReasonSchema = z.enum([
@ -49,6 +50,10 @@ function canonicalDetail(reason: UnmappedFallbackReason, tableRef: string | unde
}
}
function requiresMissingTableValidation(reason: UnmappedFallbackReason): boolean {
return reason === 'no_physical_table' || reason === 'missing_target_table';
}
export function createEmitUnmappedFallbackTool(deps: EmitUnmappedFallbackDeps) {
return tool({
description:
@ -70,6 +75,12 @@ export function createEmitUnmappedFallbackTool(deps: EmitUnmappedFallbackDeps) {
if (!deps.allowedPaths.has(input.rawPath)) {
return `Error: rawPath "${input.rawPath}" is not available to this ingest stage`;
}
if (input.tableRef && requiresMissingTableValidation(input.reason) && deps.tableRefExists) {
const exists = await deps.tableRefExists(input.tableRef);
if (exists) {
return `Error: tableRef "${input.tableRef}" already resolves to a semantic source; do not record ${input.reason} for an existing table.`;
}
}
const base = canonicalDetail(input.reason, input.tableRef);
const detail = input.clarification ? `${base} ${input.clarification.trim()}`.trim() : base;

View file

@ -34,6 +34,7 @@ export interface MemoryAction {
key: string;
detail: string;
targetConnectionId?: string | null;
rawPaths?: string[];
}
export interface MemoryAgentResult {

View file

@ -1,6 +1,6 @@
import YAML from 'yaml';
import { z } from 'zod';
import { addTouchedSlSource, type ToolContext, type ToolOutput } from '../../tools/index.js';
import { addTouchedSlSource, type ToolContext, type ToolOutput, validateActionRawPaths } from '../../tools/index.js';
import { applySqlEdits } from '../../tools/sql-edit-replacer.js';
import { normalizeSemanticLayerDescriptions } from '../description-normalization.js';
import type { SemanticLayerSource } from '../types.js';
@ -25,6 +25,10 @@ const slEditSourceInputSchema = z.object({
.optional()
.describe('Targeted exact-match search/replace edits on the raw YAML content.'),
delete: z.boolean().optional().describe('Set to true to delete this source entirely'),
rawPaths: z
.array(z.string().min(1))
.optional()
.describe('In ingest sessions, raw source file paths that directly support this SL action.'),
});
type SlEditSourceInput = z.infer<typeof slEditSourceInputSchema>;
@ -75,6 +79,10 @@ If no source exists yet, use sl_write_source instead — this tool will reject t
const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService;
const skipIndex = context.session?.isWorktreeScoped === true;
const rawPathValidation = validateActionRawPaths(context.session, input.rawPaths);
if (!rawPathValidation.ok) {
return this.buildOutput(false, [rawPathValidation.error], sourceName);
}
// Handle delete
if (input.delete) {
@ -88,6 +96,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t
key: sourceName,
detail: 'Deleted source',
targetConnectionId: actionTargetConnectionId(context.session.connectionId, connectionId),
...(rawPathValidation.rawPaths ? { rawPaths: rawPathValidation.rawPaths } : {}),
});
}
return this.buildOutput(true, [], sourceName, { yaml: undefined, commitHash: undefined });
@ -184,6 +193,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t
key: sourceName,
detail: `Applied ${editCount} edit(s)`,
targetConnectionId: actionTargetConnectionId(context.session.connectionId, connectionId),
...(rawPathValidation.rawPaths ? { rawPaths: rawPathValidation.rawPaths } : {}),
});
}

View file

@ -1,6 +1,6 @@
import YAML from 'yaml';
import { z } from 'zod';
import { addTouchedSlSource, type ToolContext, type ToolOutput } from '../../tools/index.js';
import { addTouchedSlSource, type ToolContext, type ToolOutput, validateActionRawPaths } from '../../tools/index.js';
import { sourceOverlaySchema } from '../schemas.js';
import type { SemanticLayerService } from '../semantic-layer.service.js';
import type { SemanticLayerSource } from '../types.js';
@ -25,6 +25,10 @@ const slWriteSourceInputSchema = z.object({
.optional()
.describe('Source definition (standalone with table/sql) or overlay (measures, computed columns, etc.)'),
delete: z.boolean().optional().describe('Set to true to delete this source entirely'),
rawPaths: z
.array(z.string().min(1))
.optional()
.describe('In ingest sessions, raw source file paths that directly support this SL action.'),
});
type SlWriteSourceInput = z.infer<typeof slWriteSourceInputSchema>;
@ -99,6 +103,10 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService;
const skipIndex = context.session?.isWorktreeScoped === true;
const rawPathValidation = validateActionRawPaths(context.session, input.rawPaths);
if (!rawPathValidation.ok) {
return this.buildOutput(false, [rawPathValidation.error], sourceName);
}
// Handle delete
if (input.delete) {
@ -116,6 +124,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
key: sourceName,
detail: 'Deleted source',
targetConnectionId: actionTargetConnectionId(context.session.connectionId, connectionId),
...(rawPathValidation.rawPaths ? { rawPaths: rawPathValidation.rawPaths } : {}),
});
}
return this.buildOutput(true, [], sourceName, { yaml: undefined, commitHash: undefined });
@ -142,6 +151,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
context,
semanticLayerService,
skipIndex,
rawPathValidation.rawPaths,
);
}
@ -154,6 +164,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
context: ToolContext,
semanticLayerService: SemanticLayerService,
skipIndex: boolean,
rawPaths: string[] | undefined,
): Promise<ToolOutput<SemanticLayerStructured>> {
const normalizedSource = normalizeSemanticLayerDescriptions(source, { fillMissing: !!context.session?.ingest });
const isOverlay =
@ -211,6 +222,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
key: sourceName,
detail: existing ? `Rewrote source` : `Created source`,
targetConnectionId: actionTargetConnectionId(context.session.connectionId, connectionId),
...(rawPaths ? { rawPaths } : {}),
});
}

View file

@ -0,0 +1,30 @@
import type { ToolSession } from './tool-session.js';
type ActionRawPathValidation =
| { ok: true; rawPaths?: string[] }
| { ok: false; error: string };
export function validateActionRawPaths(
session: ToolSession | undefined,
rawPaths: readonly string[] | undefined,
): ActionRawPathValidation {
if (!rawPaths || rawPaths.length === 0) {
return { ok: true };
}
const uniqueRawPaths = [...new Set(rawPaths)];
const allowedRawPaths = session?.allowedRawPaths;
if (!allowedRawPaths) {
return { ok: true, rawPaths: uniqueRawPaths };
}
const unavailable = uniqueRawPaths.filter((rawPath) => !allowedRawPaths.has(rawPath));
if (unavailable.length > 0) {
return {
ok: false,
error: `rawPaths include unavailable ingest file(s): ${unavailable.join(', ')}`,
};
}
return { ok: true, rawPaths: uniqueRawPaths };
}

View file

@ -31,6 +31,7 @@ export { ingestMetadataRequired, resolveIngestMetadata } from './context-ingest-
export type { SqlEdit } from './sql-edit-replacer.js';
export { applySqlEdits } from './sql-edit-replacer.js';
export type { IngestToolMetadata, MemoryAction, ToolSession } from './tool-session.js';
export { validateActionRawPaths } from './action-raw-paths.js';
export type { TouchedSlSource, TouchedSlSourceSet } from './touched-sl-sources.js';
export {
addTouchedSlSource,

View file

@ -16,6 +16,7 @@ export interface MemoryAction {
key: string;
detail: string;
targetConnectionId?: string | null;
rawPaths?: string[];
}
interface EvictionDecisionRecord {
@ -45,6 +46,7 @@ export interface ToolSession {
preHead: string | null;
touchedSlSources: TouchedSlSourceSet;
actions: MemoryAction[];
allowedRawPaths?: ReadonlySet<string>;
semanticLayerService: SemanticLayerService;
wikiService: KnowledgeWikiService;
configService: KtxFileStorePort;

View file

@ -3,13 +3,17 @@ import type { KnowledgeIndexPort } from '../ports.js';
import type { KnowledgeEventPort } from '../ports.js';
type BlockScope = 'GLOBAL' | 'USER';
import { KnowledgeWikiService } from '../index.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../tools/index.js';
import { BaseTool, type ToolContext, type ToolOutput, validateActionRawPaths } from '../../tools/index.js';
const SYSTEM_AUTHOR = 'System User';
const SYSTEM_EMAIL = 'system@example.com';
const wikiRemoveInputSchema = z.object({
key: z.string().describe('The page key to remove'),
rawPaths: z
.array(z.string().min(1))
.optional()
.describe('In ingest sessions, raw source file paths that directly support this removal.'),
});
type WikiRemoveInput = z.infer<typeof wikiRemoveInputSchema>;
@ -42,6 +46,13 @@ export class WikiRemoveTool extends BaseTool<typeof wikiRemoveInputSchema> {
const wikiService = context.session?.wikiService ?? this.wikiService;
const writesGlobal = !!context.session;
const skipIndex = context.session?.isWorktreeScoped === true;
const rawPathValidation = validateActionRawPaths(context.session, input.rawPaths);
if (!rawPathValidation.ok) {
return {
markdown: `Error: ${rawPathValidation.error}`,
structured: { success: false, key: input.key },
};
}
const scope: BlockScope = writesGlobal ? 'GLOBAL' : 'USER';
const scopeId = scope === 'USER' ? context.userId : null;
@ -76,6 +87,7 @@ export class WikiRemoveTool extends BaseTool<typeof wikiRemoveInputSchema> {
type: 'removed',
key: input.key,
detail: `Removed page "${input.key}"`,
...(rawPathValidation.rawPaths ? { rawPaths: rawPathValidation.rawPaths } : {}),
});
}

View file

@ -4,7 +4,7 @@ import type { KnowledgeEventPort } from '../ports.js';
type BlockScope = 'GLOBAL' | 'USER';
import { KnowledgeWikiService, type WikiFrontmatter } from '../index.js';
import { applySqlEdits } from '../../tools/sql-edit-replacer.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../tools/index.js';
import { BaseTool, type ToolContext, type ToolOutput, validateActionRawPaths } from '../../tools/index.js';
const MAX_USER_BLOCKS = 100;
const SYSTEM_AUTHOR = 'System User';
@ -37,6 +37,10 @@ const wikiWriteInputSchema = z.object({
representative_sql: z.string().optional(),
usage: historicSqlUsageFrontmatterSchema.optional(),
fingerprints: z.array(z.string()).optional(),
rawPaths: z
.array(z.string().min(1))
.optional()
.describe('In ingest sessions, raw source file paths that directly support this wiki action.'),
});
type WikiWriteInput = z.infer<typeof wikiWriteInputSchema>;
@ -156,6 +160,13 @@ tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to
const wikiService = context.session?.wikiService ?? this.wikiService;
const writesGlobal = !!context.session;
const skipIndex = context.session?.isWorktreeScoped === true;
const rawPathValidation = validateActionRawPaths(context.session, input.rawPaths);
if (!rawPathValidation.ok) {
return {
markdown: `Error: ${rawPathValidation.error}`,
structured: { success: false, key: input.key },
};
}
const scope: BlockScope = writesGlobal ? 'GLOBAL' : 'USER';
const scopeId = scope === 'USER' ? context.userId : null;
@ -261,7 +272,13 @@ tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to
const action = existing ? 'updated' : 'created';
if (context.session) {
context.session.actions.push({ target: 'wiki', type: action, key: input.key, detail: input.summary });
context.session.actions.push({
target: 'wiki',
type: action,
key: input.key,
detail: input.summary,
...(rawPathValidation.rawPaths ? { rawPaths: rawPathValidation.rawPaths } : {}),
});
}
// When the LLM used `replacements` (edit mode), it doesn't have the