ktx/packages/cli/test/context/ingest/ingest-bundle.runner.isolated-diff.test.ts
Andrey Avtomonov 56985b7e09
test: split cli tests from source tree (#216)
* feat(cli): define full warehouse dialect contract

* test(cli): keep dialect edge tests focused

* fix(cli): stabilize dialect contract foundation

* refactor(connectors): own read-only query preparation

* refactor(connectors): resolve dialects through registry

* refactor(connectors): keep concrete dialect classes internal

* chore(workspace): enforce dialect import boundary

* refactor(cli): resolve relationship dialect at scan boundary

* refactor(cli): use dialect display parsing for entity details

* refactor(cli): use dialect display parsing for warehouse catalog

* refactor(cli): use dialect SQL in relationship workflows

* test(cli): verify solid dialect scan workflow closure

* test: split cli tests from source tree

* refactor(cli): standardize BigQuery scope listing

* feat(sqlite): implement connector scope listing

* test(connectors): cover required table listing

* feat(cli): add warehouse driver registry

* refactor(setup): route scope discovery through driver registry

* refactor(cli): route local query execution through driver registry

* refactor(historic-sql): route dialect support through driver registry

* refactor(cli): test warehouse connections through driver registry

* fix(cli): close driver registry type export gaps

* Improve setup daemon diagnostics

* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback

Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.

* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match

The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.

Align the picker boundary with the canonical 3-level KtxTableRef:

- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
  resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
  (resolveEnabledTables already accepts the 3-part shape) and
  schemasFromEnabledTables now goes through parseDottedTableEntry so it
  recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
  reuse.

Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).

* fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00

2376 lines
100 KiB
TypeScript

