ktx/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts
Andrey Avtomonov 2896f9fb91
fix(ingest): drive work-unit progress from tool calls, not turn counts (#269)
The ingest HUD showed "step 70/40" because the Claude subscription runtime
re-derived a per-turn counter that could not match the SDK's num_turns and
overshot the maxTurns budget. Replace the turn-based work_unit_step heartbeat
with a real, observed tool-call count (no denominator), report
metrics.stepCount from the SDK's authoritative num_turns, and delete the
brittle countsAsAssistantTurn denylist plus the now-unused onStepFinish
callback across the runtime port and all three runtimes. Reconcile and curator
progress move to the same tool-call heartbeat.
2026-06-08 15:30:35 +02:00

2378 lines
83 KiB
TypeScript

import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { addTouchedSlSource } from '../../../src/context/tools/touched-sl-sources.js';
import { IngestBundleRunner } from '../../../src/context/ingest/ingest-bundle.runner.js';
import { createMemoryFlowLiveBuffer } from '../../../src/context/ingest/memory-flow/live-buffer.js';
import type { MemoryFlowReplayInput } from '../../../src/context/ingest/memory-flow/types.js';
import type { IngestBundleRunnerDeps } from '../../../src/context/ingest/ports.js';
class TestJobContext {
private currentProgress = 0;
constructor(
public readonly jobId: string,
public readonly userId: string | null | undefined,
public readonly checkCancellation: () => Promise<void>,
private readonly updateProgressFn: (progress: number, message?: string) => Promise<void>,
private readonly parent?: TestJobContext,
private readonly start = 0,
private readonly span = 1,
) {}
async updateProgress(progress: number, message?: string): Promise<void> {
const local = Math.max(0, Math.min(1, progress));
this.currentProgress = local;
if (this.parent) {
await this.parent.updateProgress(Math.max(0, Math.min(1, this.start + this.span * local)), message);
return;
}
await this.updateProgressFn(local, message);
}
startPhase(fraction: number): TestJobContext {
return new TestJobContext(
this.jobId,
this.userId,
this.checkCancellation,
this.updateProgressFn,
this,
this.currentProgress,
Math.max(0, Math.min(1, fraction)),
);
}
}
const deferred = <T>() => {
let resolve!: (v: T) => void;
const promise = new Promise<T>((r) => {
resolve = r;
});
return { promise, resolve };
};
function bundleReplayInput(): MemoryFlowReplayInput {
return {
runId: 'pending',
connectionId: 'c1',
adapter: 'fake',
status: 'running',
sourceDir: '/tmp/stage/upload-x',
syncId: 'pending',
errors: [],
events: [],
plannedWorkUnits: [],
details: { actions: [], provenance: [], transcripts: [] },
};
}
const makeDeps = () => {
const runsRepo = {
create: vi.fn().mockResolvedValue({ id: 'run-1' }),
findMostRecentCompleted: vi.fn().mockResolvedValue(null),
markFailed: vi.fn(),
markCompleted: vi.fn(),
};
const provenanceRepo = {
insertMany: vi.fn(),
findHashesBySync: vi.fn().mockResolvedValue(new Map()),
findLatestArtifactsForRawPaths: vi.fn().mockResolvedValue(new Map()),
};
const reportsRepo = {
create: vi.fn().mockResolvedValue({ id: 'report-1' }),
findByJobId: vi.fn().mockResolvedValue(null),
markSuperseded: vi.fn().mockResolvedValue(undefined),
};
const canonicalPins = {
listPins: vi.fn().mockResolvedValue([]),
};
const adapter = {
source: 'fake',
skillNames: [] as string[],
reconcileSkillNames: undefined as undefined | string[],
evidenceIndexing: undefined as undefined | 'documents',
triageSupported: undefined as undefined | boolean,
detect: vi.fn().mockResolvedValue(true),
listTargetConnectionIds: undefined as undefined | ((stagedDir: string) => Promise<string[]>),
finalize: undefined as any,
chunk: vi.fn().mockResolvedValue({
workUnits: [{ unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] }],
}),
};
const registry = { get: vi.fn().mockReturnValue(adapter) };
const diffSetService = {
compute: vi.fn().mockResolvedValue({ added: ['a.yml'], modified: [], deleted: [], unchanged: [] }),
};
const contextEvidenceIndex = {
indexStagedDir: vi.fn().mockResolvedValue({
documentsIndexed: 1,
chunksIndexed: 1,
documentsDeleted: 0,
embeddingFailures: 0,
warnings: [],
}),
publishSync: vi.fn().mockResolvedValue(undefined),
};
const pageTriage = {
triageRun: vi.fn().mockResolvedValue({
enabled: true,
fullRawPaths: new Set(['a.yml']),
warnings: [],
}),
};
const scopedGit = {
revParseHead: vi.fn().mockResolvedValue('h'),
commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockResolvedValue(undefined),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockResolvedValue([]),
changedPaths: vi.fn().mockResolvedValue([]),
};
const sessionWorktreeService = {
create: vi.fn().mockResolvedValue({
chatId: 'j1',
workdir: '/tmp/wt',
branch: 'session/j1',
baseSha: 'b',
createdAt: new Date(),
git: scopedGit,
config: {},
}),
cleanup: vi.fn(),
};
const agentRunner = { runLoop: vi.fn().mockResolvedValue({ stopReason: 'natural' }) };
const gitService = {
revParseHead: vi.fn().mockResolvedValue('base'),
listFilesAtHead: vi.fn().mockResolvedValue([]),
getFileAtCommit: vi.fn(),
squashMergeIntoMain: vi
.fn()
.mockResolvedValue({ ok: true, squashSha: 'sq', touchedPaths: ['raw-sources/c1/fake/s/a.yml'] }),
};
const lockingService = {
withLock: vi.fn().mockImplementation(async (_k: string, fn: () => Promise<unknown>) => fn()),
};
const appSettingsService = {
settings: {
ai: { slValidation: { probeRowCount: 1 } },
llm: { memoryIngestionModel: 'test-model' },
},
};
const skillsRegistry = {
listSkills: vi.fn().mockResolvedValue([]),
getSkill: vi.fn().mockResolvedValue(null),
buildSkillsPrompt: vi.fn().mockReturnValue(''),
stripFrontmatter: vi.fn().mockImplementation((s: string) => s),
};
const promptService = {
loadPrompt: vi.fn().mockResolvedValue('base-framing'),
};
const wikiService = {
forWorktree: vi.fn(),
listPageKeys: vi.fn().mockResolvedValue([]),
readPage: vi.fn().mockResolvedValue(null),
syncFromCommit: vi.fn().mockResolvedValue(undefined),
};
wikiService.forWorktree.mockReturnValue(wikiService);
const knowledgeSlRefs = {
syncFromWiki: vi.fn().mockResolvedValue({ inserted: 1, deleted: 0 }),
};
const knowledgeIndex = {
listPagesForUser: vi.fn().mockResolvedValue([]),
};
const semanticLayerService = {
forWorktree: vi.fn(),
listFilesForConnection: vi
.fn()
.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'warehouse-2' ? ['looker__orders.yaml'] : []),
),
loadAllSources: vi
.fn()
.mockImplementation((connectionId: string) =>
Promise.resolve({
sources: connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : [],
loadErrors: [],
}),
),
};
semanticLayerService.forWorktree.mockReturnValue(semanticLayerService);
const slSearchService = {
indexSources: vi.fn().mockResolvedValue(undefined),
};
const slSourcesRepository = {};
const slValidator = { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) };
const toolsetFactory = {
createIngestWuToolset: vi.fn().mockReturnValue({
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
}),
};
const configService = {
enqueueCommitMessageJobForExternalCommit: vi.fn().mockResolvedValue(undefined),
};
return {
runsRepo,
provenanceRepo,
reportsRepo,
canonicalPins,
adapter,
registry,
diffSetService,
contextEvidenceIndex,
pageTriage,
sessionWorktreeService,
agentRunner,
gitService,
lockingService,
slValidator,
appSettingsService,
skillsRegistry,
promptService,
wikiService,
knowledgeSlRefs,
knowledgeIndex,
semanticLayerService,
slSearchService,
slSourcesRepository,
toolsetFactory,
configService,
};
};
const buildRunner = (deps: ReturnType<typeof makeDeps> = makeDeps(), overrides: Partial<IngestBundleRunnerDeps> = {}) =>
new IngestBundleRunner({
runs: deps.runsRepo as any,
provenance: deps.provenanceRepo as any,
registry: deps.registry as any,
diffSetService: deps.diffSetService as any,
contextEvidenceIndex: deps.contextEvidenceIndex,
pageTriage: deps.pageTriage as any,
sessionWorktreeService: deps.sessionWorktreeService as any,
agentRunner: deps.agentRunner as any,
gitService: deps.gitService as any,
lockingService: deps.lockingService as any,
storage: {
homeDir: '/tmp/ktx-test',
systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' },
resolveUploadDir: (uploadId) => `/tmp/ktx-test/ingest-uploads/${uploadId}`,
resolvePullDir: (jobId) => `/tmp/ktx-test/ingest-pulls/${jobId}`,
resolveTranscriptDir: (jobId) => `/tmp/ktx-test/run/wu-transcripts/${jobId}`,
resolveTracePath: (jobId) => `/tmp/ktx-test/ingest-traces/${jobId}/trace.jsonl`,
},
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
},
skillsRegistry: deps.skillsRegistry as any,
promptService: deps.promptService as any,
wikiService: deps.wikiService as any,
knowledgeSlRefs: deps.knowledgeSlRefs as any,
knowledgeIndex: deps.knowledgeIndex,
semanticLayerService: deps.semanticLayerService as any,
slSearchService: deps.slSearchService as any,
slSourcesRepository: deps.slSourcesRepository as any,
connections: {
listEnabledConnections: vi.fn().mockResolvedValue([]),
getConnectionById: vi.fn().mockResolvedValue({ id: 'c1', name: 'warehouse', connectionType: 'POSTGRES' }),
executeQuery: vi.fn().mockResolvedValue({ headers: [], rows: [] }),
},
reports: deps.reportsRepo as any,
canonicalPins: deps.canonicalPins,
slValidator: deps.slValidator as any,
toolsetFactory: deps.toolsetFactory as any,
commitMessages: {
enqueueForExternalCommit: deps.configService.enqueueCommitMessageJobForExternalCommit,
},
embedding: {
maxBatchSize: 10,
computeEmbedding: async () => [0],
computeEmbeddingsBulk: async (texts: string[]) => texts.map(() => [0]),
},
...overrides,
});
describe('IngestBundleRunner — FIFO-per-connection', () => {
let spy: any;
beforeEach(() => {
spy = vi.fn();
});
it('serializes two jobs on the same connectionId', async () => {
const runner = buildRunner();
(runner as any).runInner = async (job: any) => {
spy(job.jobId);
await new Promise((r) => setTimeout(r, 5));
spy(`done-${job.jobId}`);
return {
runId: 'r',
syncId: 's',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
workUnitCount: 0,
failedWorkUnits: [],
artifactsWritten: 0,
commitSha: null,
};
};
const p1 = runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'u1' },
});
const p2 = runner.run({
jobId: 'j2',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'u2' },
});
await Promise.all([p1, p2]);
expect(spy.mock.calls.map((c: unknown[]) => c[0])).toEqual(['j1', 'done-j1', 'j2', 'done-j2']);
});
it('runs jobs on different connections in parallel', async () => {
const runner = buildRunner();
const d1 = deferred<void>();
const d2 = deferred<void>();
(runner as any).runInner = async (job: any) => {
spy(`start-${job.jobId}`);
if (job.jobId === 'j1') {
await d1.promise;
}
if (job.jobId === 'j2') {
await d2.promise;
}
return {
runId: 'r',
syncId: 's',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
workUnitCount: 0,
failedWorkUnits: [],
artifactsWritten: 0,
commitSha: null,
};
};
const p1 = runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'u1' },
});
const p2 = runner.run({
jobId: 'j2',
connectionId: 'c2',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'u2' },
});
await new Promise((r) => setTimeout(r, 10));
expect(spy.mock.calls.map((c: unknown[]) => c[0]).sort()).toEqual(['start-j1', 'start-j2']);
d1.resolve();
d2.resolve();
await Promise.all([p1, p2]);
});
});
describe('IngestBundleRunner — Stages 1 → 7', () => {
it('runs the full pipeline, creates a run row, stages files, chunks, squashes, writes provenance', async () => {
const deps = makeDeps();
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const result = await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.runsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({ jobId: 'j1', connectionId: 'c1', sourceKey: 'fake', trigger: 'upload' }),
);
expect(deps.adapter.detect).toHaveBeenCalled();
expect(deps.adapter.chunk).toHaveBeenCalled();
expect(result.workUnitCount).toBe(1);
expect(deps.diffSetService.compute).toHaveBeenCalled();
expect(deps.gitService.squashMergeIntoMain).toHaveBeenCalledWith(
'session/j1',
expect.any(String),
expect.any(String),
expect.stringContaining('ingest(fake): j1'),
);
expect(deps.provenanceRepo.insertMany).toHaveBeenCalled();
expect(result.commitSha).toBe('sq');
expect(deps.runsRepo.markCompleted).toHaveBeenCalledWith('run-1', expect.any(Object), 'completed');
// Single touched path → path-scoped diff for the LLM commit-message note.
expect(deps.configService.enqueueCommitMessageJobForExternalCommit).toHaveBeenCalledWith(
{ commitHash: 'sq' },
expect.stringContaining('ingest(fake): j1'),
'raw-sources/c1/fake/s/a.yml',
);
});
it('uses the rate-limit governor for work-unit start slots', async () => {
const deps = makeDeps();
const acquireWorkSlot = vi.fn(async () => vi.fn());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 2,
rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never,
},
});
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'u2', rawFiles: ['b.yml'], peerFileIndex: [], dependencyPaths: [] },
],
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([
['a.yml', 'h1'],
['b.yml', 'h2'],
]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(acquireWorkSlot).toHaveBeenCalledTimes(2);
});
it('passes the job abort signal into rate-limit work-unit slots', async () => {
const deps = makeDeps();
const controller = new AbortController();
const acquireWorkSlot = vi.fn(async () => vi.fn());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 1,
rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never,
},
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any,
);
expect(acquireWorkSlot).toHaveBeenCalledWith(controller.signal);
});
it('does not convert aborted work-unit agent loops into failed work units', async () => {
const deps = makeDeps();
const controller = new AbortController();
deps.agentRunner.runLoop.mockImplementation(async () => {
controller.abort();
throw new DOMException('Aborted', 'AbortError');
});
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 1,
},
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any,
),
).rejects.toThrow(/Aborted/);
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
expect(deps.reportsRepo.create).not.toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
failedWorkUnits: expect.arrayContaining(['u1']),
}),
}),
);
});
it('emits trace and memory-flow status for rate-limit waits', async () => {
const deps = makeDeps();
let subscriber: ((state: any) => void) | undefined;
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
rateLimitGovernor: {
acquireWorkSlot: vi.fn(async () => vi.fn()),
subscribe: vi.fn((cb: (state: any) => void) => {
subscriber = cb;
return vi.fn();
}),
} as never,
},
});
(runner as any).runInner = async (_job: any, ctx: any) => {
subscriber?.({
kind: 'wait_tick',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
});
ctx.memoryFlow.emit({ type: 'report_created', runId: 'run-1' });
return {
runId: 'run-1',
syncId: 'sync-1',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
workUnitCount: 0,
failedWorkUnits: [],
artifactsWritten: 0,
commitSha: null,
};
};
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ memoryFlow } as any,
);
expect(memoryFlow.snapshot().events).toContainEqual(
expect.objectContaining({
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
}),
);
});
it('fails before squash when reconciliation leaves a touched wiki page with dangling refs', async () => {
const deps = makeDeps();
let currentToolSession: any = null;
const scopedWiki = {
listPageKeys: vi.fn().mockResolvedValue(['page-a']),
readPage: vi.fn().mockImplementation((_scope: string, _scopeId: string | null, key: string) => {
if (key === 'page-a') {
return Promise.resolve({
pageKey: 'page-a',
frontmatter: { summary: 'Page A', usage_mode: 'auto', refs: ['missing-page'] },
content: 'See [[missing-page]].',
});
}
return Promise.resolve(null);
}),
};
deps.wikiService.forWorktree.mockReturnValue(scopedWiki);
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
currentToolSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: 'Orders source' });
}
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
currentToolSession.actions.push({ target: 'wiki', type: 'created', key: 'page-a', detail: 'Page A' });
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
}),
).rejects.toThrow(/wiki references target missing page\(s\): page-a -> missing-page/);
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
expect(deps.gitService.squashMergeIntoMain).not.toHaveBeenCalled();
});
it('allows reconciliation to save circular wiki refs once both pages exist', async () => {
const deps = makeDeps();
let currentToolSession: any = null;
const scopedWiki = {
listPageKeys: vi.fn().mockResolvedValue(['page-a', 'page-b']),
readPage: vi.fn().mockImplementation((_scope: string, _scopeId: string | null, key: string) => {
if (key === 'page-a') {
return Promise.resolve({
pageKey: 'page-a',
frontmatter: { summary: 'Page A', usage_mode: 'auto', refs: ['page-b'] },
content: 'See [[page-b]].',
});
}
if (key === 'page-b') {
return Promise.resolve({
pageKey: 'page-b',
frontmatter: { summary: 'Page B', usage_mode: 'auto', refs: ['page-a'] },
content: 'See [[page-a]].',
});
}
return Promise.resolve(null);
}),
};
deps.wikiService.forWorktree.mockReturnValue(scopedWiki);
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
currentToolSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: 'Orders source' });
}
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
currentToolSession.actions.push(
{ target: 'wiki', type: 'created', key: 'page-a', detail: 'Page A' },
{ target: 'wiki', type: 'created', key: 'page-b', detail: 'Page B' },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const result = await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(result.failedWorkUnits).toEqual([]);
expect(deps.gitService.squashMergeIntoMain).toHaveBeenCalled();
expect(deps.runsRepo.markFailed).not.toHaveBeenCalled();
});
it('threads target warehouse connection names into WorkUnit and reconcile tool sessions', async () => {
const deps = makeDeps();
const sessions: any[] = [];
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse']);
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
sessions.push(toolSession);
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockResolvedValue({ stopReason: 'natural' });
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/notion/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'notion',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect([...sessions[0].allowedConnectionNames].sort()).toEqual(['notion', 'warehouse']);
});
it('reuses document evidence indexing and page triage for document WorkUnits', async () => {
const deps = makeDeps();
deps.adapter.source = 'notion';
deps.adapter.skillNames = ['notion_synthesize'];
deps.adapter.reconcileSkillNames = [];
deps.adapter.evidenceIndexing = 'documents';
deps.adapter.triageSupported = true;
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'full', rawFiles: ['pages/full/metadata.json'], dependencyPaths: [], peerFileIndex: [] },
{ unitKey: 'skip', rawFiles: ['pages/skip/metadata.json'], dependencyPaths: [], peerFileIndex: [] },
],
});
deps.diffSetService.compute.mockResolvedValue({
added: ['pages/full/metadata.json', 'pages/skip/metadata.json'],
modified: [],
deleted: [],
unchanged: [],
});
deps.pageTriage.triageRun.mockResolvedValue({
enabled: true,
fullRawPaths: new Set(['pages/full/metadata.json']),
warnings: [],
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([
['pages/full/metadata.json', 'h-full'],
['pages/skip/metadata.json', 'h-skip'],
]),
rawDirInWorktree: 'raw-sources/c1/notion/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const result = await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'notion',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCalls = deps.agentRunner.runLoop.mock.calls.filter(
([params]) => params.telemetryTags?.operationName === 'ingest-bundle-wu',
);
expect(deps.contextEvidenceIndex.indexStagedDir).toHaveBeenCalled();
expect(deps.pageTriage.triageRun).toHaveBeenCalled();
expect(workUnitCalls).toHaveLength(1);
expect(workUnitCalls[0][0].telemetryTags.unitKey).toBe('full');
expect(result.workUnitCount).toBe(1);
});
it('emits memory-flow source and planning events for bundle ingest', async () => {
const deps = makeDeps();
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'u1',
rawFiles: ['a.yml'],
peerFileIndex: ['peer.yml'],
dependencyPaths: ['manifest.yml'],
},
],
eviction: { deletedRawPaths: ['old.yml'] },
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const snapshots: MemoryFlowReplayInput[] = [];
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput(), {
onChange: (snapshot) => snapshots.push(snapshot),
});
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
() => Promise.resolve(),
);
(ctx as any).memoryFlow = memoryFlow;
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
);
expect(memoryFlow.snapshot()).toMatchObject({
runId: 'run-1',
connectionId: 'c1',
adapter: 'fake',
sourceDir: '/tmp/stage/upload-x',
});
expect(memoryFlow.snapshot().plannedWorkUnits).toEqual([
{
unitKey: 'u1',
rawFiles: ['a.yml'],
peerFileCount: 1,
dependencyCount: 1,
},
]);
expect(memoryFlow.snapshot().events).toEqual(
expect.arrayContaining([
expect.objectContaining({ type: 'source_acquired', adapter: 'fake', trigger: 'upload', fileCount: 1 }),
expect.objectContaining({ type: 'scope_detected', fingerprint: null }),
expect.objectContaining({ type: 'raw_snapshot_written', rawFileCount: 1 }),
expect.objectContaining({ type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 }),
expect.objectContaining({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 1 }),
]),
);
expect(snapshots.length).toBeGreaterThan(4);
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
memoryFlow: expect.objectContaining({
metadata: expect.objectContaining({
schemaVersion: 1,
mode: 'full',
origin: 'captured',
timing: 'captured',
}),
events: expect.arrayContaining([
expect.objectContaining({
type: 'source_acquired',
emittedAt: expect.stringMatching(/^\d{4}-\d{2}-\d{2}T/),
}),
]),
}),
}),
}),
);
});
it('emits memory-flow WorkUnit step, candidate action, and finish events', async () => {
const deps = makeDeps();
let currentToolSession: any = null;
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
// A real tool call drives the live work_unit_step heartbeat.
await params.toolSet.record_verification_ledger.execute(
{ summary: 'Captured order context.', verifiedIdentifiers: [], unverifiedIdentifiers: [] },
{ toolCallId: 'ledger-1', messages: [] },
);
currentToolSession.actions.push({
target: 'wiki',
type: 'created',
key: 'wiki/orders.md',
detail: 'captured order context',
});
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
() => Promise.resolve(),
);
(ctx as any).memoryFlow = memoryFlow;
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
);
expect(memoryFlow.snapshot().events).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: 'work_unit_started',
unitKey: 'u1',
skills: ['ingest_triage', 'sl_capture', 'wiki_capture'],
}),
expect.objectContaining({ type: 'work_unit_step', unitKey: 'u1', toolCalls: 1 }),
expect.objectContaining({
type: 'candidate_action',
unitKey: 'u1',
target: 'wiki',
action: 'created',
key: 'wiki/orders.md',
}),
expect.objectContaining({ type: 'work_unit_finished', unitKey: 'u1', status: 'success' }),
]),
);
});
it('emits memory-flow gate, saved, provenance, and report events', async () => {
const deps = makeDeps();
let currentToolSession: any = null;
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
currentToolSession.actions.push({
target: 'sl',
type: 'updated',
key: 'orders',
detail: 'captured gross revenue',
});
}
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Reconciliation emits no warehouse identifiers before fallback recording.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_conflict_resolution.execute(
{
kind: 'near_duplicate',
artifactKey: 'sl:orders',
detail: 'orders retained as canonical',
flaggedForHuman: false,
},
{ toolCallId: 'conflict-1', messages: [] },
);
await params.toolSet.emit_unmapped_fallback.execute(
{
rawPath: 'a.yml',
reason: 'parse_error',
clarification: 'semantic_not_representable',
fallback: 'flagged',
},
{ toolCallId: 'fallback-1', messages: [] },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
() => Promise.resolve(),
);
(ctx as any).memoryFlow = memoryFlow;
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
);
expect(memoryFlow.snapshot()).toMatchObject({
reportId: 'report-1',
reportPath: 'report-1',
});
expect(memoryFlow.snapshot().events).toEqual(
expect.arrayContaining([
expect.objectContaining({ type: 'reconciliation_finished', conflictCount: 1, fallbackCount: 1 }),
expect.objectContaining({ type: 'saved', commitSha: 'sq', wikiCount: 0, slCount: 1 }),
expect.objectContaining({ type: 'provenance_recorded', rowCount: 1 }),
expect.objectContaining({ type: 'report_created', runId: 'run-1', reportPath: 'report-1' }),
]),
);
});
it('finishes successful bundle memory-flow runs as done', async () => {
const deps = makeDeps();
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
() => Promise.resolve(),
);
(ctx as any).memoryFlow = memoryFlow;
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
);
expect(memoryFlow.snapshot().status).toBe('done');
});
it('finishes bundle memory-flow runs with sanitized errors when the runner fails', async () => {
const deps = makeDeps();
const sensitiveMessage = [
'failed to read postgres://user',
':password',
'@localhost:5432/db?api_key=abc',
' token=',
'secret',
].join('');
deps.adapter.detect.mockRejectedValue(new Error(sensitiveMessage));
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
() => Promise.resolve(),
);
(ctx as any).memoryFlow = memoryFlow;
await expect(
runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
),
).rejects.toThrow(/failed to read/);
expect(memoryFlow.snapshot()).toMatchObject({
status: 'error',
errors: ['failed to read postgres://[redacted] token=[redacted]'],
});
expect(memoryFlow.snapshot().events).toEqual(
expect.arrayContaining([
expect.objectContaining({ type: 'source_acquired', adapter: 'fake', trigger: 'upload', fileCount: 1 }),
]),
);
});
it('stores memory-flow provenance and transcript summaries in the ingest report body', async () => {
const deps = makeDeps();
deps.toolsetFactory.createIngestWuToolset.mockReturnValue({
toRuntimeTools: vi.fn().mockReturnValue({
read_raw_span: {
description: 'read a raw span',
inputSchema: {},
execute: vi.fn().mockResolvedValue('safe excerpt'),
},
wiki_write: {
description: 'write wiki',
inputSchema: {},
execute: vi.fn().mockResolvedValue('written'),
},
}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await params.toolSet.read_raw_span.execute(
{ path: 'a.yml', startLine: 1, endLine: 2 },
{ toolCallId: 'read-1', messages: [] },
);
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Wiki write contains no warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.wiki_write.execute(
{ key: 'wiki/a.md', content: 'safe summary' },
{ toolCallId: 'wiki-1', messages: [] },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
provenanceRows: [
expect.objectContaining({
rawPath: 'a.yml',
artifactKind: null,
artifactKey: null,
actionType: 'skipped',
targetConnectionId: null,
}),
],
toolTranscripts: [
{
unitKey: 'u1',
path: '/tmp/ktx-test/run/wu-transcripts/j1/u1.jsonl',
toolCallCount: 3,
errorCount: 0,
toolNames: ['read_raw_span', 'record_verification_ledger', 'wiki_write'],
},
],
}),
}),
);
});
it('persists WorkUnit unmapped fallback records in the report body', async () => {
const deps = makeDeps();
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Unmapped fallback records an unsupported conversion metric without verified warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_unmapped_fallback.execute(
{
rawPath: 'a.yml',
reason: 'conversion_metric_unsupported',
fallback: 'flagged',
},
{ toolCallId: 'fallback-1', messages: [] },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
unmappedFallbacks: [
{
rawPath: 'a.yml',
reason: 'conversion_metric_unsupported',
detail: expect.stringContaining('conversion metric'),
fallback: 'flagged',
},
],
}),
}),
);
});
it('persists reconciliation conflict and eviction records in the report body', async () => {
const deps = makeDeps();
deps.diffSetService.compute.mockResolvedValue({
added: [],
modified: [],
deleted: ['views/old_orders.view.lkml'],
unchanged: [],
});
deps.adapter.chunk.mockResolvedValue({
workUnits: [],
eviction: { deletedRawPaths: ['views/old_orders.view.lkml'] },
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Reconciliation records conflict, eviction, and fallback decisions without warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_conflict_resolution.execute(
{
kind: 'near_duplicate',
artifactKey: 'sl:orders',
detail: 'orders and old_orders overlapped; orders is retained as canonical',
flaggedForHuman: false,
},
{ toolCallId: 'conflict-1', messages: [] },
);
await params.toolSet.emit_eviction_decision.execute(
{
rawPath: 'views/old_orders.view.lkml',
artifactKind: 'sl',
artifactKey: 'old_orders',
action: 'removed',
reason: 'raw source disappeared in this sync',
},
{ toolCallId: 'eviction-1', messages: [] },
);
await params.toolSet.emit_unmapped_fallback.execute(
{
rawPath: 'cards/untranslated.json',
reason: 'parse_error',
clarification: 'metabase_sql_untranslated',
fallback: 'flagged',
},
{ toolCallId: 'fallback-1', messages: [] },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['cards/untranslated.json', 'h-card']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
conflictsResolved: [
{
kind: 'near_duplicate',
artifactKey: 'sl:orders',
detail: 'orders and old_orders overlapped; orders is retained as canonical',
flaggedForHuman: false,
},
],
evictionsApplied: [
{
rawPath: 'views/old_orders.view.lkml',
artifactKind: 'sl',
artifactKey: 'old_orders',
action: 'removed',
reason: 'raw source disappeared in this sync',
},
],
unmappedFallbacks: [
{
rawPath: 'cards/untranslated.json',
reason: 'parse_error',
detail: expect.stringContaining('metabase_sql_untranslated'),
fallback: 'flagged',
},
],
}),
}),
);
});
it('persists reconciliation artifact resolutions as provenance rows', async () => {
const deps = makeDeps();
deps.diffSetService.compute.mockResolvedValue({
added: [],
modified: [],
deleted: ['looks/20.json'],
unchanged: ['explores/b2b/sales_pipeline.json'],
});
deps.adapter.chunk.mockResolvedValue({
workUnits: [],
eviction: { deletedRawPaths: ['looks/20.json'] },
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
await params.toolSet.emit_artifact_resolution.execute(
{
rawPath: 'explores/b2b/sales_pipeline.json',
artifactKind: 'sl',
artifactKey: 'looker__b2b__sales_pipeline',
actionType: 'subsumed',
reason: 'File adapter source b2b__sales_pipeline is canonical.',
},
{ toolCallId: 'resolution-1', messages: [] },
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['explores/b2b/sales_pipeline.json', 'h-explore']]),
rawDirInWorktree: 'raw-sources/c1/looker/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'looker',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.provenanceRepo.insertMany).toHaveBeenCalledWith(
expect.arrayContaining([
expect.objectContaining({
rawPath: 'explores/b2b/sales_pipeline.json',
artifactKind: 'sl',
artifactKey: 'looker__b2b__sales_pipeline',
actionType: 'subsumed',
}),
]),
);
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
artifactResolutions: [
{
rawPath: 'explores/b2b/sales_pipeline.json',
artifactKind: 'sl',
artifactKey: 'looker__b2b__sales_pipeline',
actionType: 'subsumed',
reason: 'File adapter source b2b__sales_pipeline is canonical.',
},
],
}),
}),
);
});
it('runs manual override reconciliation from the prior report snapshot and marks the prior report superseded', async () => {
const tempRoot = await mkdtemp(join(tmpdir(), 'ktx-override-'));
const deps = makeDeps();
deps.reportsRepo.findByJobId.mockResolvedValue({
id: 'report-old',
runId: 'run-old',
jobId: 'job-old',
connectionId: 'c1',
sourceKey: 'fake',
createdAt: '2026-04-27T10:00:00.000Z',
body: {
syncId: '2026-04-27-100000-job-old',
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
commitSha: 'old-sha',
workUnits: [
{
unitKey: 'wu-orders',
rawFiles: ['a.yml'],
status: 'success',
actions: [
{
target: 'sl',
type: 'updated',
key: 'orders',
detail: 'captured gross_revenue as orders.gross_revenue',
},
],
touchedSlSources: ['orders'],
},
],
failedWorkUnits: [],
reconciliationSkipped: false,
conflictsResolved: [
{
kind: 'definitional_contradiction',
contestedKey: 'gross_revenue',
artifactKey: 'orders.gross_revenue',
detail: 'billing and orders disagree',
flaggedForHuman: true,
},
],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
},
});
deps.gitService.listFilesAtHead.mockResolvedValue(['raw-sources/c1/fake/2026-04-27-100000-job-old/a.yml']);
deps.gitService.getFileAtCommit.mockResolvedValue('name: orders\n');
deps.diffSetService.compute.mockResolvedValue({ added: [], modified: [], deleted: [], unchanged: ['a.yml'] });
deps.agentRunner.runLoop.mockImplementation(async (args: any) => {
await args.toolSet.emit_conflict_resolution.execute(
{
kind: 'definitional_contradiction',
contestedKey: 'gross_revenue',
artifactKey: 'orders.gross_revenue',
detail: 'canonical pin applied',
flaggedForHuman: false,
},
{ toolCallId: 'tc-1', messages: [] },
);
return { stopReason: 'natural' };
});
const runner = new IngestBundleRunner({
...(buildRunner(deps) as any).deps,
storage: {
homeDir: tempRoot,
systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' },
resolveUploadDir: (uploadId: string) => join(tempRoot, 'ingest-uploads', uploadId),
resolvePullDir: (jobId: string) => join(tempRoot, 'ingest-pulls', jobId),
resolveTranscriptDir: (jobId: string) => join(tempRoot, 'run', 'wu-transcripts', jobId),
},
});
await runner.run({
jobId: 'job-new',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'manual_override',
bundleRef: { kind: 'override', priorJobId: 'job-old' },
});
await expect(readFile(join(tempRoot, 'ingest-pulls/job-new/a.yml'), 'utf-8')).resolves.toBe('name: orders\n');
expect(deps.adapter.chunk).not.toHaveBeenCalled();
expect(deps.agentRunner.runLoop).toHaveBeenCalled();
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
jobId: 'job-new',
body: expect.objectContaining({
overrideOf: 'job-old',
supersededBy: null,
conflictsResolved: [
expect.objectContaining({
contestedKey: 'gross_revenue',
flaggedForHuman: false,
}),
],
}),
}),
);
expect(deps.reportsRepo.markSuperseded).toHaveBeenCalledWith('job-old', 'job-new');
await rm(tempRoot, { recursive: true, force: true });
});
it('passes connection canonical pins into each WorkUnit system prompt', async () => {
const deps = makeDeps();
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'wu-orders',
rawFiles: ['cards/orders.yml'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
deps.canonicalPins.listPins.mockResolvedValue([
{
contestedKey: 'gross_revenue',
canonicalArtifactKey: 'finance.gross_revenue',
pinnedAt: '2026-04-27T12:00:00.000Z',
pinnedBy: 'user-1',
reason: 'finance owns revenue definitions',
},
]);
deps.agentRunner.runLoop.mockResolvedValue({ stopReason: 'natural' });
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['cards/orders.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(workUnitCall?.[0].systemPrompt).toContain('<canonical_pins>');
expect(workUnitCall?.[0].systemPrompt).toContain('contestedKey: gross_revenue');
expect(workUnitCall?.[0].systemPrompt).toContain('canonicalArtifactKey: finance.gross_revenue');
expect(deps.canonicalPins.listPins).toHaveBeenCalledTimes(1);
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['c1']);
});
it('builds WorkUnit SL index and canonical pins across adapter target connections', async () => {
const deps = makeDeps();
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']);
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'looker-explore-b2b-orders',
rawFiles: ['explores/b2b/orders.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
deps.canonicalPins.listPins.mockResolvedValue([
{
contestedKey: 'gross_revenue',
canonicalArtifactKey: 'finance.gross_revenue',
pinnedAt: '2026-05-01T12:00:00.000Z',
pinnedBy: 'user-1',
reason: 'finance owns revenue definitions',
},
]);
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['explores/b2b/orders.json', 'h1']]),
rawDirInWorktree: 'raw-sources/looker-run/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'looker-run',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(deps.adapter.listTargetConnectionIds).toHaveBeenCalledWith('/tmp/stage/upload-x');
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('looker-run');
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse-2');
expect(workUnitCall?.[0].userPrompt).toContain('looker__orders');
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['looker-run', 'warehouse-2']);
});
it('syncs wiki refs, reindexes, and records provenance on SL target connections', async () => {
const deps = makeDeps();
let currentToolSession: any = null;
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']);
deps.wikiService.readPage = vi.fn().mockResolvedValue({
frontmatter: { sl_refs: ['looker__b2b__sales_pipeline.arr'] },
});
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
currentToolSession.actions.push(
{
target: 'wiki',
type: 'created',
key: 'wiki/global/pipeline.md',
detail: 'Pipeline article',
},
{
target: 'sl',
type: 'created',
key: 'looker__b2b__sales_pipeline',
detail: 'Created warehouse source',
targetConnectionId: 'warehouse-2',
},
);
addTouchedSlSource(currentToolSession.touchedSlSources, 'warehouse-2', 'looker__b2b__sales_pipeline');
}
return { stopReason: 'natural' };
});
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/looker-run/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'looker-run',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.knowledgeSlRefs.syncFromWiki).toHaveBeenCalledWith({
wikiPageKey: 'wiki/global/pipeline.md',
wikiScope: 'GLOBAL',
wikiScopeId: null,
refs: [{ connectionId: 'warehouse-2', sourceName: 'looker__b2b__sales_pipeline' }],
});
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse-2');
expect(deps.slSearchService.indexSources).toHaveBeenCalledWith('warehouse-2', [{ name: 'warehouse-2_source' }]);
expect(deps.provenanceRepo.insertMany).toHaveBeenCalledWith(
expect.arrayContaining([
expect.objectContaining({
connectionId: 'looker-run',
targetConnectionId: 'warehouse-2',
artifactKind: 'sl',
artifactKey: 'looker__b2b__sales_pipeline',
}),
expect.objectContaining({
connectionId: 'looker-run',
targetConnectionId: null,
artifactKind: 'wiki',
artifactKey: 'wiki/global/pipeline.md',
}),
]),
);
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
workUnits: [
expect.objectContaining({
touchedSlSources: [{ connectionId: 'warehouse-2', sourceName: 'looker__b2b__sales_pipeline' }],
}),
],
provenanceRows: expect.arrayContaining([
expect.objectContaining({
artifactKind: 'sl',
artifactKey: 'looker__b2b__sales_pipeline',
targetConnectionId: 'warehouse-2',
}),
]),
}),
}),
);
});
it('runs adapter finalization before squash, records the outcome, and reindexes touched sources', async () => {
const deps = makeDeps();
deps.adapter.source = 'metricflow';
deps.registry.get.mockReturnValue(deps.adapter);
deps.adapter.chunk.mockResolvedValue({
workUnits: [],
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']);
deps.adapter.finalize = vi.fn().mockResolvedValue({
result: { sourcesTouched: 1 },
warnings: ['kept going'],
errors: [],
touchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
changedWikiPageKeys: [],
actions: [
{
target: 'sl',
type: 'updated',
key: 'orders',
targetConnectionId: 'warehouse-2',
detail: 'Finalized orders usage',
rawPaths: ['semantic_models.yml'],
},
],
});
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);
let head = 'pre-finalization';
const git = {
revParseHead: vi.fn(async () => head),
commitFiles: vi.fn().mockImplementation(async (paths: string[]) => {
if (paths.includes('semantic-layer/warehouse-2/orders.yaml')) {
head = 'post-finalization';
return { created: true, commitHash: 'finalization-sha' };
}
return { created: true, commitHash: head };
}),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'post-finalization' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockResolvedValue(undefined),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockImplementation(async (from: string, to: string) =>
from === 'pre-finalization' && to === 'post-finalization'
? [{ status: 'M', path: 'semantic-layer/warehouse-2/orders.yaml' }]
: [],
),
changedPaths: vi.fn().mockResolvedValue(['semantic-layer/warehouse-2/orders.yaml']),
};
deps.sessionWorktreeService.create.mockResolvedValue({
chatId: 'j1',
workdir: '/tmp/wt',
branch: 'session/j1',
baseSha: 'b',
createdAt: new Date(),
git,
config: {},
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['semantic_models.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/metricflow/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'metricflow',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.adapter.finalize).toHaveBeenCalledWith(
expect.objectContaining({
connectionId: 'c1',
sourceKey: 'metricflow',
syncId: expect.any(String),
jobId: 'j1',
runId: 'run-1',
workdir: '/tmp/wt',
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
}),
);
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
finalization: expect.objectContaining({
sourceKey: 'metricflow',
status: 'success',
commitSha: 'finalization-sha',
touchedPaths: ['semantic-layer/warehouse-2/orders.yaml'],
derivedTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
declaredTouchedSources: [{ connectionId: 'warehouse-2', sourceName: 'orders' }],
actions: [expect.objectContaining({ key: 'orders' })],
}),
}),
}),
);
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse-2');
expect(deps.slSearchService.indexSources).toHaveBeenCalledWith('warehouse-2', [{ name: 'warehouse-2_source' }]);
expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
});
it('includes finalization actions in memory-flow saved counts', async () => {
const deps = makeDeps();
deps.adapter.source = 'historic-sql';
deps.registry.get.mockReturnValue(deps.adapter);
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'historic-sql-table-public-orders',
rawFiles: ['tables/public/orders.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [
{ target: 'sl', type: 'updated', key: 'orders', detail: 'Merged usage' },
{ target: 'sl', type: 'updated', key: 'customers', detail: 'Merged usage' },
{ target: 'wiki', type: 'created', key: 'historic-sql-orders', detail: 'Projected pattern' },
{ target: 'wiki', type: 'updated', key: 'historic-sql-customers', detail: 'Projected pattern' },
],
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['tables/public/orders.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/historic-sql/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'historic-sql',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{
jobId: 'j1',
memoryFlow,
startPhase: () => new TestJobContext('j1', null, () => Promise.resolve(), () => Promise.resolve()),
},
);
expect(memoryFlow.snapshot().events).toContainEqual(
expect.objectContaining({
type: 'saved',
wikiCount: 2,
slCount: 2,
}),
);
});
it('marks finalization infrastructure failure as failed and preserves worktree cleanup state', async () => {
const deps = makeDeps();
deps.adapter.source = 'metricflow';
deps.registry.get.mockReturnValue(deps.adapter);
deps.adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }],
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
deps.adapter.finalize = vi.fn().mockRejectedValue(new Error('worktree write failed'));
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['semantic_models.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/metricflow/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'metricflow',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
}),
).rejects.toThrow('worktree write failed');
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
expect(deps.gitService.squashMergeIntoMain).not.toHaveBeenCalled();
expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'crash');
});
it('reports finalization actions excluded from provenance when raw paths are not defensible', async () => {
const deps = makeDeps();
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [
{ target: 'wiki', type: 'updated', key: 'historic-sql-pattern', detail: 'No raw path' },
{ target: 'sl', type: 'updated', key: 'orders', detail: 'Invalid raw path', rawPaths: ['missing.json'] },
],
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['current.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
finalization: expect.objectContaining({
provenanceExclusions: [
expect.objectContaining({ reason: 'missing_raw_paths' }),
expect.objectContaining({ reason: 'raw_path_not_defensible', invalidRawPaths: ['missing.json'] }),
],
}),
}),
}),
);
expect(deps.provenanceRepo.insertMany).not.toHaveBeenCalledWith(
expect.arrayContaining([expect.objectContaining({ rawPath: 'missing.json' })]),
);
});
it('passes explicit override replay metadata and no current work unit outcomes', async () => {
const deps = makeDeps();
deps.reportsRepo.findByJobId.mockResolvedValue({
id: 'prior-report',
runId: 'prior-run',
jobId: 'prior-job',
connectionId: 'c1',
sourceKey: 'fake',
createdAt: '2026-05-18T00:00:00.000Z',
body: {
status: 'completed',
syncId: 'prior-sync',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
commitSha: 'prior-sha',
workUnits: [
{
unitKey: 'prior-unit',
rawFiles: ['prior.json'],
status: 'success',
actions: [{ target: 'wiki', type: 'created', key: 'prior', detail: 'prior' }],
touchedSlSources: [],
},
],
failedWorkUnits: [],
reconciliationSkipped: false,
conflictsResolved: [],
evictionsApplied: [
{
rawPath: 'do-not-replay.json',
artifactKind: 'wiki',
artifactKey: 'old',
action: 'removed',
reason: 'prior',
},
],
unmappedFallbacks: [],
artifactResolutions: [],
evictionInputs: ['evicted-from-prior-report.json'],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
},
});
deps.adapter.finalize = vi.fn().mockResolvedValue({
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: [],
actions: [],
});
deps.gitService.listFilesAtHead.mockResolvedValue(['raw-sources/c1/fake/prior-sync/prior.json']);
deps.gitService.getFileAtCommit.mockResolvedValue('{"id":1}\n');
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['prior.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/prior-sync',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/prior');
await runner.run({
jobId: 'override-job',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'manual_override',
bundleRef: { kind: 'override', priorJobId: 'prior-job' },
});
expect(deps.adapter.finalize).toHaveBeenCalledWith(
expect.objectContaining({
workUnitOutcomes: [],
overrideReplay: {
priorJobId: 'prior-job',
priorRunId: 'prior-run',
priorSyncId: 'prior-sync',
evictionRawPaths: ['evicted-from-prior-report.json'],
},
}),
);
});
it('includes existing global wiki pages in WorkUnit prompts', async () => {
const deps = makeDeps();
deps.knowledgeIndex.listPagesForUser.mockResolvedValue([
{
page_key: 'revenue-recognition',
summary: 'Recognize revenue net of refunds after fulfillment.',
scope: 'GLOBAL',
scope_id: null,
},
]);
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['cards/orders.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(workUnitCall?.[0].userPrompt).toContain('## Wiki Pages');
expect(workUnitCall?.[0].userPrompt).toContain(
'- revenue-recognition: Recognize revenue net of refunds after fulfillment.',
);
expect(deps.knowledgeIndex.listPagesForUser).toHaveBeenCalledWith('system');
});
it('includes manifest-backed target sources in WorkUnit prompts', async () => {
const deps = makeDeps();
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['postgres-warehouse']);
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve({
sources: connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : [],
loadErrors: [],
}),
);
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['models/schema.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/dbt-main/dbt/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'dbt-main',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(workUnitCall?.[0].userPrompt).toContain('## postgres-warehouse');
expect(workUnitCall?.[0].userPrompt).toContain('stg_accounts');
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['dbt-main', 'postgres-warehouse']);
});
it('does not resolve qualified fallback table refs by source name alone', async () => {
const deps = makeDeps();
deps.semanticLayerService.loadAllSources.mockResolvedValue({
sources: [{ name: 'orders', table: 'sales.orders' }],
loadErrors: [],
});
const runner = buildRunner(deps);
await expect(
(runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'finance.orders'),
).resolves.toBe(false);
await expect(
(runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'sales.orders'),
).resolves.toBe(true);
});
it('passes relevant canonical pins into the reconciliation system prompt', async () => {
const deps = makeDeps();
deps.diffSetService.compute.mockResolvedValue({
added: [],
modified: [],
deleted: ['metrics/old.yml'],
unchanged: [],
});
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'wu-billing',
rawFiles: ['metrics/churn_risk_score.yml'],
peerFileIndex: [],
dependencyPaths: [],
},
],
eviction: { deletedRawPaths: ['metrics/old.yml'] },
});
deps.canonicalPins.listPins.mockResolvedValue([
{
contestedKey: 'churn_risk_score',
canonicalArtifactKey: 'billing.churn_risk_score',
pinnedAt: '2026-04-27T12:00:00.000Z',
pinnedBy: 'user-1',
reason: 'billing owns the contractual definition',
},
{
contestedKey: 'gross_margin',
canonicalArtifactKey: 'finance.gross_margin',
pinnedAt: '2026-04-27T12:01:00.000Z',
pinnedBy: 'user-2',
reason: null,
},
]);
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([
['metrics/churn_risk_score.yml', 'h1'],
['metrics/old.yml', 'h2'],
]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const reconcileCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-reconcile',
);
expect(reconcileCall?.[0].systemPrompt).toContain('<canonical_pins>');
expect(reconcileCall?.[0].systemPrompt).toContain('contestedKey: churn_risk_score');
expect(reconcileCall?.[0].systemPrompt).not.toContain('gross_margin');
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['c1']);
});
it('emits a monotonically non-decreasing progress sequence reaching 1.0, covering all 7 stages', async () => {
const deps = makeDeps();
deps.agentRunner.runLoop.mockImplementation(async () => ({ stopReason: 'natural' }));
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const observed: Array<{ p: number; m?: string }> = [];
const ctx = new TestJobContext(
'j1',
null,
() => Promise.resolve(),
(p, m) => {
observed.push({ p, m });
return Promise.resolve();
},
);
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
ctx,
);
// Monotonic.
for (let i = 1; i < observed.length; i++) {
expect(observed[i].p).toBeGreaterThanOrEqual(observed[i - 1].p);
}
// Reaches completion.
expect(observed.at(-1)?.p).toBeCloseTo(1.0, 3);
// Every stage surfaces a user-facing message.
const phaseLabels = [
'Fetching source files',
'Planning updates',
'Processing',
/Reconcil|reconcil/,
'Saving changes',
'Recording history',
'Wrapping up',
];
for (const label of phaseLabels) {
expect(observed.some((o) => (typeof label === 'string' ? o.m?.includes(label) : label.test(o.m ?? '')))).toBe(
true,
);
}
});
it('a Stage 3 failure leaves the shared knowledge table untouched', async () => {
const deps = makeDeps();
// Agent runner returns a successful result but the adapter emits a WU whose
// outcome still produces no actions — the point is that the scoped wiki service
// must not touch indexRepository during Stage 3, and syncFromCommit is what
// drives the shared table. If we cancel the run before squash, syncFromCommit
// must not be called.
deps.gitService.squashMergeIntoMain.mockRejectedValue(new Error('simulated squash failure'));
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
}),
).rejects.toThrow(/simulated squash failure/);
expect(deps.wikiService.syncFromCommit).not.toHaveBeenCalled();
});
it('refuses to squash-merge when the session worktree has an in-progress sequencer op', async () => {
const deps = makeDeps();
const assertError = new Error('Worktree has in-progress git operation (sequencer ...); refusing to proceed');
const sessionGit = {
revParseHead: vi.fn().mockResolvedValue('h'),
commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockRejectedValue(assertError),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockResolvedValue([]),
};
deps.sessionWorktreeService.create.mockResolvedValue({
chatId: 'j1',
workdir: '/tmp/wt',
branch: 'session/j1',
baseSha: 'b',
createdAt: new Date(),
git: sessionGit,
config: {},
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
}),
).rejects.toThrow(/in-progress git operation/);
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
expect(deps.gitService.squashMergeIntoMain).not.toHaveBeenCalled();
});
it('fails the run and rethrows when the adapter cannot detect the bundle', async () => {
const deps = makeDeps();
deps.adapter.detect.mockResolvedValue(false);
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
}),
).rejects.toThrow(/did not recognize/);
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
});
});