mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
* feat(cli): define full warehouse dialect contract
* test(cli): keep dialect edge tests focused
* fix(cli): stabilize dialect contract foundation
* refactor(connectors): own read-only query preparation
* refactor(connectors): resolve dialects through registry
* refactor(connectors): keep concrete dialect classes internal
* chore(workspace): enforce dialect import boundary
* refactor(cli): resolve relationship dialect at scan boundary
* refactor(cli): use dialect display parsing for entity details
* refactor(cli): use dialect display parsing for warehouse catalog
* refactor(cli): use dialect SQL in relationship workflows
* test(cli): verify solid dialect scan workflow closure
* test: split cli tests from source tree
* refactor(cli): standardize BigQuery scope listing
* feat(sqlite): implement connector scope listing
* test(connectors): cover required table listing
* feat(cli): add warehouse driver registry
* refactor(setup): route scope discovery through driver registry
* refactor(cli): route local query execution through driver registry
* refactor(historic-sql): route dialect support through driver registry
* refactor(cli): test warehouse connections through driver registry
* fix(cli): close driver registry type export gaps
* Improve setup daemon diagnostics
* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback
Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match
The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.
Align the picker boundary with the canonical 3-level KtxTableRef:
- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
(resolveEnabledTables already accepts the 3-part shape) and
schemasFromEnabledTables now goes through parseDottedTableEntry so it
recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
reuse.
Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).
* fix(cli): allow debug telemetry under opt-out env
2376 lines
100 KiB
TypeScript
2376 lines
100 KiB
TypeScript
import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises';
|
|
import { tmpdir } from 'node:os';
|
|
import { join } from 'node:path';
|
|
import { describe, expect, it, vi } from 'vitest';
|
|
import { GitService } from '../../../src/context/core/git.service.js';
|
|
import { SessionWorktreeService } from '../../../src/context/core/session-worktree.service.js';
|
|
import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js';
|
|
import { addTouchedSlSource } from '../../../src/context/tools/touched-sl-sources.js';
|
|
import { IngestBundleRunner } from '../../../src/context/ingest/ingest-bundle.runner.js';
|
|
import type { IngestBundleRunnerDeps } from '../../../src/context/ingest/ports.js';
|
|
|
|
async function makeRealGitRuntime() {
|
|
const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-runner-'));
|
|
const configDir = join(homeDir, 'config');
|
|
const git = new GitService({
|
|
storage: { configDir, homeDir },
|
|
git: {
|
|
userName: 'System User',
|
|
userEmail: 'system@example.com',
|
|
bootstrapMessage: 'init',
|
|
bootstrapAuthor: 'system',
|
|
bootstrapAuthorEmail: 'system@example.com',
|
|
},
|
|
});
|
|
await git.onModuleInit();
|
|
const configService = new LocalGitFileStore({ rootDir: configDir, git });
|
|
const sessionWorktreeService = new SessionWorktreeService({
|
|
coreConfig: {
|
|
storage: { configDir, homeDir },
|
|
git: {
|
|
userName: 'System User',
|
|
userEmail: 'system@example.com',
|
|
bootstrapMessage: 'init',
|
|
bootstrapAuthor: 'system',
|
|
bootstrapAuthorEmail: 'system@example.com',
|
|
},
|
|
},
|
|
gitService: git,
|
|
configService,
|
|
});
|
|
return { homeDir, configDir, git, configService, sessionWorktreeService };
|
|
}
|
|
|
|
function rootOfConfig(configService: unknown, fallback: string): string {
|
|
const rootDir = (configService as { rootDir?: unknown }).rootDir;
|
|
return typeof rootDir === 'string' ? rootDir : fallback;
|
|
}
|
|
|
|
async function loadSourcesFromRoot(root: string) {
|
|
const raw = await readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8').catch(
|
|
() => '',
|
|
);
|
|
const hasCents = raw.includes('total_contract_arr_cents');
|
|
const hasDollars = raw.includes('total_contract_arr');
|
|
return {
|
|
sources:
|
|
hasCents || hasDollars
|
|
? [
|
|
{
|
|
name: 'mart_account_segments',
|
|
grain: ['account_id'],
|
|
columns: [{ name: 'account_id', type: 'string' }],
|
|
joins: [],
|
|
measures: [{ name: hasCents ? 'total_contract_arr_cents' : 'total_contract_arr', expr: 'sum(contract_arr)' }],
|
|
table: 'analytics.mart_account_segments',
|
|
},
|
|
]
|
|
: [],
|
|
loadErrors: [],
|
|
};
|
|
}
|
|
|
|
async function listGlobalWikiPageKeys(root: string): Promise<string[]> {
|
|
const dir = join(root, 'wiki/global');
|
|
const entries = await readdir(dir).catch(() => []);
|
|
return entries
|
|
.filter((entry) => entry.endsWith('.md'))
|
|
.map((entry) => entry.slice(0, -'.md'.length))
|
|
.sort();
|
|
}
|
|
|
|
function frontmatterList(yaml: string, key: string): string[] {
|
|
const pattern = new RegExp(`(?:^|\\n)${key}:\\n((?: - .+\\n?)*)`);
|
|
return (
|
|
pattern
|
|
.exec(yaml)?.[1]
|
|
?.split('\n')
|
|
.map((line) => line.trim().replace(/^- /, ''))
|
|
.filter(Boolean) ?? []
|
|
);
|
|
}
|
|
|
|
function legacyFallbackSettingKey(): string {
|
|
return ['sharedWorktree', 'SourceKeys'].join('');
|
|
}
|
|
|
|
function legacySharedTraceEvent(): string {
|
|
return ['shared', 'worktree', 'path', 'enabled'].join('_');
|
|
}
|
|
|
|
function makeWikiService(root: string) {
|
|
return {
|
|
listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])),
|
|
readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => {
|
|
const path = join(root, 'wiki/global', `${key}.md`);
|
|
const raw = await readFile(path, 'utf-8').catch(() => null);
|
|
if (!raw) {
|
|
return null;
|
|
}
|
|
const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? [];
|
|
return {
|
|
pageKey: key,
|
|
frontmatter: {
|
|
summary: key,
|
|
usage_mode: 'auto',
|
|
refs: frontmatterList(yaml, 'refs'),
|
|
sl_refs: frontmatterList(yaml, 'sl_refs'),
|
|
},
|
|
content: content.trim(),
|
|
};
|
|
}),
|
|
writePage: vi.fn(
|
|
async (
|
|
_scope: string,
|
|
_scopeId: string | null,
|
|
key: string,
|
|
frontmatter: { summary?: string; usage_mode?: string; refs?: string[]; sl_refs?: string[] },
|
|
content: string,
|
|
) => {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
const refs = (frontmatter.refs ?? []).map((ref) => ` - ${ref}`).join('\n');
|
|
const slRefs = (frontmatter.sl_refs ?? []).map((ref) => ` - ${ref}`).join('\n');
|
|
await writeFile(
|
|
join(root, 'wiki/global', `${key}.md`),
|
|
[
|
|
'---',
|
|
`summary: ${frontmatter.summary ?? key}`,
|
|
`usage_mode: ${frontmatter.usage_mode ?? 'auto'}`,
|
|
'refs:',
|
|
refs,
|
|
'sl_refs:',
|
|
slRefs,
|
|
'---',
|
|
'',
|
|
content,
|
|
'',
|
|
].join('\n'),
|
|
);
|
|
},
|
|
),
|
|
syncFromCommit: vi.fn(),
|
|
};
|
|
}
|
|
|
|
function makeDeps(
|
|
runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>,
|
|
sourceKey = 'metabase',
|
|
settings: Partial<IngestBundleRunnerDeps['settings']> = {},
|
|
) {
|
|
const adapter: any = {
|
|
source: sourceKey,
|
|
skillNames: [],
|
|
detect: vi.fn().mockResolvedValue(true),
|
|
chunk: vi.fn().mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
}),
|
|
};
|
|
const wikiService = makeWikiService(runtime.configDir);
|
|
const semanticLayerService: any = {
|
|
loadAllSources: vi.fn(async () => loadSourcesFromRoot(runtime.configDir)),
|
|
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
|
|
};
|
|
semanticLayerService.forWorktree = vi.fn((workdir: string) => ({
|
|
...semanticLayerService,
|
|
loadAllSources: vi.fn(async () => loadSourcesFromRoot(workdir)),
|
|
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
|
|
}));
|
|
|
|
const deps: IngestBundleRunnerDeps = {
|
|
runs: { create: vi.fn().mockResolvedValue({ id: 'run-1' }), markCompleted: vi.fn(), markFailed: vi.fn() },
|
|
provenance: {
|
|
insertMany: vi.fn(),
|
|
findLatestHashesForCompletedSyncs: vi.fn().mockResolvedValue(new Map()),
|
|
findLatestArtifactsForRawPaths: vi.fn().mockResolvedValue(new Map()),
|
|
},
|
|
reports: { create: vi.fn().mockResolvedValue({ id: 'report-1' }), findByJobId: vi.fn().mockResolvedValue(null), markSuperseded: vi.fn() },
|
|
canonicalPins: { listPins: vi.fn().mockResolvedValue([]) },
|
|
registry: { get: vi.fn().mockReturnValue(adapter), register: vi.fn(), has: vi.fn(), list: vi.fn() },
|
|
diffSetService: {
|
|
compute: vi.fn().mockResolvedValue({ added: ['cards/wiki.json', 'cards/source.json'], modified: [], deleted: [], unchanged: [] }),
|
|
},
|
|
sessionWorktreeService: runtime.sessionWorktreeService,
|
|
agentRunner: { runLoop: vi.fn() },
|
|
gitService: runtime.git,
|
|
lockingService: { withLock: vi.fn(async (_key, fn) => fn()) },
|
|
storage: {
|
|
homeDir: join(runtime.configDir, '.ktx'),
|
|
systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' },
|
|
resolveUploadDir: (id) => join(runtime.homeDir, 'upload', id),
|
|
resolvePullDir: (id) => join(runtime.homeDir, 'pull', id),
|
|
resolveTranscriptDir: (id) => join(runtime.configDir, '.ktx/ingest-transcripts', id),
|
|
resolveTracePath: (id) => join(runtime.configDir, '.ktx/ingest-traces', id, 'trace.jsonl'),
|
|
},
|
|
settings: {
|
|
memoryIngestionModel: 'test',
|
|
probeRowCount: 1,
|
|
ingestTraceLevel: 'trace',
|
|
...settings,
|
|
},
|
|
skillsRegistry: {
|
|
listSkills: vi.fn().mockResolvedValue([]),
|
|
getSkill: vi.fn().mockResolvedValue(null),
|
|
buildSkillsPrompt: vi.fn().mockReturnValue(''),
|
|
stripFrontmatter: vi.fn((body) => body),
|
|
} as never,
|
|
promptService: { loadPrompt: vi.fn().mockResolvedValue('base') } as never,
|
|
wikiService: { ...wikiService, forWorktree: vi.fn((workdir: string) => makeWikiService(workdir)) } as never,
|
|
knowledgeIndex: { listPagesForUser: vi.fn().mockResolvedValue([]) },
|
|
knowledgeSlRefs: { syncFromWiki: vi.fn() },
|
|
semanticLayerService,
|
|
slSearchService: { indexSources: vi.fn() } as never,
|
|
slSourcesRepository: {} as never,
|
|
slValidator: { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) },
|
|
connections: { listEnabledConnections: vi.fn().mockResolvedValue([]), getConnectionById: vi.fn() } as never,
|
|
toolsetFactory: { createIngestWuToolset: vi.fn(() => ({ toRuntimeTools: vi.fn(() => ({})) })) },
|
|
commitMessages: { enqueueForExternalCommit: vi.fn() },
|
|
embedding: { maxBatchSize: 64, computeEmbedding: vi.fn(), computeEmbeddingsBulk: vi.fn() },
|
|
};
|
|
return { deps, adapter };
|
|
}
|
|
|
|
async function mockStageRawFiles(
|
|
runner: IngestBundleRunner,
|
|
runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>,
|
|
hashes: [string, string][],
|
|
sourceKey = 'metabase',
|
|
) {
|
|
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue(join(runtime.homeDir, 'stage'));
|
|
(runner as any).stageRawFilesStage1 = vi.fn(async ({ worktreeRoot }: any) => {
|
|
const rawDir = join(worktreeRoot, 'raw-sources/warehouse', sourceKey, 's');
|
|
await mkdir(rawDir, { recursive: true });
|
|
for (const [rawPath] of hashes) {
|
|
await mkdir(join(rawDir, rawPath.split('/').slice(0, -1).join('/')), { recursive: true });
|
|
await writeFile(join(rawDir, rawPath), '{}');
|
|
}
|
|
return { currentHashes: new Map(hashes), rawDirInWorktree: `raw-sources/warehouse/${sourceKey}/s` };
|
|
});
|
|
}
|
|
|
|
describe('IngestBundleRunner isolated diff path', () => {
|
|
it('routes an unlisted direct-writing source through isolated diffs by default', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const sourceKey = 'custom-direct-source';
|
|
const { deps, adapter } = makeDeps(runtime, sourceKey);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{
|
|
unitKey: 'custom-wiki',
|
|
rawFiles: ['custom/page.json'],
|
|
peerFileIndex: [],
|
|
dependencyPaths: [],
|
|
},
|
|
],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/custom-isolated.md'),
|
|
'---\nsummary: Custom isolated write\nusage_mode: auto\n---\n\nCustom isolated write.\n',
|
|
'utf-8',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'custom-isolated',
|
|
detail: 'Custom isolated write',
|
|
rawPaths: ['custom/page.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/custom-isolated.md'],
|
|
'custom wiki',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['custom/page.json', 'h1']], sourceKey);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-custom-default',
|
|
connectionId: 'warehouse',
|
|
sourceKey,
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).resolves.toMatchObject({
|
|
jobId: 'job-custom-default',
|
|
failedWorkUnits: [],
|
|
workUnitCount: 1,
|
|
});
|
|
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-custom-default/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('isolated_diff_enabled');
|
|
expect(trace).toContain('work_unit_child_created');
|
|
expect(trace).not.toContain(legacySharedTraceEvent());
|
|
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
|
|
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
|
|
expect(reportBody?.isolatedDiff).toMatchObject({
|
|
enabled: true,
|
|
acceptedPatches: 1,
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('does not support shared-worktree fallback settings', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const sourceKey = 'legacy-source';
|
|
const staleSettings = {
|
|
[legacyFallbackSettingKey()]: ['legacy-source'],
|
|
} as Partial<IngestBundleRunnerDeps['settings']> & Record<string, unknown>;
|
|
const { deps, adapter } = makeDeps(runtime, sourceKey, staleSettings);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{
|
|
unitKey: 'legacy-wiki',
|
|
rawFiles: ['legacy/page.json'],
|
|
peerFileIndex: [],
|
|
dependencyPaths: [],
|
|
},
|
|
],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/legacy-isolated.md'),
|
|
'---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n',
|
|
'utf-8',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'legacy-isolated',
|
|
detail: 'Legacy isolated write',
|
|
rawPaths: ['legacy/page.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/legacy-isolated.md'],
|
|
'legacy isolated wiki',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['legacy/page.json', 'h1']], sourceKey);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-legacy-isolated',
|
|
connectionId: 'warehouse',
|
|
sourceKey,
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).resolves.toMatchObject({
|
|
jobId: 'job-legacy-isolated',
|
|
failedWorkUnits: [],
|
|
workUnitCount: 1,
|
|
});
|
|
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('isolated_diff_enabled');
|
|
expect(trace).toContain('work_unit_child_created');
|
|
expect(trace).not.toContain(legacySharedTraceEvent());
|
|
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
|
|
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
|
|
expect(reportBody?.isolatedDiff).toMatchObject({
|
|
enabled: true,
|
|
acceptedPatches: 1,
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('does not integrate failed isolated WorkUnit patches', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime, 'fake');
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
});
|
|
deps.diffSetService.compute = vi.fn().mockResolvedValue({
|
|
added: ['good.raw', 'bad.raw'],
|
|
modified: [],
|
|
deleted: [],
|
|
unchanged: [],
|
|
});
|
|
deps.slValidator.validateSingleSource = vi.fn(
|
|
async (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({
|
|
errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [],
|
|
warnings: [],
|
|
}),
|
|
) as never;
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
const unitKey = params.telemetryTags.unitKey;
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
if (unitKey === 'wu-good') {
|
|
await writeFile(join(root, 'semantic-layer/warehouse/good.yaml'), 'name: good\n', 'utf-8');
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'good');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'good',
|
|
detail: 'good source',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['good.raw'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/good.yaml'],
|
|
'test: add good source',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
if (unitKey === 'wu-bad') {
|
|
await writeFile(join(root, 'semantic-layer/warehouse/bad.yaml'), 'name: bad\n', 'utf-8');
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'bad');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'bad',
|
|
detail: 'bad source',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['bad.raw'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/bad.yaml'],
|
|
'test: add bad source',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(
|
|
runner,
|
|
runtime,
|
|
[
|
|
['good.raw', 'good-hash'],
|
|
['bad.raw', 'bad-hash'],
|
|
],
|
|
'fake',
|
|
);
|
|
|
|
const result = await runner.run({
|
|
jobId: 'job-failed-wu-isolated',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'fake',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
});
|
|
|
|
expect(result.failedWorkUnits).toEqual(['wu-bad']);
|
|
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/good.yaml'), 'utf-8')).resolves.toContain(
|
|
'good',
|
|
);
|
|
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/bad.yaml'), 'utf-8')).rejects.toThrow();
|
|
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
|
|
const reportBody = reportCreate?.body as {
|
|
isolatedDiff?: { acceptedPatches?: number };
|
|
failedWorkUnits?: string[];
|
|
};
|
|
expect(reportBody.failedWorkUnits).toEqual(['wu-bad']);
|
|
expect(reportBody.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1 });
|
|
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('work_unit_failed_before_patch');
|
|
expect(trace).toContain('patch_accepted');
|
|
expect(trace).not.toContain(legacySharedTraceEvent());
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)(
|
|
'routes %s direct writes through isolated child worktrees',
|
|
async (sourceKey) => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime, sourceKey);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{
|
|
unitKey: `${sourceKey}-wiki`,
|
|
rawFiles: [`${sourceKey}/page.json`],
|
|
peerFileIndex: [],
|
|
dependencyPaths: [],
|
|
},
|
|
],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
|
|
expect(params.telemetryTags).toMatchObject({
|
|
operationName: 'ingest-bundle-wu',
|
|
source: sourceKey,
|
|
unitKey: `${sourceKey}-wiki`,
|
|
});
|
|
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global', `${sourceKey}-isolated.md`),
|
|
`---\nsummary: ${sourceKey} isolated write\nusage_mode: auto\n---\n\nIsolated ${sourceKey} write.\n`,
|
|
'utf-8',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: `${sourceKey}-isolated`,
|
|
detail: `${sourceKey} isolated write`,
|
|
rawPaths: [`${sourceKey}/page.json`],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
[`wiki/global/${sourceKey}-isolated.md`],
|
|
`${sourceKey} wiki`,
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [[`${sourceKey}/page.json`, 'h1']], sourceKey);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: `job-${sourceKey}`,
|
|
connectionId: 'warehouse',
|
|
sourceKey,
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).resolves.toMatchObject({
|
|
jobId: `job-${sourceKey}`,
|
|
failedWorkUnits: [],
|
|
workUnitCount: 1,
|
|
});
|
|
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces', `job-${sourceKey}`, 'trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('isolated_diff_enabled');
|
|
expect(trace).toContain('work_unit_child_created');
|
|
expect(trace).toContain('work_unit_patch_collected');
|
|
expect(trace).toContain('patch_apply_started');
|
|
expect(trace).not.toContain(legacySharedTraceEvent());
|
|
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
|
|
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
|
|
expect(reportBody?.isolatedDiff).toMatchObject({
|
|
enabled: true,
|
|
acceptedPatches: 1,
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
},
|
|
);
|
|
|
|
it('rejects the Metabase stale-measure wiki body regression before squash', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.project = vi.fn(async ({ workdir }) => {
|
|
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
|
|
);
|
|
return {
|
|
warnings: [],
|
|
errors: [],
|
|
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
|
|
changedWikiPageKeys: [],
|
|
};
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
if (params.telemetryTags.unitKey === 'card-wiki') {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Account segments' });
|
|
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
|
|
}
|
|
if (params.telemetryTags.unitKey === 'card-source') {
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
detail: 'Dollar measure',
|
|
targetConnectionId: 'warehouse',
|
|
});
|
|
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local');
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['cards/wiki.json', 'h1'],
|
|
['cards/source.json', 'h2'],
|
|
]);
|
|
|
|
await expect(
|
|
runner.run({ jobId: 'job-1', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
|
|
).rejects.toThrow(/total_contract_arr_cents/);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-1/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('input_snapshot');
|
|
expect(trace).toContain('isolated_diff_enabled');
|
|
expect(trace).toContain('work_unit_child_created');
|
|
expect(trace).toContain('work_unit_patch_collected');
|
|
expect(trace).toContain('patch_apply_started');
|
|
expect(trace).toContain('final_artifact_gates_failed');
|
|
expect(trace).toContain('ingest_failed');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects unchanged wiki body refs made stale by isolated semantic-layer changes', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
|
|
);
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
await runtime.git.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
|
|
'seed existing wiki body ref',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
detail: 'Rename ARR measure',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
'wu source rename',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-existing-body-stale',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/total_contract_arr_cents/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl'), 'utf-8'))
|
|
.trim()
|
|
.split('\n')
|
|
.map((line) => JSON.parse(line));
|
|
expect(events.map((event) => event.event)).toEqual(
|
|
expect.arrayContaining([
|
|
'final_artifact_gates_started',
|
|
'final_artifact_gates_failed',
|
|
'ingest_failed',
|
|
'failure_report_created',
|
|
]),
|
|
);
|
|
expect(events.map((event) => event.event)).not.toContain('squash_finished');
|
|
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
|
|
expect(gateFailure).toMatchObject({
|
|
data: {
|
|
wikiReferenceGateScope: {
|
|
global: true,
|
|
reasons: expect.arrayContaining(['semantic_layer_changed']),
|
|
pageKeysValidated: expect.arrayContaining(['account-segments']),
|
|
},
|
|
actionOrigins: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'source-only',
|
|
unitRawFiles: ['cards/source.json'],
|
|
action: expect.objectContaining({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
rawPaths: ['cards/source.json'],
|
|
targetConnectionId: 'warehouse',
|
|
}),
|
|
}),
|
|
]),
|
|
},
|
|
error: { message: expect.stringContaining('total_contract_arr_cents') },
|
|
});
|
|
|
|
const failureReport = (deps.reports.create as any).mock.calls
|
|
.map((call: any[]) => call[0])
|
|
.find((report: any) => report.body.status === 'failed');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'final_gates',
|
|
message: expect.stringContaining('total_contract_arr_cents'),
|
|
details: expect.objectContaining({
|
|
wikiReferenceGateScope: expect.objectContaining({
|
|
global: true,
|
|
reasons: expect.arrayContaining(['semantic_layer_changed']),
|
|
pageKeysValidated: expect.arrayContaining(['account-segments']),
|
|
}),
|
|
touchedSlSources: expect.arrayContaining([
|
|
expect.objectContaining({ connectionId: 'warehouse', sourceName: 'mart_account_segments' }),
|
|
]),
|
|
actionOrigins: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'source-only',
|
|
action: expect.objectContaining({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
rawPaths: ['cards/source.json'],
|
|
targetConnectionId: 'warehouse',
|
|
}),
|
|
}),
|
|
]),
|
|
}),
|
|
});
|
|
expect(failureReport.body.workUnits).toEqual(
|
|
expect.arrayContaining([
|
|
expect.objectContaining({
|
|
unitKey: 'source-only',
|
|
actions: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
rawPaths: ['cards/source.json'],
|
|
}),
|
|
]),
|
|
}),
|
|
]),
|
|
);
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('accepts two isolated work units that edit different wiki pages', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'page-a', rawFiles: ['pages/a.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'page-b', rawFiles: ['pages/b.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const unitKey = params.telemetryTags.unitKey;
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(join(root, `wiki/global/${unitKey}.md`), `---\nsummary: ${unitKey}\nusage_mode: auto\n---\n\n${unitKey}\n`);
|
|
currentSession.actions.push({ target: 'wiki', type: 'created', key: unitKey, detail: unitKey });
|
|
await currentSession.gitService.commitFiles([`wiki/global/${unitKey}.md`], `wu ${unitKey}`, 'KTX Test', 'system@ktx.local');
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['pages/a.json', 'h1'],
|
|
['pages/b.json', 'h2'],
|
|
]);
|
|
|
|
const result = await runner.run({ jobId: 'job-clean', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
|
|
expect(result.failedWorkUnits).toEqual([]);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-clean/trace.jsonl'), 'utf-8');
|
|
expect(trace.match(/patch_accepted/g)).toHaveLength(2);
|
|
expect(trace).toContain('ingest_finished');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('classifies same-source patch application failure as a textual conflict', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'orders-a', rawFiles: ['orders/a.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'orders-b', rawFiles: ['orders/b.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
const suffix = params.telemetryTags.unitKey === 'orders-a' ? 'a' : 'b';
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/orders.yaml'),
|
|
`name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures:\n - name: order_count_${suffix}\n expr: count(*)\n`,
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'orders');
|
|
currentSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: suffix, targetConnectionId: 'warehouse' });
|
|
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], `wu ${suffix}`, 'KTX Test', 'system@ktx.local');
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['orders/a.json', 'h1'],
|
|
['orders/b.json', 'h2'],
|
|
]);
|
|
|
|
await expect(
|
|
runner.run({ jobId: 'job-text-conflict', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
|
|
).rejects.toThrow(/isolated diff textual conflict/);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-text-conflict/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('patch_textual_conflict');
|
|
expect(trace).toContain('textual_conflict_resolver_failed');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('makes deterministic projection visible to child worktrees before WorkUnit synthesis', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'wiki-projected', rawFiles: ['projected/wiki.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
adapter.project = vi.fn(async ({ workdir }) => {
|
|
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
return {
|
|
warnings: [],
|
|
errors: [],
|
|
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
|
|
changedWikiPageKeys: [],
|
|
};
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await expect(readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8')).resolves.toContain(
|
|
'total_contract_arr',
|
|
);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/projected-orders.md'),
|
|
'---\nsummary: Projected orders\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR `mart_account_segments.total_contract_arr`.\n',
|
|
);
|
|
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'projected-orders', detail: 'Projected orders' });
|
|
await currentSession.gitService.commitFiles(['wiki/global/projected-orders.md'], 'wu projected wiki', 'KTX Test', 'system@ktx.local');
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['projected/wiki.json', 'h1']]);
|
|
|
|
const result = await runner.run({ jobId: 'job-projection', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
|
|
expect(result.failedWorkUnits).toEqual([]);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-projection/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('deterministic_projection_finished');
|
|
expect(trace).toContain('deterministic_projection_committed');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects Notion-style changed wiki pages with invalid sl_refs', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'notion-page', rawFiles: ['pages/notion.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
|
|
return { stopReason: 'natural' as const };
|
|
}
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(join(root, 'wiki/global/notion-page.md'), '---\nsummary: Notion page\nusage_mode: auto\nsl_refs:\n - missing_source\n---\n\nBody\n');
|
|
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'notion-page', detail: 'Notion page' });
|
|
await currentSession.gitService.commitFiles(['wiki/global/notion-page.md'], 'wu notion', 'KTX Test', 'system@ktx.local');
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['pages/notion.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({ jobId: 'job-invalid-slrefs', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
|
|
).rejects.toThrow(/gate repair completed without editing an allowed path/);
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('runs final artifact gates after reconciliation mutates the integration tree', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'mart_account_segments',
|
|
detail: 'Source with renamed ARR measure',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
'wu source',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
} else {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nReconcile wrote stale ARR `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'account-segments',
|
|
detail: 'Stale reconcile wiki page',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'reconcile wiki', 'KTX Test', 'system@ktx.local');
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-reconcile-stale',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/total_contract_arr_cents/);
|
|
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-reconcile-stale/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('reconciliation_finished');
|
|
expect(trace).toContain('final_artifact_gates_failed');
|
|
expect(trace).toContain('ingest_failed');
|
|
expect(await runtime.git.revParseHead()).not.toContain('reconcile wiki');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('stores a failure report and postmortem trace for final gate failures', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
const createdReports: any[] = [];
|
|
deps.reports.create = vi.fn(async (args: any) => {
|
|
createdReports.push(args);
|
|
return { id: `report-${createdReports.length}` };
|
|
});
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
if (params.telemetryTags.unitKey === 'card-wiki') {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'account-segments',
|
|
detail: 'Account segments',
|
|
rawPaths: ['cards/wiki.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
|
|
}
|
|
if (params.telemetryTags.unitKey === 'card-source') {
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'mart_account_segments',
|
|
detail: 'Dollar measure',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
'wu source',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['cards/wiki.json', 'h1'],
|
|
['cards/source.json', 'h2'],
|
|
]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-trace-failure',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/total_contract_arr_cents/);
|
|
|
|
const failureReport = createdReports.find((report) => report.body.status === 'failed');
|
|
expect(failureReport.body.tracePath).toContain('job-trace-failure/trace.jsonl');
|
|
expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates' });
|
|
|
|
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-trace-failure/trace.jsonl'), 'utf-8'))
|
|
.trim()
|
|
.split('\n')
|
|
.map((line) => JSON.parse(line));
|
|
expect(events.map((event) => event.event)).toEqual(
|
|
expect.arrayContaining([
|
|
'ingest_started',
|
|
'input_snapshot',
|
|
'work_units_planned',
|
|
'isolated_diff_enabled',
|
|
'work_unit_child_created',
|
|
'work_unit_patch_collected',
|
|
'patch_apply_started',
|
|
'patch_accepted',
|
|
'reconciliation_finished',
|
|
'final_artifact_gates_failed',
|
|
'ingest_failed',
|
|
'failure_report_created',
|
|
]),
|
|
);
|
|
const failed = events.find((event) => event.event === 'ingest_failed');
|
|
expect(failed).toMatchObject({
|
|
runId: 'run-1',
|
|
syncId: expect.any(String),
|
|
data: { phase: 'final_gates', tracePath: expect.stringContaining('trace.jsonl') },
|
|
error: { message: expect.stringContaining('total_contract_arr_cents') },
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects invalid provenance raw paths before squash reaches main', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
const createdReports: any[] = [];
|
|
deps.reports.create = vi.fn(async (args: any) => {
|
|
createdReports.push(args);
|
|
return { id: `report-${createdReports.length}` };
|
|
});
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{
|
|
unitKey: 'card-valid-artifacts',
|
|
rawFiles: ['cards/source.json'],
|
|
peerFileIndex: [],
|
|
dependencyPaths: [],
|
|
},
|
|
],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
await writeFile(
|
|
join(root, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr`.\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'mart_account_segments',
|
|
detail: 'Valid source',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'account-segments',
|
|
detail: 'Valid wiki with invalid provenance raw path',
|
|
rawPaths: ['cards/missing.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
|
|
'valid artifacts with invalid provenance',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-invalid-provenance',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/provenance row references raw path outside this snapshot: cards\/missing\.json/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
expect(deps.provenance.insertMany).not.toHaveBeenCalled();
|
|
|
|
const failureReport = createdReports.find((report) => report.body.status === 'failed');
|
|
expect(failureReport.body.tracePath).toContain('job-invalid-provenance/trace.jsonl');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'provenance_validation',
|
|
message: expect.stringContaining('cards/missing.json'),
|
|
});
|
|
expect(failureReport.body.failure.details).toMatchObject({
|
|
invalidRawPaths: ['cards/missing.json'],
|
|
currentRawPaths: ['cards/source.json'],
|
|
invalidRows: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
row: expect.objectContaining({
|
|
rawPath: 'cards/missing.json',
|
|
artifactKind: 'wiki',
|
|
artifactKey: 'account-segments',
|
|
actionType: 'wiki_written',
|
|
}),
|
|
origin: expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'card-valid-artifacts',
|
|
actionIndex: 1,
|
|
unitRawFiles: ['cards/source.json'],
|
|
action: expect.objectContaining({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'account-segments',
|
|
rawPaths: ['cards/missing.json'],
|
|
}),
|
|
}),
|
|
}),
|
|
]),
|
|
});
|
|
expect(failureReport.body.provenanceRows).toEqual(
|
|
expect.arrayContaining([
|
|
expect.objectContaining({ rawPath: 'cards/source.json', artifactKind: 'sl', artifactKey: 'mart_account_segments' }),
|
|
expect.objectContaining({ rawPath: 'cards/missing.json', artifactKind: 'wiki', artifactKey: 'account-segments' }),
|
|
]),
|
|
);
|
|
expect(failureReport.body.workUnits).toEqual(
|
|
expect.arrayContaining([
|
|
expect.objectContaining({
|
|
unitKey: 'card-valid-artifacts',
|
|
rawFiles: ['cards/source.json'],
|
|
actions: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
target: 'wiki',
|
|
key: 'account-segments',
|
|
rawPaths: ['cards/missing.json'],
|
|
}),
|
|
]),
|
|
}),
|
|
]),
|
|
);
|
|
|
|
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-invalid-provenance/trace.jsonl'), 'utf-8'))
|
|
.trim()
|
|
.split('\n')
|
|
.map((line) => JSON.parse(line));
|
|
expect(events.map((event) => event.event)).toEqual(
|
|
expect.arrayContaining([
|
|
'final_artifact_gates_finished',
|
|
'provenance_rows_validation_failed',
|
|
'ingest_failed',
|
|
'failure_report_created',
|
|
]),
|
|
);
|
|
expect(events.map((event) => event.event)).not.toContain('squash_finished');
|
|
const validationFailure = events.find((event) => event.event === 'provenance_rows_validation_failed');
|
|
expect(validationFailure).toMatchObject({
|
|
phase: 'provenance',
|
|
data: {
|
|
invalidRawPaths: ['cards/missing.json'],
|
|
currentRawPaths: ['cards/source.json'],
|
|
invalidRows: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
row: expect.objectContaining({ rawPath: 'cards/missing.json' }),
|
|
origin: expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'card-valid-artifacts',
|
|
actionIndex: 1,
|
|
}),
|
|
}),
|
|
]),
|
|
},
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects slDisallowed patches that touch semantic-layer files', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{
|
|
unitKey: 'lookml-mismatch',
|
|
rawFiles: ['views/orders.lkml'],
|
|
peerFileIndex: [],
|
|
dependencyPaths: [],
|
|
slDisallowed: true,
|
|
slDisallowedReason: 'lookml_connection_mismatch',
|
|
},
|
|
],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/orders.yaml'),
|
|
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
|
|
);
|
|
currentSession.actions.push({ target: 'sl', type: 'created', key: 'orders', detail: 'forbidden', targetConnectionId: 'warehouse' });
|
|
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], 'forbidden sl', 'KTX Test', 'system@ktx.local');
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['views/orders.lkml', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({ jobId: 'job-sl-disallowed', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
|
|
).rejects.toThrow(/isolated diff textual conflict/);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-sl-disallowed/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('patch_policy_rejected');
|
|
expect(trace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/warehouse/orders.yaml');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects final wiki refs broken by another accepted WorkUnit before squash', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/source-page.md'),
|
|
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
|
|
);
|
|
await runtime.git.commitFiles(['wiki/global/source-page.md'], 'seed source page', 'KTX Test', 'system@ktx.local');
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [
|
|
{ unitKey: 'page-ref', rawFiles: ['pages/ref.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
{ unitKey: 'page-delete', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] },
|
|
],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
if (params.telemetryTags.unitKey === 'page-ref') {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'account-segments',
|
|
detail: 'Page with wiki ref',
|
|
rawPaths: ['pages/ref.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/account-segments.md'],
|
|
'wu page ref',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
if (params.telemetryTags.unitKey === 'page-delete') {
|
|
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'removed',
|
|
key: 'source-page',
|
|
detail: 'Delete referenced page',
|
|
rawPaths: ['pages/delete.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/source-page.md'],
|
|
'wu delete source page',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['pages/ref.json', 'h1'],
|
|
['pages/delete.json', 'h2'],
|
|
]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-wiki-ref-conflict',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('final_artifact_gates_failed');
|
|
expect(trace).toContain('account-segments -> source-page');
|
|
expect(trace).toContain('ingest_failed');
|
|
expect(trace).toContain('failure_report_created');
|
|
expect(trace).not.toContain('squash_finished');
|
|
|
|
const failureReport = (deps.reports.create as any).mock.calls
|
|
.map((call: any[]) => call[0])
|
|
.find((report: any) => report.body.status === 'failed');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'final_gates',
|
|
message: expect.stringContaining('account-segments -> source-page'),
|
|
details: expect.objectContaining({
|
|
changedWikiPageKeys: expect.arrayContaining(['account-segments']),
|
|
workUnitPatchTouchedPaths: expect.arrayContaining([
|
|
'wiki/global/account-segments.md',
|
|
'wiki/global/source-page.md',
|
|
]),
|
|
}),
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects unchanged inbound wiki refs broken by an isolated wiki deletion', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/source-page.md'),
|
|
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
|
|
);
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
|
|
);
|
|
await runtime.git.commitFiles(
|
|
['wiki/global/source-page.md', 'wiki/global/account-segments.md'],
|
|
'seed inbound wiki refs',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'delete-target-page', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.unitKey !== 'delete-target-page') {
|
|
return { stopReason: 'natural' };
|
|
}
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'removed',
|
|
key: 'source-page',
|
|
detail: 'Delete referenced page',
|
|
rawPaths: ['pages/delete.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/source-page.md'],
|
|
'wu delete target page',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['pages/delete.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-existing-wiki-ref-stale',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl'), 'utf-8'))
|
|
.trim()
|
|
.split('\n')
|
|
.map((line) => JSON.parse(line));
|
|
expect(events.map((event) => event.event)).toEqual(
|
|
expect.arrayContaining([
|
|
'final_artifact_gates_started',
|
|
'final_artifact_gates_failed',
|
|
'ingest_failed',
|
|
'failure_report_created',
|
|
]),
|
|
);
|
|
expect(events.map((event) => event.event)).not.toContain('squash_finished');
|
|
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
|
|
expect(gateFailure).toMatchObject({
|
|
data: {
|
|
wikiReferenceGateScope: {
|
|
global: true,
|
|
reasons: expect.arrayContaining(['wiki_page_removed']),
|
|
removedWikiPageKeys: expect.arrayContaining(['source-page']),
|
|
pageKeysValidated: expect.arrayContaining(['account-segments']),
|
|
},
|
|
actionOrigins: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'delete-target-page',
|
|
unitRawFiles: ['pages/delete.json'],
|
|
action: expect.objectContaining({
|
|
target: 'wiki',
|
|
type: 'removed',
|
|
key: 'source-page',
|
|
rawPaths: ['pages/delete.json'],
|
|
}),
|
|
}),
|
|
]),
|
|
},
|
|
error: { message: expect.stringContaining('account-segments -> source-page') },
|
|
});
|
|
|
|
const failureReport = (deps.reports.create as any).mock.calls
|
|
.map((call: any[]) => call[0])
|
|
.find((report: any) => report.body.status === 'failed');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'final_gates',
|
|
message: expect.stringContaining('account-segments -> source-page'),
|
|
details: expect.objectContaining({
|
|
wikiReferenceGateScope: expect.objectContaining({
|
|
global: true,
|
|
reasons: expect.arrayContaining(['wiki_page_removed']),
|
|
removedWikiPageKeys: expect.arrayContaining(['source-page']),
|
|
pageKeysValidated: expect.arrayContaining(['account-segments']),
|
|
}),
|
|
changedWikiPageKeys: expect.arrayContaining(['source-page']),
|
|
actionOrigins: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
source: 'work_unit_action',
|
|
unitKey: 'delete-target-page',
|
|
action: expect.objectContaining({
|
|
target: 'wiki',
|
|
type: 'removed',
|
|
key: 'source-page',
|
|
rawPaths: ['pages/delete.json'],
|
|
}),
|
|
}),
|
|
]),
|
|
}),
|
|
});
|
|
expect(failureReport.body.workUnits).toEqual(
|
|
expect.arrayContaining([
|
|
expect.objectContaining({
|
|
unitKey: 'delete-target-page',
|
|
actions: expect.arrayContaining([
|
|
expect.objectContaining({
|
|
target: 'wiki',
|
|
type: 'removed',
|
|
key: 'source-page',
|
|
rawPaths: ['pages/delete.json'],
|
|
}),
|
|
]),
|
|
}),
|
|
]),
|
|
);
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'finance-source', rawFiles: ['cards/finance.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/finance/orders.yaml'),
|
|
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'orders');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'orders',
|
|
detail: 'Unauthorized target',
|
|
targetConnectionId: 'finance',
|
|
rawPaths: ['cards/finance.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/finance/orders.yaml'],
|
|
'wu unauthorized target',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/finance.json', 'h1']]);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-unauthorized-wu-target',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/isolated diff textual conflict.*semantic-layer target connection not allowed/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('patch_policy_rejected');
|
|
expect(trace).toContain('semantic-layer/finance/orders.yaml');
|
|
expect(trace).toContain('allowedTargetConnectionIds');
|
|
expect(trace).toContain('ingest_failed');
|
|
expect(trace).toContain('failure_report_created');
|
|
expect(trace).not.toContain('squash_finished');
|
|
|
|
const failureReport = (deps.reports.create as any).mock.calls
|
|
.map((call: any[]) => call[0])
|
|
.find((report: any) => report.body.status === 'failed');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'integration',
|
|
message: expect.stringContaining('semantic-layer target connection not allowed'),
|
|
});
|
|
expect(failureReport.body.failure.details).toMatchObject({
|
|
unitKey: 'finance-source',
|
|
allowedTargetConnectionIds: ['warehouse'],
|
|
touchedPaths: ['semantic-layer/finance/orders.yaml'],
|
|
reason: expect.stringContaining('semantic-layer/finance/orders.yaml (finance)'),
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'valid-page', rawFiles: ['pages/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(join(root, 'wiki/global/valid-page.md'), '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n');
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'valid-page',
|
|
detail: 'Valid page',
|
|
rawPaths: ['pages/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(['wiki/global/valid-page.md'], 'wu valid page', 'KTX Test', 'system@ktx.local');
|
|
} else {
|
|
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'semantic-layer/finance/reconcile_orders.yaml'),
|
|
'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'reconcile_orders');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'reconcile_orders',
|
|
detail: 'Unauthorized reconcile target',
|
|
targetConnectionId: 'finance',
|
|
rawPaths: ['pages/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/finance/reconcile_orders.yaml'],
|
|
'reconcile unauthorized target',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
}
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['pages/source.json', 'h1']]);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-unauthorized-reconcile-target',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/semantic-layer target connection not allowed/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('semantic_layer_target_policy_started');
|
|
expect(trace).toContain('semantic_layer_target_policy_failed');
|
|
expect(trace).toContain('allowedTargetConnectionIds');
|
|
expect(trace).toContain('semantic-layer/finance/reconcile_orders.yaml');
|
|
expect(trace).toContain('ingest_failed');
|
|
expect(trace).toContain('failure_report_created');
|
|
expect(trace).not.toContain('squash_finished');
|
|
const failureReport = (deps.reports.create as any).mock.calls
|
|
.map((call: any[]) => call[0])
|
|
.find((report: any) => report.body.status === 'failed');
|
|
expect(failureReport.body.failure).toMatchObject({
|
|
phase: 'target_policy',
|
|
message: expect.stringContaining('semantic-layer target connection not allowed'),
|
|
});
|
|
expect(failureReport.body.failure.details).toMatchObject({
|
|
allowedTargetConnectionIds: ['warehouse'],
|
|
touchedPaths: expect.arrayContaining(['semantic-layer/finance/reconcile_orders.yaml']),
|
|
});
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('repairs additive same-source textual conflicts before final gates and squash', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps } = makeDeps(runtime);
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') {
|
|
const current = await params.toolSet.read_integration_file.execute({
|
|
path: 'semantic-layer/warehouse/mart_account_segments.yaml',
|
|
});
|
|
expect(current.markdown).toContain('total_contract_arr_cents');
|
|
const patch = await params.toolSet.read_failed_patch.execute({});
|
|
expect(patch.markdown).toContain('account_count');
|
|
await params.toolSet.write_integration_file.execute({
|
|
path: 'semantic-layer/warehouse/mart_account_segments.yaml',
|
|
content:
|
|
'name: mart_account_segments\n' +
|
|
'grain: [account_id]\n' +
|
|
'columns: [{name: account_id, type: string}]\n' +
|
|
'joins: []\n' +
|
|
'measures:\n' +
|
|
' - name: total_contract_arr_cents\n' +
|
|
' expr: sum(contract_arr)\n' +
|
|
' - name: account_count\n' +
|
|
' expr: count_distinct(account_id)\n',
|
|
});
|
|
return { stopReason: 'natural' };
|
|
}
|
|
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
|
|
if (params.telemetryTags.unitKey === 'card-wiki') {
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\n' +
|
|
'grain: [account_id]\n' +
|
|
'columns: [{name: account_id, type: string}]\n' +
|
|
'joins: []\n' +
|
|
'measures:\n' +
|
|
' - name: total_contract_arr_cents\n' +
|
|
' expr: sum(contract_arr)\n',
|
|
);
|
|
} else if (params.telemetryTags.unitKey === 'card-source') {
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\n' +
|
|
'grain: [account_id]\n' +
|
|
'columns: [{name: account_id, type: string}]\n' +
|
|
'joins: []\n' +
|
|
'measures:\n' +
|
|
' - name: account_count\n' +
|
|
' expr: count_distinct(account_id)\n',
|
|
);
|
|
}
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
detail: 'Updated account segments source',
|
|
targetConnectionId: 'warehouse',
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
`wu ${params.telemetryTags.unitKey}`,
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [
|
|
['cards/wiki.json', 'hash-a'],
|
|
['cards/source.json', 'hash-b'],
|
|
]);
|
|
|
|
const result = await runner.run({
|
|
jobId: 'job-resolver-e2e',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'manual_resync',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload-1' },
|
|
});
|
|
|
|
expect(result.commitSha).toBeTruthy();
|
|
const source = await readFile(join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8');
|
|
expect(source).toContain('total_contract_arr_cents');
|
|
expect(source).toContain('account_count');
|
|
expect(deps.agentRunner.runLoop).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
modelRole: 'repair',
|
|
telemetryTags: expect.objectContaining({
|
|
operationName: 'ingest-isolated-diff-textual-resolver',
|
|
unitKey: 'card-source',
|
|
}),
|
|
}),
|
|
);
|
|
const successReport = (deps.reports.create as any).mock.calls.at(-1)?.[0]?.body;
|
|
expect(successReport.isolatedDiff).toMatchObject({
|
|
acceptedPatches: 2,
|
|
textualConflicts: 1,
|
|
semanticConflicts: 0,
|
|
resolverAttempts: 1,
|
|
resolverRepairs: 1,
|
|
resolverFailures: 0,
|
|
});
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-resolver-e2e/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('textual_conflict_resolver_repaired');
|
|
expect(trace).toContain('patch_accepted_after_textual_resolution');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('repairs final wiki body refs before squash when the repair agent edits the scoped page', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
|
|
);
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
await runtime.git.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
|
|
'seed stale wiki body ref',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
|
|
const gateError = await params.toolSet.read_gate_error.execute({});
|
|
expect(gateError.markdown).toContain('total_contract_arr_cents');
|
|
const page = await params.toolSet.read_repair_file.execute({
|
|
path: 'wiki/global/account-segments.md',
|
|
});
|
|
await params.toolSet.write_repair_file.execute({
|
|
path: 'wiki/global/account-segments.md',
|
|
content: page.markdown.replace('total_contract_arr_cents', 'total_contract_arr'),
|
|
});
|
|
return { stopReason: 'natural' as const };
|
|
}
|
|
if (params.modelRole === 'reconcile') {
|
|
return { stopReason: 'natural' as const };
|
|
}
|
|
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
detail: 'Rename ARR measure',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
'wu source rename',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' as const };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
const result = await runner.run({
|
|
jobId: 'job-final-gate-repair',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
});
|
|
|
|
expect(result.commitSha).toBeTruthy();
|
|
await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.toContain(
|
|
'mart_account_segments.total_contract_arr',
|
|
);
|
|
await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.not.toContain(
|
|
'total_contract_arr_cents',
|
|
);
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any;
|
|
expect(reportCreate.body.isolatedDiff).toMatchObject({
|
|
gateRepairAttempts: 1,
|
|
gateRepairs: 1,
|
|
gateRepairFailures: 0,
|
|
});
|
|
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair/trace.jsonl'), 'utf-8');
|
|
expect(trace).toContain('gate_repair_repaired');
|
|
expect(trace).toContain('final_artifact_gates_after_gate_repair_finished');
|
|
expect(trace).toContain('final_gate_repair_committed');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('fails before squash when final gate repair makes no edit', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
|
|
);
|
|
await writeFile(
|
|
join(runtime.configDir, 'wiki/global/account-segments.md'),
|
|
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
|
|
);
|
|
await runtime.git.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
|
|
'seed stale wiki body ref',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
const preRunHead = await runtime.git.revParseHead();
|
|
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
|
|
if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') {
|
|
return { stopReason: 'natural' as const };
|
|
}
|
|
if (params.modelRole === 'reconcile') {
|
|
return { stopReason: 'natural' as const };
|
|
}
|
|
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await writeFile(
|
|
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
|
|
currentSession.actions.push({
|
|
target: 'sl',
|
|
type: 'updated',
|
|
key: 'mart_account_segments',
|
|
detail: 'Rename ARR measure',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['semantic-layer/warehouse/mart_account_segments.yaml'],
|
|
'wu source rename',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' as const };
|
|
}) as never;
|
|
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-final-gate-repair-fails',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/gate repair completed without editing an allowed path/);
|
|
|
|
expect(await runtime.git.revParseHead()).toBe(preRunHead);
|
|
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any;
|
|
expect(reportCreate.body.status).toBe('failed');
|
|
expect(reportCreate.body.isolatedDiff).toMatchObject({
|
|
gateRepairAttempts: 1,
|
|
gateRepairs: 0,
|
|
gateRepairFailures: 1,
|
|
});
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair-fails/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('gate_repair_failed');
|
|
expect(trace).not.toContain('squash_finished');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
it('runs finalization before wiki sl-ref repair and final gates', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
adapter.finalize = vi.fn(async ({ workdir }) => {
|
|
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
|
|
await mkdir(join(workdir, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
|
|
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
|
|
);
|
|
await writeFile(
|
|
join(workdir, 'wiki/global/finalized-accounts.md'),
|
|
'---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n',
|
|
);
|
|
return {
|
|
warnings: [],
|
|
errors: [],
|
|
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
|
|
changedWikiPageKeys: ['finalized-accounts'],
|
|
actions: [
|
|
{
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'mart_account_segments',
|
|
detail: 'Finalized accounts',
|
|
targetConnectionId: 'warehouse',
|
|
rawPaths: ['cards/source.json'],
|
|
},
|
|
{
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'finalized-accounts',
|
|
detail: 'Finalized wiki',
|
|
rawPaths: ['cards/source.json'],
|
|
},
|
|
],
|
|
};
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => ({ stopReason: 'natural' as const })) as never;
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await runner.run({
|
|
jobId: 'job-finalization',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
});
|
|
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-finalization/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace.indexOf('finalization_committed')).toBeLessThan(trace.indexOf('wiki_sl_refs_repaired'));
|
|
expect(trace.indexOf('wiki_sl_refs_repaired')).toBeLessThan(trace.indexOf('final_artifact_gates'));
|
|
await expect(readFile(join(runtime.configDir, 'wiki/global/finalized-accounts.md'), 'utf-8')).resolves.toContain(
|
|
'sl_refs:\n - mart_account_segments',
|
|
);
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('fails when finalization edits a path already changed earlier in the run', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({
|
|
workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
|
|
});
|
|
let currentSession: any = null;
|
|
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
|
|
currentSession = toolSession;
|
|
return { toRuntimeTools: vi.fn(() => ({})) };
|
|
});
|
|
deps.agentRunner.runLoop = vi.fn(async () => {
|
|
const root = rootOfConfig(currentSession.configService, runtime.configDir);
|
|
await mkdir(join(root, 'wiki/global'), { recursive: true });
|
|
await writeFile(
|
|
join(root, 'wiki/global/orders.md'),
|
|
'---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n',
|
|
);
|
|
currentSession.actions.push({
|
|
target: 'wiki',
|
|
type: 'created',
|
|
key: 'orders',
|
|
detail: 'WU orders',
|
|
rawPaths: ['cards/source.json'],
|
|
});
|
|
await currentSession.gitService.commitFiles(
|
|
['wiki/global/orders.md'],
|
|
'wu orders',
|
|
'KTX Test',
|
|
'system@ktx.local',
|
|
);
|
|
return { stopReason: 'natural' as const };
|
|
}) as never;
|
|
adapter.finalize = vi.fn(async ({ workdir }) => {
|
|
await writeFile(
|
|
join(workdir, 'wiki/global/orders.md'),
|
|
'---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n',
|
|
);
|
|
return {
|
|
warnings: [],
|
|
errors: [],
|
|
touchedSources: [],
|
|
changedWikiPageKeys: ['orders'],
|
|
actions: [{ target: 'wiki', type: 'updated', key: 'orders', detail: 'Conflicting finalization' }],
|
|
};
|
|
});
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-finalization-overlap',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/);
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
|
|
it('rejects finalization writes to unauthorized semantic-layer targets', async () => {
|
|
const runtime = await makeRealGitRuntime();
|
|
try {
|
|
const { deps, adapter } = makeDeps(runtime);
|
|
adapter.chunk.mockResolvedValue({ workUnits: [] });
|
|
adapter.finalize = vi.fn(async ({ workdir }) => {
|
|
await mkdir(join(workdir, 'semantic-layer/other-warehouse'), { recursive: true });
|
|
await writeFile(
|
|
join(workdir, 'semantic-layer/other-warehouse/orders.yaml'),
|
|
'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n',
|
|
);
|
|
return {
|
|
warnings: [],
|
|
errors: [],
|
|
touchedSources: [{ connectionId: 'other-warehouse', sourceName: 'orders' }],
|
|
changedWikiPageKeys: [],
|
|
actions: [
|
|
{
|
|
target: 'sl',
|
|
type: 'created',
|
|
key: 'orders',
|
|
targetConnectionId: 'other-warehouse',
|
|
detail: 'Forbidden target',
|
|
rawPaths: ['cards/source.json'],
|
|
},
|
|
],
|
|
};
|
|
});
|
|
const runner = new IngestBundleRunner(deps);
|
|
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
|
|
|
|
await expect(
|
|
runner.run({
|
|
jobId: 'job-finalization-target-policy',
|
|
connectionId: 'warehouse',
|
|
sourceKey: 'metabase',
|
|
trigger: 'upload',
|
|
bundleRef: { kind: 'upload', uploadId: 'upload' },
|
|
}),
|
|
).rejects.toThrow(/semantic-layer target connection not allowed/);
|
|
const trace = await readFile(
|
|
join(runtime.configDir, '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl'),
|
|
'utf-8',
|
|
);
|
|
expect(trace).toContain('finalization_committed');
|
|
expect(trace).toContain('semantic_layer_target_policy');
|
|
expect(trace).toContain('ingest_failed');
|
|
} finally {
|
|
await rm(runtime.homeDir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
});
|