import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { GitService } from '../../../src/context/core/git.service.js';
import { SessionWorktreeService } from '../../../src/context/core/session-worktree.service.js';
import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js';
import { addTouchedSlSource } from '../../../src/context/tools/touched-sl-sources.js';
import { IngestBundleRunner } from '../../../src/context/ingest/ingest-bundle.runner.js';
import type { IngestBundleRunnerDeps } from '../../../src/context/ingest/ports.js';
async function makeRealGitRuntime() {
const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-runner-'));
const configDir = join(homeDir, 'config');
const git = new GitService({
storage: { configDir, homeDir },
git: {
userName: 'System User',
userEmail: 'system@example.com',
bootstrapMessage: 'init',
bootstrapAuthor: 'system',
bootstrapAuthorEmail: 'system@example.com',
},
});
await git.onModuleInit();
const configService = new LocalGitFileStore({ rootDir: configDir, git });
const sessionWorktreeService = new SessionWorktreeService({
coreConfig: {
storage: { configDir, homeDir },
git: {
userName: 'System User',
userEmail: 'system@example.com',
bootstrapMessage: 'init',
bootstrapAuthor: 'system',
bootstrapAuthorEmail: 'system@example.com',
},
},
gitService: git,
configService,
});
return { homeDir, configDir, git, configService, sessionWorktreeService };
}
function rootOfConfig(configService: unknown, fallback: string): string {
const rootDir = (configService as { rootDir?: unknown }).rootDir;
return typeof rootDir === 'string' ? rootDir : fallback;
}
async function loadSourcesFromRoot(root: string) {
const raw = await readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8').catch(
() => '',
);
const hasCents = raw.includes('total_contract_arr_cents');
const hasDollars = raw.includes('total_contract_arr');
return {
sources:
hasCents || hasDollars
? [
{
name: 'mart_account_segments',
grain: ['account_id'],
columns: [{ name: 'account_id', type: 'string' }],
joins: [],
measures: [{ name: hasCents ? 'total_contract_arr_cents' : 'total_contract_arr', expr: 'sum(contract_arr)' }],
table: 'analytics.mart_account_segments',
},
]
: [],
loadErrors: [],
};
}
async function listGlobalWikiPageKeys(root: string): Promise<string[]> {
const dir = join(root, 'wiki/global');
const entries = await readdir(dir).catch(() => []);
return entries
.filter((entry) => entry.endsWith('.md'))
.map((entry) => entry.slice(0, -'.md'.length))
.sort();
}
function frontmatterList(yaml: string, key: string): string[] {
const pattern = new RegExp(`(?:^|\\n)${key}:\\n((?: - .+\\n?)*)`);
return (
pattern
.exec(yaml)?.[1]
?.split('\n')
.map((line) => line.trim().replace(/^- /, ''))
.filter(Boolean) ?? []
);
}
function legacyFallbackSettingKey(): string {
return ['sharedWorktree', 'SourceKeys'].join('');
}
function legacySharedTraceEvent(): string {
return ['shared', 'worktree', 'path', 'enabled'].join('_');
}
function makeWikiService(root: string) {
return {
listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])),
readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => {
const path = join(root, 'wiki/global', `${key}.md`);
const raw = await readFile(path, 'utf-8').catch(() => null);
if (!raw) {
return null;
}
const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? [];
return {
pageKey: key,
frontmatter: {
summary: key,
usage_mode: 'auto',
refs: frontmatterList(yaml, 'refs'),
sl_refs: frontmatterList(yaml, 'sl_refs'),
},
content: content.trim(),
};
}),
writePage: vi.fn(
async (
_scope: string,
_scopeId: string | null,
key: string,
frontmatter: { summary?: string; usage_mode?: string; refs?: string[]; sl_refs?: string[] },
content: string,
) => {
await mkdir(join(root, 'wiki/global'), { recursive: true });
const refs = (frontmatter.refs ?? []).map((ref) => ` - ${ref}`).join('\n');
const slRefs = (frontmatter.sl_refs ?? []).map((ref) => ` - ${ref}`).join('\n');
await writeFile(
join(root, 'wiki/global', `${key}.md`),
[
'---',
`summary: ${frontmatter.summary ?? key}`,
`usage_mode: ${frontmatter.usage_mode ?? 'auto'}`,
'refs:',
refs,
'sl_refs:',
slRefs,
'---',
'',
content,
'',
].join('\n'),
);
},
),
syncFromCommit: vi.fn(),
};
}
function makeDeps(
runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>,
sourceKey = 'metabase',
settings: Partial<IngestBundleRunnerDeps['settings']> = {},
) {
const adapter: any = {
source: sourceKey,
skillNames: [],
detect: vi.fn().mockResolvedValue(true),
chunk: vi.fn().mockResolvedValue({
workUnits: [
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
],
}),
};
const wikiService = makeWikiService(runtime.configDir);
const semanticLayerService: any = {
loadAllSources: vi.fn(async () => loadSourcesFromRoot(runtime.configDir)),
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
};
semanticLayerService.forWorktree = vi.fn((workdir: string) => ({
...semanticLayerService,
loadAllSources: vi.fn(async () => loadSourcesFromRoot(workdir)),
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
}));
const deps: IngestBundleRunnerDeps = {
runs: { create: vi.fn().mockResolvedValue({ id: 'run-1' }), markCompleted: vi.fn(), markFailed: vi.fn() },
provenance: {
insertMany: vi.fn(),
findLatestHashesForCompletedSyncs: vi.fn().mockResolvedValue(new Map()),
findLatestArtifactsForRawPaths: vi.fn().mockResolvedValue(new Map()),
},
reports: { create: vi.fn().mockResolvedValue({ id: 'report-1' }), findByJobId: vi.fn().mockResolvedValue(null), markSuperseded: vi.fn() },
canonicalPins: { listPins: vi.fn().mockResolvedValue([]) },
registry: { get: vi.fn().mockReturnValue(adapter), register: vi.fn(), has: vi.fn(), list: vi.fn() },
diffSetService: {
compute: vi.fn().mockResolvedValue({ added: ['cards/wiki.json', 'cards/source.json'], modified: [], deleted: [], unchanged: [] }),
},
sessionWorktreeService: runtime.sessionWorktreeService,
agentRunner: { runLoop: vi.fn() },
gitService: runtime.git,
lockingService: { withLock: vi.fn(async (_key, fn) => fn()) },
storage: {
homeDir: join(runtime.configDir, '.ktx'),
systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' },
resolveUploadDir: (id) => join(runtime.homeDir, 'upload', id),
resolvePullDir: (id) => join(runtime.homeDir, 'pull', id),
resolveTranscriptDir: (id) => join(runtime.configDir, '.ktx/ingest-transcripts', id),
resolveTracePath: (id) => join(runtime.configDir, '.ktx/ingest-traces', id, 'trace.jsonl'),
},
settings: {
memoryIngestionModel: 'test',
probeRowCount: 1,
ingestTraceLevel: 'trace',
...settings,
},
skillsRegistry: {
listSkills: vi.fn().mockResolvedValue([]),
getSkill: vi.fn().mockResolvedValue(null),
buildSkillsPrompt: vi.fn().mockReturnValue(''),
stripFrontmatter: vi.fn((body) => body),
} as never,
promptService: { loadPrompt: vi.fn().mockResolvedValue('base') } as never,
wikiService: { ...wikiService, forWorktree: vi.fn((workdir: string) => makeWikiService(workdir)) } as never,
knowledgeIndex: { listPagesForUser: vi.fn().mockResolvedValue([]) },
knowledgeSlRefs: { syncFromWiki: vi.fn() },
semanticLayerService,
slSearchService: { indexSources: vi.fn() } as never,
slSourcesRepository: {} as never,
slValidator: { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) },
connections: { listEnabledConnections: vi.fn().mockResolvedValue([]), getConnectionById: vi.fn() } as never,
toolsetFactory: { createIngestWuToolset: vi.fn(() => ({ toRuntimeTools: vi.fn(() => ({})) })) },
commitMessages: { enqueueForExternalCommit: vi.fn() },
embedding: { maxBatchSize: 64, computeEmbedding: vi.fn(), computeEmbeddingsBulk: vi.fn() },
};
return { deps, adapter };
}
async function mockStageRawFiles(
runner: IngestBundleRunner,
runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>,
hashes: [string, string][],
sourceKey = 'metabase',
) {
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue(join(runtime.homeDir, 'stage'));
(runner as any).stageRawFilesStage1 = vi.fn(async ({ worktreeRoot }: any) => {
const rawDir = join(worktreeRoot, 'raw-sources/warehouse', sourceKey, 's');
await mkdir(rawDir, { recursive: true });
for (const [rawPath] of hashes) {
await mkdir(join(rawDir, rawPath.split('/').slice(0, -1).join('/')), { recursive: true });
await writeFile(join(rawDir, rawPath), '{}');
}
return { currentHashes: new Map(hashes), rawDirInWorktree: `raw-sources/warehouse/${sourceKey}/s` };
});
}
describe('IngestBundleRunner isolated diff path', () => {
it('routes an unlisted direct-writing source through isolated diffs by default', async () => {
const runtime = await makeRealGitRuntime();
try {
const sourceKey = 'custom-direct-source';
const { deps, adapter } = makeDeps(runtime, sourceKey);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'custom-wiki',
rawFiles: ['custom/page.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/custom-isolated.md'),
'---\nsummary: Custom isolated write\nusage_mode: auto\n---\n\nCustom isolated write.\n',
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'custom-isolated',
detail: 'Custom isolated write',
rawPaths: ['custom/page.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/custom-isolated.md'],
'custom wiki',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['custom/page.json', 'h1']], sourceKey);
await expect(
runner.run({
jobId: 'job-custom-default',
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: 'job-custom-default',
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-custom-default/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: true,
acceptedPatches: 1,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('does not support shared-worktree fallback settings', async () => {
const runtime = await makeRealGitRuntime();
try {
const sourceKey = 'legacy-source';
const staleSettings = {
[legacyFallbackSettingKey()]: ['legacy-source'],
} as Partial<IngestBundleRunnerDeps['settings']> & Record<string, unknown>;
const { deps, adapter } = makeDeps(runtime, sourceKey, staleSettings);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'legacy-wiki',
rawFiles: ['legacy/page.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/legacy-isolated.md'),
'---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n',
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'legacy-isolated',
detail: 'Legacy isolated write',
rawPaths: ['legacy/page.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/legacy-isolated.md'],
'legacy isolated wiki',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['legacy/page.json', 'h1']], sourceKey);
await expect(
runner.run({
jobId: 'job-legacy-isolated',
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: 'job-legacy-isolated',
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: true,
acceptedPatches: 1,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('does not integrate failed isolated WorkUnit patches', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime, 'fake');
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] },
],
});
deps.diffSetService.compute = vi.fn().mockResolvedValue({
added: ['good.raw', 'bad.raw'],
modified: [],
deleted: [],
unchanged: [],
});
deps.slValidator.validateSingleSource = vi.fn(
async (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({
errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [],
warnings: [],
}),
) as never;
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const unitKey = params.telemetryTags.unitKey;
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
if (unitKey === 'wu-good') {
await writeFile(join(root, 'semantic-layer/warehouse/good.yaml'), 'name: good\n', 'utf-8');
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'good');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'good',
detail: 'good source',
targetConnectionId: 'warehouse',
rawPaths: ['good.raw'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/good.yaml'],
'test: add good source',
'KTX Test',
'system@ktx.local',
);
}
if (unitKey === 'wu-bad') {
await writeFile(join(root, 'semantic-layer/warehouse/bad.yaml'), 'name: bad\n', 'utf-8');
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'bad');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'bad',
detail: 'bad source',
targetConnectionId: 'warehouse',
rawPaths: ['bad.raw'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/bad.yaml'],
'test: add bad source',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(
runner,
runtime,
[
['good.raw', 'good-hash'],
['bad.raw', 'bad-hash'],
],
'fake',
);
const result = await runner.run({
jobId: 'job-failed-wu-isolated',
connectionId: 'warehouse',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
});
expect(result.failedWorkUnits).toEqual(['wu-bad']);
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/good.yaml'), 'utf-8')).resolves.toContain(
'good',
);
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/bad.yaml'), 'utf-8')).rejects.toThrow();
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as {
isolatedDiff?: { acceptedPatches?: number };
failedWorkUnits?: string[];
};
expect(reportBody.failedWorkUnits).toEqual(['wu-bad']);
expect(reportBody.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1 });
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('work_unit_failed_before_patch');
expect(trace).toContain('patch_accepted');
expect(trace).not.toContain(legacySharedTraceEvent());
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)(
'routes %s direct writes through isolated child worktrees',
async (sourceKey) => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime, sourceKey);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: `${sourceKey}-wiki`,
rawFiles: [`${sourceKey}/page.json`],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
expect(params.telemetryTags).toMatchObject({
operationName: 'ingest-bundle-wu',
source: sourceKey,
unitKey: `${sourceKey}-wiki`,
});
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global', `${sourceKey}-isolated.md`),
`---\nsummary: ${sourceKey} isolated write\nusage_mode: auto\n---\n\nIsolated ${sourceKey} write.\n`,
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: `${sourceKey}-isolated`,
detail: `${sourceKey} isolated write`,
rawPaths: [`${sourceKey}/page.json`],
});
await currentSession.gitService.commitFiles(
[`wiki/global/${sourceKey}-isolated.md`],
`${sourceKey} wiki`,
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [[`${sourceKey}/page.json`, 'h1']], sourceKey);
await expect(
runner.run({
jobId: `job-${sourceKey}`,
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: `job-${sourceKey}`,
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces', `job-${sourceKey}`, 'trace.jsonl'),
'utf-8',
);
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).toContain('work_unit_patch_collected');
expect(trace).toContain('patch_apply_started');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: true,
acceptedPatches: 1,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
},
);
it('rejects the Metabase stale-measure wiki body regression before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.project = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: [],
};
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'card-wiki') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
);
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Account segments' });
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
}
if (params.telemetryTags.unitKey === 'card-source') {
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Dollar measure',
targetConnectionId: 'warehouse',
});
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local');
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['cards/wiki.json', 'h1'],
['cards/source.json', 'h2'],
]);
await expect(
runner.run({ jobId: 'job-1', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/total_contract_arr_cents/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-1/trace.jsonl'), 'utf-8');
expect(trace).toContain('input_snapshot');
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).toContain('work_unit_patch_collected');
expect(trace).toContain('patch_apply_started');
expect(trace).toContain('final_artifact_gates_failed');
expect(trace).toContain('ingest_failed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects unchanged wiki body refs made stale by isolated semantic-layer changes', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
);
await runtime.git.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'seed existing wiki body ref',
'KTX Test',
'system@ktx.local',
);
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Rename ARR measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source rename',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-existing-body-stale',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/total_contract_arr_cents/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_started',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
expect(gateFailure).toMatchObject({
data: {
wikiReferenceGateScope: {
global: true,
reasons: expect.arrayContaining(['semantic_layer_changed']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
},
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'source-only',
unitRawFiles: ['cards/source.json'],
action: expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
targetConnectionId: 'warehouse',
}),
}),
]),
},
error: { message: expect.stringContaining('total_contract_arr_cents') },
});
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('total_contract_arr_cents'),
details: expect.objectContaining({
wikiReferenceGateScope: expect.objectContaining({
global: true,
reasons: expect.arrayContaining(['semantic_layer_changed']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
}),
touchedSlSources: expect.arrayContaining([
expect.objectContaining({ connectionId: 'warehouse', sourceName: 'mart_account_segments' }),
]),
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'source-only',
action: expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
targetConnectionId: 'warehouse',
}),
}),
]),
}),
});
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'source-only',
actions: expect.arrayContaining([
expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
}),
]),
}),
]),
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('accepts two isolated work units that edit different wiki pages', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'page-a', rawFiles: ['pages/a.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'page-b', rawFiles: ['pages/b.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const unitKey = params.telemetryTags.unitKey;
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, `wiki/global/${unitKey}.md`), `---\nsummary: ${unitKey}\nusage_mode: auto\n---\n\n${unitKey}\n`);
currentSession.actions.push({ target: 'wiki', type: 'created', key: unitKey, detail: unitKey });
await currentSession.gitService.commitFiles([`wiki/global/${unitKey}.md`], `wu ${unitKey}`, 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['pages/a.json', 'h1'],
['pages/b.json', 'h2'],
]);
const result = await runner.run({ jobId: 'job-clean', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
expect(result.failedWorkUnits).toEqual([]);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-clean/trace.jsonl'), 'utf-8');
expect(trace.match(/patch_accepted/g)).toHaveLength(2);
expect(trace).toContain('ingest_finished');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('classifies same-source patch application failure as a textual conflict', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'orders-a', rawFiles: ['orders/a.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'orders-b', rawFiles: ['orders/b.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') {
return { stopReason: 'natural' };
}
const suffix = params.telemetryTags.unitKey === 'orders-a' ? 'a' : 'b';
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/orders.yaml'),
`name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures:\n - name: order_count_${suffix}\n expr: count(*)\n`,
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'orders');
currentSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: suffix, targetConnectionId: 'warehouse' });
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], `wu ${suffix}`, 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['orders/a.json', 'h1'],
['orders/b.json', 'h2'],
]);
await expect(
runner.run({ jobId: 'job-text-conflict', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/isolated diff textual conflict/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-text-conflict/trace.jsonl'), 'utf-8');
expect(trace).toContain('patch_textual_conflict');
expect(trace).toContain('textual_conflict_resolver_failed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('makes deterministic projection visible to child worktrees before WorkUnit synthesis', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-projected', rawFiles: ['projected/wiki.json'], peerFileIndex: [], dependencyPaths: [] }],
});
adapter.project = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: [],
};
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await expect(readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8')).resolves.toContain(
'total_contract_arr',
);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/projected-orders.md'),
'---\nsummary: Projected orders\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR `mart_account_segments.total_contract_arr`.\n',
);
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'projected-orders', detail: 'Projected orders' });
await currentSession.gitService.commitFiles(['wiki/global/projected-orders.md'], 'wu projected wiki', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['projected/wiki.json', 'h1']]);
const result = await runner.run({ jobId: 'job-projection', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
expect(result.failedWorkUnits).toEqual([]);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-projection/trace.jsonl'), 'utf-8');
expect(trace).toContain('deterministic_projection_finished');
expect(trace).toContain('deterministic_projection_committed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects Notion-style changed wiki pages with invalid sl_refs', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'notion-page', rawFiles: ['pages/notion.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
return { stopReason: 'natural' as const };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, 'wiki/global/notion-page.md'), '---\nsummary: Notion page\nusage_mode: auto\nsl_refs:\n - missing_source\n---\n\nBody\n');
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'notion-page', detail: 'Notion page' });
await currentSession.gitService.commitFiles(['wiki/global/notion-page.md'], 'wu notion', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/notion.json', 'h1']]);
await expect(
runner.run({ jobId: 'job-invalid-slrefs', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/gate repair completed without editing an allowed path/);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('runs final artifact gates after reconciliation mutates the integration tree', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Source with renamed ARR measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source',
'KTX Test',
'system@ktx.local',
);
} else {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nReconcile wrote stale ARR `mart_account_segments.total_contract_arr_cents`.\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Stale reconcile wiki page',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'reconcile wiki', 'KTX Test', 'system@ktx.local');
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-reconcile-stale',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/total_contract_arr_cents/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-reconcile-stale/trace.jsonl'), 'utf-8');
expect(trace).toContain('reconciliation_finished');
expect(trace).toContain('final_artifact_gates_failed');
expect(trace).toContain('ingest_failed');
expect(await runtime.git.revParseHead()).not.toContain('reconcile wiki');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('stores a failure report and postmortem trace for final gate failures', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
const createdReports: any[] = [];
deps.reports.create = vi.fn(async (args: any) => {
createdReports.push(args);
return { id: `report-${createdReports.length}` };
});
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'card-wiki') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Account segments',
rawPaths: ['cards/wiki.json'],
});
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
}
if (params.telemetryTags.unitKey === 'card-source') {
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Dollar measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['cards/wiki.json', 'h1'],
['cards/source.json', 'h2'],
]);
await expect(
runner.run({
jobId: 'job-trace-failure',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/total_contract_arr_cents/);
const failureReport = createdReports.find((report) => report.body.status === 'failed');
expect(failureReport.body.tracePath).toContain('job-trace-failure/trace.jsonl');
expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates' });
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-trace-failure/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'ingest_started',
'input_snapshot',
'work_units_planned',
'isolated_diff_enabled',
'work_unit_child_created',
'work_unit_patch_collected',
'patch_apply_started',
'patch_accepted',
'reconciliation_finished',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
const failed = events.find((event) => event.event === 'ingest_failed');
expect(failed).toMatchObject({
runId: 'run-1',
syncId: expect.any(String),
data: { phase: 'final_gates', tracePath: expect.stringContaining('trace.jsonl') },
error: { message: expect.stringContaining('total_contract_arr_cents') },
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects invalid provenance raw paths before squash reaches main', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
const createdReports: any[] = [];
deps.reports.create = vi.fn(async (args: any) => {
createdReports.push(args);
return { id: `report-${createdReports.length}` };
});
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'card-valid-artifacts',
rawFiles: ['cards/source.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr`.\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Valid source',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Valid wiki with invalid provenance raw path',
rawPaths: ['cards/missing.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'valid artifacts with invalid provenance',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-invalid-provenance',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/provenance row references raw path outside this snapshot: cards\/missing\.json/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
expect(deps.provenance.insertMany).not.toHaveBeenCalled();
const failureReport = createdReports.find((report) => report.body.status === 'failed');
expect(failureReport.body.tracePath).toContain('job-invalid-provenance/trace.jsonl');
expect(failureReport.body.failure).toMatchObject({
phase: 'provenance_validation',
message: expect.stringContaining('cards/missing.json'),
});
expect(failureReport.body.failure.details).toMatchObject({
invalidRawPaths: ['cards/missing.json'],
currentRawPaths: ['cards/source.json'],
invalidRows: expect.arrayContaining([
expect.objectContaining({
row: expect.objectContaining({
rawPath: 'cards/missing.json',
artifactKind: 'wiki',
artifactKey: 'account-segments',
actionType: 'wiki_written',
}),
origin: expect.objectContaining({
source: 'work_unit_action',
unitKey: 'card-valid-artifacts',
actionIndex: 1,
unitRawFiles: ['cards/source.json'],
action: expect.objectContaining({
target: 'wiki',
type: 'created',
key: 'account-segments',
rawPaths: ['cards/missing.json'],
}),
}),
}),
]),
});
expect(failureReport.body.provenanceRows).toEqual(
expect.arrayContaining([
expect.objectContaining({ rawPath: 'cards/source.json', artifactKind: 'sl', artifactKey: 'mart_account_segments' }),
expect.objectContaining({ rawPath: 'cards/missing.json', artifactKind: 'wiki', artifactKey: 'account-segments' }),
]),
);
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'card-valid-artifacts',
rawFiles: ['cards/source.json'],
actions: expect.arrayContaining([
expect.objectContaining({
target: 'wiki',
key: 'account-segments',
rawPaths: ['cards/missing.json'],
}),
]),
}),
]),
);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-invalid-provenance/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_finished',
'provenance_rows_validation_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const validationFailure = events.find((event) => event.event === 'provenance_rows_validation_failed');
expect(validationFailure).toMatchObject({
phase: 'provenance',
data: {
invalidRawPaths: ['cards/missing.json'],
currentRawPaths: ['cards/source.json'],
invalidRows: expect.arrayContaining([
expect.objectContaining({
row: expect.objectContaining({ rawPath: 'cards/missing.json' }),
origin: expect.objectContaining({
source: 'work_unit_action',
unitKey: 'card-valid-artifacts',
actionIndex: 1,
}),
}),
]),
},
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects slDisallowed patches that touch semantic-layer files', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'lookml-mismatch',
rawFiles: ['views/orders.lkml'],
peerFileIndex: [],
dependencyPaths: [],
slDisallowed: true,
slDisallowedReason: 'lookml_connection_mismatch',
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/orders.yaml'),
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
currentSession.actions.push({ target: 'sl', type: 'created', key: 'orders', detail: 'forbidden', targetConnectionId: 'warehouse' });
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], 'forbidden sl', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['views/orders.lkml', 'h1']]);
await expect(
runner.run({ jobId: 'job-sl-disallowed', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/isolated diff textual conflict/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-sl-disallowed/trace.jsonl'), 'utf-8');
expect(trace).toContain('patch_policy_rejected');
expect(trace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/warehouse/orders.yaml');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects final wiki refs broken by another accepted WorkUnit before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'wiki/global/source-page.md'),
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
);
await runtime.git.commitFiles(['wiki/global/source-page.md'], 'seed source page', 'KTX Test', 'system@ktx.local');
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'page-ref', rawFiles: ['pages/ref.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'page-delete', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'page-ref') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Page with wiki ref',
rawPaths: ['pages/ref.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/account-segments.md'],
'wu page ref',
'KTX Test',
'system@ktx.local',
);
}
if (params.telemetryTags.unitKey === 'page-delete') {
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
currentSession.actions.push({
target: 'wiki',
type: 'removed',
key: 'source-page',
detail: 'Delete referenced page',
rawPaths: ['pages/delete.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/source-page.md'],
'wu delete source page',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['pages/ref.json', 'h1'],
['pages/delete.json', 'h2'],
]);
await expect(
runner.run({
jobId: 'job-wiki-ref-conflict',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl'), 'utf-8');
expect(trace).toContain('final_artifact_gates_failed');
expect(trace).toContain('account-segments -> source-page');
expect(trace).toContain('ingest_failed');
expect(trace).toContain('failure_report_created');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('account-segments -> source-page'),
details: expect.objectContaining({
changedWikiPageKeys: expect.arrayContaining(['account-segments']),
workUnitPatchTouchedPaths: expect.arrayContaining([
'wiki/global/account-segments.md',
'wiki/global/source-page.md',
]),
}),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects unchanged inbound wiki refs broken by an isolated wiki deletion', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'wiki/global/source-page.md'),
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
);
await runtime.git.commitFiles(
['wiki/global/source-page.md', 'wiki/global/account-segments.md'],
'seed inbound wiki refs',
'KTX Test',
'system@ktx.local',
);
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'delete-target-page', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.unitKey !== 'delete-target-page') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
currentSession.actions.push({
target: 'wiki',
type: 'removed',
key: 'source-page',
detail: 'Delete referenced page',
rawPaths: ['pages/delete.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/source-page.md'],
'wu delete target page',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/delete.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-existing-wiki-ref-stale',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_started',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
expect(gateFailure).toMatchObject({
data: {
wikiReferenceGateScope: {
global: true,
reasons: expect.arrayContaining(['wiki_page_removed']),
removedWikiPageKeys: expect.arrayContaining(['source-page']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
},
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'delete-target-page',
unitRawFiles: ['pages/delete.json'],
action: expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
}),
]),
},
error: { message: expect.stringContaining('account-segments -> source-page') },
});
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('account-segments -> source-page'),
details: expect.objectContaining({
wikiReferenceGateScope: expect.objectContaining({
global: true,
reasons: expect.arrayContaining(['wiki_page_removed']),
removedWikiPageKeys: expect.arrayContaining(['source-page']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
}),
changedWikiPageKeys: expect.arrayContaining(['source-page']),
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'delete-target-page',
action: expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
}),
]),
}),
});
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'delete-target-page',
actions: expect.arrayContaining([
expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
]),
}),
]),
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'finance-source', rawFiles: ['cards/finance.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/finance/orders.yaml'),
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'orders');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'orders',
detail: 'Unauthorized target',
targetConnectionId: 'finance',
rawPaths: ['cards/finance.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/finance/orders.yaml'],
'wu unauthorized target',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/finance.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-unauthorized-wu-target',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/isolated diff textual conflict.*semantic-layer target connection not allowed/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl'), 'utf-8');
expect(trace).toContain('patch_policy_rejected');
expect(trace).toContain('semantic-layer/finance/orders.yaml');
expect(trace).toContain('allowedTargetConnectionIds');
expect(trace).toContain('ingest_failed');
expect(trace).toContain('failure_report_created');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'integration',
message: expect.stringContaining('semantic-layer target connection not allowed'),
});
expect(failureReport.body.failure.details).toMatchObject({
unitKey: 'finance-source',
allowedTargetConnectionIds: ['warehouse'],
touchedPaths: ['semantic-layer/finance/orders.yaml'],
reason: expect.stringContaining('semantic-layer/finance/orders.yaml (finance)'),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'valid-page', rawFiles: ['pages/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, 'wiki/global/valid-page.md'), '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n');
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'valid-page',
detail: 'Valid page',
rawPaths: ['pages/source.json'],
});
await currentSession.gitService.commitFiles(['wiki/global/valid-page.md'], 'wu valid page', 'KTX Test', 'system@ktx.local');
} else {
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/finance/reconcile_orders.yaml'),
'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'reconcile_orders');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'reconcile_orders',
detail: 'Unauthorized reconcile target',
targetConnectionId: 'finance',
rawPaths: ['pages/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/finance/reconcile_orders.yaml'],
'reconcile unauthorized target',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/source.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-unauthorized-reconcile-target',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/semantic-layer target connection not allowed/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('semantic_layer_target_policy_started');
expect(trace).toContain('semantic_layer_target_policy_failed');
expect(trace).toContain('allowedTargetConnectionIds');
expect(trace).toContain('semantic-layer/finance/reconcile_orders.yaml');
expect(trace).toContain('ingest_failed');
expect(trace).toContain('failure_report_created');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'target_policy',
message: expect.stringContaining('semantic-layer target connection not allowed'),
});
expect(failureReport.body.failure.details).toMatchObject({
allowedTargetConnectionIds: ['warehouse'],
touchedPaths: expect.arrayContaining(['semantic-layer/finance/reconcile_orders.yaml']),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('repairs additive same-source textual conflicts before final gates and squash', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps } = makeDeps(runtime);
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') {
const current = await params.toolSet.read_integration_file.execute({
path: 'semantic-layer/warehouse/mart_account_segments.yaml',
});
expect(current.markdown).toContain('total_contract_arr_cents');
const patch = await params.toolSet.read_failed_patch.execute({});
expect(patch.markdown).toContain('account_count');
await params.toolSet.write_integration_file.execute({
path: 'semantic-layer/warehouse/mart_account_segments.yaml',
content:
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: total_contract_arr_cents\n' +
' expr: sum(contract_arr)\n' +
' - name: account_count\n' +
' expr: count_distinct(account_id)\n',
});
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
if (params.telemetryTags.unitKey === 'card-wiki') {
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: total_contract_arr_cents\n' +
' expr: sum(contract_arr)\n',
);
} else if (params.telemetryTags.unitKey === 'card-source') {
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: account_count\n' +
' expr: count_distinct(account_id)\n',
);
}
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Updated account segments source',
targetConnectionId: 'warehouse',
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
`wu ${params.telemetryTags.unitKey}`,
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['cards/wiki.json', 'hash-a'],
['cards/source.json', 'hash-b'],
]);
const result = await runner.run({
jobId: 'job-resolver-e2e',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'manual_resync',
bundleRef: { kind: 'upload', uploadId: 'upload-1' },
});
expect(result.commitSha).toBeTruthy();
const source = await readFile(join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8');
expect(source).toContain('total_contract_arr_cents');
expect(source).toContain('account_count');
expect(deps.agentRunner.runLoop).toHaveBeenCalledWith(
expect.objectContaining({
modelRole: 'repair',
telemetryTags: expect.objectContaining({
operationName: 'ingest-isolated-diff-textual-resolver',
unitKey: 'card-source',
}),
}),
);
const successReport = (deps.reports.create as any).mock.calls.at(-1)?.[0]?.body;
expect(successReport.isolatedDiff).toMatchObject({
acceptedPatches: 2,
textualConflicts: 1,
semanticConflicts: 0,
resolverAttempts: 1,
resolverRepairs: 1,
resolverFailures: 0,
});
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-resolver-e2e/trace.jsonl'), 'utf-8');
expect(trace).toContain('textual_conflict_resolver_repaired');
expect(trace).toContain('patch_accepted_after_textual_resolution');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('repairs final wiki body refs before squash when the repair agent edits the scoped page', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
);
await runtime.git.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'seed stale wiki body ref',
'KTX Test',
'system@ktx.local',
);
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
const gateError = await params.toolSet.read_gate_error.execute({});
expect(gateError.markdown).toContain('total_contract_arr_cents');
const page = await params.toolSet.read_repair_file.execute({
path: 'wiki/global/account-segments.md',
});
await params.toolSet.write_repair_file.execute({
path: 'wiki/global/account-segments.md',
content: page.markdown.replace('total_contract_arr_cents', 'total_contract_arr'),
});
return { stopReason: 'natural' as const };
}
if (params.modelRole === 'reconcile') {
return { stopReason: 'natural' as const };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Rename ARR measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source rename',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' as const };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
const result = await runner.run({
jobId: 'job-final-gate-repair',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
});
expect(result.commitSha).toBeTruthy();
await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.toContain(
'mart_account_segments.total_contract_arr',
);
await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.not.toContain(
'total_contract_arr_cents',
);
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any;
expect(reportCreate.body.isolatedDiff).toMatchObject({
gateRepairAttempts: 1,
gateRepairs: 1,
gateRepairFailures: 0,
});
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair/trace.jsonl'), 'utf-8');
expect(trace).toContain('gate_repair_repaired');
expect(trace).toContain('final_artifact_gates_after_gate_repair_finished');
expect(trace).toContain('final_gate_repair_committed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('fails before squash when final gate repair makes no edit', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
);
await runtime.git.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'seed stale wiki body ref',
'KTX Test',
'system@ktx.local',
);
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
return { stopReason: 'natural' as const };
}
if (params.modelRole === 'reconcile') {
return { stopReason: 'natural' as const };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Rename ARR measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source rename',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' as const };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-final-gate-repair-fails',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/gate repair completed without editing an allowed path/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any;
expect(reportCreate.body.status).toBe('failed');
expect(reportCreate.body.isolatedDiff).toMatchObject({
gateRepairAttempts: 1,
gateRepairs: 0,
gateRepairFailures: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair-fails/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('gate_repair_failed');
expect(trace).not.toContain('squash_finished');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('runs finalization before wiki sl-ref repair and final gates', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
adapter.finalize = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(workdir, 'wiki/global'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
await writeFile(
join(workdir, 'wiki/global/finalized-accounts.md'),
'---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: ['finalized-accounts'],
actions: [
{
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Finalized accounts',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
},
{
target: 'wiki',
type: 'created',
key: 'finalized-accounts',
detail: 'Finalized wiki',
rawPaths: ['cards/source.json'],
},
],
};
});
deps.agentRunner.runLoop = vi.fn(async () => ({ stopReason: 'natural' as const })) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await runner.run({
jobId: 'job-finalization',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-finalization/trace.jsonl'),
'utf-8',
);
expect(trace.indexOf('finalization_committed')).toBeLessThan(trace.indexOf('wiki_sl_refs_repaired'));
expect(trace.indexOf('wiki_sl_refs_repaired')).toBeLessThan(trace.indexOf('final_artifact_gates'));
await expect(readFile(join(runtime.configDir, 'wiki/global/finalized-accounts.md'), 'utf-8')).resolves.toContain(
'sl_refs:\n - mart_account_segments',
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('fails when finalization edits a path already changed earlier in the run', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/orders.md'),
'---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'orders',
detail: 'WU orders',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/orders.md'],
'wu orders',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' as const };
}) as never;
adapter.finalize = vi.fn(async ({ workdir }) => {
await writeFile(
join(workdir, 'wiki/global/orders.md'),
'---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n',
);
return {
warnings: [],
errors: [],
touchedSources: [],
changedWikiPageKeys: ['orders'],
actions: [{ target: 'wiki', type: 'updated', key: 'orders', detail: 'Conflicting finalization' }],
};
});
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-finalization-overlap',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects finalization writes to unauthorized semantic-layer targets', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({ workUnits: [] });
adapter.finalize = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/other-warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/other-warehouse/orders.yaml'),
'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'other-warehouse', sourceName: 'orders' }],
changedWikiPageKeys: [],
actions: [
{
target: 'sl',
type: 'created',
key: 'orders',
targetConnectionId: 'other-warehouse',
detail: 'Forbidden target',
rawPaths: ['cards/source.json'],
},
],
};
});
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-finalization-target-policy',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/semantic-layer target connection not allowed/);
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('finalization_committed');
expect(trace).toContain('semantic_layer_target_policy');
expect(trace).toContain('ingest_failed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
});