2026-05-10 23:12:26 +02:00
import { access , mkdir , mkdtemp , readFile , rm , writeFile } from 'node:fs/promises' ;
import { tmpdir } from 'node:os' ;
import { join } from 'node:path' ;
test: split cli tests from source tree (#216)
* feat(cli): define full warehouse dialect contract
* test(cli): keep dialect edge tests focused
* fix(cli): stabilize dialect contract foundation
* refactor(connectors): own read-only query preparation
* refactor(connectors): resolve dialects through registry
* refactor(connectors): keep concrete dialect classes internal
* chore(workspace): enforce dialect import boundary
* refactor(cli): resolve relationship dialect at scan boundary
* refactor(cli): use dialect display parsing for entity details
* refactor(cli): use dialect display parsing for warehouse catalog
* refactor(cli): use dialect SQL in relationship workflows
* test(cli): verify solid dialect scan workflow closure
* test: split cli tests from source tree
* refactor(cli): standardize BigQuery scope listing
* feat(sqlite): implement connector scope listing
* test(connectors): cover required table listing
* feat(cli): add warehouse driver registry
* refactor(setup): route scope discovery through driver registry
* refactor(cli): route local query execution through driver registry
* refactor(historic-sql): route dialect support through driver registry
* refactor(cli): test warehouse connections through driver registry
* fix(cli): close driver registry type export gaps
* Improve setup daemon diagnostics
* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback
Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match
The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.
Align the picker boundary with the canonical 3-level KtxTableRef:
- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
(resolveEnabledTables already accepts the 3-part shape) and
schemasFromEnabledTables now goes through parseDottedTableEntry so it
recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
reuse.
Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).
* fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00
import { LocalLookerRuntimeStore } from '../src/context/ingest/adapters/looker/local-runtime-store.js' ;
import { LocalMetabaseDiscoveryCache } from '../src/context/ingest/adapters/metabase/local-source-state-store.js' ;
import type { LocalIngestResult , LocalMetabaseFanoutProgress , RunLocalIngestOptions } from '../src/context/ingest/local-ingest.js' ;
import type { SourceAdapter } from '../src/context/ingest/types.js' ;
import { initKtxProject , loadKtxProject } from '../src/context/project/project.js' ;
import { ktxLocalStateDbPath } from '../src/context/project/local-state-db.js' ;
2026-05-10 23:12:26 +02:00
import { afterEach , beforeEach , describe , expect , it , vi } from 'vitest' ;
test: split cli tests from source tree (#216)
* feat(cli): define full warehouse dialect contract
* test(cli): keep dialect edge tests focused
* fix(cli): stabilize dialect contract foundation
* refactor(connectors): own read-only query preparation
* refactor(connectors): resolve dialects through registry
* refactor(connectors): keep concrete dialect classes internal
* chore(workspace): enforce dialect import boundary
* refactor(cli): resolve relationship dialect at scan boundary
* refactor(cli): use dialect display parsing for entity details
* refactor(cli): use dialect display parsing for warehouse catalog
* refactor(cli): use dialect SQL in relationship workflows
* test(cli): verify solid dialect scan workflow closure
* test: split cli tests from source tree
* refactor(cli): standardize BigQuery scope listing
* feat(sqlite): implement connector scope listing
* test(connectors): cover required table listing
* feat(cli): add warehouse driver registry
* refactor(setup): route scope discovery through driver registry
* refactor(cli): route local query execution through driver registry
* refactor(historic-sql): route dialect support through driver registry
* refactor(cli): test warehouse connections through driver registry
* fix(cli): close driver registry type export gaps
* Improve setup daemon diagnostics
* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback
Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match
The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.
Align the picker boundary with the canonical 3-level KtxTableRef:
- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
(resolveEnabledTables already accepts the 3-part shape) and
schemasFromEnabledTables now goes through parseDottedTableEntry so it
recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
reuse.
Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).
* fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00
import { type KtxIngestArgs , type KtxIngestDeps , runKtxIngest } from '../src/ingest.js' ;
import type { KtxCliLocalIngestAdaptersOptions } from '../src/local-adapters.js' ;
2026-05-11 09:55:42 +02:00
import {
CliLookerSlWritingAgentRunner ,
CliMetabaseAgentRunner ,
CliMetabaseSourceAdapter ,
completedLocalBundleRun ,
failedLocalBundleRun ,
localFakeBundleReport ,
makeCliLookerParser ,
makeCliLookerRuntimeClient ,
makeIo ,
persistLocalBundleReport ,
runPublicMetabaseSyncModeCase ,
writeMetabaseConfig ,
writeWarehouseConfig ,
} from './ingest.test-utils.js' ;
test: split cli tests from source tree (#216)
* feat(cli): define full warehouse dialect contract
* test(cli): keep dialect edge tests focused
* fix(cli): stabilize dialect contract foundation
* refactor(connectors): own read-only query preparation
* refactor(connectors): resolve dialects through registry
* refactor(connectors): keep concrete dialect classes internal
* chore(workspace): enforce dialect import boundary
* refactor(cli): resolve relationship dialect at scan boundary
* refactor(cli): use dialect display parsing for entity details
* refactor(cli): use dialect display parsing for warehouse catalog
* refactor(cli): use dialect SQL in relationship workflows
* test(cli): verify solid dialect scan workflow closure
* test: split cli tests from source tree
* refactor(cli): standardize BigQuery scope listing
* feat(sqlite): implement connector scope listing
* test(connectors): cover required table listing
* feat(cli): add warehouse driver registry
* refactor(setup): route scope discovery through driver registry
* refactor(cli): route local query execution through driver registry
* refactor(historic-sql): route dialect support through driver registry
* refactor(cli): test warehouse connections through driver registry
* fix(cli): close driver registry type export gaps
* Improve setup daemon diagnostics
* refactor(setup): centralize rail-prefixed diagnostics + query-history fallback
Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
* fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match
The setup picker's KtxTableListEntry was a 2-level { schema, name }, so
qualifiedTableId always wrote db.name into enabled_tables. When BigQuery,
Snowflake, or SQL Server later ran fast ingest, their introspect step filtered
the scope set with scopedTableNames(scope, { catalog: projectId|database, db })
— catalog was non-null on the introspect side but null in the scope refs, so
every entry was rejected, the live-database adapter staged zero table files,
and detect() failed with 'Adapter "live-database" did not recognize fetched
source output'.
Align the picker boundary with the canonical 3-level KtxTableRef:
- Add catalog: string | null to KtxTableListEntry.
- BigQuery/Snowflake/SQL Server listTables populate catalog from the
resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null.
- qualifiedTableId emits catalog.schema.name when catalog is non-null
(resolveEnabledTables already accepts the 3-part shape) and
schemasFromEnabledTables now goes through parseDottedTableEntry so it
recovers the schema correctly from both 2-part and 3-part entries.
- Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker
reuse.
Update listTables expectations in all seven connector tests and the setup /
picker test fixtures. Add a picker regression test that covers the
catalog-bearing round-trip (save + refine).
* fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00
import { resetVizFallbackWarningsForTest } from '../src/viz-fallback.js' ;
import { runKtxSetup } from '../src/setup.js' ;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
describe ( 'runKtxIngest' , ( ) = > {
2026-05-10 23:12:26 +02:00
let tempDir : string ;
let originalTerm : string | undefined ;
2026-05-11 23:29:09 +02:00
const interactiveEnv = ( ) : NodeJS . ProcessEnv = > ( { . . . process . env , CI : 'false' } ) ;
2026-05-17 10:27:29 +02:00
const runtimeReady = ( projectDir : string ) = > ( {
status : 'ready' as const ,
projectDir ,
requirements : { features : [ 'core' as const ] , requirements : [ ] } ,
} ) ;
2026-05-10 23:12:26 +02:00
beforeEach ( async ( ) = > {
resetVizFallbackWarningsForTest ( ) ;
originalTerm = process . env . TERM ;
process . env . TERM = 'xterm-256color' ;
2026-05-10 23:51:24 +02:00
tempDir = await mkdtemp ( join ( tmpdir ( ) , 'ktx-cli-ingest-' ) ) ;
2026-05-10 23:12:26 +02:00
} ) ;
afterEach ( async ( ) = > {
if ( originalTerm === undefined ) {
delete process . env . TERM ;
} else {
process . env . TERM = originalTerm ;
}
await rm ( tempDir , { recursive : true , force : true } ) ;
} ) ;
it ( 'runs local ingest and reads status' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) : Promise < LocalIngestResult > = > {
const result = completedLocalBundleRun ( input , 'cli-local-run-1' ) ;
await persistLocalBundleReport ( projectDir , result . report ) ;
return result ;
} ) ;
const runIo = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
sourceDir ,
outputMode : 'plain' ,
} ,
runIo . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'cli-local-run-1' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Report: report-live-1' ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Run: run-live-1' ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Job: cli-local-run-1' ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Status: done' ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Diff: +2/~0/-0/=0' ) ;
expect ( runIo . stdout ( ) ) . toContain ( 'Saved memory: 1 wiki, 1 SL' ) ;
const statusIo = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest ( { command : 'status' , projectDir , runId : 'cli-local-run-1' , outputMode : 'plain' } , statusIo . io ) ,
2026-05-10 23:12:26 +02:00
) . resolves . toBe ( 0 ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Report: report-live-1' ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Run: run-live-1' ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Job: cli-local-run-1' ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Status: done' ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Diff: +2/~0/-0/=0' ) ;
expect ( statusIo . stderr ( ) ) . toBe ( '' ) ;
} ) ;
2026-05-14 01:43:06 +02:00
it ( 'labels internal database reports without adapter names in plain status output' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const report = localFakeBundleReport ( 'scan-job-1' , {
id : 'report-scan-1' ,
runId : 'run-scan-1' ,
connectionId : 'warehouse' ,
sourceKey : 'live-database' ,
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'status' ,
projectDir ,
reportFile : '/tmp/scan-report.json' ,
outputMode : 'plain' ,
} ,
io . io ,
{
readReportFile : vi.fn ( async ( ) = > report ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stdout ( ) ) . toContain ( 'Source: Database schema\n' ) ;
expect ( io . stdout ( ) ) . not . toContain ( 'Adapter:' ) ;
expect ( io . stdout ( ) ) . not . toContain ( 'live-database' ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
} ) ;
it ( 'labels internal query-history reports without adapter names in plain status output' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const report = localFakeBundleReport ( 'query-history-job-1' , {
id : 'report-query-history-1' ,
runId : 'run-query-history-1' ,
connectionId : 'warehouse' ,
sourceKey : 'historic-sql' ,
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'status' ,
projectDir ,
reportFile : '/tmp/query-history-report.json' ,
outputMode : 'plain' ,
} ,
io . io ,
{
readReportFile : vi.fn ( async ( ) = > report ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stdout ( ) ) . toContain ( 'Source: Query history\n' ) ;
expect ( io . stdout ( ) ) . not . toContain ( 'Adapter:' ) ;
expect ( io . stdout ( ) ) . not . toContain ( 'historic-sql' ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
} ) ;
2026-05-13 17:01:48 +02:00
it ( 'emits structured progress for non-TTY local ingest runs' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const progressEvents : Array < { percent : number ; message : string ; transient? : boolean } > = [ ] ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) : Promise < LocalIngestResult > = > {
input . memoryFlow ? . emit ( { type : 'source_acquired' , adapter : 'fake' , trigger : 'manual_resync' , fileCount : 2 } ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 2 , workUnitCount : 2 , evictionCount : 0 } ) ;
2026-06-08 15:30:35 +02:00
input . memoryFlow ? . emit ( { type : 'work_unit_started' , unitKey : 'orders' , skills : [ ] } ) ;
input . memoryFlow ? . emit ( { type : 'work_unit_step' , unitKey : 'orders' , toolCalls : 2 } ) ;
2026-05-13 17:01:48 +02:00
return completedLocalBundleRun ( input , 'cli-local-progress-1' ) ;
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'cli-local-progress-1' ,
progress : ( event ) = > progressEvents . push ( event ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( progressEvents ) . toEqual (
expect . arrayContaining ( [
{ percent : 5 , message : 'Fetching source files for warehouse/fake' } ,
{ percent : 15 , message : 'Fetched 2 source files from fake' } ,
2026-05-14 01:43:06 +02:00
{ percent : 45 , message : 'Planned 2 tasks' } ,
2026-05-13 17:01:48 +02:00
expect . objectContaining ( {
2026-06-08 15:30:35 +02:00
message : 'Processing tasks: 0/2 complete, 1 active; latest orders · 2 actions' ,
2026-05-13 17:01:48 +02:00
transient : true ,
} ) ,
] ) ,
) ;
expect ( io . stderr ( ) ) . not . toContain ( '[15%] Fetched 2 source files from fake' ) ;
} ) ;
it ( 'describes zero-work-unit ingest progress as finalizing instead of appearing half-planned' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const progressEvents : Array < { percent : number ; message : string ; transient? : boolean } > = [ ] ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) : Promise < LocalIngestResult > = > {
input . memoryFlow ? . emit ( { type : 'source_acquired' , adapter : 'fake' , trigger : 'manual_resync' , fileCount : 2 } ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 0 , workUnitCount : 0 , evictionCount : 0 } ) ;
return completedLocalBundleRun ( input , 'cli-local-zero-progress-1' ) ;
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'cli-local-zero-progress-1' ,
progress : ( event ) = > progressEvents . push ( event ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( progressEvents ) . toEqual (
expect . arrayContaining ( [
2026-05-14 01:43:06 +02:00
{ percent : 80 , message : 'No tasks to process; finalizing ingest' } ,
2026-05-13 17:01:48 +02:00
] ) ,
) ;
2026-05-14 01:43:06 +02:00
expect ( progressEvents ) . not . toContainEqual ( { percent : 45 , message : 'Planned 0 tasks' } ) ;
2026-05-13 17:01:48 +02:00
} ) ;
2026-05-13 12:00:08 +02:00
it ( 'prints provider setup guidance when a skip-llm setup project runs ingest' , async ( ) = > {
2026-05-12 10:26:07 +02:00
const projectDir = join ( tempDir , 'project' ) ;
const setupIo = makeIo ( ) ;
await expect (
runKtxSetup (
{
command : 'run' ,
projectDir ,
2026-05-19 19:23:35 +02:00
mode : 'auto' ,
2026-05-12 10:26:07 +02:00
agents : false ,
agentScope : 'project' ,
skipAgents : true ,
inputMode : 'disabled' ,
yes : true ,
cliVersion : '0.0.0-test' ,
skipLlm : true ,
skipEmbeddings : true ,
databaseDrivers : [ 'postgres' ] ,
databaseConnectionId : 'warehouse' ,
databaseUrl : 'env:WAREHOUSE_URL' ,
2026-06-10 14:22:22 +02:00
databaseSchemas : [ 'public' ] ,
2026-05-14 01:43:06 +02:00
enableQueryHistory : true ,
2026-05-12 10:26:07 +02:00
skipDatabases : false ,
skipSources : true ,
} ,
setupIo . io ,
{
databasesDeps : {
testConnection : async ( _projectDir , _connectionId , io ) = > {
2026-05-14 16:21:18 +02:00
io . stdout . write ( 'Driver: postgres\nStatus: ok\n' ) ;
2026-05-12 10:26:07 +02:00
return 0 ;
} ,
scanConnection : async ( ) = > 0 ,
2026-05-24 19:30:06 +02:00
historicSqlReadinessProbe : async ( ) = > ( {
ok : true ,
dialect : 'postgres' ,
runner : {
dialect : 'postgres' ,
catalogName : 'pg_stat_statements' ,
async run() {
return { warnings : [ ] , info : [ ] } ;
} ,
formatSuccessDetail() {
return {
detail : 'pg_stat_statements ready (PostgreSQL 16.4)' ,
warnings : [ ] ,
} ;
} ,
fixAdvice() {
return {
failHeadline : 'pg_stat_statements unavailable' ,
remediation : 'Fix query-history grants.' ,
} ;
} ,
} ,
result : { pgServerVersion : 'PostgreSQL 16.4' , warnings : [ ] , info : [ ] } ,
} ) ,
2026-05-12 10:26:07 +02:00
} ,
context : async ( ) = > ( { status : 'skipped' , projectDir } ) ,
2026-05-17 10:27:29 +02:00
runtime : async ( ) = > runtimeReady ( projectDir ) ,
2026-05-12 10:26:07 +02:00
} ,
) ,
) . resolves . toBe ( 0 ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const runIo = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
sourceDir ,
2026-05-14 01:43:06 +02:00
allowImplicitAdapter : true ,
2026-05-12 10:26:07 +02:00
outputMode : 'plain' ,
} ,
runIo . io ,
) ,
) . resolves . toBe ( 1 ) ;
expect ( runIo . stdout ( ) ) . toBe ( '' ) ;
expect ( runIo . stderr ( ) ) . toContain (
feat: add codex llm backend for ktx runtime work (#253)
* feat: add codex sdk runner foundation
* feat: parse codex runtime events
* feat: expose codex runtime mcp tools
* feat: add codex llm runtime
* feat: wire codex llm backend
* test: avoid Array.fromAsync in codex runner test
* docs: document codex llm backend
* fix: tighten codex runtime config ownership
* fix: use codex sdk env and thread options
* fix: parse codex sdk event shapes
* test: add codex backend live smoke
* docs: clarify codex backend isolation
* fix: drive codex loop metrics from mcp events
* fix: enforce codex local step budget
* docs: disclose codex isolation limits
* fix: count all codex agent steps and stream step callbacks live
The agent-loop step budget only counted completed mcp_tool_call items, so
built-in command_execution steps (which the public Codex SDK/CLI surface can
still expose) never decremented the budget, letting ingest/reconciliation run
past stepBudget until Codex stopped on its own. onStepFinish was also replayed
only after the whole stream drained, so live work_unit_step / reconciliation
progress appeared stuck until the Codex process exited.
collectEvents is now the single live step accumulator: it counts every
completed agent-action item via a shared isCompletedAgentStep predicate
(command_execution, mcp_tool_call, file_change, web_search), fires onStepFinish
as each step completes, and enforces the budget on that broader count. A
no-tool turn still counts as one step. toolFailures stays MCP-specific, since a
non-zero command exit is normal agent exploration, not a loop failure.
* test: align ingest llm-guard assertions with codex backend
The skip-llm ingest guard message now lists codex as a valid backend and
mentions a Claude Code/Codex session plus a codex setup hint, but this slow
suite test still asserted the pre-codex wording. Update it to match the
production message (already covered by the local-bundle-runtime unit test) and
add the codex setup-line assertion.
* fix: treat codex error:null tool calls as success
The Codex SDK serializes error: null on successful mcp_tool_call items, so
the failure check (item.error !== undefined) flagged every successful tool
call as failed with the empty-payload default "Codex turn failed". This
killed every ingest work unit under the codex backend before it could
produce a patch.
Key on status === 'failed' (authoritative, always set) and only treat a
populated error object as a failure. Add a regression test built from a
verbatim real-SDK event capture.
* fix: default codex backend to gpt-5.5 and report real probe errors
The previous default gpt-5.3-codex is an API-key-only model that the OpenAI
API rejects under ChatGPT-account (subscription) auth, so codex status/setup
failed with a misleading "authentication is not usable" message even though
auth was fine.
- Default codex model is now gpt-5.5 (works on both subscription and API-key
auth); the curated setup picker offers gpt-5.5 / gpt-5.4 / gpt-5.4-mini and
keeps free-form entry for account-specific ids (e.g. gpt-5.3-codex-spark).
- runCodexAuthProbe now distinguishes "model not available" from an auth
failure and surfaces the real API error: collectEvents retains stream
events when the SDK throws on a non-zero exit, and the API error JSON
envelope is unwrapped to its human-readable message.
- The Codex isolation warning now renders inside the clack setup frame.
- Docs updated to gpt-5.5 with a note that *-codex ids require API-key auth.
* fix: require llm.models.default in status and match codex probe remediation
Status reported a project ready when a non-none LLM backend was configured
without llm.models.default, but the runtime (resolveModelSlots) hard-requires
it, so ingest/scan/memory threw after `ktx status` said the project was usable.
buildLlmStatus now fails for any non-none backend missing models.default and no
longer invents a fallback model for claude-code/codex.
Codex probe failures now carry a category-matched fix: a model-access failure
steers the user at llm.models.default instead of the auth/install remediation.
runCodexAuthProbe returns the fix and status consumes it; the message stays
self-sufficient so setup output is unchanged.
Docs: README now lists the codex backend and local Codex auth; ktx-setup.mdx
states --llm-model only accepts codex/default or gpt-*/codex-* ids.
Repaired four doctor fixtures that configured a backend without models.default
(the now-correctly-blocked config) and added coverage for the new behavior.
2026-06-02 13:57:11 +02:00
'ktx ingest requires llm.provider.backend: anthropic, vertex, gateway, claude-code, or codex, or an injected agentRunner.' ,
2026-05-12 10:26:07 +02:00
) ;
feat: add codex llm backend for ktx runtime work (#253)
* feat: add codex sdk runner foundation
* feat: parse codex runtime events
* feat: expose codex runtime mcp tools
* feat: add codex llm runtime
* feat: wire codex llm backend
* test: avoid Array.fromAsync in codex runner test
* docs: document codex llm backend
* fix: tighten codex runtime config ownership
* fix: use codex sdk env and thread options
* fix: parse codex sdk event shapes
* test: add codex backend live smoke
* docs: clarify codex backend isolation
* fix: drive codex loop metrics from mcp events
* fix: enforce codex local step budget
* docs: disclose codex isolation limits
* fix: count all codex agent steps and stream step callbacks live
The agent-loop step budget only counted completed mcp_tool_call items, so
built-in command_execution steps (which the public Codex SDK/CLI surface can
still expose) never decremented the budget, letting ingest/reconciliation run
past stepBudget until Codex stopped on its own. onStepFinish was also replayed
only after the whole stream drained, so live work_unit_step / reconciliation
progress appeared stuck until the Codex process exited.
collectEvents is now the single live step accumulator: it counts every
completed agent-action item via a shared isCompletedAgentStep predicate
(command_execution, mcp_tool_call, file_change, web_search), fires onStepFinish
as each step completes, and enforces the budget on that broader count. A
no-tool turn still counts as one step. toolFailures stays MCP-specific, since a
non-zero command exit is normal agent exploration, not a loop failure.
* test: align ingest llm-guard assertions with codex backend
The skip-llm ingest guard message now lists codex as a valid backend and
mentions a Claude Code/Codex session plus a codex setup hint, but this slow
suite test still asserted the pre-codex wording. Update it to match the
production message (already covered by the local-bundle-runtime unit test) and
add the codex setup-line assertion.
* fix: treat codex error:null tool calls as success
The Codex SDK serializes error: null on successful mcp_tool_call items, so
the failure check (item.error !== undefined) flagged every successful tool
call as failed with the empty-payload default "Codex turn failed". This
killed every ingest work unit under the codex backend before it could
produce a patch.
Key on status === 'failed' (authoritative, always set) and only treat a
populated error object as a failure. Add a regression test built from a
verbatim real-SDK event capture.
* fix: default codex backend to gpt-5.5 and report real probe errors
The previous default gpt-5.3-codex is an API-key-only model that the OpenAI
API rejects under ChatGPT-account (subscription) auth, so codex status/setup
failed with a misleading "authentication is not usable" message even though
auth was fine.
- Default codex model is now gpt-5.5 (works on both subscription and API-key
auth); the curated setup picker offers gpt-5.5 / gpt-5.4 / gpt-5.4-mini and
keeps free-form entry for account-specific ids (e.g. gpt-5.3-codex-spark).
- runCodexAuthProbe now distinguishes "model not available" from an auth
failure and surfaces the real API error: collectEvents retains stream
events when the SDK throws on a non-zero exit, and the API error JSON
envelope is unwrapped to its human-readable message.
- The Codex isolation warning now renders inside the clack setup frame.
- Docs updated to gpt-5.5 with a note that *-codex ids require API-key auth.
* fix: require llm.models.default in status and match codex probe remediation
Status reported a project ready when a non-none LLM backend was configured
without llm.models.default, but the runtime (resolveModelSlots) hard-requires
it, so ingest/scan/memory threw after `ktx status` said the project was usable.
buildLlmStatus now fails for any non-none backend missing models.default and no
longer invents a fallback model for claude-code/codex.
Codex probe failures now carry a category-matched fix: a model-access failure
steers the user at llm.models.default instead of the auth/install remediation.
runCodexAuthProbe returns the fix and status consumes it; the message stays
self-sufficient so setup output is unchanged.
Docs: README now lists the codex backend and local Codex auth; ktx-setup.mdx
states --llm-model only accepts codex/default or gpt-*/codex-* ids.
Repaired four doctor fixtures that configured a backend without models.default
(the now-correctly-blocked config) and added coverage for the new behavior.
2026-06-02 13:57:11 +02:00
expect ( runIo . stderr ( ) ) . toContain ( 'Configure a local Claude Code/Codex session or API-backed LLM, then rerun ingest:' ) ;
2026-05-16 12:06:34 +02:00
expect ( runIo . stderr ( ) ) . toContain ( ` ktx setup --project-dir ${ projectDir } --llm-backend claude-code --no-input ` ) ;
2026-06-08 15:30:48 +02:00
expect ( runIo . stderr ( ) ) . toContain ( ` ktx setup --project-dir ${ projectDir } --llm-backend codex --no-input ` ) ;
feat: add codex llm backend for ktx runtime work (#253)
* feat: add codex sdk runner foundation
* feat: parse codex runtime events
* feat: expose codex runtime mcp tools
* feat: add codex llm runtime
* feat: wire codex llm backend
* test: avoid Array.fromAsync in codex runner test
* docs: document codex llm backend
* fix: tighten codex runtime config ownership
* fix: use codex sdk env and thread options
* fix: parse codex sdk event shapes
* test: add codex backend live smoke
* docs: clarify codex backend isolation
* fix: drive codex loop metrics from mcp events
* fix: enforce codex local step budget
* docs: disclose codex isolation limits
* fix: count all codex agent steps and stream step callbacks live
The agent-loop step budget only counted completed mcp_tool_call items, so
built-in command_execution steps (which the public Codex SDK/CLI surface can
still expose) never decremented the budget, letting ingest/reconciliation run
past stepBudget until Codex stopped on its own. onStepFinish was also replayed
only after the whole stream drained, so live work_unit_step / reconciliation
progress appeared stuck until the Codex process exited.
collectEvents is now the single live step accumulator: it counts every
completed agent-action item via a shared isCompletedAgentStep predicate
(command_execution, mcp_tool_call, file_change, web_search), fires onStepFinish
as each step completes, and enforces the budget on that broader count. A
no-tool turn still counts as one step. toolFailures stays MCP-specific, since a
non-zero command exit is normal agent exploration, not a loop failure.
* test: align ingest llm-guard assertions with codex backend
The skip-llm ingest guard message now lists codex as a valid backend and
mentions a Claude Code/Codex session plus a codex setup hint, but this slow
suite test still asserted the pre-codex wording. Update it to match the
production message (already covered by the local-bundle-runtime unit test) and
add the codex setup-line assertion.
* fix: treat codex error:null tool calls as success
The Codex SDK serializes error: null on successful mcp_tool_call items, so
the failure check (item.error !== undefined) flagged every successful tool
call as failed with the empty-payload default "Codex turn failed". This
killed every ingest work unit under the codex backend before it could
produce a patch.
Key on status === 'failed' (authoritative, always set) and only treat a
populated error object as a failure. Add a regression test built from a
verbatim real-SDK event capture.
* fix: default codex backend to gpt-5.5 and report real probe errors
The previous default gpt-5.3-codex is an API-key-only model that the OpenAI
API rejects under ChatGPT-account (subscription) auth, so codex status/setup
failed with a misleading "authentication is not usable" message even though
auth was fine.
- Default codex model is now gpt-5.5 (works on both subscription and API-key
auth); the curated setup picker offers gpt-5.5 / gpt-5.4 / gpt-5.4-mini and
keeps free-form entry for account-specific ids (e.g. gpt-5.3-codex-spark).
- runCodexAuthProbe now distinguishes "model not available" from an auth
failure and surfaces the real API error: collectEvents retains stream
events when the SDK throws on a non-zero exit, and the API error JSON
envelope is unwrapped to its human-readable message.
- The Codex isolation warning now renders inside the clack setup frame.
- Docs updated to gpt-5.5 with a note that *-codex ids require API-key auth.
* fix: require llm.models.default in status and match codex probe remediation
Status reported a project ready when a non-none LLM backend was configured
without llm.models.default, but the runtime (resolveModelSlots) hard-requires
it, so ingest/scan/memory threw after `ktx status` said the project was usable.
buildLlmStatus now fails for any non-none backend missing models.default and no
longer invents a fallback model for claude-code/codex.
Codex probe failures now carry a category-matched fix: a model-access failure
steers the user at llm.models.default instead of the auth/install remediation.
runCodexAuthProbe returns the fix and status consumes it; the message stays
self-sufficient so setup output is unchanged.
Docs: README now lists the codex backend and local Codex auth; ktx-setup.mdx
states --llm-model only accepts codex/default or gpt-*/codex-* ids.
Repaired four doctor fixtures that configured a backend without models.default
(the now-correctly-blocked config) and added coverage for the new behavior.
2026-06-02 13:57:11 +02:00
expect ( runIo . stderr ( ) ) . toContain (
2026-06-08 15:30:48 +02:00
` ktx setup --project-dir ${ projectDir } --llm-backend anthropic --anthropic-api-key-env ANTHROPIC_API_KEY --no-input ` ,
2026-05-12 10:26:07 +02:00
) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'routes metabase scheduled pulls to the fanout runner and prints child summaries' , async ( ) = > {
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const report = localFakeBundleReport ( 'metabase-child-1' , {
id : 'report-metabase-child-1' ,
runId : 'run-a' ,
jobId : 'metabase-child-1' ,
connectionId : 'warehouse_a' ,
sourceKey : 'metabase' ,
} ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( ) = > ( {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 2 , failedWorkUnits : 0 } ,
children : [
{
jobId : 'metabase-child-1' ,
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
result : {
jobId : 'metabase-child-1' ,
runId : 'run-a' ,
syncId : 'sync-a' ,
diffSummary : { added : 0 , modified : 0 , deleted : 0 , unchanged : 0 } ,
workUnitCount : 1 ,
failedWorkUnits : [ ] ,
artifactsWritten : 0 ,
commitSha : null ,
} ,
report ,
} ,
] ,
} ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-25 11:09:33 -04:00
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: all_succeeded' ) ;
2026-05-10 23:12:26 +02:00
expect ( io . stdout ( ) ) . toContain ( 'warehouse_a' ) ;
expect ( io . stdout ( ) ) . toContain ( 'metabase-child-1' ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stderr ( ) ) . toContain ( 'Metabase ingest: prod-metabase' ) ;
2026-05-10 23:12:26 +02:00
} ) ;
2026-05-30 00:42:59 +02:00
it ( 'returns a non-zero code when a Metabase fanout child fully fails' , async ( ) = > {
2026-05-10 23:13:17 -07:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const report = localFakeBundleReport ( 'metabase-child-1' , {
id : 'report-metabase-child-1' ,
runId : 'run-a' ,
jobId : 'metabase-child-1' ,
connectionId : 'warehouse_a' ,
sourceKey : 'metabase' ,
body : {
failedWorkUnits : [ 'metabase-db-1' ] ,
workUnits : [
{
unitKey : 'metabase-db-1' ,
rawFiles : [ 'cards/1.json' ] ,
status : 'failed' ,
reason : 'tool write failed' ,
actions : [ ] ,
touchedSlSources : [ ] ,
} ,
] ,
} ,
} ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( ) = > ( {
metabaseConnectionId : 'prod-metabase' ,
2026-05-30 00:42:59 +02:00
status : 'all_failed' ,
2026-05-10 23:13:17 -07:00
totals : { workUnits : 1 , failedWorkUnits : 1 } ,
children : [
{
jobId : 'metabase-child-1' ,
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
result : {
jobId : 'metabase-child-1' ,
runId : 'run-a' ,
syncId : 'sync-a' ,
diffSummary : { added : 0 , modified : 0 , deleted : 0 , unchanged : 0 } ,
workUnitCount : 1 ,
failedWorkUnits : [ 'metabase-db-1' ] ,
artifactsWritten : 0 ,
commitSha : null ,
} ,
report ,
} ,
] ,
} ) ,
} ,
) ,
) . resolves . toBe ( 1 ) ;
2026-05-30 00:42:59 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: all_failed' ) ;
2026-05-10 23:13:17 -07:00
expect ( io . stdout ( ) ) . toContain ( 'status=error' ) ;
2026-05-30 00:42:59 +02:00
} ) ;
it ( 'exits 0 and reports status=partial when a Metabase child saved memory despite a failure' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const report = localFakeBundleReport ( 'metabase-child-1' , {
id : 'report-metabase-child-1' ,
runId : 'run-a' ,
jobId : 'metabase-child-1' ,
connectionId : 'warehouse_a' ,
sourceKey : 'metabase' ,
body : {
failedWorkUnits : [ 'metabase-db-2' ] ,
workUnits : [
{
unitKey : 'metabase-db-1' ,
rawFiles : [ 'cards/1.json' ] ,
status : 'success' ,
actions : [ { target : 'sl' , type : 'updated' , key : 'warehouse.orders' , detail : 'measure' } ] ,
touchedSlSources : [ ] ,
} ,
{
unitKey : 'metabase-db-2' ,
rawFiles : [ 'cards/2.json' ] ,
status : 'failed' ,
reason : 'bad SQL' ,
actions : [ ] ,
touchedSlSources : [ ] ,
} ,
] ,
} ,
} ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( ) = > ( {
metabaseConnectionId : 'prod-metabase' ,
status : 'partial_failure' ,
totals : { workUnits : 2 , failedWorkUnits : 1 } ,
children : [
{
jobId : 'metabase-child-1' ,
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
result : {
jobId : 'metabase-child-1' ,
runId : 'run-a' ,
syncId : 'sync-a' ,
diffSummary : { added : 1 , modified : 0 , deleted : 0 , unchanged : 0 } ,
workUnitCount : 2 ,
failedWorkUnits : [ 'metabase-db-2' ] ,
artifactsWritten : 1 ,
commitSha : 'abc' ,
} ,
report ,
} ,
] ,
} ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: partial_failure' ) ;
expect ( io . stdout ( ) ) . toContain ( 'status=partial' ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stderr ( ) ) . toContain ( 'Metabase ingest: prod-metabase' ) ;
2026-05-10 23:13:17 -07:00
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'prints Metabase fanout progress before the final summary' , async ( ) = > {
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const report = localFakeBundleReport ( 'metabase-child-1' , {
id : 'report-metabase-child-1' ,
runId : 'run-a' ,
jobId : 'metabase-child-1' ,
connectionId : 'warehouse_a' ,
sourceKey : 'metabase' ,
} ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( input ) = > {
const progress = ( input as { progress? : LocalMetabaseFanoutProgress } ) . progress ;
progress ? . onMetabaseFanoutPlanned ? . ( {
metabaseConnectionId : 'prod-metabase' ,
children : [ { metabaseDatabaseId : 1 , targetConnectionId : 'warehouse_a' } ] ,
} ) ;
progress ? . onMetabaseChildStarted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
} ) ;
progress ? . onMetabaseChildCompleted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
status : 'done' ,
} ) ;
return {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 2 , failedWorkUnits : 0 } ,
children : [
{
jobId : 'metabase-child-1' ,
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
result : {
jobId : 'metabase-child-1' ,
runId : 'run-a' ,
syncId : 'sync-a' ,
diffSummary : { added : 0 , modified : 0 , deleted : 0 , unchanged : 0 } ,
workUnitCount : 1 ,
failedWorkUnits : [ ] ,
artifactsWritten : 0 ,
commitSha : null ,
} ,
report ,
} ,
] ,
} ;
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stderr ( ) ) . toContain ( 'Metabase ingest: prod-metabase' ) ;
expect ( io . stderr ( ) ) . toContain ( 'Targets: 1 mapped database' ) ;
expect ( io . stderr ( ) ) . toContain ( '- database=1 target=warehouse_a status=running job=metabase-child-1' ) ;
expect ( io . stderr ( ) ) . toContain ( '- database=1 target=warehouse_a status=done job=metabase-child-1' ) ;
2026-05-25 11:09:33 -04:00
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: all_succeeded' ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stdout ( ) ) . not . toContain ( 'status=running job=metabase-child-1' ) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'writes metabase fanout progress to stderr and final result to stdout' , async ( ) = > {
2026-05-12 11:29:34 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( input ) = > {
2026-05-12 11:36:15 +02:00
input . progress ? . onMetabaseFanoutPlanned ? . ( {
2026-05-12 11:29:34 +02:00
metabaseConnectionId : 'prod-metabase' ,
children : [ { metabaseDatabaseId : 1 , targetConnectionId : 'warehouse_a' } ] ,
} ) ;
2026-05-12 11:36:15 +02:00
input . progress ? . onMetabaseChildStarted ? . ( {
2026-05-12 11:29:34 +02:00
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
} ) ;
return {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 0 , failedWorkUnits : 0 } ,
children : [ ] ,
} ;
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stderr ( ) ) . toContain ( 'Metabase ingest: prod-metabase' ) ;
expect ( io . stderr ( ) ) . toContain ( 'status=running job=metabase-child-1' ) ;
2026-05-25 11:09:33 -04:00
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: all_succeeded' ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stdout ( ) ) . not . toContain ( 'status=running job=metabase-child-1' ) ;
2026-05-10 23:12:26 +02:00
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'emits structured progress for Metabase fanout without writing progress to JSON output' , async ( ) = > {
2026-05-13 17:01:48 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const progressEvents : Array < { percent : number ; message : string } > = [ ] ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'json' ,
} ,
io . io ,
{
progress : ( event ) = > progressEvents . push ( event ) ,
runLocalMetabaseIngest : async ( input ) = > {
input . progress ? . onMetabaseFanoutPlanned ? . ( {
metabaseConnectionId : 'prod-metabase' ,
children : [ { metabaseDatabaseId : 1 , targetConnectionId : 'warehouse_a' } ] ,
} ) ;
input . progress ? . onMetabaseChildStarted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
} ) ;
input . progress ? . onMetabaseChildCompleted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
status : 'done' ,
} ) ;
return {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 0 , failedWorkUnits : 0 } ,
children : [ ] ,
} ;
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( progressEvents ) . toEqual (
expect . arrayContaining ( [
{ percent : 5 , message : 'Checking Metabase mappings for prod-metabase' } ,
{ percent : 10 , message : 'Metabase prod-metabase: 1 mapped database' } ,
{ percent : 25 , message : 'Metabase database 1 -> warehouse_a running' } ,
{ percent : 90 , message : 'Metabase database 1 -> warehouse_a done' } ,
] ) ,
) ;
expect ( io . stdout ( ) ) . toContain ( '"status": "all_succeeded"' ) ;
expect ( io . stderr ( ) ) . not . toContain ( 'Metabase ingest: prod-metabase' ) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'emits structured child ingest progress during Metabase fanout' , async ( ) = > {
2026-05-18 13:38:06 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const progressEvents : Array < { percent : number ; message : string ; transient? : boolean } > = [ ] ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'json' ,
} ,
io . io ,
{
progress : ( event ) = > progressEvents . push ( event ) ,
runLocalMetabaseIngest : async ( input ) = > {
input . progress ? . onMetabaseFanoutPlanned ? . ( {
metabaseConnectionId : 'prod-metabase' ,
children : [ { metabaseDatabaseId : 1 , targetConnectionId : 'warehouse_a' } ] ,
} ) ;
input . progress ? . onMetabaseChildStarted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
} ) ;
input . memoryFlow ? . update ( {
plannedWorkUnits : [
{
unitKey : 'metabase-col-6' ,
rawFiles : [ 'cards/40.json' ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ,
] ,
} ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 1 , workUnitCount : 1 , evictionCount : 0 } ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_started' ,
unitKey : 'metabase-col-6' ,
skills : [ 'sl_capture' ] ,
} ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_step' ,
unitKey : 'metabase-col-6' ,
2026-06-08 15:30:35 +02:00
toolCalls : 7 ,
2026-05-18 13:38:06 +02:00
} ) ;
input . memoryFlow ? . emit ( {
type : 'stage_progress' ,
stage : 'integration' ,
percent : 81 ,
message : 'Resolving text conflict for metabase-col-6' ,
} ) ;
input . memoryFlow ? . emit ( { type : 'work_unit_finished' , unitKey : 'metabase-col-6' , status : 'success' } ) ;
input . memoryFlow ? . update ( {
plannedWorkUnits : [
{
unitKey : 'metabase-col-7' ,
rawFiles : [ 'cards/48.json' ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ,
] ,
} ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 1 , workUnitCount : 1 , evictionCount : 0 } ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_started' ,
unitKey : 'metabase-col-7' ,
skills : [ 'sl_capture' ] ,
} ) ;
input . progress ? . onMetabaseChildCompleted ? . ( {
metabaseConnectionId : 'prod-metabase' ,
metabaseDatabaseId : 1 ,
targetConnectionId : 'warehouse_a' ,
jobId : 'metabase-child-1' ,
status : 'done' ,
} ) ;
return {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 1 , failedWorkUnits : 0 } ,
children : [ ] ,
} ;
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( progressEvents ) . toEqual (
expect . arrayContaining ( [
{ percent : 45 , message : 'Planned 1 task' } ,
{ percent : 55 , message : 'Processing 1/1 tasks: metabase-col-6' } ,
{
2026-06-08 15:30:35 +02:00
percent : 55 ,
message : 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 · 7 actions' ,
2026-05-18 13:38:06 +02:00
transient : true ,
} ,
{ percent : 81 , message : 'Resolving text conflict for metabase-col-6' } ,
{ percent : 81 , message : 'Processing 1/1 tasks: metabase-col-7' } ,
] ) ,
) ;
expect ( io . stdout ( ) ) . toContain ( '"status": "all_succeeded"' ) ;
expect ( io . stderr ( ) ) . not . toContain ( 'Metabase ingest: prod-metabase' ) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'runs Metabase scheduled ingest through the public CLI command path with real fanout' , async ( ) = > {
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'metabase-cli-project' ) ;
2026-05-11 09:55:42 +02:00
await writeWarehouseConfig ( projectDir ) ;
2026-05-10 23:12:26 +02:00
await writeFile (
2026-05-10 23:51:24 +02:00
join ( projectDir , 'ktx.yaml' ) ,
2026-05-10 23:12:26 +02:00
[
'connections:' ,
' prod-metabase:' ,
' driver: metabase' ,
' api_url: https://metabase.example.test' ,
' api_key: literal-test-key' ,
2026-05-13 13:55:21 +02:00
' mappings:' ,
' databaseMappings:' ,
' "1": warehouse_a' ,
' "2": warehouse_b' ,
' syncEnabled:' ,
' "1": true' ,
' "2": true' ,
' syncMode: ALL' ,
' defaultTagNames:' ,
' - ktx' ,
2026-05-10 23:12:26 +02:00
' warehouse_a:' ,
' driver: postgres' ,
' url: postgresql://readonly@db.example.test/warehouse_a' ,
' warehouse_b:' ,
' driver: postgres' ,
' url: postgresql://readonly@db.example.test/warehouse_b' ,
'ingest:' ,
' adapters:' ,
' - metabase' ,
' embeddings:' ,
2026-05-19 16:40:01 +02:00
' backend: none' ,
2026-05-10 23:12:26 +02:00
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
2026-05-10 23:51:24 +02:00
const project = await loadKtxProject ( { projectDir } ) ;
2026-05-13 13:55:21 +02:00
const discoveryCache = new LocalMetabaseDiscoveryCache ( { dbPath : ktxLocalStateDbPath ( project ) } ) ;
await discoveryCache . refreshDiscoveredDatabases ( {
2026-05-10 23:12:26 +02:00
connectionId : 'prod-metabase' ,
2026-05-13 13:55:21 +02:00
discovered : [
{ id : 1 , name : 'Warehouse A' , engine : 'postgres' , host : 'db.example.test' , dbName : 'warehouse_a' } ,
{ id : 2 , name : 'Warehouse B' , engine : 'postgres' , host : 'db.example.test' , dbName : 'warehouse_b' } ,
2026-05-10 23:12:26 +02:00
] ,
} ) ;
const adapter = new CliMetabaseSourceAdapter ( ) ;
const agentRunner = new CliMetabaseAgentRunner ( ) ;
const childJobIds = [ 'metabase-child-1' , 'metabase-child-2' ] ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'plain' ,
} ,
io . io ,
{
createAdapters : vi.fn ( ( ) = > [ adapter ] ) ,
jobIdFactory : ( ) = > childJobIds . shift ( ) ? ? 'metabase-child-extra' ,
localIngestOptions : {
agentRunner ,
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stderr ( ) ) . toContain ( 'Metabase ingest: prod-metabase' ) ;
expect ( io . stderr ( ) ) . toContain ( 'Targets: 2 mapped databases' ) ;
2026-05-25 11:09:33 -04:00
expect ( io . stdout ( ) ) . toContain ( 'Metabase fanout: all_succeeded' ) ;
2026-05-10 23:12:26 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Source: prod-metabase' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Children: 2' ) ;
expect ( io . stdout ( ) ) . toContain ( 'target=warehouse_a database=1 status=done job=metabase-child-1' ) ;
expect ( io . stdout ( ) ) . toContain ( 'target=warehouse_b database=2 status=done job=metabase-child-2' ) ;
expect ( adapter . fetchCalls ) . toEqual ( [
{ metabaseConnectionId : 'prod-metabase' , metabaseDatabaseId : 1 , connectionId : 'warehouse_a' } ,
{ metabaseConnectionId : 'prod-metabase' , metabaseDatabaseId : 2 , connectionId : 'warehouse_b' } ,
] ) ;
const statusIo = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{ command : 'status' , projectDir , runId : 'metabase-child-1' , outputMode : 'plain' } ,
statusIo . io ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Job: metabase-child-1' ) ;
2026-05-14 01:43:06 +02:00
expect ( statusIo . stdout ( ) ) . toContain ( 'Source: Metabase' ) ;
2026-05-10 23:12:26 +02:00
expect ( statusIo . stdout ( ) ) . toContain ( 'Connection: warehouse_a' ) ;
expect ( statusIo . stderr ( ) ) . toBe ( '' ) ;
} ) ;
it ( 'runs public Metabase CLI scheduled ingest for ALL, ONLY, and EXCEPT sync modes' , async ( ) = > {
await runPublicMetabaseSyncModeCase ( tempDir , {
name : 'all' ,
syncMode : 'ALL' ,
selections : [ ] ,
expectedWorkUnitKeys : [ 'metabase-col-12' , 'metabase-col-13' ] ,
expectedRawFiles : [
'cards/101.json' ,
'cards/102.json' ,
'cards/103.json' ,
'collections/12.json' ,
'collections/13.json' ,
] ,
} ) ;
await runPublicMetabaseSyncModeCase ( tempDir , {
name : 'only' ,
syncMode : 'ONLY' ,
selections : [ { selectionType : 'collection' , metabaseObjectId : 12 } ] ,
expectedWorkUnitKeys : [ 'metabase-col-12' ] ,
expectedRawFiles : [ 'cards/101.json' , 'cards/102.json' , 'collections/12.json' ] ,
} ) ;
await runPublicMetabaseSyncModeCase ( tempDir , {
name : 'except' ,
syncMode : 'EXCEPT' ,
selections : [ { selectionType : 'item' , metabaseObjectId : 102 } ] ,
expectedWorkUnitKeys : [ 'metabase-col-12' , 'metabase-col-13' ] ,
expectedRawFiles : [ 'cards/101.json' , 'cards/103.json' , 'collections/12.json' , 'collections/13.json' ] ,
} ) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'prints metabase fanout JSON results' , async ( ) = > {
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'json' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( ) = > ( {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 0 , failedWorkUnits : 0 } ,
children : [ ] ,
} ) ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( JSON . parse ( io . stdout ( ) ) ) . toMatchObject ( {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
children : [ ] ,
} ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
} ) ;
2026-05-12 11:26:34 +02:00
it ( 'keeps metabase JSON stdout free of operational adapter logs' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
let adapterOptions : KtxCliLocalIngestAdaptersOptions | undefined ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'prod-metabase' ,
adapter : 'metabase' ,
outputMode : 'json' ,
} ,
io . io ,
{
createAdapters : ( _project , options ) = > {
adapterOptions = options ;
options ? . logger ? . warn ( 'adapter warning' ) ;
return [ ] ;
} ,
runLocalMetabaseIngest : async ( input ) = > {
input . adapters . find ( ( adapter ) = > adapter . source === 'metabase' ) ;
return {
metabaseConnectionId : 'prod-metabase' ,
status : 'all_succeeded' ,
totals : { workUnits : 0 , failedWorkUnits : 0 } ,
children : [ ] ,
} ;
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( adapterOptions ? . logger ) . toEqual ( expect . objectContaining ( { warn : expect.any ( Function ) } ) ) ;
expect ( ( ) = > JSON . parse ( io . stdout ( ) ) ) . not . toThrow ( ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
} ) ;
2026-05-25 11:09:33 -04:00
it ( 'rejects source-dir uploads through the metabase fanout route' , async ( ) = > {
2026-05-10 23:12:26 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeMetabaseConfig ( projectDir ) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
adapter : 'metabase' ,
connectionId : 'prod-metabase' ,
sourceDir : projectDir ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalMetabaseIngest : async ( ) = > {
2026-05-25 11:09:33 -04:00
throw new Error ( 'fanout should not be called' ) ;
2026-05-10 23:12:26 +02:00
} ,
} ,
) ,
) . resolves . toBe ( 1 ) ;
2026-05-25 11:09:33 -04:00
expect ( io . stderr ( ) ) . toContain ( 'source-dir uploads are not supported for the Metabase fanout adapter' ) ;
2026-05-14 01:43:06 +02:00
expect ( io . stderr ( ) ) . not . toContain ( 'ktx ingest requires llm.provider.backend' ) ;
2026-05-10 23:12:26 +02:00
expect ( io . stdout ( ) ) . toBe ( '' ) ;
} ) ;
it ( 'prints previous run and diff summary for local ingest results' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > completedLocalBundleRun ( input , 'local-job-1' ) ) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
sourceDir ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-1' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Report: report-live-1\n' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Job: local-job-1\n' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Diff: +2/~0/-0/=0\n' ) ;
} ) ;
2026-05-11 22:52:47 +02:00
it ( 'includes historic-sql projection output in saved memory counts' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > {
const result = completedLocalBundleRun ( input , 'historic-sql-projection' ) ;
return {
. . . result ,
report : localFakeBundleReport ( 'historic-sql-projection' , {
sourceKey : 'historic-sql' ,
body : {
workUnits : [ ] ,
2026-05-20 14:17:10 +02:00
finalization : {
2026-05-11 22:52:47 +02:00
sourceKey : 'historic-sql' ,
status : 'success' ,
2026-05-20 14:17:10 +02:00
commitSha : 'finalization-sha' ,
touchedPaths : [ 'semantic-layer/warehouse/_schema/public.yaml' , 'wiki/global/historic-sql-orders.md' ] ,
declaredTouchedSources : [ { connectionId : 'warehouse' , sourceName : 'orders' } ] ,
derivedTouchedSources : [ { connectionId : 'warehouse' , sourceName : 'orders' } ] ,
declaredChangedWikiPageKeys : [ 'historic-sql-orders' ] ,
derivedChangedWikiPageKeys : [ 'historic-sql-orders' ] ,
mismatches : [ ] ,
2026-05-11 22:52:47 +02:00
result : {
tableUsageMerged : 56 ,
staleTablesMarked : 1 ,
patternPagesWritten : 30 ,
stalePatternPagesMarked : 2 ,
archivedPatternPages : 3 ,
} ,
errors : [ ] ,
warnings : [ ] ,
2026-05-20 14:17:10 +02:00
actions : [
. . . Array . from ( { length : 57 } , ( _ , index ) = > ( {
target : 'sl' as const ,
type : 'updated' as const ,
key : ` orders- ${ index } ` ,
detail : 'Merged usage' ,
targetConnectionId : 'warehouse' ,
rawPaths : [ 'tables/public/orders.json' ] ,
} ) ) ,
. . . Array . from ( { length : 35 } , ( _ , index ) = > ( {
target : 'wiki' as const ,
type : 'updated' as const ,
key : ` historic-sql-orders- ${ index } ` ,
detail : 'Projected pattern' ,
rawPaths : [ 'patterns/orders.json' ] ,
} ) ) ,
] ,
provenanceExclusions : [ ] ,
2026-05-11 22:52:47 +02:00
} ,
} ,
} ) ,
} ;
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
createAdapters : vi.fn ( ( ) = > [
{ source : 'historic-sql' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ) ,
jobIdFactory : ( ) = > 'historic-sql-projection' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
2026-05-14 01:43:06 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Source: Query history\n' ) ;
2026-05-13 15:55:00 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Saved memory: 35 wiki, 57 SL\n' ) ;
2026-05-11 22:52:47 +02:00
} ) ;
2026-05-10 23:13:17 -07:00
it ( 'returns a non-zero code when local ingest reports failed work units' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > failedLocalBundleRun ( input , 'local-job-failed' ) ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
sourceDir ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-failed' ,
} ,
) ,
) . resolves . toBe ( 1 ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Status: error\n' ) ;
} ) ;
2026-05-30 00:42:59 +02:00
it ( 'exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const partialReport = localFakeBundleReport ( 'local-job-partial' , {
connectionId : 'warehouse' ,
sourceKey : 'fake' ,
body : {
failedWorkUnits : [ 'orders-bad' ] ,
workUnits : [
{
unitKey : 'orders-ok' ,
rawFiles : [ 'orders/orders.json' ] ,
status : 'success' ,
actions : [ { target : 'wiki' , type : 'created' , key : 'wiki/orders.md' , detail : 'orders' } ] ,
touchedSlSources : [ ] ,
} ,
{
unitKey : 'orders-bad' ,
rawFiles : [ 'orders/bad.json' ] ,
status : 'failed' ,
reason : 'writer tool failed' ,
actions : [ ] ,
touchedSlSources : [ ] ,
} ,
] ,
} ,
} ) ;
const runLocal = vi . fn ( async ( _input : RunLocalIngestOptions ) = > ( {
result : {
jobId : 'local-job-partial' ,
runId : partialReport.runId ,
syncId : partialReport.body.syncId ,
diffSummary : partialReport.body.diffSummary ,
workUnitCount : partialReport.body.workUnits.length ,
failedWorkUnits : partialReport.body.failedWorkUnits ,
artifactsWritten : 1 ,
commitSha : partialReport.body.commitSha ,
} ,
report : partialReport ,
} ) ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{ command : 'run' , projectDir , connectionId : 'warehouse' , adapter : 'fake' , sourceDir , outputMode : 'plain' } ,
io . io ,
{ runLocalIngest : runLocal , jobIdFactory : ( ) = > 'local-job-partial' } ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stdout ( ) ) . toContain ( 'Status: partial\n' ) ;
} ) ;
2026-05-18 13:38:06 +02:00
it ( 'prints trace path and error status for stored failed ingest reports' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const io = makeIo ( ) ;
const report = {
id : 'report-failed' ,
runId : 'run-failed' ,
jobId : 'job-failed' ,
connectionId : 'warehouse' ,
sourceKey : 'metabase' ,
createdAt : '2026-05-17T12:00:00.000Z' ,
body : {
status : 'failed' ,
syncId : 'sync-failed' ,
diffSummary : { added : 1 , modified : 0 , deleted : 0 , unchanged : 0 } ,
commitSha : null ,
tracePath : '/project/.ktx/ingest-traces/job-failed/trace.jsonl' ,
failure : { phase : 'final_gates' , message : 'final artifact gates failed' } ,
workUnits : [ ] ,
failedWorkUnits : [ ] ,
reconciliationSkipped : true ,
conflictsResolved : [ ] ,
evictionsApplied : [ ] ,
unmappedFallbacks : [ ] ,
evictionInputs : [ ] ,
unresolvedCards : [ ] ,
supersededBy : null ,
overrideOf : null ,
provenanceRows : [ ] ,
toolTranscripts : [ ] ,
} ,
} ;
await runKtxIngest (
{
command : 'status' ,
projectDir ,
reportFile : '/project/report-failed.json' ,
runId : 'run-failed' ,
outputMode : 'plain' ,
inputMode : 'disabled' ,
} ,
io . io ,
{
readReportFile : vi.fn ( ) . mockResolvedValue ( report ) ,
} ,
) ;
expect ( io . stdout ( ) ) . toContain ( 'Trace: /project/.ktx/ingest-traces/job-failed/trace.jsonl' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Status: error' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Error: final artifact gates failed' ) ;
} ) ;
2026-05-17 10:27:29 +02:00
it ( 'prints a clear first failure reason when query-history work units fail' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const rawReason =
'{"error":"invalid_grant","error_description":"reauth related error (invalid_rapt)","error_uri":"https://support.google.com/a/answer/9368756","error_subtype":"invalid_rapt"}' ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) : Promise < LocalIngestResult > = > {
const failedWorkUnit = {
. . . localFakeBundleReport ( 'query-history-failed' ) . body . workUnits [ 0 ] ,
unitKey : 'historic-sql-table-orders' ,
rawFiles : [ 'tables/orders.json' ] ,
status : 'failed' as const ,
reason : rawReason ,
actions : [ ] ,
touchedSlSources : [ ] ,
} ;
const report = localFakeBundleReport ( 'query-history-failed' , {
id : 'report-query-history-failed' ,
runId : 'run-query-history-failed' ,
connectionId : input.connectionId ,
sourceKey : 'historic-sql' ,
body : {
workUnits : [ failedWorkUnit ] ,
failedWorkUnits : [ failedWorkUnit . unitKey ] ,
} ,
} ) ;
return {
result : {
jobId : 'query-history-failed' ,
runId : report.runId ,
syncId : report.body.syncId ,
diffSummary : report.body.diffSummary ,
workUnitCount : report.body.workUnits.length ,
failedWorkUnits : report.body.failedWorkUnits ,
artifactsWritten : report.body.provenanceRows.length ,
commitSha : report.body.commitSha ,
} ,
report ,
} ;
} ) ;
const io = makeIo ( ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'query-history-failed' ,
} ,
) ,
) . resolves . toBe ( 1 ) ;
expect ( io . stdout ( ) ) . toContain ( 'Status: error\n' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Failed tasks: 1\n' ) ;
expect ( io . stdout ( ) ) . toContain (
'Error: Query history failed for 1 task. First failure: Google Cloud authentication failed while analyzing query history: application-default credentials expired or require reauthentication (invalid_grant / invalid_rapt). Run `gcloud auth application-default login`, then retry.' ,
) ;
expect ( io . stdout ( ) ) . not . toContain ( 'error_uri' ) ;
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'passes the debug LLM request file to local ingest runs' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const runLocalIngest = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , 'job-debug' ) ,
) ;
const io = makeIo ( ) ;
2026-05-10 23:51:24 +02:00
const debugFile = join ( projectDir , '.ktx' , 'llm-debug.jsonl' ) ;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
const exitCode = await runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
outputMode : 'plain' ,
debugLlmRequestFile : debugFile ,
} ,
io . io ,
{ runLocalIngest } ,
) ;
expect ( exitCode ) . toBe ( 0 ) ;
expect ( runLocalIngest ) . toHaveBeenCalledWith ( expect . objectContaining ( { llmDebugRequestFile : debugFile } ) ) ;
} ) ;
2026-05-13 13:43:23 +02:00
it ( 'supplies a scan-connector query executor to local ingest runs' , async ( ) = > {
const io = makeIo ( ) ;
const projectDir = join ( tempDir , 'query-executor-project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const queryExecutor = {
execute : vi.fn ( async ( ) = > ( {
headers : [ ] ,
rows : [ ] ,
totalRows : 0 ,
command : 'SELECT' ,
rowCount : 0 ,
} ) ) ,
} ;
const runLocalIngest = vi . fn ( async ( input : RunLocalIngestOptions ) : Promise < LocalIngestResult > = >
completedLocalBundleRun ( input , 'query-executor-run' ) ,
) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
outputMode : 'json' ,
} ,
io . io ,
{
runLocalIngest ,
createAdapters : ( ) = > [ ] ,
createQueryExecutor : ( ) = > queryExecutor ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( runLocalIngest ) . toHaveBeenCalledWith ( expect . objectContaining ( { queryExecutor } ) ) ;
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'passes daemon database introspection URL to default local ingest adapters' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'fake' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , input . jobId ? ? 'local-job-1' ) ,
) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
sourceDir ,
databaseIntrospectionUrl : 'http://127.0.0.1:8765' ,
outputMode : 'plain' ,
2026-05-10 23:51:24 +02:00
} satisfies KtxIngestArgs ,
2026-05-10 23:12:26 +02:00
io . io ,
{
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-1' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 11:26:34 +02:00
expect ( createAdapters ) . toHaveBeenCalledWith (
expect . objectContaining ( { projectDir } ) ,
expect . objectContaining ( {
databaseIntrospectionUrl : 'http://127.0.0.1:8765' ,
logger : expect.any ( Object ) ,
} ) ,
) ;
2026-05-10 23:12:26 +02:00
expect ( runLocal ) . toHaveBeenCalledWith (
expect . objectContaining ( {
adapters : createdAdapters ,
adapter : 'fake' ,
connectionId : 'warehouse' ,
2026-05-12 11:26:34 +02:00
pullConfigOptions : expect.objectContaining ( {
2026-05-11 15:50:34 +02:00
databaseIntrospectionUrl : 'http://127.0.0.1:8765' ,
2026-05-12 11:26:34 +02:00
logger : expect.any ( Object ) ,
} ) ,
2026-05-11 15:50:34 +02:00
} ) ,
) ;
} ) ;
2026-06-11 13:49:45 +02:00
it ( 'passes ktx daemon options to adapters and pull-config options when no explicit daemon URL is set' , async ( ) = > {
2026-05-11 15:50:34 +02:00
const projectDir = join ( tempDir , 'managed-daemon-ingest-project' ) ;
2026-05-14 17:39:31 +02:00
await initKtxProject ( { projectDir } ) ;
2026-05-11 15:50:34 +02:00
await writeWarehouseConfig ( projectDir ) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'fake' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , input . jobId ? ? 'local-job-1' ) ,
) ;
const io = makeIo ( ) ;
2026-05-16 11:39:43 +02:00
const runtimeIo = makeIo ( { isTTY : true } ) ;
2026-05-11 15:50:34 +02:00
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
cliVersion : '0.2.0' ,
runtimeInstallPolicy : 'auto' ,
outputMode : 'plain' ,
} satisfies KtxIngestArgs ,
io . io ,
{
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-1' ,
2026-05-16 11:39:43 +02:00
runtimeIo : runtimeIo.io ,
} as KtxIngestDeps & {
runtimeIo : typeof runtimeIo . io ;
2026-05-11 15:50:34 +02:00
} ,
) ,
) . resolves . toBe ( 0 ) ;
const expectedManagedDaemon = {
cliVersion : '0.2.0' ,
2026-05-14 14:35:55 +02:00
projectDir ,
2026-05-11 15:50:34 +02:00
installPolicy : 'auto' ,
2026-05-16 11:39:43 +02:00
io : runtimeIo.io ,
2026-05-11 15:50:34 +02:00
} ;
2026-05-12 11:26:34 +02:00
expect ( createAdapters ) . toHaveBeenCalledWith (
expect . objectContaining ( { projectDir } ) ,
expect . objectContaining ( {
managedDaemon : expectedManagedDaemon ,
logger : expect.any ( Object ) ,
} ) ,
) ;
2026-05-11 15:50:34 +02:00
expect ( runLocal ) . toHaveBeenCalledWith (
expect . objectContaining ( {
2026-05-12 11:26:34 +02:00
pullConfigOptions : expect.objectContaining ( {
2026-05-11 15:50:34 +02:00
managedDaemon : expectedManagedDaemon ,
2026-05-12 11:26:34 +02:00
logger : expect.any ( Object ) ,
} ) ,
2026-05-10 23:12:26 +02:00
} ) ,
) ;
} ) ;
chore(workspace): gate dead-code with knip production mode (#196)
* refactor(workspace): relocate @ktx/llm source into packages/cli/src/llm
* refactor(workspace): rewrite @ktx/llm imports to relative paths
* refactor(workspace): fold internal packages into cli
* chore(workspace): gate dead-code with knip production mode
Turn on production-mode knip plus an autofix run in pre-commit and the
`pnpm dead-code` script, document the `/** @internal */` convention for
test-only exports in AGENTS.md, annotate test-only exports across the
CLI with that JSDoc, and drop dead exports/wrappers the new gate
surfaced (e.g. `cli-project.ts`, `lookerRuntimeSourceToFileAdapterSource`,
`createLocalScanEnrichmentProvidersFromConfig`,
`PGLITE_OWNER_PROCESS_BACKEND_CAPABILITIES`, stale type re-exports).
Replace the loose `ignoreIssues` allowlist in `knip.json` with explicit
production entries so cross-package barrel leaks are caught.
* refactor(cli): delete internal barrel index.ts files
The 34 `index.ts` re-export barrels inside `packages/cli/src/` were
holdovers from the pre-fold multi-workspace structure. Post-fold-in they
served no production purpose: external consumers go through the single
package main entry, and in-repo callers mostly imported through them
only because the path was short. Internally, knip flagged most barrel
re-exports as production-dead (only reached via tests).
This change:
- Deletes every internal barrel except `packages/cli/src/index.ts`
(the published package entry).
- Rewrites ~270 source/test files to import each name directly from
the file that defines it.
- Moves `tools/warehouse-verification/index.ts` to
`create-warehouse-verification-tools.ts` (the function it defined
locally) and updates its single consumer.
- Renames `search/backend-conformance.ts` → `.test-utils.ts` to match
the existing test-helper file convention.
- Deletes 13 dead test-only chains (dbt-descriptions/*,
live-database/extracted-schema, live-database/structural-sync,
relationship-* feedback/review chain) plus their tests and a
cascading orphan integration test.
- Updates test mocks that pointed at deleted barrel paths
(notion-client, connector barrels in scan/local-scan-connectors
tests) to mock the source files instead.
- Points the maintainer benchmark script
(`scripts/relationship-benchmark-report.mjs`) at source files
instead of `dist/context/scan/index.js`.
- Drops the barrel `!` entries from `knip.json`; adds explicit
production entries only for the benchmark code reached via dist by
the maintainer script.
Net: 413 files changed, ~1.2k insertions, ~9.4k deletions.
`pnpm run dead-code` (Biome + knip default + knip production) and
`pnpm run type-check` are clean; 2277 tests pass.
* refactor(workspace): rename @ktx/cli to @kaelio/ktx and pack it directly
Promote the CLI workspace package to the public name `@kaelio/ktx` and
drop the separate `scripts/build-public-npm-package.mjs` wrapper. The
CLI package is now publishable in place (`publishConfig.access: public`,
`provenance: true`), so artifact packing uses `pnpm pack` against
`packages/cli/` instead of assembling a parallel package tree.
Updates all workspace filter invocations, docs, tests, and release
readiness checks to reference the new package name, and folds the
tarball-name helper into `scripts/public-npm-release-metadata.mjs`.
* docs: align "agent clients" and "data agents" terminology
Replace "client agents" with "agent clients" and "database agents" with
"data agents" across AGENTS.md, README.md, the docs-site copy, and the
matching setup-agents test description, matching the canonical
vocabulary in docs/terminology.md.
Also moves packages/cli/tsconfig.json's tsBuildInfoFile from
node_modules/.cache/ to dist/.tsbuildinfo so incremental builds survive
node_modules reinstalls.
* refactor(release): single source of truth for package version
Make packages/cli/package.json the single source of truth for the
@kaelio/ktx version. publicNpmPackageVersion() now reads it directly,
so artifact filenames, release-readiness checks, and the Python wheel
version all derive from one field. The duplicate
release-policy.json.publicNpmPackageVersion is removed.
Previously the two fields could drift: tarballs were named
kaelio-ktx-0.4.1.tgz while internally containing
@kaelio/ktx@0.0.0-private.
- update-public-release-version.mjs rewrites both Python pyproject.toml
files (ktx-daemon, ktx-sl) alongside the npm package.jsons,
normalizing the version for PEP 440 (e.g. 0.1.0-rc.2 -> 0.1.0rc2).
- semantic-release-config.cjs adds the two pyproject.toml files to
@semantic-release/git assets so the release commit back to main
carries every version source in lockstep.
- The six "?? '0.0.0-private'" fallback literals across the CLI are
replaced with "?? getKtxCliPackageInfo().version", and
createDefaultKtxMcpServer makes its version arg required.
- docs/release.md describes the actual commit-back model: the dev tree
always reflects the most recent release; no sentinel pin to
maintain.
Verified: pnpm run artifacts:build now produces
kaelio-ktx-0.4.1.tgz and kaelio_ktx-0.4.1-py3-none-any.whl with
@kaelio/ktx@0.4.1 inside. Full type-check, dead-code, and
2287 vitests + 173 script tests pass.
* refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime
Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and
scan command entrypoints so tests can stub them, and teach
resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime
feature when ktx.yaml selects sentence-transformers.
* chore(cli): mark buildLocalStatsStatus and LocalStatsStatus as @internal
Both symbols are consumed only by status-project.test.ts. Annotating with
/** @internal */ keeps knip's production-mode check clean without changing
runtime behavior.
* fix(cli): use real package metadata in print-command-tree
The stubbed package name embedded a forbidden product identifier that
tripped the boundary check in CI. Read the metadata from package.json
instead — keeps the rendered tree unchanged and removes a duplicate
source of truth.
* feat(cli): show embedding coverage in `ktx status`, drop duplicate disk counts
Inline `(N embedded)` next to the Wiki scope counts and Semantic-layer
source counts, computed with `SUM(embedding_json IS NOT NULL)` over
`knowledge_pages` and `local_sl_sources`. Rename the "Knowledge" label to
"Wiki" (canonical per `docs/terminology.md`) and rename the matching
`localStats.knowledgePages` field to `localStats.wikiPages`.
Drop `wiki=N md` and `semantic-layer=N yaml` from the Disk row — those
duplicated the per-surface rows above. Disk now reports only actual byte
usage (db, cache, raw-sources). The unused `wikiGlobalMarkdownCount` /
`semanticLayerYamlCount` fields, the `isMarkdownEntry` / `isYamlEntry`
helpers, and the `filter` arg on `summarizeDir` are removed.
2026-05-21 15:28:58 +02:00
it ( 'uses runtime IO when resolving managed embedding runtime' , async ( ) = > {
const projectDir = join ( tempDir , 'managed-embedding-ingest-project' ) ;
await initKtxProject ( { projectDir } ) ;
await writeWarehouseConfig ( projectDir ) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'fake' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , input . jobId ? ? 'local-job-1' ) ,
) ;
const resolveEmbeddingProvider = vi . fn ( async ( ) = > ( { kind : 'disabled' as const } ) ) ;
const io = makeIo ( ) ;
const runtimeIo = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
cliVersion : '0.2.0' ,
runtimeInstallPolicy : 'auto' ,
outputMode : 'plain' ,
} satisfies KtxIngestArgs ,
io . io ,
{
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-1' ,
runtimeIo : runtimeIo.io ,
resolveEmbeddingProvider ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( resolveEmbeddingProvider ) . toHaveBeenCalledWith (
expect . anything ( ) ,
expect . objectContaining ( {
installPolicy : 'auto' ,
io : runtimeIo.io ,
} ) ,
) ;
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'passes the target connection id when constructing local historic-sql adapters' , async ( ) = > {
const projectDir = join ( tempDir , 'historic-sql-project' ) ;
2026-05-11 09:55:42 +02:00
await writeWarehouseConfig ( projectDir ) ;
2026-05-10 23:12:26 +02:00
await writeFile (
2026-05-10 23:51:24 +02:00
join ( projectDir , 'ktx.yaml' ) ,
2026-05-10 23:12:26 +02:00
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
' url: env:WAREHOUSE_DATABASE_URL' ,
2026-05-24 01:00:20 +02:00
' context:' ,
' queryHistory:' ,
' enabled: true' ,
' minExecutions: 2' ,
2026-05-10 23:12:26 +02:00
'ingest:' ,
' adapters:' ,
' - historic-sql' ,
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'historic-sql' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , input . jobId ? ? 'local-historic-job' ) ,
) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-historic-job' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 11:26:34 +02:00
expect ( createAdapters ) . toHaveBeenCalledWith (
expect . objectContaining ( { projectDir } ) ,
expect . objectContaining ( {
historicSqlConnectionId : 'warehouse' ,
logger : expect.any ( Object ) ,
} ) ,
) ;
2026-05-10 23:12:26 +02:00
expect ( runLocal ) . toHaveBeenCalledWith (
expect . objectContaining ( {
adapters : createdAdapters ,
adapter : 'historic-sql' ,
connectionId : 'warehouse' ,
} ) ,
) ;
} ) ;
2026-05-11 22:35:07 +02:00
it ( 'prints live progress for plain local ingest in interactive terminals' , async ( ) = > {
const projectDir = join ( tempDir , 'historic-sql-progress-project' ) ;
await mkdir ( projectDir , { recursive : true } ) ;
await writeFile (
join ( projectDir , 'ktx.yaml' ) ,
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
' url: env:WAREHOUSE_DATABASE_URL' ,
2026-05-24 01:00:20 +02:00
' context:' ,
' queryHistory:' ,
' enabled: true' ,
' minExecutions: 2' ,
2026-05-11 22:35:07 +02:00
'ingest:' ,
' adapters:' ,
' - historic-sql' ,
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'historic-sql' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > {
expect ( input . memoryFlow ) . toBeDefined ( ) ;
input . memoryFlow ? . emit ( {
type : 'source_acquired' ,
adapter : 'historic-sql' ,
trigger : 'manual_resync' ,
fileCount : 3 ,
} ) ;
input . memoryFlow ? . update ( { syncId : 'sync-progress-1' } ) ;
input . memoryFlow ? . emit ( { type : 'raw_snapshot_written' , syncId : 'sync-progress-1' , rawFileCount : 3 } ) ;
input . memoryFlow ? . emit ( { type : 'diff_computed' , added : 2 , modified : 0 , deleted : 0 , unchanged : 1 } ) ;
input . memoryFlow ? . update ( {
plannedWorkUnits : [
{
unitKey : 'historic-sql-table-public-orders' ,
rawFiles : [ 'tables/public/orders.json' ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ,
] ,
} ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 1 , workUnitCount : 1 , evictionCount : 0 } ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_started' ,
unitKey : 'historic-sql-table-public-orders' ,
skills : [ 'historic_sql_table_digest' ] ,
} ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_finished' ,
unitKey : 'historic-sql-table-public-orders' ,
status : 'success' ,
} ) ;
input . memoryFlow ? . emit ( { type : 'saved' , commitSha : null , wikiCount : 0 , slCount : 1 } ) ;
input . memoryFlow ? . emit ( { type : 'provenance_recorded' , rowCount : 3 } ) ;
input . memoryFlow ? . emit ( { type : 'report_created' , runId : 'run-live-1' , reportPath : 'report-live-1' } ) ;
input . memoryFlow ? . finish ( 'done' ) ;
return completedLocalBundleRun ( input , input . jobId ? ? 'historic-progress-job' ) ;
} ) ;
const io = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
2026-05-11 23:29:09 +02:00
env : interactiveEnv ( ) ,
2026-05-11 22:35:07 +02:00
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'historic-progress-job' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
const stdout = io . stdout ( ) ;
2026-05-12 15:15:28 +02:00
const stderr = io . stderr ( ) ;
expect ( stderr ) . toContain ( '[5%] Fetching source files for warehouse/historic-sql' ) ;
expect ( stderr ) . toContain ( '[15%] Fetched 3 source files from historic-sql' ) ;
2026-05-14 01:43:06 +02:00
expect ( stderr ) . toContain ( '[45%] Planned 1 task' ) ;
expect ( stderr ) . toContain ( '[80%] Processed 1/1 tasks' ) ;
2026-05-12 15:15:28 +02:00
expect ( stderr ) . toContain ( '[100%] Ingest completed' ) ;
2026-05-12 14:37:51 +02:00
expect ( stdout ) . toContain ( 'Report: report-live-1' ) ;
2026-05-12 15:15:28 +02:00
expect ( stdout ) . not . toContain ( '[5%]' ) ;
2026-05-12 11:29:34 +02:00
} ) ;
2026-05-12 15:15:28 +02:00
it ( 'writes plain TTY ingest progress to stderr and final report to stdout' , async ( ) = > {
2026-05-12 11:29:34 +02:00
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const sourceDir = join ( tempDir , 'source' ) ;
await mkdir ( join ( sourceDir , 'orders' ) , { recursive : true } ) ;
await writeFile ( join ( sourceDir , 'orders' , 'orders.json' ) , '{"name":"orders"}\n' , 'utf-8' ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > completedLocalBundleRun ( input , 'local-job-1' ) ) ;
const io = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
sourceDir ,
outputMode : 'plain' ,
} ,
io . io ,
{
env : interactiveEnv ( ) ,
runLocalIngest : runLocal ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 15:15:28 +02:00
expect ( io . stderr ( ) ) . toContain ( '[5%] Fetching source files for warehouse/fake' ) ;
2026-05-12 11:29:34 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Report: report-live-1' ) ;
2026-05-12 15:15:28 +02:00
expect ( io . stdout ( ) ) . not . toContain ( '[5%]' ) ;
2026-05-11 22:35:07 +02:00
} ) ;
2026-05-12 10:25:58 +02:00
it ( 'prints plain WorkUnit step progress during long-running local ingest' , async ( ) = > {
const projectDir = join ( tempDir , 'historic-sql-step-progress-project' ) ;
await mkdir ( projectDir , { recursive : true } ) ;
await writeFile (
join ( projectDir , 'ktx.yaml' ) ,
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
' url: env:WAREHOUSE_DATABASE_URL' ,
2026-05-24 01:00:20 +02:00
' context:' ,
' queryHistory:' ,
' enabled: true' ,
' minExecutions: 2' ,
2026-05-12 10:25:58 +02:00
'ingest:' ,
' adapters:' ,
' - historic-sql' ,
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'historic-sql' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > {
input . memoryFlow ? . update ( {
plannedWorkUnits : [
{
unitKey : 'historic-sql-table-public-orders' ,
rawFiles : [ 'tables/public/orders.json' ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ,
{
unitKey : 'historic-sql-table-public-customers' ,
rawFiles : [ 'tables/public/customers.json' ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ,
] ,
} ) ;
input . memoryFlow ? . emit ( { type : 'chunks_planned' , chunkCount : 2 , workUnitCount : 2 , evictionCount : 0 } ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_started' ,
unitKey : 'historic-sql-table-public-orders' ,
skills : [ 'historic_sql_table_digest' ] ,
} ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_step' ,
unitKey : 'historic-sql-table-public-orders' ,
2026-06-08 15:30:35 +02:00
toolCalls : 7 ,
2026-05-12 10:25:58 +02:00
} ) ;
input . memoryFlow ? . emit ( {
type : 'work_unit_finished' ,
unitKey : 'historic-sql-table-public-orders' ,
status : 'success' ,
} ) ;
input . memoryFlow ? . finish ( 'done' ) ;
return completedLocalBundleRun ( input , input . jobId ? ? 'historic-step-progress-job' ) ;
} ) ;
const io = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
env : interactiveEnv ( ) ,
createAdapters : vi.fn ( ( ) = > createdAdapters as never ) ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'historic-step-progress-job' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 15:15:28 +02:00
const stderr = io . stderr ( ) ;
2026-05-14 01:43:06 +02:00
expect ( stderr ) . toContain ( '[45%] Planned 2 tasks' ) ;
expect ( stderr ) . toContain ( '[55%] Processing 1/2 tasks: historic-sql-table-public-orders' ) ;
2026-05-12 15:15:28 +02:00
expect ( stderr ) . toContain (
2026-06-08 15:30:35 +02:00
'\r[55%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders · 7 actions\u001b[K' ,
2026-05-12 14:21:57 +02:00
) ;
2026-05-14 01:43:06 +02:00
expect ( stderr ) . toContain ( '[68%] Processed 1/2 tasks' ) ;
2026-05-12 10:25:58 +02:00
} ) ;
2026-05-12 14:21:57 +02:00
it ( 'renders concurrent WorkUnit step progress as transient aggregate status' , async ( ) = > {
const projectDir = join ( tempDir , 'historic-sql-concurrent-progress-project' ) ;
await mkdir ( projectDir , { recursive : true } ) ;
await writeFile (
join ( projectDir , 'ktx.yaml' ) ,
[
'connections:' ,
' warehouse:' ,
' driver: postgres' ,
' url: env:WAREHOUSE_DATABASE_URL' ,
2026-05-24 01:00:20 +02:00
' context:' ,
' queryHistory:' ,
' enabled: true' ,
' minExecutions: 2' ,
2026-05-12 14:21:57 +02:00
'ingest:' ,
' adapters:' ,
' - historic-sql' ,
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'historic-sql' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const workUnitKeys = [
'historic-sql-table-public-orders' ,
'historic-sql-table-public-customers' ,
'historic-sql-table-public-line-items' ,
'historic-sql-table-public-payments' ,
'historic-sql-table-public-products' ,
'historic-sql-table-public-suppliers' ,
] ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = > {
input . memoryFlow ? . update ( {
plannedWorkUnits : workUnitKeys.map ( ( unitKey ) = > ( {
unitKey ,
rawFiles : [ ` tables/ ${ unitKey } .json ` ] ,
peerFileCount : 0 ,
dependencyCount : 0 ,
} ) ) ,
} ) ;
input . memoryFlow ? . emit ( {
type : 'chunks_planned' ,
chunkCount : workUnitKeys.length ,
workUnitCount : workUnitKeys.length ,
evictionCount : 0 ,
} ) ;
for ( const unitKey of workUnitKeys ) {
input . memoryFlow ? . emit ( {
type : 'work_unit_started' ,
unitKey ,
skills : [ 'historic_sql_table_digest' ] ,
} ) ;
}
for ( const unitKey of workUnitKeys ) {
2026-06-08 15:30:35 +02:00
input . memoryFlow ? . emit ( { type : 'work_unit_step' , unitKey , toolCalls : 1 } ) ;
2026-05-12 14:21:57 +02:00
}
input . memoryFlow ? . finish ( 'done' ) ;
return completedLocalBundleRun ( input , input . jobId ? ? 'historic-concurrent-progress-job' ) ;
} ) ;
const io = makeIo ( { isTTY : true } ) ;
await expect (
runKtxIngest (
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'historic-sql' ,
outputMode : 'plain' ,
} ,
io . io ,
{
env : interactiveEnv ( ) ,
createAdapters : vi.fn ( ( ) = > createdAdapters as never ) ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'historic-concurrent-progress-job' ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 15:15:28 +02:00
const stderr = io . stderr ( ) ;
expect ( stderr ) . toContain (
2026-06-08 15:30:35 +02:00
'\r[55%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers · 1 action\u001b[K' ,
2026-05-12 14:21:57 +02:00
) ;
2026-05-12 15:15:28 +02:00
expect ( stderr ) . not . toContain (
2026-06-08 15:30:35 +02:00
'\n[55%] Processing 6/6 tasks: historic-sql-table-public-suppliers · 1 action\n' ,
2026-05-12 14:21:57 +02:00
) ;
2026-05-12 15:15:28 +02:00
expect ( stderr ) . toContain ( '\n[100%] Ingest completed\n' ) ;
2026-05-12 10:25:58 +02:00
} ) ;
2026-05-10 23:12:26 +02:00
it ( 'passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest' , async ( ) = > {
const projectDir = join ( tempDir , 'project' ) ;
await writeWarehouseConfig ( projectDir ) ;
const pullConfigOptions = {
looker : {
parser : { parse : vi.fn ( ) } ,
} ,
} ;
const agentRunner = { runLoop : vi.fn ( ) } as never ;
const createdAdapters : SourceAdapter [ ] = [
{ source : 'fake' , skillNames : [ ] , detect : async ( ) = > true , chunk : async ( ) = > ( { workUnits : [ ] } ) } ,
] ;
const createAdapters = vi . fn ( ( ) = > createdAdapters as never ) ;
const runLocal = vi . fn ( async ( input : RunLocalIngestOptions ) = >
completedLocalBundleRun ( input , input . jobId ? ? 'local-job-1' ) ,
) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'warehouse' ,
adapter : 'fake' ,
outputMode : 'plain' ,
2026-05-10 23:51:24 +02:00
} satisfies KtxIngestArgs ,
2026-05-10 23:12:26 +02:00
io . io ,
{
createAdapters ,
runLocalIngest : runLocal ,
jobIdFactory : ( ) = > 'local-job-1' ,
localIngestOptions : {
agentRunner ,
pullConfigOptions ,
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
2026-05-12 11:26:34 +02:00
expect ( createAdapters ) . toHaveBeenCalledWith (
expect . objectContaining ( { projectDir } ) ,
expect . objectContaining ( {
logger : expect.any ( Object ) ,
looker : {
parser : pullConfigOptions.looker.parser ,
} ,
} ) ,
) ;
2026-05-10 23:12:26 +02:00
expect ( runLocal ) . toHaveBeenCalledWith (
expect . objectContaining ( {
agentRunner ,
2026-05-12 11:26:34 +02:00
pullConfigOptions : expect.objectContaining ( pullConfigOptions ) ,
2026-05-10 23:12:26 +02:00
} ) ,
) ;
} ) ;
it ( 'runs Looker scheduled ingest through the public CLI command path' , async ( ) = > {
const projectDir = join ( tempDir , 'looker-project' ) ;
2026-05-11 09:55:42 +02:00
await writeWarehouseConfig ( projectDir ) ;
2026-05-10 23:12:26 +02:00
await writeFile (
2026-05-10 23:51:24 +02:00
join ( projectDir , 'ktx.yaml' ) ,
2026-05-10 23:12:26 +02:00
[
'connections:' ,
' prod-looker:' ,
' driver: looker' ,
' base_url: https://looker.example.test' ,
' client_id: client' ,
' prod-warehouse:' ,
' driver: postgres' ,
' url: postgresql://readonly@db.example.test/analytics' ,
'ingest:' ,
' adapters:' ,
' - looker' ,
' embeddings:' ,
2026-05-19 16:40:01 +02:00
' backend: none' ,
2026-05-10 23:12:26 +02:00
'' ,
] . join ( '\n' ) ,
'utf-8' ,
) ;
2026-05-10 23:51:24 +02:00
const project = await loadKtxProject ( { projectDir } ) ;
const store = new LocalLookerRuntimeStore ( { dbPath : ktxLocalStateDbPath ( project ) } ) ;
2026-05-10 23:12:26 +02:00
await store . setCursors ( 'prod-looker' , {
dashboardsLastSyncedAt : null ,
looksLastSyncedAt : null ,
} ) ;
await store . upsertConnectionMapping ( {
lookerConnectionId : 'prod-looker' ,
lookerConnectionName : 'analytics' ,
2026-05-10 23:51:24 +02:00
ktxConnectionId : 'prod-warehouse' ,
2026-05-10 23:12:26 +02:00
source : 'cli' ,
} ) ;
const runtimeClient = makeCliLookerRuntimeClient ( ) ;
const parser = makeCliLookerParser ( ) ;
const agentRunner = new CliLookerSlWritingAgentRunner ( ) ;
const io = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{
command : 'run' ,
projectDir ,
connectionId : 'prod-looker' ,
adapter : 'looker' ,
outputMode : 'plain' ,
} ,
io . io ,
{
jobIdFactory : ( ) = > 'cli-looker-job' ,
localIngestOptions : {
agentRunner ,
pullConfigOptions : {
looker : {
client : runtimeClient ,
runtimeClient ,
parser ,
} ,
} ,
} ,
} ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( io . stderr ( ) ) . toBe ( '' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Job: cli-looker-job' ) ;
2026-05-14 01:43:06 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Source: Looker' ) ;
2026-05-10 23:12:26 +02:00
expect ( io . stdout ( ) ) . toContain ( 'Connection: prod-looker' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Status: done' ) ;
expect ( io . stdout ( ) ) . toContain ( 'Saved memory: 0 wiki, 1 SL' ) ;
expect ( parser . parse ) . toHaveBeenCalledWith (
expect . arrayContaining ( [
expect . objectContaining ( { key : 'ecommerce.orders' , sql_table_name : 'public.orders' , dialect : 'postgres' } ) ,
expect . objectContaining ( { key : 'ecommerce.orders.users' , sql_table_name : 'public.users' , dialect : 'postgres' } ) ,
] ) ,
) ;
expect ( runtimeClient . cleanup ) . toHaveBeenCalledTimes ( 1 ) ;
const slPath = join ( projectDir , 'semantic-layer' , 'prod-warehouse' , 'looker__ecommerce__orders.yaml' ) ;
await access ( slPath ) ;
await expect ( readFile ( slPath , 'utf-8' ) ) . resolves . toContain ( 'table: public.orders' ) ;
const statusIo = makeIo ( ) ;
await expect (
2026-05-10 23:51:24 +02:00
runKtxIngest (
2026-05-10 23:12:26 +02:00
{ command : 'status' , projectDir , runId : 'cli-looker-job' , outputMode : 'plain' } ,
statusIo . io ,
) ,
) . resolves . toBe ( 0 ) ;
expect ( statusIo . stdout ( ) ) . toContain ( 'Job: cli-looker-job' ) ;
2026-05-14 01:43:06 +02:00
expect ( statusIo . stdout ( ) ) . toContain ( 'Source: Looker' ) ;
2026-05-10 23:12:26 +02:00
expect ( statusIo . stderr ( ) ) . toBe ( '' ) ;
} ) ;
} ) ;