2026-05-10 23:12:26 +02:00
import { mkdir , mkdtemp , rm , writeFile } from 'node:fs/promises' ;
import { tmpdir } from 'node:os' ;
import { join } from 'node:path' ;
2026-05-25 13:17:46 +02:00
import type { AgentRunnerPort } from '../../../src/context/llm/runtime-port.js' ;
import { initKtxProject , type KtxLocalProject , loadKtxProject } from '../../../src/context/project/project.js' ;
2026-05-13 13:43:23 +02:00
import { afterEach , beforeEach , describe , expect , it , vi } from 'vitest' ;
2026-05-25 13:17:46 +02:00
import { FakeSourceAdapter } from '../../../src/context/ingest/adapters/fake/fake.adapter.js' ;
import { createLocalBundleIngestRuntime } from '../../../src/context/ingest/local-bundle-runtime.js' ;
2026-05-10 23:12:26 +02:00
type RuntimeWithConnectionDeps = {
deps : {
connections : {
listEnabledConnections ( ids : string [ ] ) : Promise < Array < { id : string ; name : string ; connectionType : string } > > ;
getConnectionById ( connectionId : string ) : Promise < { id : string ; name : string ; connectionType : string } | null > ;
2026-05-13 13:43:23 +02:00
executeQuery ( connectionId : string , sql : string ) : Promise < unknown > ;
2026-05-10 23:12:26 +02:00
} ;
} ;
} ;
2026-05-18 13:38:06 +02:00
type RuntimeWithSlValidationDeps = {
deps : {
slValidator : {
validateSingleSource (
deps : unknown ,
connectionId : string ,
sourceName : string ,
) : Promise < { errors : string [ ] ; warnings : string [ ] } > ;
} ;
} ;
} ;
type RuntimeWithSettingsDeps = {
deps : {
settings : Record < string , unknown > ;
} ;
} ;
2026-05-16 12:06:34 +02:00
function testAgentRunner ( ) : AgentRunnerPort {
return { runLoop : vi.fn ( ) . mockResolvedValue ( { stopReason : 'natural' as const } ) } ;
}
2026-05-10 23:12:26 +02:00
describe ( 'createLocalBundleIngestRuntime' , ( ) = > {
let tempDir : string ;
2026-05-10 23:51:24 +02:00
let project : KtxLocalProject ;
2026-05-10 23:12:26 +02:00
beforeEach ( async ( ) = > {
2026-05-10 23:51:24 +02:00
tempDir = await mkdtemp ( join ( tmpdir ( ) , 'ktx-local-bundle-runtime-' ) ) ;
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'project' ) ;
2026-05-14 17:39:31 +02:00
await initKtxProject ( { projectDir } ) ;
2026-05-10 23:12:26 +02:00
await writeFile (
2026-05-10 23:51:24 +02:00
join ( projectDir , 'ktx.yaml' ) ,
2026-05-10 23:12:26 +02:00
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
'ingest:' ,
' adapters:' ,
' - fake' ,
' embeddings:' ,
2026-05-19 16:40:01 +02:00
' backend: none' ,
2026-05-10 23:12:26 +02:00
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
2026-05-10 23:51:24 +02:00
project = await loadKtxProject ( { projectDir } ) ;
2026-05-10 23:12:26 +02:00
} ) ;
afterEach ( async ( ) = > {
await rm ( tempDir , { recursive : true , force : true } ) ;
} ) ;
it ( 'requires an agent runner or configured local ingest LLM' , ( ) = > {
expect ( ( ) = >
createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
} ) ,
2026-05-12 10:26:07 +02:00
) . toThrow (
[
2026-05-16 12:06:34 +02:00
'ktx ingest requires llm.provider.backend: anthropic, vertex, gateway, or claude-code, or an injected agentRunner.' ,
'Configure a local Claude Code session or API-backed LLM, then rerun ingest:' ,
` ktx setup --project-dir ${ project . projectDir } --llm-backend claude-code --no-input ` ,
2026-05-19 19:23:35 +02:00
` ktx setup --project-dir ${ project . projectDir } --llm-backend anthropic --anthropic-api-key-env ANTHROPIC_API_KEY --llm-model claude-sonnet-4-6 --no-input ` ,
2026-05-12 10:26:07 +02:00
] . join ( '\n' ) ,
) ;
2026-05-10 23:12:26 +02:00
} ) ;
2026-05-16 12:06:34 +02:00
it ( 'uses a runtime-backed agent runner when claude-code is configured' , ( ) = > {
const runtime = {
generateText : vi.fn ( ) ,
generateObject : vi.fn ( ) ,
runAgentLoop : vi.fn ( async ( ) = > ( { stopReason : 'natural' as const } ) ) ,
} ;
project . config . llm = {
provider : { backend : 'claude-code' } ,
models : { default : 'sonnet' } ,
promptCaching : { enabled : false } ,
} ;
const createLlmRuntime = vi . fn ( ( ) = > runtime ) ;
const created = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
createLlmRuntime ,
} ) ;
expect ( created ) . toBeDefined ( ) ;
expect ( createLlmRuntime ) . toHaveBeenCalledWith (
project . config . llm ,
expect . objectContaining ( { projectDir : project.projectDir } ) ,
) ;
} ) ;
2026-05-21 10:38:23 +02:00
it ( 'warns when embeddings are configured but no embedding provider is supplied' , ( ) = > {
const logger = { log : vi.fn ( ) , warn : vi.fn ( ) , error : vi.fn ( ) } ;
project . config . ingest . embeddings = {
backend : 'openai' ,
model : 'text-embedding-3-small' ,
dimensions : 1536 ,
} ;
createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner : testAgentRunner ( ) ,
logger : logger as never ,
} ) ;
expect ( logger . warn ) . toHaveBeenCalledWith (
'[local-bundle-runtime] embeddings backend "openai" is configured but no embedding provider was passed; embedding-dependent stages will run against a no-op embedding port.' ,
) ;
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'builds runner deps with local SQLite stores and context tools enabled' , async ( ) = > {
2026-05-16 12:06:34 +02:00
const agentRunner = testAgentRunner ( ) ;
2026-05-10 23:12:26 +02:00
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner ,
jobIdFactory : ( ) = > 'job-1' ,
} ) ;
expect ( runtime . nextJobId ( ) ) . toBe ( 'job-1' ) ;
2026-05-10 23:51:24 +02:00
expect ( runtime . storage . resolvePullDir ( 'job-1' ) ) . toBe ( join ( project . projectDir , '.ktx/cache/local-ingest/job-1/pull' ) ) ;
2026-05-10 23:12:26 +02:00
expect ( runtime . storage . resolveUploadDir ( 'job-1' ) ) . toBe (
2026-05-10 23:51:24 +02:00
join ( project . projectDir , '.ktx/cache/local-ingest/job-1/upload' ) ,
2026-05-10 23:12:26 +02:00
) ;
expect ( runtime . storage . resolveTranscriptDir ( 'job-1' ) ) . toBe (
2026-05-10 23:51:24 +02:00
join ( project . projectDir , '.ktx/ingest-transcripts/job-1' ) ,
2026-05-10 23:12:26 +02:00
) ;
await mkdir ( runtime . storage . resolveUploadDir ( 'job-1' ) , { recursive : true } ) ;
} ) ;
it ( 'exposes canonical warehouse connection types to local ingest SL tools' , async ( ) = > {
project . config . connections . warehouse = {
driver : 'postgres' ,
url : 'postgresql://readonly@db.example.test/analytics' ,
} ;
project . config . connections . bq = {
driver : 'bigquery' ,
project_id : 'acme' ,
dataset_id : 'warehouse' ,
} ;
2026-05-16 12:06:34 +02:00
const agentRunner = testAgentRunner ( ) ;
2026-05-10 23:12:26 +02:00
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner ,
} ) ;
const connections = ( runtime . runner as unknown as RuntimeWithConnectionDeps ) . deps . connections ;
await expect ( connections . getConnectionById ( 'warehouse' ) ) . resolves . toMatchObject ( {
id : 'warehouse' ,
connectionType : 'POSTGRESQL' ,
} ) ;
await expect ( connections . listEnabledConnections ( [ 'warehouse' , 'bq' ] ) ) . resolves . toEqual ( [
{ id : 'warehouse' , name : 'warehouse' , connectionType : 'POSTGRESQL' } ,
{ id : 'bq' , name : 'bq' , connectionType : 'BIGQUERY' } ,
] ) ;
} ) ;
2026-05-18 13:38:06 +02:00
it ( 'validates manifest-backed scan sources during local ingest gates' , async ( ) = > {
await project . fileStore . writeFile (
'semantic-layer/warehouse/_schema/public.yaml' ,
[
'tables:' ,
' payments:' ,
' table: public.payments' ,
' columns:' ,
' - name: payment_id' ,
' type: string' ,
' - name: amount' ,
' type: number' ,
'' ,
] . join ( '\n' ) ,
'ktx' ,
'ktx@example.com' ,
'Add warehouse manifest' ,
) ;
const agentRunner = testAgentRunner ( ) ;
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner ,
} ) ;
const deps = ( runtime . runner as unknown as RuntimeWithSlValidationDeps ) . deps ;
await expect ( deps . slValidator . validateSingleSource ( deps , 'warehouse' , 'payments' ) ) . resolves . toEqual ( {
errors : [ ] ,
warnings : expect.any ( Array ) ,
} ) ;
} ) ;
it ( 'does not mask malformed direct overlays with manifest-backed fallback validation' , async ( ) = > {
await project . fileStore . writeFile (
'semantic-layer/warehouse/_schema/public.yaml' ,
[
'tables:' ,
' payments:' ,
' table: public.payments' ,
' columns:' ,
' - name: payment_id' ,
' type: string' ,
'' ,
] . join ( '\n' ) ,
'ktx' ,
'ktx@example.com' ,
'Add warehouse manifest' ,
) ;
await project . fileStore . writeFile (
'semantic-layer/warehouse/payments.yaml' ,
[ 'name: payments' , 'columns:' , ' - [' , '' ] . join ( '\n' ) ,
'ktx' ,
'ktx@example.com' ,
'Add malformed overlay' ,
) ;
const agentRunner = testAgentRunner ( ) ;
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner ,
} ) ;
const deps = ( runtime . runner as unknown as RuntimeWithSlValidationDeps ) . deps ;
await expect ( deps . slValidator . validateSingleSource ( deps , 'warehouse' , 'payments' ) ) . resolves . toEqual ( {
errors : [ expect . stringContaining ( 'invalid YAML' ) ] ,
warnings : [ ] ,
} ) ;
} ) ;
2026-05-13 13:43:23 +02:00
it ( 'passes project connection config to local ingest query executors' , async ( ) = > {
2026-05-16 12:06:34 +02:00
const agentRunner = testAgentRunner ( ) ;
2026-05-13 13:43:23 +02:00
const queryExecutor = {
execute : vi.fn ( async ( ) = > ( {
headers : [ 'answer' ] ,
rows : [ [ 1 ] ] ,
totalRows : 1 ,
command : 'SELECT' ,
rowCount : 1 ,
} ) ) ,
} ;
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner ,
queryExecutor ,
} ) ;
const connections = ( runtime . runner as unknown as RuntimeWithConnectionDeps ) . deps . connections ;
await expect ( connections . executeQuery ( 'warehouse' , 'select 1' ) ) . resolves . toMatchObject ( {
headers : [ 'answer' ] ,
} ) ;
expect ( queryExecutor . execute ) . toHaveBeenCalledWith ( {
connectionId : 'warehouse' ,
projectDir : project.projectDir ,
connection : project.config.connections.warehouse ,
sql : 'select 1' ,
} ) ;
} ) ;
2026-05-18 13:38:06 +02:00
it ( 'defaults local bundle ingest to isolated diffs without a shared-worktree fallback setting' , ( ) = > {
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
agentRunner : testAgentRunner ( ) ,
} ) ;
const settings = ( runtime . runner as unknown as RuntimeWithSettingsDeps ) . deps . settings ;
const fallbackSettingKey = [ 'sharedWorktree' , 'SourceKeys' ] . join ( '' ) ;
expect ( settings ) . not . toHaveProperty ( fallbackSettingKey ) ;
expect ( Object . keys ( settings ) . sort ( ) ) . toEqual ( [
'ingestTraceLevel' ,
'memoryIngestionModel' ,
'probeRowCount' ,
'workUnitFailureMode' ,
'workUnitMaxConcurrency' ,
'workUnitStepBudget' ,
] ) ;
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'accepts a debug LLM request file when constructing the default agent runner' , async ( ) = > {
await writeFile (
2026-05-10 23:51:24 +02:00
join ( project . projectDir , 'ktx.yaml' ) ,
2026-05-10 23:12:26 +02:00
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
'llm:' ,
' provider:' ,
' backend: gateway' ,
' gateway:' ,
' base_url: https://gateway.example/v1' ,
' models:' ,
' default: anthropic/claude-sonnet-4-6' ,
'ingest:' ,
' adapters:' ,
' - fake' ,
' embeddings:' ,
2026-05-19 16:40:01 +02:00
' backend: none' ,
2026-05-10 23:12:26 +02:00
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
2026-05-10 23:51:24 +02:00
project = await loadKtxProject ( { projectDir : project.projectDir } ) ;
2026-05-10 23:12:26 +02:00
const runtime = createLocalBundleIngestRuntime ( {
project ,
adapters : [ new FakeSourceAdapter ( ) ] ,
2026-05-10 23:51:24 +02:00
llmDebugRequestFile : join ( project . projectDir , '.ktx' , 'llm-debug.jsonl' ) ,
2026-05-10 23:12:26 +02:00
} ) ;
2026-05-10 23:51:24 +02:00
expect ( runtime . storage . resolvePullDir ( 'job-1' ) ) . toBe ( join ( project . projectDir , '.ktx/cache/local-ingest/job-1/pull' ) ) ;
2026-05-10 23:12:26 +02:00
} ) ;
} ) ;