2026-05-18 13:38:06 +02:00
import { mkdir , mkdtemp , readFile , readdir , rm , writeFile } from 'node:fs/promises' ;
import { tmpdir } from 'node:os' ;
import { join } from 'node:path' ;
import { describe , expect , it , vi } from 'vitest' ;
test: split cli tests from source tree (#216)
* feat(cli): define full warehouse dialect contract
* test(cli): keep dialect edge tests focused
* fix(cli): stabilize dialect contract foundation
* refactor(connectors): own read-only query preparation
* refactor(connectors): resolve dialects through registry
* refactor(connectors): keep concrete dialect classes internal
* chore(workspace): enforce dialect import boundary
* refactor(cli): resolve relationship dialect at scan boundary
* refactor(cli): use dialect display parsing for entity details
* refactor(cli): use dialect display parsing for warehouse catalog
* refactor(cli): use dialect SQL in relationship workflows
* test(cli): verify solid dialect scan workflow closure
* test: split cli tests from source tree
* refactor(cli): standardize BigQuery scope listing
* feat(sqlite): implement connector scope listing
* test(connectors): cover required table listing
* feat(cli): add warehouse driver registry
* refactor(setup): route scope discovery through driver registry
* refactor(cli): route local query execution through driver registry
* refactor(historic-sql): route dialect support through driver registry
* refactor(cli): test warehouse connections through driver registry
* fix(cli): close driver registry type export gaps
* Improve setup daemon diagnostics
* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback
Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match
The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.
Align the picker boundary with the canonical 3-level KtxTableRef:
- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
(resolveEnabledTables already accepts the 3-part shape) and
schemasFromEnabledTables now goes through parseDottedTableEntry so it
recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
reuse.
Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).
* fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00
import { GitService } from '../../../src/context/core/git.service.js' ;
import { SessionWorktreeService } from '../../../src/context/core/session-worktree.service.js' ;
import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js' ;
import { addTouchedSlSource } from '../../../src/context/tools/touched-sl-sources.js' ;
import { IngestBundleRunner } from '../../../src/context/ingest/ingest-bundle.runner.js' ;
import type { IngestBundleRunnerDeps } from '../../../src/context/ingest/ports.js' ;
2026-05-18 13:38:06 +02:00
async function makeRealGitRuntime() {
const homeDir = await mkdtemp ( join ( tmpdir ( ) , 'ktx-isolated-runner-' ) ) ;
const configDir = join ( homeDir , 'config' ) ;
const git = new GitService ( {
storage : { configDir , homeDir } ,
git : {
userName : 'System User' ,
userEmail : 'system@example.com' ,
bootstrapMessage : 'init' ,
bootstrapAuthor : 'system' ,
bootstrapAuthorEmail : 'system@example.com' ,
} ,
} ) ;
await git . onModuleInit ( ) ;
const configService = new LocalGitFileStore ( { rootDir : configDir , git } ) ;
const sessionWorktreeService = new SessionWorktreeService ( {
coreConfig : {
storage : { configDir , homeDir } ,
git : {
userName : 'System User' ,
userEmail : 'system@example.com' ,
bootstrapMessage : 'init' ,
bootstrapAuthor : 'system' ,
bootstrapAuthorEmail : 'system@example.com' ,
} ,
} ,
gitService : git ,
configService ,
} ) ;
return { homeDir , configDir , git , configService , sessionWorktreeService } ;
}
function rootOfConfig ( configService : unknown , fallback : string ) : string {
const rootDir = ( configService as { rootDir? : unknown } ) . rootDir ;
return typeof rootDir === 'string' ? rootDir : fallback ;
}
async function loadSourcesFromRoot ( root : string ) {
const raw = await readFile ( join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) , 'utf-8' ) . catch (
( ) = > '' ,
) ;
const hasCents = raw . includes ( 'total_contract_arr_cents' ) ;
const hasDollars = raw . includes ( 'total_contract_arr' ) ;
return {
sources :
hasCents || hasDollars
? [
{
name : 'mart_account_segments' ,
grain : [ 'account_id' ] ,
columns : [ { name : 'account_id' , type : 'string' } ] ,
joins : [ ] ,
measures : [ { name : hasCents ? 'total_contract_arr_cents' : 'total_contract_arr' , expr : 'sum(contract_arr)' } ] ,
table : 'analytics.mart_account_segments' ,
} ,
]
: [ ] ,
loadErrors : [ ] ,
} ;
}
async function listGlobalWikiPageKeys ( root : string ) : Promise < string [ ] > {
const dir = join ( root , 'wiki/global' ) ;
const entries = await readdir ( dir ) . catch ( ( ) = > [ ] ) ;
return entries
. filter ( ( entry ) = > entry . endsWith ( '.md' ) )
. map ( ( entry ) = > entry . slice ( 0 , - '.md' . length ) )
. sort ( ) ;
}
function frontmatterList ( yaml : string , key : string ) : string [ ] {
const pattern = new RegExp ( ` (?:^| \\ n) ${ key } : \\ n((?: - .+ \\ n?)*) ` ) ;
return (
pattern
. exec ( yaml ) ? . [ 1 ]
? . split ( '\n' )
. map ( ( line ) = > line . trim ( ) . replace ( /^- / , '' ) )
. filter ( Boolean ) ? ? [ ]
) ;
}
function legacyFallbackSettingKey ( ) : string {
return [ 'sharedWorktree' , 'SourceKeys' ] . join ( '' ) ;
}
function legacySharedTraceEvent ( ) : string {
return [ 'shared' , 'worktree' , 'path' , 'enabled' ] . join ( '_' ) ;
}
function makeWikiService ( root : string ) {
return {
listPageKeys : vi.fn ( async ( scope : string ) = > ( scope === 'GLOBAL' ? listGlobalWikiPageKeys ( root ) : [ ] ) ) ,
readPage : vi.fn ( async ( _scope : string , _scopeId : string | null , key : string ) = > {
const path = join ( root , 'wiki/global' , ` ${ key } .md ` ) ;
const raw = await readFile ( path , 'utf-8' ) . catch ( ( ) = > null ) ;
if ( ! raw ) {
return null ;
}
const [ , yaml = '' , content = '' ] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/ . exec ( raw ) ? ? [ ] ;
return {
pageKey : key ,
frontmatter : {
summary : key ,
usage_mode : 'auto' ,
refs : frontmatterList ( yaml , 'refs' ) ,
sl_refs : frontmatterList ( yaml , 'sl_refs' ) ,
} ,
content : content.trim ( ) ,
} ;
} ) ,
2026-05-20 14:17:10 +02:00
writePage : vi.fn (
async (
_scope : string ,
_scopeId : string | null ,
key : string ,
frontmatter : { summary? : string ; usage_mode? : string ; refs? : string [ ] ; sl_refs? : string [ ] } ,
content : string ,
) = > {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
const refs = ( frontmatter . refs ? ? [ ] ) . map ( ( ref ) = > ` - ${ ref } ` ) . join ( '\n' ) ;
const slRefs = ( frontmatter . sl_refs ? ? [ ] ) . map ( ( ref ) = > ` - ${ ref } ` ) . join ( '\n' ) ;
await writeFile (
join ( root , 'wiki/global' , ` ${ key } .md ` ) ,
[
'---' ,
` summary: ${ frontmatter . summary ? ? key } ` ,
` usage_mode: ${ frontmatter . usage_mode ? ? 'auto' } ` ,
'refs:' ,
refs ,
'sl_refs:' ,
slRefs ,
'---' ,
'' ,
content ,
'' ,
] . join ( '\n' ) ,
) ;
} ,
) ,
2026-05-18 13:38:06 +02:00
syncFromCommit : vi.fn ( ) ,
} ;
}
function makeDeps (
runtime : Awaited < ReturnType < typeof makeRealGitRuntime > > ,
sourceKey = 'metabase' ,
settings : Partial < IngestBundleRunnerDeps [ 'settings' ] > = { } ,
) {
const adapter : any = {
source : sourceKey ,
skillNames : [ ] ,
detect : vi.fn ( ) . mockResolvedValue ( true ) ,
chunk : vi.fn ( ) . mockResolvedValue ( {
workUnits : [
{ unitKey : 'card-wiki' , rawFiles : [ 'cards/wiki.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'card-source' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ,
} ;
const wikiService = makeWikiService ( runtime . configDir ) ;
const semanticLayerService : any = {
loadAllSources : vi.fn ( async ( ) = > loadSourcesFromRoot ( runtime . configDir ) ) ,
listFilesForConnection : vi.fn ( ) . mockResolvedValue ( [ 'mart_account_segments.yaml' ] ) ,
} ;
semanticLayerService . forWorktree = vi . fn ( ( workdir : string ) = > ( {
. . . semanticLayerService ,
loadAllSources : vi.fn ( async ( ) = > loadSourcesFromRoot ( workdir ) ) ,
listFilesForConnection : vi.fn ( ) . mockResolvedValue ( [ 'mart_account_segments.yaml' ] ) ,
} ) ) ;
const deps : IngestBundleRunnerDeps = {
runs : { create : vi.fn ( ) . mockResolvedValue ( { id : 'run-1' } ) , markCompleted : vi.fn ( ) , markFailed : vi.fn ( ) } ,
provenance : {
insertMany : vi.fn ( ) ,
findLatestHashesForCompletedSyncs : vi.fn ( ) . mockResolvedValue ( new Map ( ) ) ,
findLatestArtifactsForRawPaths : vi.fn ( ) . mockResolvedValue ( new Map ( ) ) ,
} ,
reports : { create : vi.fn ( ) . mockResolvedValue ( { id : 'report-1' } ) , findByJobId : vi.fn ( ) . mockResolvedValue ( null ) , markSuperseded : vi.fn ( ) } ,
canonicalPins : { listPins : vi.fn ( ) . mockResolvedValue ( [ ] ) } ,
registry : { get : vi . fn ( ) . mockReturnValue ( adapter ) , register : vi.fn ( ) , has : vi.fn ( ) , list : vi.fn ( ) } ,
diffSetService : {
compute : vi.fn ( ) . mockResolvedValue ( { added : [ 'cards/wiki.json' , 'cards/source.json' ] , modified : [ ] , deleted : [ ] , unchanged : [ ] } ) ,
} ,
sessionWorktreeService : runtime.sessionWorktreeService ,
agentRunner : { runLoop : vi.fn ( ) } ,
gitService : runtime.git ,
lockingService : { withLock : vi.fn ( async ( _key , fn ) = > fn ( ) ) } ,
storage : {
homeDir : join ( runtime . configDir , '.ktx' ) ,
systemGitAuthor : { name : 'KTX Test' , email : 'system@ktx.local' } ,
resolveUploadDir : ( id ) = > join ( runtime . homeDir , 'upload' , id ) ,
resolvePullDir : ( id ) = > join ( runtime . homeDir , 'pull' , id ) ,
resolveTranscriptDir : ( id ) = > join ( runtime . configDir , '.ktx/ingest-transcripts' , id ) ,
resolveTracePath : ( id ) = > join ( runtime . configDir , '.ktx/ingest-traces' , id , 'trace.jsonl' ) ,
} ,
settings : {
memoryIngestionModel : 'test' ,
probeRowCount : 1 ,
ingestTraceLevel : 'trace' ,
. . . settings ,
} ,
skillsRegistry : {
listSkills : vi.fn ( ) . mockResolvedValue ( [ ] ) ,
getSkill : vi.fn ( ) . mockResolvedValue ( null ) ,
buildSkillsPrompt : vi.fn ( ) . mockReturnValue ( '' ) ,
stripFrontmatter : vi.fn ( ( body ) = > body ) ,
} as never ,
promptService : { loadPrompt : vi.fn ( ) . mockResolvedValue ( 'base' ) } as never ,
wikiService : { . . . wikiService , forWorktree : vi.fn ( ( workdir : string ) = > makeWikiService ( workdir ) ) } as never ,
knowledgeIndex : { listPagesForUser : vi.fn ( ) . mockResolvedValue ( [ ] ) } ,
knowledgeSlRefs : { syncFromWiki : vi.fn ( ) } ,
semanticLayerService ,
slSearchService : { indexSources : vi.fn ( ) } as never ,
slSourcesRepository : { } as never ,
slValidator : { validateSingleSource : vi.fn ( ) . mockResolvedValue ( { errors : [ ] , warnings : [ ] } ) } ,
connections : { listEnabledConnections : vi.fn ( ) . mockResolvedValue ( [ ] ) , getConnectionById : vi.fn ( ) } as never ,
toolsetFactory : { createIngestWuToolset : vi.fn ( ( ) = > ( { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ) ) } ,
commitMessages : { enqueueForExternalCommit : vi.fn ( ) } ,
embedding : { maxBatchSize : 64 , computeEmbedding : vi.fn ( ) , computeEmbeddingsBulk : vi.fn ( ) } ,
} ;
return { deps , adapter } ;
}
async function mockStageRawFiles (
runner : IngestBundleRunner ,
runtime : Awaited < ReturnType < typeof makeRealGitRuntime > > ,
hashes : [ string , string ] [ ] ,
sourceKey = 'metabase' ,
) {
( runner as any ) . resolveStagedDir = vi . fn ( ) . mockResolvedValue ( join ( runtime . homeDir , 'stage' ) ) ;
( runner as any ) . stageRawFilesStage1 = vi . fn ( async ( { worktreeRoot } : any ) = > {
const rawDir = join ( worktreeRoot , 'raw-sources/warehouse' , sourceKey , 's' ) ;
await mkdir ( rawDir , { recursive : true } ) ;
for ( const [ rawPath ] of hashes ) {
await mkdir ( join ( rawDir , rawPath . split ( '/' ) . slice ( 0 , - 1 ) . join ( '/' ) ) , { recursive : true } ) ;
await writeFile ( join ( rawDir , rawPath ) , '{}' ) ;
}
return { currentHashes : new Map ( hashes ) , rawDirInWorktree : ` raw-sources/warehouse/ ${ sourceKey } /s ` } ;
} ) ;
}
describe ( 'IngestBundleRunner isolated diff path' , ( ) = > {
it ( 'routes an unlisted direct-writing source through isolated diffs by default' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const sourceKey = 'custom-direct-source' ;
const { deps , adapter } = makeDeps ( runtime , sourceKey ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{
unitKey : 'custom-wiki' ,
rawFiles : [ 'custom/page.json' ] ,
peerFileIndex : [ ] ,
dependencyPaths : [ ] ,
} ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName !== 'ingest-bundle-wu' ) {
return { stopReason : 'natural' } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/custom-isolated.md' ) ,
'---\nsummary: Custom isolated write\nusage_mode: auto\n---\n\nCustom isolated write.\n' ,
'utf-8' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'custom-isolated' ,
detail : 'Custom isolated write' ,
rawPaths : [ 'custom/page.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/custom-isolated.md' ] ,
'custom wiki' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'custom/page.json' , 'h1' ] ] , sourceKey ) ;
await expect (
runner . run ( {
jobId : 'job-custom-default' ,
connectionId : 'warehouse' ,
sourceKey ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . resolves . toMatchObject ( {
jobId : 'job-custom-default' ,
failedWorkUnits : [ ] ,
workUnitCount : 1 ,
} ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-custom-default/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'isolated_diff_enabled' ) ;
expect ( trace ) . toContain ( 'work_unit_child_created' ) ;
expect ( trace ) . not . toContain ( legacySharedTraceEvent ( ) ) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] ;
const reportBody = reportCreate ? . body as { isolatedDiff? : unknown } | undefined ;
expect ( reportBody ? . isolatedDiff ) . toMatchObject ( {
enabled : true ,
acceptedPatches : 1 ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'does not support shared-worktree fallback settings' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const sourceKey = 'legacy-source' ;
const staleSettings = {
[ legacyFallbackSettingKey ( ) ] : [ 'legacy-source' ] ,
} as Partial < IngestBundleRunnerDeps [ 'settings' ] > & Record < string , unknown > ;
const { deps , adapter } = makeDeps ( runtime , sourceKey , staleSettings ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{
unitKey : 'legacy-wiki' ,
rawFiles : [ 'legacy/page.json' ] ,
peerFileIndex : [ ] ,
dependencyPaths : [ ] ,
} ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName !== 'ingest-bundle-wu' ) {
return { stopReason : 'natural' } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/legacy-isolated.md' ) ,
'---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n' ,
'utf-8' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'legacy-isolated' ,
detail : 'Legacy isolated write' ,
rawPaths : [ 'legacy/page.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/legacy-isolated.md' ] ,
'legacy isolated wiki' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'legacy/page.json' , 'h1' ] ] , sourceKey ) ;
await expect (
runner . run ( {
jobId : 'job-legacy-isolated' ,
connectionId : 'warehouse' ,
sourceKey ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . resolves . toMatchObject ( {
jobId : 'job-legacy-isolated' ,
failedWorkUnits : [ ] ,
workUnitCount : 1 ,
} ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'isolated_diff_enabled' ) ;
expect ( trace ) . toContain ( 'work_unit_child_created' ) ;
expect ( trace ) . not . toContain ( legacySharedTraceEvent ( ) ) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] ;
const reportBody = reportCreate ? . body as { isolatedDiff? : unknown } | undefined ;
expect ( reportBody ? . isolatedDiff ) . toMatchObject ( {
enabled : true ,
acceptedPatches : 1 ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'does not integrate failed isolated WorkUnit patches' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime , 'fake' ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{ unitKey : 'wu-good' , rawFiles : [ 'good.raw' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'wu-bad' , rawFiles : [ 'bad.raw' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ;
deps . diffSetService . compute = vi . fn ( ) . mockResolvedValue ( {
added : [ 'good.raw' , 'bad.raw' ] ,
modified : [ ] ,
deleted : [ ] ,
unchanged : [ ] ,
} ) ;
deps . slValidator . validateSingleSource = vi . fn (
async ( _validationDeps : unknown , _connectionId : string , sourceName : string ) = > ( {
errors : sourceName === 'bad' ? [ { message : 'bad source rejected' } ] : [ ] ,
warnings : [ ] ,
} ) ,
) as never ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName !== 'ingest-bundle-wu' ) {
return { stopReason : 'natural' } ;
}
const unitKey = params . telemetryTags . unitKey ;
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
if ( unitKey === 'wu-good' ) {
await writeFile ( join ( root , 'semantic-layer/warehouse/good.yaml' ) , 'name: good\n' , 'utf-8' ) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'good' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'good' ,
detail : 'good source' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'good.raw' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/good.yaml' ] ,
'test: add good source' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
if ( unitKey === 'wu-bad' ) {
await writeFile ( join ( root , 'semantic-layer/warehouse/bad.yaml' ) , 'name: bad\n' , 'utf-8' ) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'bad' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'bad' ,
detail : 'bad source' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'bad.raw' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/bad.yaml' ] ,
'test: add bad source' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles (
runner ,
runtime ,
[
[ 'good.raw' , 'good-hash' ] ,
[ 'bad.raw' , 'bad-hash' ] ,
] ,
'fake' ,
) ;
const result = await runner . run ( {
jobId : 'job-failed-wu-isolated' ,
connectionId : 'warehouse' ,
sourceKey : 'fake' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ;
expect ( result . failedWorkUnits ) . toEqual ( [ 'wu-bad' ] ) ;
await expect ( readFile ( join ( runtime . configDir , 'semantic-layer/warehouse/good.yaml' ) , 'utf-8' ) ) . resolves . toContain (
'good' ,
) ;
await expect ( readFile ( join ( runtime . configDir , 'semantic-layer/warehouse/bad.yaml' ) , 'utf-8' ) ) . rejects . toThrow ( ) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] ;
const reportBody = reportCreate ? . body as {
isolatedDiff ? : { acceptedPatches? : number } ;
failedWorkUnits? : string [ ] ;
} ;
expect ( reportBody . failedWorkUnits ) . toEqual ( [ 'wu-bad' ] ) ;
expect ( reportBody . isolatedDiff ) . toMatchObject ( { enabled : true , acceptedPatches : 1 } ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'work_unit_failed_before_patch' ) ;
expect ( trace ) . toContain ( 'patch_accepted' ) ;
expect ( trace ) . not . toContain ( legacySharedTraceEvent ( ) ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it . each ( [ 'notion' , 'lookml' , 'looker' , 'dbt' , 'metricflow' ] as const ) (
'routes %s direct writes through isolated child worktrees' ,
async ( sourceKey ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime , sourceKey ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{
unitKey : ` ${ sourceKey } -wiki ` ,
rawFiles : [ ` ${ sourceKey } /page.json ` ] ,
peerFileIndex : [ ] ,
dependencyPaths : [ ] ,
} ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName !== 'ingest-bundle-wu' ) {
return { stopReason : 'natural' } ;
}
expect ( params . telemetryTags ) . toMatchObject ( {
operationName : 'ingest-bundle-wu' ,
source : sourceKey ,
unitKey : ` ${ sourceKey } -wiki ` ,
} ) ;
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global' , ` ${ sourceKey } -isolated.md ` ) ,
` --- \ nsummary: ${ sourceKey } isolated write \ nusage_mode: auto \ n--- \ n \ nIsolated ${ sourceKey } write. \ n ` ,
'utf-8' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : ` ${ sourceKey } -isolated ` ,
detail : ` ${ sourceKey } isolated write ` ,
rawPaths : [ ` ${ sourceKey } /page.json ` ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ ` wiki/global/ ${ sourceKey } -isolated.md ` ] ,
` ${ sourceKey } wiki ` ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ ` ${ sourceKey } /page.json ` , 'h1' ] ] , sourceKey ) ;
await expect (
runner . run ( {
jobId : ` job- ${ sourceKey } ` ,
connectionId : 'warehouse' ,
sourceKey ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . resolves . toMatchObject ( {
jobId : ` job- ${ sourceKey } ` ,
failedWorkUnits : [ ] ,
workUnitCount : 1 ,
} ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces' , ` job- ${ sourceKey } ` , 'trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'isolated_diff_enabled' ) ;
expect ( trace ) . toContain ( 'work_unit_child_created' ) ;
expect ( trace ) . toContain ( 'work_unit_patch_collected' ) ;
expect ( trace ) . toContain ( 'patch_apply_started' ) ;
expect ( trace ) . not . toContain ( legacySharedTraceEvent ( ) ) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] ;
const reportBody = reportCreate ? . body as { isolatedDiff? : unknown } | undefined ;
expect ( reportBody ? . isolatedDiff ) . toMatchObject ( {
enabled : true ,
acceptedPatches : 1 ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ,
) ;
it ( 'rejects the Metabase stale-measure wiki body regression before squash' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . project = vi . fn ( async ( { workdir } ) = > {
await mkdir ( join ( workdir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( workdir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n' ,
) ;
return {
warnings : [ ] ,
errors : [ ] ,
touchedSources : [ { connectionId : 'warehouse' , sourceName : 'mart_account_segments' } ] ,
changedWikiPageKeys : [ ] ,
} ;
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
if ( params . telemetryTags . unitKey === 'card-wiki' ) {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
currentSession . actions . push ( { target : 'wiki' , type : 'created' , key : 'account-segments' , detail : 'Account segments' } ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/account-segments.md' ] , 'wu wiki' , 'KTX Test' , 'system@ktx.local' ) ;
}
if ( params . telemetryTags . unitKey === 'card-source' ) {
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
detail : 'Dollar measure' ,
targetConnectionId : 'warehouse' ,
} ) ;
await currentSession . gitService . commitFiles ( [ 'semantic-layer/warehouse/mart_account_segments.yaml' ] , 'wu source' , 'KTX Test' , 'system@ktx.local' ) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'cards/wiki.json' , 'h1' ] ,
[ 'cards/source.json' , 'h2' ] ,
] ) ;
await expect (
runner . run ( { jobId : 'job-1' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ,
) . rejects . toThrow ( /total_contract_arr_cents/ ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-1/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'input_snapshot' ) ;
expect ( trace ) . toContain ( 'isolated_diff_enabled' ) ;
expect ( trace ) . toContain ( 'work_unit_child_created' ) ;
expect ( trace ) . toContain ( 'work_unit_patch_collected' ) ;
expect ( trace ) . toContain ( 'patch_apply_started' ) ;
expect ( trace ) . toContain ( 'final_artifact_gates_failed' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects unchanged wiki body refs made stale by isolated semantic-layer changes' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
await mkdir ( join ( runtime . configDir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await mkdir ( join ( runtime . configDir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( runtime . configDir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n' ,
) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
await runtime . git . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' , 'wiki/global/account-segments.md' ] ,
'seed existing wiki body ref' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'source-only' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
detail : 'Rename ARR measure' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
'wu source rename' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-existing-body-stale' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /total_contract_arr_cents/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const events = ( await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl' ) , 'utf-8' ) )
. trim ( )
. split ( '\n' )
. map ( ( line ) = > JSON . parse ( line ) ) ;
expect ( events . map ( ( event ) = > event . event ) ) . toEqual (
expect . arrayContaining ( [
'final_artifact_gates_started' ,
'final_artifact_gates_failed' ,
'ingest_failed' ,
'failure_report_created' ,
] ) ,
) ;
expect ( events . map ( ( event ) = > event . event ) ) . not . toContain ( 'squash_finished' ) ;
const gateFailure = events . find ( ( event ) = > event . event === 'final_artifact_gates_failed' ) ;
expect ( gateFailure ) . toMatchObject ( {
data : {
wikiReferenceGateScope : {
global : true ,
reasons : expect.arrayContaining ( [ 'semantic_layer_changed' ] ) ,
pageKeysValidated : expect.arrayContaining ( [ 'account-segments' ] ) ,
} ,
actionOrigins : expect.arrayContaining ( [
expect . objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'source-only' ,
unitRawFiles : [ 'cards/source.json' ] ,
action : expect.objectContaining ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
rawPaths : [ 'cards/source.json' ] ,
targetConnectionId : 'warehouse' ,
} ) ,
} ) ,
] ) ,
} ,
error : { message : expect.stringContaining ( 'total_contract_arr_cents' ) } ,
} ) ;
const failureReport = ( deps . reports . create as any ) . mock . calls
. map ( ( call : any [ ] ) = > call [ 0 ] )
. find ( ( report : any ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'final_gates' ,
message : expect.stringContaining ( 'total_contract_arr_cents' ) ,
details : expect.objectContaining ( {
wikiReferenceGateScope : expect.objectContaining ( {
global : true ,
reasons : expect.arrayContaining ( [ 'semantic_layer_changed' ] ) ,
pageKeysValidated : expect.arrayContaining ( [ 'account-segments' ] ) ,
} ) ,
touchedSlSources : expect.arrayContaining ( [
expect . objectContaining ( { connectionId : 'warehouse' , sourceName : 'mart_account_segments' } ) ,
] ) ,
actionOrigins : expect.arrayContaining ( [
expect . objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'source-only' ,
action : expect.objectContaining ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
rawPaths : [ 'cards/source.json' ] ,
targetConnectionId : 'warehouse' ,
} ) ,
} ) ,
] ) ,
} ) ,
} ) ;
expect ( failureReport . body . workUnits ) . toEqual (
expect . arrayContaining ( [
expect . objectContaining ( {
unitKey : 'source-only' ,
actions : expect.arrayContaining ( [
expect . objectContaining ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ,
] ) ,
} ) ,
] ) ,
) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'accepts two isolated work units that edit different wiki pages' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{ unitKey : 'page-a' , rawFiles : [ 'pages/a.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'page-b' , rawFiles : [ 'pages/b.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const unitKey = params . telemetryTags . unitKey ;
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile ( join ( root , ` wiki/global/ ${ unitKey } .md ` ) , ` --- \ nsummary: ${ unitKey } \ nusage_mode: auto \ n--- \ n \ n ${ unitKey } \ n ` ) ;
currentSession . actions . push ( { target : 'wiki' , type : 'created' , key : unitKey , detail : unitKey } ) ;
await currentSession . gitService . commitFiles ( [ ` wiki/global/ ${ unitKey } .md ` ] , ` wu ${ unitKey } ` , 'KTX Test' , 'system@ktx.local' ) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'pages/a.json' , 'h1' ] ,
[ 'pages/b.json' , 'h2' ] ,
] ) ;
const result = await runner . run ( { jobId : 'job-clean' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ;
expect ( result . failedWorkUnits ) . toEqual ( [ ] ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-clean/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace . match ( /patch_accepted/g ) ) . toHaveLength ( 2 ) ;
expect ( trace ) . toContain ( 'ingest_finished' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'classifies same-source patch application failure as a textual conflict' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{ unitKey : 'orders-a' , rawFiles : [ 'orders/a.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'orders-b' , rawFiles : [ 'orders/b.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName === 'ingest-isolated-diff-textual-resolver' ) {
return { stopReason : 'natural' } ;
}
const suffix = params . telemetryTags . unitKey === 'orders-a' ? 'a' : 'b' ;
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/orders.yaml' ) ,
` name: orders \ ngrain: [id] \ ncolumns: [{name: id, type: string}] \ njoins: [] \ nmeasures: \ n - name: order_count_ ${ suffix } \ n expr: count(*) \ n ` ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'orders' ) ;
currentSession . actions . push ( { target : 'sl' , type : 'updated' , key : 'orders' , detail : suffix , targetConnectionId : 'warehouse' } ) ;
await currentSession . gitService . commitFiles ( [ 'semantic-layer/warehouse/orders.yaml' ] , ` wu ${ suffix } ` , 'KTX Test' , 'system@ktx.local' ) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'orders/a.json' , 'h1' ] ,
[ 'orders/b.json' , 'h2' ] ,
] ) ;
await expect (
runner . run ( { jobId : 'job-text-conflict' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ,
) . rejects . toThrow ( /isolated diff textual conflict/ ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-text-conflict/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'patch_textual_conflict' ) ;
expect ( trace ) . toContain ( 'textual_conflict_resolver_failed' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'makes deterministic projection visible to child worktrees before WorkUnit synthesis' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'wiki-projected' , rawFiles : [ 'projected/wiki.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
adapter . project = vi . fn ( async ( { workdir } ) = > {
await mkdir ( join ( workdir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( workdir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
return {
warnings : [ ] ,
errors : [ ] ,
touchedSources : [ { connectionId : 'warehouse' , sourceName : 'mart_account_segments' } ] ,
changedWikiPageKeys : [ ] ,
} ;
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await expect ( readFile ( join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) , 'utf-8' ) ) . resolves . toContain (
'total_contract_arr' ,
) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/projected-orders.md' ) ,
'---\nsummary: Projected orders\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR `mart_account_segments.total_contract_arr`.\n' ,
) ;
currentSession . actions . push ( { target : 'wiki' , type : 'created' , key : 'projected-orders' , detail : 'Projected orders' } ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/projected-orders.md' ] , 'wu projected wiki' , 'KTX Test' , 'system@ktx.local' ) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'projected/wiki.json' , 'h1' ] ] ) ;
const result = await runner . run ( { jobId : 'job-projection' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ;
expect ( result . failedWorkUnits ) . toEqual ( [ ] ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-projection/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'deterministic_projection_finished' ) ;
expect ( trace ) . toContain ( 'deterministic_projection_committed' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects Notion-style changed wiki pages with invalid sl_refs' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'notion-page' , rawFiles : [ 'pages/notion.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName === 'ingest-isolated-diff-gate-repair' ) {
return { stopReason : 'natural' as const } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile ( join ( root , 'wiki/global/notion-page.md' ) , '---\nsummary: Notion page\nusage_mode: auto\nsl_refs:\n - missing_source\n---\n\nBody\n' ) ;
currentSession . actions . push ( { target : 'wiki' , type : 'created' , key : 'notion-page' , detail : 'Notion page' } ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/notion-page.md' ] , 'wu notion' , 'KTX Test' , 'system@ktx.local' ) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'pages/notion.json' , 'h1' ] ] ) ;
await expect (
runner . run ( { jobId : 'job-invalid-slrefs' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ,
) . rejects . toThrow ( /gate repair completed without editing an allowed path/ ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'runs final artifact gates after reconciliation mutates the integration tree' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'card-source' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
if ( params . telemetryTags . operationName === 'ingest-bundle-wu' ) {
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'mart_account_segments' ,
detail : 'Source with renamed ARR measure' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
'wu source' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
} else {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nReconcile wrote stale ARR `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'account-segments' ,
detail : 'Stale reconcile wiki page' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/account-segments.md' ] , 'reconcile wiki' , 'KTX Test' , 'system@ktx.local' ) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-reconcile-stale' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /total_contract_arr_cents/ ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-reconcile-stale/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'reconciliation_finished' ) ;
expect ( trace ) . toContain ( 'final_artifact_gates_failed' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
expect ( await runtime . git . revParseHead ( ) ) . not . toContain ( 'reconcile wiki' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'stores a failure report and postmortem trace for final gate failures' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
const createdReports : any [ ] = [ ] ;
deps . reports . create = vi . fn ( async ( args : any ) = > {
createdReports . push ( args ) ;
return { id : ` report- ${ createdReports . length } ` } ;
} ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{ unitKey : 'card-wiki' , rawFiles : [ 'cards/wiki.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'card-source' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
if ( params . telemetryTags . unitKey === 'card-wiki' ) {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'account-segments' ,
detail : 'Account segments' ,
rawPaths : [ 'cards/wiki.json' ] ,
} ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/account-segments.md' ] , 'wu wiki' , 'KTX Test' , 'system@ktx.local' ) ;
}
if ( params . telemetryTags . unitKey === 'card-source' ) {
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'mart_account_segments' ,
detail : 'Dollar measure' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
'wu source' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'cards/wiki.json' , 'h1' ] ,
[ 'cards/source.json' , 'h2' ] ,
] ) ;
await expect (
runner . run ( {
jobId : 'job-trace-failure' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /total_contract_arr_cents/ ) ;
const failureReport = createdReports . find ( ( report ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . tracePath ) . toContain ( 'job-trace-failure/trace.jsonl' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( { phase : 'final_gates' } ) ;
const events = ( await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-trace-failure/trace.jsonl' ) , 'utf-8' ) )
. trim ( )
. split ( '\n' )
. map ( ( line ) = > JSON . parse ( line ) ) ;
expect ( events . map ( ( event ) = > event . event ) ) . toEqual (
expect . arrayContaining ( [
'ingest_started' ,
'input_snapshot' ,
'work_units_planned' ,
'isolated_diff_enabled' ,
'work_unit_child_created' ,
'work_unit_patch_collected' ,
'patch_apply_started' ,
'patch_accepted' ,
'reconciliation_finished' ,
'final_artifact_gates_failed' ,
'ingest_failed' ,
'failure_report_created' ,
] ) ,
) ;
const failed = events . find ( ( event ) = > event . event === 'ingest_failed' ) ;
expect ( failed ) . toMatchObject ( {
runId : 'run-1' ,
syncId : expect.any ( String ) ,
data : { phase : 'final_gates' , tracePath : expect.stringContaining ( 'trace.jsonl' ) } ,
error : { message : expect.stringContaining ( 'total_contract_arr_cents' ) } ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects invalid provenance raw paths before squash reaches main' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
const createdReports : any [ ] = [ ] ;
deps . reports . create = vi . fn ( async ( args : any ) = > {
createdReports . push ( args ) ;
return { id : ` report- ${ createdReports . length } ` } ;
} ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{
unitKey : 'card-valid-artifacts' ,
rawFiles : [ 'cards/source.json' ] ,
peerFileIndex : [ ] ,
dependencyPaths : [ ] ,
} ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
await writeFile (
join ( root , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr`.\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'mart_account_segments' ,
detail : 'Valid source' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'account-segments' ,
detail : 'Valid wiki with invalid provenance raw path' ,
rawPaths : [ 'cards/missing.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' , 'wiki/global/account-segments.md' ] ,
'valid artifacts with invalid provenance' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
await expect (
runner . run ( {
jobId : 'job-invalid-provenance' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /provenance row references raw path outside this snapshot: cards\/missing\.json/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
expect ( deps . provenance . insertMany ) . not . toHaveBeenCalled ( ) ;
const failureReport = createdReports . find ( ( report ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . tracePath ) . toContain ( 'job-invalid-provenance/trace.jsonl' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'provenance_validation' ,
message : expect.stringContaining ( 'cards/missing.json' ) ,
} ) ;
expect ( failureReport . body . failure . details ) . toMatchObject ( {
invalidRawPaths : [ 'cards/missing.json' ] ,
currentRawPaths : [ 'cards/source.json' ] ,
invalidRows : expect.arrayContaining ( [
expect . objectContaining ( {
row : expect.objectContaining ( {
rawPath : 'cards/missing.json' ,
artifactKind : 'wiki' ,
artifactKey : 'account-segments' ,
actionType : 'wiki_written' ,
} ) ,
origin : expect.objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'card-valid-artifacts' ,
actionIndex : 1 ,
unitRawFiles : [ 'cards/source.json' ] ,
action : expect.objectContaining ( {
target : 'wiki' ,
type : 'created' ,
key : 'account-segments' ,
rawPaths : [ 'cards/missing.json' ] ,
} ) ,
} ) ,
} ) ,
] ) ,
} ) ;
expect ( failureReport . body . provenanceRows ) . toEqual (
expect . arrayContaining ( [
expect . objectContaining ( { rawPath : 'cards/source.json' , artifactKind : 'sl' , artifactKey : 'mart_account_segments' } ) ,
expect . objectContaining ( { rawPath : 'cards/missing.json' , artifactKind : 'wiki' , artifactKey : 'account-segments' } ) ,
] ) ,
) ;
expect ( failureReport . body . workUnits ) . toEqual (
expect . arrayContaining ( [
expect . objectContaining ( {
unitKey : 'card-valid-artifacts' ,
rawFiles : [ 'cards/source.json' ] ,
actions : expect.arrayContaining ( [
expect . objectContaining ( {
target : 'wiki' ,
key : 'account-segments' ,
rawPaths : [ 'cards/missing.json' ] ,
} ) ,
] ) ,
} ) ,
] ) ,
) ;
const events = ( await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-invalid-provenance/trace.jsonl' ) , 'utf-8' ) )
. trim ( )
. split ( '\n' )
. map ( ( line ) = > JSON . parse ( line ) ) ;
expect ( events . map ( ( event ) = > event . event ) ) . toEqual (
expect . arrayContaining ( [
'final_artifact_gates_finished' ,
'provenance_rows_validation_failed' ,
'ingest_failed' ,
'failure_report_created' ,
] ) ,
) ;
expect ( events . map ( ( event ) = > event . event ) ) . not . toContain ( 'squash_finished' ) ;
const validationFailure = events . find ( ( event ) = > event . event === 'provenance_rows_validation_failed' ) ;
expect ( validationFailure ) . toMatchObject ( {
phase : 'provenance' ,
data : {
invalidRawPaths : [ 'cards/missing.json' ] ,
currentRawPaths : [ 'cards/source.json' ] ,
invalidRows : expect.arrayContaining ( [
expect . objectContaining ( {
row : expect.objectContaining ( { rawPath : 'cards/missing.json' } ) ,
origin : expect.objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'card-valid-artifacts' ,
actionIndex : 1 ,
} ) ,
} ) ,
] ) ,
} ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects slDisallowed patches that touch semantic-layer files' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{
unitKey : 'lookml-mismatch' ,
rawFiles : [ 'views/orders.lkml' ] ,
peerFileIndex : [ ] ,
dependencyPaths : [ ] ,
slDisallowed : true ,
slDisallowedReason : 'lookml_connection_mismatch' ,
} ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/orders.yaml' ) ,
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n' ,
) ;
currentSession . actions . push ( { target : 'sl' , type : 'created' , key : 'orders' , detail : 'forbidden' , targetConnectionId : 'warehouse' } ) ;
await currentSession . gitService . commitFiles ( [ 'semantic-layer/warehouse/orders.yaml' ] , 'forbidden sl' , 'KTX Test' , 'system@ktx.local' ) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'views/orders.lkml' , 'h1' ] ] ) ;
await expect (
runner . run ( { jobId : 'job-sl-disallowed' , connectionId : 'warehouse' , sourceKey : 'metabase' , trigger : 'upload' , bundleRef : { kind : 'upload' , uploadId : 'upload' } } ) ,
) . rejects . toThrow ( /isolated diff textual conflict/ ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-sl-disallowed/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'patch_policy_rejected' ) ;
expect ( trace ) . toContain ( 'slDisallowed WorkUnit lookml-mismatch touched semantic-layer/warehouse/orders.yaml' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects final wiki refs broken by another accepted WorkUnit before squash' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
await mkdir ( join ( runtime . configDir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/source-page.md' ) ,
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n' ,
) ;
await runtime . git . commitFiles ( [ 'wiki/global/source-page.md' ] , 'seed source page' , 'KTX Test' , 'system@ktx.local' ) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [
{ unitKey : 'page-ref' , rawFiles : [ 'pages/ref.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
{ unitKey : 'page-delete' , rawFiles : [ 'pages/delete.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ,
] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
if ( params . telemetryTags . unitKey === 'page-ref' ) {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'account-segments' ,
detail : 'Page with wiki ref' ,
rawPaths : [ 'pages/ref.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/account-segments.md' ] ,
'wu page ref' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
if ( params . telemetryTags . unitKey === 'page-delete' ) {
await rm ( join ( root , 'wiki/global/source-page.md' ) , { force : true } ) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'removed' ,
key : 'source-page' ,
detail : 'Delete referenced page' ,
rawPaths : [ 'pages/delete.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/source-page.md' ] ,
'wu delete source page' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'pages/ref.json' , 'h1' ] ,
[ 'pages/delete.json' , 'h2' ] ,
] ) ;
await expect (
runner . run ( {
jobId : 'job-wiki-ref-conflict' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /wiki references target missing page\(s\): account-segments -> source-page/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'final_artifact_gates_failed' ) ;
expect ( trace ) . toContain ( 'account-segments -> source-page' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
expect ( trace ) . toContain ( 'failure_report_created' ) ;
expect ( trace ) . not . toContain ( 'squash_finished' ) ;
const failureReport = ( deps . reports . create as any ) . mock . calls
. map ( ( call : any [ ] ) = > call [ 0 ] )
. find ( ( report : any ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'final_gates' ,
message : expect.stringContaining ( 'account-segments -> source-page' ) ,
details : expect.objectContaining ( {
changedWikiPageKeys : expect.arrayContaining ( [ 'account-segments' ] ) ,
workUnitPatchTouchedPaths : expect.arrayContaining ( [
'wiki/global/account-segments.md' ,
'wiki/global/source-page.md' ,
] ) ,
} ) ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects unchanged inbound wiki refs broken by an isolated wiki deletion' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
await mkdir ( join ( runtime . configDir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/source-page.md' ) ,
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n' ,
) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n' ,
) ;
await runtime . git . commitFiles (
[ 'wiki/global/source-page.md' , 'wiki/global/account-segments.md' ] ,
'seed inbound wiki refs' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'delete-target-page' , rawFiles : [ 'pages/delete.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . unitKey !== 'delete-target-page' ) {
return { stopReason : 'natural' } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await rm ( join ( root , 'wiki/global/source-page.md' ) , { force : true } ) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'removed' ,
key : 'source-page' ,
detail : 'Delete referenced page' ,
rawPaths : [ 'pages/delete.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/source-page.md' ] ,
'wu delete target page' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'pages/delete.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-existing-wiki-ref-stale' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /wiki references target missing page\(s\): account-segments -> source-page/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const events = ( await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl' ) , 'utf-8' ) )
. trim ( )
. split ( '\n' )
. map ( ( line ) = > JSON . parse ( line ) ) ;
expect ( events . map ( ( event ) = > event . event ) ) . toEqual (
expect . arrayContaining ( [
'final_artifact_gates_started' ,
'final_artifact_gates_failed' ,
'ingest_failed' ,
'failure_report_created' ,
] ) ,
) ;
expect ( events . map ( ( event ) = > event . event ) ) . not . toContain ( 'squash_finished' ) ;
const gateFailure = events . find ( ( event ) = > event . event === 'final_artifact_gates_failed' ) ;
expect ( gateFailure ) . toMatchObject ( {
data : {
wikiReferenceGateScope : {
global : true ,
reasons : expect.arrayContaining ( [ 'wiki_page_removed' ] ) ,
removedWikiPageKeys : expect.arrayContaining ( [ 'source-page' ] ) ,
pageKeysValidated : expect.arrayContaining ( [ 'account-segments' ] ) ,
} ,
actionOrigins : expect.arrayContaining ( [
expect . objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'delete-target-page' ,
unitRawFiles : [ 'pages/delete.json' ] ,
action : expect.objectContaining ( {
target : 'wiki' ,
type : 'removed' ,
key : 'source-page' ,
rawPaths : [ 'pages/delete.json' ] ,
} ) ,
} ) ,
] ) ,
} ,
error : { message : expect.stringContaining ( 'account-segments -> source-page' ) } ,
} ) ;
const failureReport = ( deps . reports . create as any ) . mock . calls
. map ( ( call : any [ ] ) = > call [ 0 ] )
. find ( ( report : any ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'final_gates' ,
message : expect.stringContaining ( 'account-segments -> source-page' ) ,
details : expect.objectContaining ( {
wikiReferenceGateScope : expect.objectContaining ( {
global : true ,
reasons : expect.arrayContaining ( [ 'wiki_page_removed' ] ) ,
removedWikiPageKeys : expect.arrayContaining ( [ 'source-page' ] ) ,
pageKeysValidated : expect.arrayContaining ( [ 'account-segments' ] ) ,
} ) ,
changedWikiPageKeys : expect.arrayContaining ( [ 'source-page' ] ) ,
actionOrigins : expect.arrayContaining ( [
expect . objectContaining ( {
source : 'work_unit_action' ,
unitKey : 'delete-target-page' ,
action : expect.objectContaining ( {
target : 'wiki' ,
type : 'removed' ,
key : 'source-page' ,
rawPaths : [ 'pages/delete.json' ] ,
} ) ,
} ) ,
] ) ,
} ) ,
} ) ;
expect ( failureReport . body . workUnits ) . toEqual (
expect . arrayContaining ( [
expect . objectContaining ( {
unitKey : 'delete-target-page' ,
actions : expect.arrayContaining ( [
expect . objectContaining ( {
target : 'wiki' ,
type : 'removed' ,
key : 'source-page' ,
rawPaths : [ 'pages/delete.json' ] ,
} ) ,
] ) ,
} ) ,
] ) ,
) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects WorkUnit patches that touch unauthorized semantic-layer target connections' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'finance-source' , rawFiles : [ 'cards/finance.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/finance' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/finance/orders.yaml' ) ,
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'finance' , 'orders' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'orders' ,
detail : 'Unauthorized target' ,
targetConnectionId : 'finance' ,
rawPaths : [ 'cards/finance.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/finance/orders.yaml' ] ,
'wu unauthorized target' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/finance.json' , 'h1' ] ] ) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
await expect (
runner . run ( {
jobId : 'job-unauthorized-wu-target' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /isolated diff textual conflict.*semantic-layer target connection not allowed/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'patch_policy_rejected' ) ;
expect ( trace ) . toContain ( 'semantic-layer/finance/orders.yaml' ) ;
expect ( trace ) . toContain ( 'allowedTargetConnectionIds' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
expect ( trace ) . toContain ( 'failure_report_created' ) ;
expect ( trace ) . not . toContain ( 'squash_finished' ) ;
const failureReport = ( deps . reports . create as any ) . mock . calls
. map ( ( call : any [ ] ) = > call [ 0 ] )
. find ( ( report : any ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'integration' ,
message : expect.stringContaining ( 'semantic-layer target connection not allowed' ) ,
} ) ;
expect ( failureReport . body . failure . details ) . toMatchObject ( {
unitKey : 'finance-source' ,
allowedTargetConnectionIds : [ 'warehouse' ] ,
touchedPaths : [ 'semantic-layer/finance/orders.yaml' ] ,
reason : expect.stringContaining ( 'semantic-layer/finance/orders.yaml (finance)' ) ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'valid-page' , rawFiles : [ 'pages/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
if ( params . telemetryTags . operationName === 'ingest-bundle-wu' ) {
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile ( join ( root , 'wiki/global/valid-page.md' ) , '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n' ) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'valid-page' ,
detail : 'Valid page' ,
rawPaths : [ 'pages/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles ( [ 'wiki/global/valid-page.md' ] , 'wu valid page' , 'KTX Test' , 'system@ktx.local' ) ;
} else {
await mkdir ( join ( root , 'semantic-layer/finance' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'semantic-layer/finance/reconcile_orders.yaml' ) ,
'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'finance' , 'reconcile_orders' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'created' ,
key : 'reconcile_orders' ,
detail : 'Unauthorized reconcile target' ,
targetConnectionId : 'finance' ,
rawPaths : [ 'pages/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/finance/reconcile_orders.yaml' ] ,
'reconcile unauthorized target' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
}
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'pages/source.json' , 'h1' ] ] ) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
await expect (
runner . run ( {
jobId : 'job-unauthorized-reconcile-target' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /semantic-layer target connection not allowed/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'semantic_layer_target_policy_started' ) ;
expect ( trace ) . toContain ( 'semantic_layer_target_policy_failed' ) ;
expect ( trace ) . toContain ( 'allowedTargetConnectionIds' ) ;
expect ( trace ) . toContain ( 'semantic-layer/finance/reconcile_orders.yaml' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
expect ( trace ) . toContain ( 'failure_report_created' ) ;
expect ( trace ) . not . toContain ( 'squash_finished' ) ;
const failureReport = ( deps . reports . create as any ) . mock . calls
. map ( ( call : any [ ] ) = > call [ 0 ] )
. find ( ( report : any ) = > report . body . status === 'failed' ) ;
expect ( failureReport . body . failure ) . toMatchObject ( {
phase : 'target_policy' ,
message : expect.stringContaining ( 'semantic-layer target connection not allowed' ) ,
} ) ;
expect ( failureReport . body . failure . details ) . toMatchObject ( {
allowedTargetConnectionIds : [ 'warehouse' ] ,
touchedPaths : expect.arrayContaining ( [ 'semantic-layer/finance/reconcile_orders.yaml' ] ) ,
} ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'repairs additive same-source textual conflicts before final gates and squash' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps } = makeDeps ( runtime ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName === 'ingest-isolated-diff-textual-resolver' ) {
const current = await params . toolSet . read_integration_file . execute ( {
path : 'semantic-layer/warehouse/mart_account_segments.yaml' ,
} ) ;
expect ( current . markdown ) . toContain ( 'total_contract_arr_cents' ) ;
const patch = await params . toolSet . read_failed_patch . execute ( { } ) ;
expect ( patch . markdown ) . toContain ( 'account_count' ) ;
await params . toolSet . write_integration_file . execute ( {
path : 'semantic-layer/warehouse/mart_account_segments.yaml' ,
content :
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: total_contract_arr_cents\n' +
' expr: sum(contract_arr)\n' +
' - name: account_count\n' +
' expr: count_distinct(account_id)\n' ,
} ) ;
return { stopReason : 'natural' } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
if ( params . telemetryTags . unitKey === 'card-wiki' ) {
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: total_contract_arr_cents\n' +
' expr: sum(contract_arr)\n' ,
) ;
} else if ( params . telemetryTags . unitKey === 'card-source' ) {
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\n' +
'grain: [account_id]\n' +
'columns: [{name: account_id, type: string}]\n' +
'joins: []\n' +
'measures:\n' +
' - name: account_count\n' +
' expr: count_distinct(account_id)\n' ,
) ;
}
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
detail : 'Updated account segments source' ,
targetConnectionId : 'warehouse' ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
` wu ${ params . telemetryTags . unitKey } ` ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [
[ 'cards/wiki.json' , 'hash-a' ] ,
[ 'cards/source.json' , 'hash-b' ] ,
] ) ;
const result = await runner . run ( {
jobId : 'job-resolver-e2e' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'manual_resync' ,
bundleRef : { kind : 'upload' , uploadId : 'upload-1' } ,
} ) ;
expect ( result . commitSha ) . toBeTruthy ( ) ;
const source = await readFile ( join ( runtime . configDir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) , 'utf-8' ) ;
expect ( source ) . toContain ( 'total_contract_arr_cents' ) ;
expect ( source ) . toContain ( 'account_count' ) ;
expect ( deps . agentRunner . runLoop ) . toHaveBeenCalledWith (
expect . objectContaining ( {
modelRole : 'repair' ,
telemetryTags : expect.objectContaining ( {
operationName : 'ingest-isolated-diff-textual-resolver' ,
unitKey : 'card-source' ,
} ) ,
} ) ,
) ;
const successReport = ( deps . reports . create as any ) . mock . calls . at ( - 1 ) ? . [ 0 ] ? . body ;
expect ( successReport . isolatedDiff ) . toMatchObject ( {
acceptedPatches : 2 ,
textualConflicts : 1 ,
semanticConflicts : 0 ,
resolverAttempts : 1 ,
resolverRepairs : 1 ,
resolverFailures : 0 ,
} ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-resolver-e2e/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'textual_conflict_resolver_repaired' ) ;
expect ( trace ) . toContain ( 'patch_accepted_after_textual_resolution' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'repairs final wiki body refs before squash when the repair agent edits the scoped page' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
await mkdir ( join ( runtime . configDir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await mkdir ( join ( runtime . configDir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( runtime . configDir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n' ,
) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
await runtime . git . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' , 'wiki/global/account-segments.md' ] ,
'seed stale wiki body ref' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'source-only' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName === 'ingest-isolated-diff-gate-repair' ) {
const gateError = await params . toolSet . read_gate_error . execute ( { } ) ;
expect ( gateError . markdown ) . toContain ( 'total_contract_arr_cents' ) ;
const page = await params . toolSet . read_repair_file . execute ( {
path : 'wiki/global/account-segments.md' ,
} ) ;
await params . toolSet . write_repair_file . execute ( {
path : 'wiki/global/account-segments.md' ,
content : page.markdown.replace ( 'total_contract_arr_cents' , 'total_contract_arr' ) ,
} ) ;
return { stopReason : 'natural' as const } ;
}
if ( params . modelRole === 'reconcile' ) {
return { stopReason : 'natural' as const } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
detail : 'Rename ARR measure' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
'wu source rename' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' as const } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
const result = await runner . run ( {
jobId : 'job-final-gate-repair' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ;
expect ( result . commitSha ) . toBeTruthy ( ) ;
await expect ( readFile ( join ( runtime . configDir , 'wiki/global/account-segments.md' ) , 'utf-8' ) ) . resolves . toContain (
'mart_account_segments.total_contract_arr' ,
) ;
await expect ( readFile ( join ( runtime . configDir , 'wiki/global/account-segments.md' ) , 'utf-8' ) ) . resolves . not . toContain (
'total_contract_arr_cents' ,
) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] as any ;
expect ( reportCreate . body . isolatedDiff ) . toMatchObject ( {
gateRepairAttempts : 1 ,
gateRepairs : 1 ,
gateRepairFailures : 0 ,
} ) ;
const trace = await readFile ( join ( runtime . configDir , '.ktx/ingest-traces/job-final-gate-repair/trace.jsonl' ) , 'utf-8' ) ;
expect ( trace ) . toContain ( 'gate_repair_repaired' ) ;
expect ( trace ) . toContain ( 'final_artifact_gates_after_gate_repair_finished' ) ;
expect ( trace ) . toContain ( 'final_gate_repair_committed' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'fails before squash when final gate repair makes no edit' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
await mkdir ( join ( runtime . configDir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await mkdir ( join ( runtime . configDir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( runtime . configDir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n' ,
) ;
await writeFile (
join ( runtime . configDir , 'wiki/global/account-segments.md' ) ,
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n' ,
) ;
await runtime . git . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' , 'wiki/global/account-segments.md' ] ,
'seed stale wiki body ref' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
const preRunHead = await runtime . git . revParseHead ( ) ;
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'source-only' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( params : any ) = > {
if ( params . telemetryTags . operationName === 'ingest-isolated-diff-gate-repair' ) {
return { stopReason : 'natural' as const } ;
}
if ( params . modelRole === 'reconcile' ) {
return { stopReason : 'natural' as const } ;
}
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await writeFile (
join ( root , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
addTouchedSlSource ( currentSession . touchedSlSources , 'warehouse' , 'mart_account_segments' ) ;
currentSession . actions . push ( {
target : 'sl' ,
type : 'updated' ,
key : 'mart_account_segments' ,
detail : 'Rename ARR measure' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'semantic-layer/warehouse/mart_account_segments.yaml' ] ,
'wu source rename' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' as const } ;
} ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-final-gate-repair-fails' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /gate repair completed without editing an allowed path/ ) ;
expect ( await runtime . git . revParseHead ( ) ) . toBe ( preRunHead ) ;
const reportCreate = vi . mocked ( deps . reports . create ) . mock . calls . at ( - 1 ) ? . [ 0 ] as any ;
expect ( reportCreate . body . status ) . toBe ( 'failed' ) ;
expect ( reportCreate . body . isolatedDiff ) . toMatchObject ( {
gateRepairAttempts : 1 ,
gateRepairs : 0 ,
gateRepairFailures : 1 ,
} ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-final-gate-repair-fails/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'gate_repair_failed' ) ;
expect ( trace ) . not . toContain ( 'squash_finished' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
2026-05-20 14:17:10 +02:00
it ( 'runs finalization before wiki sl-ref repair and final gates' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'wiki-page' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
adapter . finalize = vi . fn ( async ( { workdir } ) = > {
await mkdir ( join ( workdir , 'semantic-layer/warehouse' ) , { recursive : true } ) ;
await mkdir ( join ( workdir , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( workdir , 'semantic-layer/warehouse/mart_account_segments.yaml' ) ,
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n' ,
) ;
await writeFile (
join ( workdir , 'wiki/global/finalized-accounts.md' ) ,
'---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n' ,
) ;
return {
warnings : [ ] ,
errors : [ ] ,
touchedSources : [ { connectionId : 'warehouse' , sourceName : 'mart_account_segments' } ] ,
changedWikiPageKeys : [ 'finalized-accounts' ] ,
actions : [
{
target : 'sl' ,
type : 'created' ,
key : 'mart_account_segments' ,
detail : 'Finalized accounts' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'cards/source.json' ] ,
} ,
{
target : 'wiki' ,
type : 'created' ,
key : 'finalized-accounts' ,
detail : 'Finalized wiki' ,
rawPaths : [ 'cards/source.json' ] ,
} ,
] ,
} ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > ( { stopReason : 'natural' as const } ) ) as never ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await runner . run ( {
jobId : 'job-finalization' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-finalization/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace . indexOf ( 'finalization_committed' ) ) . toBeLessThan ( trace . indexOf ( 'wiki_sl_refs_repaired' ) ) ;
expect ( trace . indexOf ( 'wiki_sl_refs_repaired' ) ) . toBeLessThan ( trace . indexOf ( 'final_artifact_gates' ) ) ;
await expect ( readFile ( join ( runtime . configDir , 'wiki/global/finalized-accounts.md' ) , 'utf-8' ) ) . resolves . toContain (
'sl_refs:\n - mart_account_segments' ,
) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'fails when finalization edits a path already changed earlier in the run' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( {
workUnits : [ { unitKey : 'wiki-page' , rawFiles : [ 'cards/source.json' ] , peerFileIndex : [ ] , dependencyPaths : [ ] } ] ,
} ) ;
let currentSession : any = null ;
deps . toolsetFactory . createIngestWuToolset = vi . fn ( ( toolSession : any ) = > {
currentSession = toolSession ;
return { toRuntimeTools : vi.fn ( ( ) = > ( { } ) ) } ;
} ) ;
deps . agentRunner . runLoop = vi . fn ( async ( ) = > {
const root = rootOfConfig ( currentSession . configService , runtime . configDir ) ;
await mkdir ( join ( root , 'wiki/global' ) , { recursive : true } ) ;
await writeFile (
join ( root , 'wiki/global/orders.md' ) ,
'---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n' ,
) ;
currentSession . actions . push ( {
target : 'wiki' ,
type : 'created' ,
key : 'orders' ,
detail : 'WU orders' ,
rawPaths : [ 'cards/source.json' ] ,
} ) ;
await currentSession . gitService . commitFiles (
[ 'wiki/global/orders.md' ] ,
'wu orders' ,
'KTX Test' ,
'system@ktx.local' ,
) ;
return { stopReason : 'natural' as const } ;
} ) as never ;
adapter . finalize = vi . fn ( async ( { workdir } ) = > {
await writeFile (
join ( workdir , 'wiki/global/orders.md' ) ,
'---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n' ,
) ;
return {
warnings : [ ] ,
errors : [ ] ,
touchedSources : [ ] ,
changedWikiPageKeys : [ 'orders' ] ,
actions : [ { target : 'wiki' , type : 'updated' , key : 'orders' , detail : 'Conflicting finalization' } ] ,
} ;
} ) ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-finalization-overlap' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/ ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
it ( 'rejects finalization writes to unauthorized semantic-layer targets' , async ( ) = > {
const runtime = await makeRealGitRuntime ( ) ;
try {
const { deps , adapter } = makeDeps ( runtime ) ;
adapter . chunk . mockResolvedValue ( { workUnits : [ ] } ) ;
adapter . finalize = vi . fn ( async ( { workdir } ) = > {
await mkdir ( join ( workdir , 'semantic-layer/other-warehouse' ) , { recursive : true } ) ;
await writeFile (
join ( workdir , 'semantic-layer/other-warehouse/orders.yaml' ) ,
'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n' ,
) ;
return {
warnings : [ ] ,
errors : [ ] ,
touchedSources : [ { connectionId : 'other-warehouse' , sourceName : 'orders' } ] ,
changedWikiPageKeys : [ ] ,
actions : [
{
target : 'sl' ,
type : 'created' ,
key : 'orders' ,
targetConnectionId : 'other-warehouse' ,
detail : 'Forbidden target' ,
rawPaths : [ 'cards/source.json' ] ,
} ,
] ,
} ;
} ) ;
const runner = new IngestBundleRunner ( deps ) ;
await mockStageRawFiles ( runner , runtime , [ [ 'cards/source.json' , 'h1' ] ] ) ;
await expect (
runner . run ( {
jobId : 'job-finalization-target-policy' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
trigger : 'upload' ,
bundleRef : { kind : 'upload' , uploadId : 'upload' } ,
} ) ,
) . rejects . toThrow ( /semantic-layer target connection not allowed/ ) ;
const trace = await readFile (
join ( runtime . configDir , '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl' ) ,
'utf-8' ,
) ;
expect ( trace ) . toContain ( 'finalization_committed' ) ;
expect ( trace ) . toContain ( 'semantic_layer_target_policy' ) ;
expect ( trace ) . toContain ( 'ingest_failed' ) ;
} finally {
await rm ( runtime . homeDir , { recursive : true , force : true } ) ;
}
} ) ;
2026-05-18 13:38:06 +02:00
} ) ;