2026-05-10 23:51:24 +02:00
import { type KtxLocalProject , type KtxProjectConnectionConfig , loadKtxProject } from '@ktx/context/project' ;
2026-05-13 17:01:48 +02:00
import type { KtxProgressPort } from '@ktx/context/scan' ;
2026-05-10 23:51:24 +02:00
import type { KtxCliIo } from './index.js' ;
2026-05-13 17:01:48 +02:00
import type { KtxIngestArgs , KtxIngestDeps , KtxIngestProgressUpdate } from './ingest.js' ;
2026-05-14 01:43:06 +02:00
import {
type KtxDatabaseContextDepth ,
databaseContextDepth ,
deepReadinessGaps ,
isDatabaseDriver ,
normalizeConnectionDriver ,
} from './ingest-depth.js' ;
2026-05-17 10:27:29 +02:00
import {
ensureManagedPythonCommandRuntime ,
type KtxManagedPythonInstallPolicy ,
type ManagedPythonCommandRuntime ,
} from './managed-python-command.js' ;
import type { KtxRuntimeFeature } from './managed-python-runtime.js' ;
2026-05-14 01:43:06 +02:00
import { publicIngestOutputLine } from './public-ingest-copy.js' ;
2026-05-17 10:27:29 +02:00
import { resolvePublicIngestRuntimeRequirements } from './runtime-requirements.js' ;
2026-05-13 17:01:48 +02:00
import type { KtxScanArgs , KtxScanDeps } from './scan.js' ;
2026-05-10 23:12:26 +02:00
import { profileMark } from './startup-profile.js' ;
profileMark ( 'module:public-ingest' ) ;
2026-05-14 01:43:06 +02:00
type KtxPublicIngestStepName = 'database-schema' | 'query-history' | 'source-ingest' | 'memory-update' ;
2026-05-13 13:33:28 +02:00
type KtxPublicIngestStepStatus = 'done' | 'skipped' | 'failed' | 'not-run' ;
type KtxPublicIngestInputMode = 'auto' | 'disabled' ;
2026-05-14 01:43:06 +02:00
type KtxPublicIngestDepth = KtxDatabaseContextDepth ;
type KtxPublicIngestQueryHistoryFlag = 'default' | 'enabled' | 'disabled' ;
type HistoricSqlDialect = 'postgres' | 'bigquery' | 'snowflake' ;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
export type KtxPublicIngestArgs =
2026-05-14 01:43:06 +02:00
{
command : 'run' ;
projectDir : string ;
targetConnectionId? : string ;
all : boolean ;
json : boolean ;
inputMode : KtxPublicIngestInputMode ;
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
scanMode? : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] ;
detectRelationships? : boolean ;
cliVersion? : string ;
runtimeInstallPolicy? : KtxManagedPythonInstallPolicy ;
} ;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
export interface KtxPublicIngestPlanTarget {
2026-05-10 23:12:26 +02:00
connectionId : string ;
driver : string ;
2026-05-14 01:43:06 +02:00
operation : 'database-ingest' | 'source-ingest' ;
2026-05-10 23:12:26 +02:00
adapter? : string ;
sourceDir? : string ;
debugCommand : string ;
2026-05-10 23:51:24 +02:00
steps : KtxPublicIngestStepName [ ] ;
2026-05-14 01:43:06 +02:00
databaseDepth? : KtxPublicIngestDepth ;
detectRelationships? : boolean ;
preflightFailure? : string ;
queryHistory ? : {
enabled : boolean ;
dialect? : HistoricSqlDialect ;
windowDays? : number ;
pullConfig? : Record < string , unknown > ;
unsupported? : boolean ;
skippedStoredByFast? : boolean ;
} ;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPublicIngestPlan {
2026-05-10 23:12:26 +02:00
projectDir : string ;
2026-05-10 23:51:24 +02:00
targets : KtxPublicIngestPlanTarget [ ] ;
2026-05-14 01:43:06 +02:00
warnings : string [ ] ;
notices? : string [ ] ;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPublicIngestTargetResult {
2026-05-10 23:12:26 +02:00
connectionId : string ;
driver : string ;
steps : Array < {
2026-05-10 23:51:24 +02:00
operation : KtxPublicIngestStepName ;
status : KtxPublicIngestStepStatus ;
2026-05-10 23:12:26 +02:00
detail? : string ;
debugCommand? : string ;
} > ;
}
2026-05-10 23:51:24 +02:00
export type KtxPublicIngestProject = Pick < KtxLocalProject , 'projectDir' | 'config' > ;
2026-05-10 23:12:26 +02:00
2026-05-14 01:43:06 +02:00
type KtxPublicIngestPhaseKey = 'database-schema' | 'query-history' | 'source-ingest' ;
2026-05-10 23:51:24 +02:00
export interface KtxPublicIngestDeps {
loadProject ? : ( options : Parameters < typeof loadKtxProject > [ 0 ] ) = > Promise < KtxPublicIngestProject > ;
2026-05-13 17:01:48 +02:00
runScan ? : ( args : KtxScanArgs , io : KtxCliIo , deps? : KtxScanDeps ) = > Promise < number > ;
runIngest ? : ( args : KtxIngestArgs , io : KtxCliIo , deps? : KtxIngestDeps ) = > Promise < number > ;
2026-05-14 01:43:06 +02:00
runContextBuild ? : (
project : KtxPublicIngestProject ,
args : KtxPublicContextBuildArgs ,
io : KtxCliIo ,
) = > Promise < { exitCode : number } > ;
2026-05-13 17:01:48 +02:00
scanProgress? : KtxProgressPort ;
ingestProgress ? : ( update : KtxIngestProgressUpdate ) = > void ;
2026-05-17 10:27:29 +02:00
ensureRuntime ? : ( options : {
cliVersion : string ;
installPolicy : KtxManagedPythonInstallPolicy ;
io : KtxCliIo ;
feature : KtxRuntimeFeature ;
} ) = > Promise < ManagedPythonCommandRuntime > ;
env? : NodeJS.ProcessEnv ;
2026-05-16 11:39:43 +02:00
runtimeIo? : KtxCliIo ;
2026-05-14 01:43:06 +02:00
onPhaseStart ? : ( phaseKey : KtxPublicIngestPhaseKey ) = > void ;
onPhaseEnd ? : ( phaseKey : KtxPublicIngestPhaseKey , status : 'done' | 'failed' | 'skipped' , summary? : string ) = > void ;
}
interface KtxPublicContextBuildArgs {
projectDir : string ;
inputMode : 'auto' | 'disabled' ;
targetConnectionId? : string ;
all? : boolean ;
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
scanMode? : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] ;
detectRelationships? : boolean ;
cliVersion? : string ;
runtimeInstallPolicy? : KtxManagedPythonInstallPolicy ;
2026-05-10 23:12:26 +02:00
}
const sourceAdapterByDriver = new Map < string , string > ( [
[ 'metabase' , 'metabase' ] ,
[ 'local_metabase' , 'metabase' ] ,
[ 'looker' , 'looker' ] ,
[ 'local_looker' , 'looker' ] ,
[ 'notion' , 'notion' ] ,
[ 'metricflow' , 'metricflow' ] ,
[ 'dbt' , 'dbt' ] ,
[ 'lookml' , 'lookml' ] ,
] ) ;
2026-05-14 01:43:06 +02:00
const queryHistoryDialectByDriver = new Map < string , HistoricSqlDialect > ( [
[ 'postgres' , 'postgres' ] ,
[ 'postgresql' , 'postgres' ] ,
[ 'bigquery' , 'bigquery' ] ,
[ 'snowflake' , 'snowflake' ] ,
2026-05-10 23:12:26 +02:00
] ) ;
2026-05-14 01:43:06 +02:00
interface KtxUnsupportedQueryHistoryWarning {
connectionId : string ;
driver : string ;
reason : 'explicit' | 'stored' ;
}
interface KtxPublicIngestWarningAccumulator {
warnings : string [ ] ;
ignoredDepthForSources : string [ ] ;
ignoredQueryHistoryForSources : string [ ] ;
unsupportedQueryHistoryForDatabases : KtxUnsupportedQueryHistoryWarning [ ] ;
}
function createWarningAccumulator ( ) : KtxPublicIngestWarningAccumulator {
return {
warnings : [ ] ,
ignoredDepthForSources : [ ] ,
ignoredQueryHistoryForSources : [ ] ,
unsupportedQueryHistoryForDatabases : [ ] ,
} ;
}
function sourceIgnoredWarning ( option : string , connectionIds : string [ ] , all : boolean ) : string | null {
if ( connectionIds . length === 0 ) {
return null ;
}
if ( all ) {
const sourceLabel =
connectionIds . length === 1 ? '1 non-database source' : ` ${ connectionIds . length } non-database sources ` ;
return ` ${ option } ignored for ${ sourceLabel } . ` ;
}
return ` ${ option } affects database ingest only; ignoring it for ${ connectionIds [ 0 ] } . ` ;
}
function unsupportedDriverList ( entries : KtxUnsupportedQueryHistoryWarning [ ] ) : string {
return [ . . . new Set ( entries . map ( ( entry ) = > entry . driver ) ) ]
. sort ( ( left , right ) = > left . localeCompare ( right ) )
. join ( ', ' ) ;
}
function unsupportedQueryHistoryWarnings (
entries : KtxUnsupportedQueryHistoryWarning [ ] ,
all : boolean ,
) : string [ ] {
if ( entries . length === 0 ) {
return [ ] ;
}
const warnings : string [ ] = [ ] ;
const explicitEntries = entries . filter ( ( entry ) = > entry . reason === 'explicit' ) ;
const storedEntries = entries . filter ( ( entry ) = > entry . reason === 'stored' ) ;
if ( explicitEntries . length === 1 || ( ! all && explicitEntries . length > 0 ) ) {
warnings . push (
. . . explicitEntries . map (
( entry ) = >
` --query-history is not supported for ${ entry . driver } ; running schema ingest for ${ entry . connectionId } . ` ,
) ,
) ;
} else if ( explicitEntries . length > 1 ) {
warnings . push (
` --query-history is not supported for ${ explicitEntries . length } database connections ( ${ unsupportedDriverList (
explicitEntries ,
) } ) ; running schema ingest for those connections . ` ,
) ;
}
if ( storedEntries . length === 1 || ( ! all && storedEntries . length > 0 ) ) {
warnings . push (
. . . storedEntries . map (
( entry ) = >
` ${ entry . connectionId } has query history enabled in ktx.yaml, but ${ entry . driver } does not support it; running schema ingest. ` ,
) ,
) ;
} else if ( storedEntries . length > 1 ) {
warnings . push (
` ${ storedEntries . length } database connections have query history enabled in ktx.yaml, but their drivers do not support it; running schema ingest for those connections. ` ,
) ;
}
return warnings ;
}
function finalizeWarnings (
accumulator : KtxPublicIngestWarningAccumulator ,
args : {
all : boolean ;
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
} ,
) : string [ ] {
const warnings = [
. . . accumulator . warnings ,
. . . unsupportedQueryHistoryWarnings ( accumulator . unsupportedQueryHistoryForDatabases , args . all ) ,
] ;
const depthOption = args . depth ? ` -- ${ args . depth } ` : null ;
if ( depthOption ) {
const warning = sourceIgnoredWarning ( depthOption , accumulator . ignoredDepthForSources , args . all ) ;
if ( warning ) warnings . push ( warning ) ;
}
if ( args . queryHistory === 'enabled' || args . queryHistoryWindowDays !== undefined ) {
const warning = sourceIgnoredWarning ( '--query-history' , accumulator . ignoredQueryHistoryForSources , args . all ) ;
if ( warning ) warnings . push ( warning ) ;
}
return warnings ;
}
function schemaFirstQueryHistoryNotice (
targets : KtxPublicIngestPlanTarget [ ] ,
args : { queryHistory? : KtxPublicIngestQueryHistoryFlag } ,
) : string | null {
if ( args . queryHistory !== 'enabled' ) {
return null ;
}
const queryHistoryTargets = targets . filter ( ( target ) = > target . queryHistory ? . enabled === true ) ;
if ( queryHistoryTargets . length === 0 ) {
return null ;
}
if ( queryHistoryTargets . length === 1 ) {
return ` Schema ingest runs before query history for ${ queryHistoryTargets [ 0 ] . connectionId } . ` ;
}
return ` Schema ingest runs before query history for ${ queryHistoryTargets . length } database connections. ` ;
}
function storedQueryHistory ( connection : KtxProjectConnectionConfig ) : Record < string , unknown > {
const context = connection . context ;
const contextRecord =
context && typeof context === 'object' && ! Array . isArray ( context ) ? ( context as Record < string , unknown > ) : { } ;
const value = contextRecord . queryHistory ;
return typeof value === 'object' && value !== null && ! Array . isArray ( value ) ? ( value as Record < string , unknown > ) : { } ;
}
function positiveInteger ( value : unknown ) : number | undefined {
return typeof value === 'number' && Number . isInteger ( value ) && value > 0 ? value : undefined ;
}
function enabledTablesForConnection ( connection : KtxProjectConnectionConfig ) : string [ ] | undefined {
const raw = connection . enabled_tables ;
if ( ! Array . isArray ( raw ) ) {
return undefined ;
}
const tables = raw . filter ( ( value ) : value is string = > typeof value === 'string' && value . trim ( ) . length > 0 ) ;
return tables . length > 0 ? tables : undefined ;
}
function queryHistoryPullConfig ( input : {
stored : Record < string , unknown > ;
dialect : HistoricSqlDialect ;
windowDays? : number ;
enabledTables? : string [ ] ;
} ) : Record < string , unknown > {
const { enabled : _enabled , dialect : _dialect , . . . storedConfig } = input . stored ;
return {
. . . storedConfig ,
dialect : input.dialect ,
. . . ( input . enabledTables ? { enabledTables : input.enabledTables } : { } ) ,
. . . ( input . windowDays !== undefined ? { windowDays : input.windowDays } : { } ) ,
} ;
}
function depthFromLegacyScanMode (
mode : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] | undefined ,
) : KtxPublicIngestDepth | undefined {
return mode === 'enriched' || mode === 'relationships' ? 'deep' : undefined ;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
function sourceDirForConnection ( connection : KtxProjectConnectionConfig ) : string | undefined {
2026-05-13 15:55:00 +02:00
const value = connection . source_dir ;
2026-05-10 23:12:26 +02:00
return typeof value === 'string' && value . trim ( ) . length > 0 ? value . trim ( ) : undefined ;
}
2026-05-14 01:43:06 +02:00
function resolveDatabaseTargetOptions ( input : {
connectionId : string ;
driver : string ;
connection : KtxProjectConnectionConfig ;
args : {
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
scanMode? : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] ;
} ;
warnings : KtxPublicIngestWarningAccumulator ;
} ) : Pick < KtxPublicIngestPlanTarget , 'databaseDepth' | 'queryHistory' | 'steps' > {
const storedQh = storedQueryHistory ( input . connection ) ;
const dialect = queryHistoryDialectByDriver . get ( input . driver ) ;
const explicitQueryHistory = input . args . queryHistory ? ? 'default' ;
const storedEnabled = storedQh . enabled === true ;
const windowOverrideRequested = input . args . queryHistoryWindowDays !== undefined ;
const requestedQh =
explicitQueryHistory === 'enabled' ||
( explicitQueryHistory !== 'disabled' && ( windowOverrideRequested || storedEnabled ) ) ;
let depth =
input . args . depth ? ? depthFromLegacyScanMode ( input . args . scanMode ) ? ? databaseContextDepth ( input . connection ) ? ? 'fast' ;
const queryHistory = {
enabled : false ,
. . . ( input . args . queryHistoryWindowDays !== undefined
? { windowDays : input.args.queryHistoryWindowDays }
: positiveInteger ( storedQh . windowDays ) !== undefined
? { windowDays : positiveInteger ( storedQh . windowDays ) }
: { } ) ,
} ;
if ( requestedQh && ! dialect ) {
input . warnings . unsupportedQueryHistoryForDatabases . push ( {
connectionId : input.connectionId ,
driver : input.driver ,
reason :
explicitQueryHistory === 'enabled' || input . args . queryHistoryWindowDays !== undefined ? 'explicit' : 'stored' ,
} ) ;
return {
databaseDepth : depth ,
queryHistory : { . . . queryHistory , unsupported : true } ,
steps : [ 'database-schema' ] ,
} ;
}
if ( requestedQh && dialect ) {
if ( depth === 'fast' ) {
input . warnings . warnings . push ( ` --query-history requires deep ingest; running ${ input . connectionId } with --deep. ` ) ;
}
depth = 'deep' ;
return {
databaseDepth : depth ,
queryHistory : {
. . . queryHistory ,
enabled : true ,
dialect ,
pullConfig : queryHistoryPullConfig ( {
stored : storedQh ,
dialect ,
windowDays : queryHistory.windowDays ,
enabledTables : enabledTablesForConnection ( input . connection ) ,
} ) ,
} ,
steps : [ 'database-schema' , 'query-history' ] ,
} ;
}
if ( input . args . depth === 'fast' && explicitQueryHistory !== 'enabled' && storedEnabled ) {
input . warnings . warnings . push (
` ${ input . connectionId } has query history enabled in ktx.yaml, but --fast skips query-history processing. ` ,
) ;
return {
databaseDepth : 'fast' ,
queryHistory : { . . . queryHistory , skippedStoredByFast : true } ,
steps : [ 'database-schema' ] ,
} ;
}
return {
databaseDepth : depth ,
queryHistory ,
steps : [ 'database-schema' ] ,
} ;
}
function targetForConnection (
connectionId : string ,
connection : KtxProjectConnectionConfig ,
projectConfig : KtxPublicIngestProject [ 'config' ] ,
args : {
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
scanMode? : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] ;
} ,
warnings : KtxPublicIngestWarningAccumulator ,
) : KtxPublicIngestPlanTarget {
const driver = normalizeConnectionDriver ( connection ) ;
2026-05-10 23:12:26 +02:00
const adapter = sourceAdapterByDriver . get ( driver ) ;
const sourceDir = sourceDirForConnection ( connection ) ;
if ( adapter ) {
2026-05-14 01:43:06 +02:00
if ( args . depth ) {
warnings . ignoredDepthForSources . push ( connectionId ) ;
}
if ( args . queryHistory === 'enabled' || args . queryHistoryWindowDays !== undefined ) {
warnings . ignoredQueryHistoryForSources . push ( connectionId ) ;
}
2026-05-10 23:12:26 +02:00
return {
connectionId ,
driver ,
operation : 'source-ingest' ,
adapter ,
. . . ( sourceDir ? { sourceDir } : { } ) ,
2026-05-14 01:43:06 +02:00
debugCommand : ` ktx ingest ${ connectionId } --debug ` ,
2026-05-10 23:12:26 +02:00
steps : [ 'source-ingest' , 'memory-update' ] ,
} ;
}
2026-05-14 01:43:06 +02:00
if ( isDatabaseDriver ( driver ) ) {
const options = resolveDatabaseTargetOptions ( { connectionId , driver , connection , args , warnings } ) ;
const gaps = options . databaseDepth === 'deep' ? deepReadinessGaps ( projectConfig ) : [ ] ;
2026-05-10 23:12:26 +02:00
return {
connectionId ,
driver ,
2026-05-14 01:43:06 +02:00
operation : 'database-ingest' ,
debugCommand : ` ktx ingest ${ connectionId } --debug ` ,
detectRelationships : options.databaseDepth === 'deep' && projectConfig . scan . relationships . enabled ,
. . . ( gaps . length > 0
? {
preflightFailure : ` ${ connectionId } requires deep ingest readiness: ${ gaps . join (
', ' ,
) } . Run ktx setup or rerun with -- fast . ` ,
}
: { } ) ,
. . . options ,
2026-05-10 23:12:26 +02:00
} ;
}
throw new Error ( ` Connection " ${ connectionId } " uses unsupported public ingest driver " ${ driver || 'unknown' } " ` ) ;
}
export function buildPublicIngestPlan (
2026-05-10 23:51:24 +02:00
project : KtxPublicIngestProject ,
2026-05-14 01:43:06 +02:00
args : {
projectDir : string ;
targetConnectionId? : string ;
all : boolean ;
depth? : KtxPublicIngestDepth ;
queryHistory? : KtxPublicIngestQueryHistoryFlag ;
queryHistoryWindowDays? : number ;
scanMode? : Extract < KtxScanArgs , { command : 'run' } > [ 'mode' ] ;
} ,
2026-05-10 23:51:24 +02:00
) : KtxPublicIngestPlan {
2026-05-10 23:12:26 +02:00
if ( ! args . all && ! args . targetConnectionId ) {
2026-05-13 12:00:08 +02:00
throw new Error ( 'Context build requires a connection id or all targets' ) ;
2026-05-10 23:12:26 +02:00
}
const entries = Object . entries ( project . config . connections ) . sort ( ( [ a ] , [ b ] ) = > a . localeCompare ( b ) ) ;
const selected = args . all ? entries : entries.filter ( ( [ connectionId ] ) = > connectionId === args . targetConnectionId ) ;
if ( ! args . all && selected . length === 0 ) {
2026-05-10 23:51:24 +02:00
throw new Error ( ` Connection " ${ args . targetConnectionId } " is not configured in ktx.yaml ` ) ;
2026-05-10 23:12:26 +02:00
}
if ( selected . length === 0 ) {
throw new Error ( 'No configured connections are eligible for ingest' ) ;
}
2026-05-14 01:43:06 +02:00
const warnings = createWarningAccumulator ( ) ;
const targets = selected . map ( ( [ connectionId , connection ] ) = >
targetForConnection ( connectionId , connection , project . config , args , warnings ) ,
) ;
const orderedTargets = [
. . . targets . filter ( ( t ) = > t . operation === 'database-ingest' ) ,
. . . targets . filter ( ( t ) = > t . operation === 'source-ingest' ) ,
] ;
const notice = schemaFirstQueryHistoryNotice ( orderedTargets , args ) ;
2026-05-10 23:12:26 +02:00
return {
projectDir : args.projectDir ,
2026-05-14 01:43:06 +02:00
targets : orderedTargets ,
warnings : finalizeWarnings ( warnings , args ) ,
. . . ( notice ? { notices : [ notice ] } : { } ) ,
2026-05-10 23:12:26 +02:00
} ;
}
2026-05-10 23:51:24 +02:00
function defaultSteps ( target : KtxPublicIngestPlanTarget ) : KtxPublicIngestTargetResult [ 'steps' ] {
2026-05-10 23:12:26 +02:00
return [
{
2026-05-14 01:43:06 +02:00
operation : 'database-schema' ,
status : target.steps.includes ( 'database-schema' ) ? 'not-run' : 'skipped' ,
. . . ( target . operation === 'database-ingest' ? { debugCommand : target.debugCommand } : { } ) ,
} ,
{
operation : 'query-history' ,
status : target.steps.includes ( 'query-history' ) ? 'not-run' : 'skipped' ,
. . . ( target . operation === 'database-ingest' ? { debugCommand : target.debugCommand } : { } ) ,
2026-05-10 23:12:26 +02:00
} ,
{
operation : 'source-ingest' ,
status : target.steps.includes ( 'source-ingest' ) ? 'not-run' : 'skipped' ,
. . . ( target . operation === 'source-ingest' ? { debugCommand : target.debugCommand } : { } ) ,
} ,
{
operation : 'memory-update' ,
status : target.steps.includes ( 'memory-update' ) ? 'not-run' : 'skipped' ,
. . . ( target . operation === 'source-ingest' ? { debugCommand : target.debugCommand } : { } ) ,
} ,
] ;
}
2026-05-14 01:43:06 +02:00
function retryCommandForTarget (
target : KtxPublicIngestPlanTarget ,
args : Extract < KtxPublicIngestArgs , { command : 'run' } > ,
) : string {
const projectPart = ` --project-dir ${ args . projectDir } ` ;
const depthPart = target . databaseDepth ? ` -- ${ target . databaseDepth } ` : '' ;
const queryHistoryPart = target . queryHistory ? . enabled === true ? ' --query-history' : '' ;
const windowPart =
target . queryHistory ? . enabled === true && target . queryHistory . windowDays !== undefined
? ` --query-history-window-days ${ target . queryHistory . windowDays } `
: '' ;
return ` ktx ingest ${ target . connectionId } ${ projectPart } ${ depthPart } ${ queryHistoryPart } ${ windowPart } ` ;
}
function trimTrailingPeriod ( value : string ) : string {
return value . endsWith ( '.' ) ? value . slice ( 0 , - 1 ) : value ;
}
function failureDetailWithRetry ( input : {
target : KtxPublicIngestPlanTarget ;
args : Extract < KtxPublicIngestArgs , { command : 'run' } > ;
failedOperation : KtxPublicIngestStepName ;
failureDetail? : string ;
} ) : string {
const detail = input . failureDetail ? . trim ( ) ;
const base =
detail && detail . startsWith ( ` ${ input . target . connectionId } ` )
? detail
: detail
? ` ${ input . target . connectionId } failed: ${ detail } `
: ` ${ input . target . connectionId } failed at ${ input . failedOperation } . ` ;
return ` ${ trimTrailingPeriod ( base ) } . Retry: ${ retryCommandForTarget ( input . target , input . args ) } ` ;
}
function markTargetResult (
target : KtxPublicIngestPlanTarget ,
args : Extract < KtxPublicIngestArgs , { command : 'run' } > ,
status : 'done' | 'failed' ,
failedOperation? : KtxPublicIngestStepName ,
failureDetail? : string ,
) : KtxPublicIngestTargetResult {
const selectedFailedOperation =
failedOperation ? ? ( target . operation === 'database-ingest' ? 'database-schema' : 'source-ingest' ) ;
2026-05-17 10:27:29 +02:00
const selectedFailedOperationIndex = target . steps . indexOf ( selectedFailedOperation ) ;
2026-05-10 23:12:26 +02:00
return {
connectionId : target.connectionId ,
driver : target.driver ,
steps : defaultSteps ( target ) . map ( ( step ) = > {
if ( ! target . steps . includes ( step . operation ) ) {
return step ;
}
if ( status === 'done' ) {
return { . . . step , status : 'done' } ;
}
2026-05-17 10:27:29 +02:00
const stepIndex = target . steps . indexOf ( step . operation ) ;
if ( selectedFailedOperationIndex >= 0 && stepIndex >= 0 && stepIndex < selectedFailedOperationIndex ) {
return { . . . step , status : 'done' } ;
}
2026-05-14 01:43:06 +02:00
if ( step . operation === selectedFailedOperation ) {
return {
. . . step ,
status : 'failed' ,
detail : failureDetailWithRetry ( {
target ,
args ,
failedOperation : selectedFailedOperation ,
failureDetail ,
} ) ,
} ;
2026-05-10 23:12:26 +02:00
}
return { . . . step , status : 'not-run' } ;
} ) ,
} ;
}
2026-05-10 23:51:24 +02:00
function resultFailed ( result : KtxPublicIngestTargetResult ) : boolean {
2026-05-10 23:12:26 +02:00
return result . steps . some ( ( step ) = > step . status === 'failed' ) ;
}
2026-05-10 23:51:24 +02:00
function stepStatus ( result : KtxPublicIngestTargetResult , operation : KtxPublicIngestStepName ) : string {
2026-05-10 23:12:26 +02:00
return result . steps . find ( ( step ) = > step . operation === operation ) ? . status ? ? 'not-run' ;
}
2026-05-10 23:51:24 +02:00
function renderPlainResults ( results : KtxPublicIngestTargetResult [ ] , io : KtxCliIo ) : void {
2026-05-10 23:12:26 +02:00
const failures = results . filter ( resultFailed ) ;
io . stdout . write ( failures . length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n' ) ;
io . stdout . write ( '\n' ) ;
2026-05-14 01:43:06 +02:00
io . stdout . write ( 'Source Database schema Query history Source ingest Memory update\n' ) ;
2026-05-10 23:12:26 +02:00
for ( const result of results ) {
io . stdout . write (
2026-05-14 01:43:06 +02:00
` ${ result . connectionId . padEnd ( 14 ) } ${ stepStatus ( result , 'database-schema' ) . padEnd ( 16 ) } ${ stepStatus (
result ,
'query-history' ,
) . padEnd ( 14 ) } $ { stepStatus (
2026-05-10 23:12:26 +02:00
result ,
'source-ingest' ,
2026-05-14 01:43:06 +02:00
) . padEnd ( 14 ) } $ { stepStatus ( result , 'memory-update' ) } \ n ` ,
2026-05-10 23:12:26 +02:00
) ;
}
if ( failures . length === 0 ) {
return ;
}
io . stdout . write ( '\nFailed sources:\n' ) ;
for ( const result of failures ) {
const failedStep = result . steps . find ( ( step ) = > step . status === 'failed' ) ;
if ( ! failedStep ) {
continue ;
}
io . stdout . write ( ` ${ failedStep . detail ? ? ` ${ result . connectionId } failed. ` } \ n ` ) ;
}
}
2026-05-10 23:51:24 +02:00
function hasInteractiveInput ( io : KtxCliIo ) : boolean {
2026-05-10 23:12:26 +02:00
const stdin = ( io as { stdin ? : { isTTY? : boolean ; setRawMode ? : ( value : boolean ) = > void } } ) . stdin ;
return stdin ? . isTTY === true && typeof stdin . setRawMode === 'function' ;
}
2026-05-10 23:51:24 +02:00
function sourceIngestOutputMode ( args : Extract < KtxPublicIngestArgs , { command : 'run' } > , io : KtxCliIo ) : 'plain' | 'viz' {
2026-05-10 23:12:26 +02:00
return args . inputMode === 'auto' && io . stdout . isTTY === true && hasInteractiveInput ( io ) ? 'viz' : 'plain' ;
}
2026-05-14 01:43:06 +02:00
function shouldUseForegroundContextBuildView (
args : Extract < KtxPublicIngestArgs , { command : 'run' } > ,
io : KtxCliIo ,
) : boolean {
return args . inputMode === 'auto' && args . json !== true && io . stdout . isTTY === true && hasInteractiveInput ( io ) ;
}
interface CapturedPublicIngestIo extends KtxCliIo {
capturedOutput ( ) : string ;
}
function createCapturedPublicIngestIo ( ) : CapturedPublicIngestIo {
let output = '' ;
return {
stdout : {
isTTY : false ,
write ( chunk : string ) {
output += chunk ;
} ,
} ,
stderr : {
write ( chunk : string ) {
output += chunk ;
} ,
} ,
capturedOutput() {
return output ;
} ,
} ;
}
const INTERNAL_STATUS_LINE_RE =
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/ ;
2026-05-17 01:04:44 +02:00
const ACTIONABLE_FAILURE_LINE_RE =
/^(Missing bundled Python runtime manifest|KTX Python runtime is required|KTX managed daemon|Error:|Failed\b|Could not\b|Cannot\b)/ ;
const RUNTIME_BACKED_RETRY_LINE_RE = /^Then retry the runtime-backed KTX command\.?$/ ;
2026-05-14 01:43:06 +02:00
2026-05-17 10:27:29 +02:00
function trimErrorPrefix ( line : string ) : string {
return line . replace ( /^Error:\s*/ , '' ) ;
}
2026-05-17 01:04:44 +02:00
function capturedFailureMessage ( output : string ) : string | undefined {
const lines = output
2026-05-14 01:43:06 +02:00
. split ( /\r?\n/ )
. map ( ( line ) = > line . trim ( ) )
. filter ( ( line ) = > line . length > 0 )
. filter ( ( line ) = > ! line . startsWith ( 'KTX scan completed' ) )
. filter ( ( line ) = > ! INTERNAL_STATUS_LINE_RE . test ( line ) )
2026-05-17 01:04:44 +02:00
. map ( publicIngestOutputLine ) ;
const actionableIndex = lines . findIndex ( ( line ) = > ACTIONABLE_FAILURE_LINE_RE . test ( line ) ) ;
if ( actionableIndex < 0 ) {
2026-05-17 10:27:29 +02:00
const line = lines . find ( ( candidate ) = > candidate . length > 0 ) ;
return line ? trimErrorPrefix ( line ) : undefined ;
2026-05-17 01:04:44 +02:00
}
const firstLine = lines [ actionableIndex ] ;
if ( ! firstLine ? . startsWith ( 'Missing bundled Python runtime manifest' ) ) {
2026-05-17 10:27:29 +02:00
return trimErrorPrefix ( firstLine ) ;
2026-05-17 01:04:44 +02:00
}
const followupLines = lines
. slice ( actionableIndex + 1 )
. filter ( ( line ) = > ! RUNTIME_BACKED_RETRY_LINE_RE . test ( line ) )
. filter ( ( line ) = > ! /\bRetry:\s/ . test ( line ) )
. filter ( ( line ) = > line . startsWith ( 'In a source checkout, build the local runtime assets with:' ) ) ;
return [ firstLine , . . . followupLines ] . join ( '\n' ) ;
2026-05-14 01:43:06 +02:00
}
2026-05-10 23:12:26 +02:00
export async function executePublicIngestTarget (
2026-05-10 23:51:24 +02:00
target : KtxPublicIngestPlanTarget ,
args : Extract < KtxPublicIngestArgs , { command : 'run' } > ,
io : KtxCliIo ,
deps : KtxPublicIngestDeps ,
) : Promise < KtxPublicIngestTargetResult > {
2026-05-14 01:43:06 +02:00
if ( target . preflightFailure ) {
if ( target . operation === 'database-ingest' ) {
deps . onPhaseEnd ? . ( 'database-schema' , 'failed' , target . preflightFailure ) ;
if ( target . queryHistory ? . enabled === true ) {
deps . onPhaseEnd ? . ( 'query-history' , 'skipped' ) ;
}
} else {
deps . onPhaseEnd ? . ( 'source-ingest' , 'failed' , target . preflightFailure ) ;
}
return {
connectionId : target.connectionId ,
driver : target.driver ,
steps : defaultSteps ( target ) . map ( ( step ) = >
step . operation === 'database-schema'
? {
. . . step ,
status : 'failed' ,
detail : target.preflightFailure ,
}
: step ,
) ,
} ;
}
if ( target . operation === 'database-ingest' ) {
2026-05-10 23:51:24 +02:00
const { runKtxScan } = await import ( './scan.js' ) ;
2026-05-13 17:01:48 +02:00
const scanArgs : KtxScanArgs = {
command : 'run' ,
projectDir : args.projectDir ,
connectionId : target.connectionId ,
2026-05-14 01:43:06 +02:00
mode : target.databaseDepth === 'deep' ? 'enriched' : 'structural' ,
detectRelationships : target.detectRelationships === true ,
2026-05-13 17:01:48 +02:00
dryRun : false ,
2026-05-14 01:43:06 +02:00
. . . ( args . cliVersion ? { cliVersion : args.cliVersion } : { } ) ,
. . . ( args . runtimeInstallPolicy ? { runtimeInstallPolicy : args.runtimeInstallPolicy } : { } ) ,
2026-05-13 17:01:48 +02:00
} ;
const runScan = deps . runScan ? ? runKtxScan ;
2026-05-14 01:43:06 +02:00
const capturedScanIo = deps . scanProgress ? null : createCapturedPublicIngestIo ( ) ;
const scanIo = capturedScanIo ? ? io ;
2026-05-16 11:39:43 +02:00
const scanDeps = {
. . . ( deps . scanProgress ? { progress : deps.scanProgress } : { } ) ,
. . . ( deps . runtimeIo ? { runtimeIo : deps.runtimeIo } : { } ) ,
} ;
2026-05-14 01:43:06 +02:00
deps . onPhaseStart ? . ( 'database-schema' ) ;
2026-05-16 11:39:43 +02:00
const scanExitCode =
Object . keys ( scanDeps ) . length > 0 ? await runScan ( scanArgs , scanIo , scanDeps ) : await runScan ( scanArgs , scanIo ) ;
2026-05-14 01:43:06 +02:00
if ( scanExitCode !== 0 ) {
deps . onPhaseEnd ? . ( 'database-schema' , 'failed' ) ;
if ( target . queryHistory ? . enabled === true ) {
deps . onPhaseEnd ? . ( 'query-history' , 'skipped' ) ;
}
return markTargetResult (
target ,
args ,
'failed' ,
'database-schema' ,
2026-05-17 01:04:44 +02:00
capturedScanIo ? capturedFailureMessage ( capturedScanIo . capturedOutput ( ) ) : undefined ,
2026-05-14 01:43:06 +02:00
) ;
}
deps . onPhaseEnd ? . ( 'database-schema' , 'done' ) ;
if ( target . queryHistory ? . enabled === true ) {
const { runKtxIngest } = await import ( './ingest.js' ) ;
const runIngest = deps . runIngest ? ? runKtxIngest ;
const ingestArgs : KtxIngestArgs = {
command : 'run' ,
projectDir : args.projectDir ,
connectionId : target.connectionId ,
adapter : 'historic-sql' ,
outputMode : sourceIngestOutputMode ( args , io ) ,
inputMode : args.inputMode ,
. . . ( args . cliVersion ? { cliVersion : args.cliVersion } : { } ) ,
. . . ( args . runtimeInstallPolicy ? { runtimeInstallPolicy : args.runtimeInstallPolicy } : { } ) ,
allowImplicitAdapter : true ,
historicSqlPullConfigOverride :
target . queryHistory . pullConfig ? ? {
dialect : target.queryHistory.dialect ,
. . . ( target . queryHistory . windowDays !== undefined ? { windowDays : target.queryHistory.windowDays } : { } ) ,
} ,
} ;
const capturedIngestIo = deps . ingestProgress ? null : createCapturedPublicIngestIo ( ) ;
const ingestIo = capturedIngestIo ? ? io ;
2026-05-16 11:39:43 +02:00
const ingestDeps = {
. . . ( deps . ingestProgress ? { progress : deps.ingestProgress } : { } ) ,
. . . ( deps . runtimeIo ? { runtimeIo : deps.runtimeIo } : { } ) ,
} ;
2026-05-14 01:43:06 +02:00
deps . onPhaseStart ? . ( 'query-history' ) ;
2026-05-16 11:39:43 +02:00
const qhExitCode =
Object . keys ( ingestDeps ) . length > 0
? await runIngest ( ingestArgs , ingestIo , ingestDeps )
: await runIngest ( ingestArgs , ingestIo ) ;
2026-05-14 01:43:06 +02:00
if ( qhExitCode !== 0 ) {
deps . onPhaseEnd ? . ( 'query-history' , 'failed' ) ;
return markTargetResult (
target ,
args ,
'failed' ,
'query-history' ,
2026-05-17 01:04:44 +02:00
capturedIngestIo ? capturedFailureMessage ( capturedIngestIo . capturedOutput ( ) ) : undefined ,
2026-05-14 01:43:06 +02:00
) ;
}
deps . onPhaseEnd ? . ( 'query-history' , 'done' ) ;
}
return markTargetResult ( target , args , 'done' ) ;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
const { runKtxIngest } = await import ( './ingest.js' ) ;
2026-05-13 17:01:48 +02:00
const ingestArgs : KtxIngestArgs = {
command : 'run' ,
projectDir : args.projectDir ,
connectionId : target.connectionId ,
adapter : target.adapter ? ? target . driver ,
. . . ( target . sourceDir ? { sourceDir : target.sourceDir } : { } ) ,
outputMode : sourceIngestOutputMode ( args , io ) ,
inputMode : args.inputMode ,
2026-05-14 01:43:06 +02:00
. . . ( args . cliVersion ? { cliVersion : args.cliVersion } : { } ) ,
. . . ( args . runtimeInstallPolicy ? { runtimeInstallPolicy : args.runtimeInstallPolicy } : { } ) ,
allowImplicitAdapter : true ,
2026-05-13 17:01:48 +02:00
} ;
const runIngest = deps . runIngest ? ? runKtxIngest ;
2026-05-14 01:43:06 +02:00
const capturedIngestIo = deps . ingestProgress ? null : createCapturedPublicIngestIo ( ) ;
const ingestIo = capturedIngestIo ? ? io ;
2026-05-16 11:39:43 +02:00
const ingestDeps = {
. . . ( deps . ingestProgress ? { progress : deps.ingestProgress } : { } ) ,
. . . ( deps . runtimeIo ? { runtimeIo : deps.runtimeIo } : { } ) ,
} ;
2026-05-14 01:43:06 +02:00
deps . onPhaseStart ? . ( 'source-ingest' ) ;
2026-05-16 11:39:43 +02:00
const exitCode =
Object . keys ( ingestDeps ) . length > 0
? await runIngest ( ingestArgs , ingestIo , ingestDeps )
: await runIngest ( ingestArgs , ingestIo ) ;
2026-05-14 01:43:06 +02:00
deps . onPhaseEnd ? . ( 'source-ingest' , exitCode === 0 ? 'done' : 'failed' ) ;
return markTargetResult (
target ,
args ,
exitCode === 0 ? 'done' : 'failed' ,
'source-ingest' ,
2026-05-17 01:04:44 +02:00
capturedIngestIo ? capturedFailureMessage ( capturedIngestIo . capturedOutput ( ) ) : undefined ,
2026-05-14 01:43:06 +02:00
) ;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export async function runKtxPublicIngest (
args : KtxPublicIngestArgs ,
io : KtxCliIo ,
deps : KtxPublicIngestDeps = { } ,
2026-05-10 23:12:26 +02:00
) : Promise < number > {
2026-05-14 01:43:06 +02:00
const loadProject = deps . loadProject ? ? loadKtxProject ;
const project = await loadProject ( { projectDir : args.projectDir } ) ;
if ( shouldUseForegroundContextBuildView ( args , io ) ) {
2026-05-17 10:27:29 +02:00
const plan = buildPublicIngestPlan ( project , args ) ;
const requirements = resolvePublicIngestRuntimeRequirements ( plan , { env : deps.env ? ? process . env } ) ;
const ensureRuntime = deps . ensureRuntime ? ? ensureManagedPythonCommandRuntime ;
for ( const feature of requirements . features ) {
try {
await ensureRuntime ( {
cliVersion : args.cliVersion ? ? '0.0.0-private' ,
installPolicy : args.runtimeInstallPolicy ? ? 'prompt' ,
io ,
feature ,
} ) ;
} catch ( error ) {
io . stderr . write ( ` ${ error instanceof Error ? error.message : String ( error ) } \ n ` ) ;
return 1 ;
}
}
2026-05-14 01:43:06 +02:00
const { runContextBuild } = await import ( './context-build-view.js' ) ;
const contextBuild = deps . runContextBuild ? ? runContextBuild ;
const result = await contextBuild (
project ,
2026-05-10 23:12:26 +02:00
{
projectDir : args.projectDir ,
2026-05-14 01:43:06 +02:00
. . . ( args . targetConnectionId ? { targetConnectionId : args.targetConnectionId } : { } ) ,
all : args.all ,
entrypoint : 'ingest' ,
2026-05-10 23:12:26 +02:00
inputMode : args.inputMode ,
2026-05-14 01:43:06 +02:00
. . . ( args . depth ? { depth : args.depth } : { } ) ,
. . . ( args . queryHistory ? { queryHistory : args.queryHistory } : { } ) ,
. . . ( args . queryHistoryWindowDays !== undefined ? { queryHistoryWindowDays : args.queryHistoryWindowDays } : { } ) ,
. . . ( args . scanMode ? { scanMode : args.scanMode } : { } ) ,
. . . ( args . detectRelationships !== undefined ? { detectRelationships : args.detectRelationships } : { } ) ,
. . . ( args . cliVersion ? { cliVersion : args.cliVersion } : { } ) ,
. . . ( args . runtimeInstallPolicy ? { runtimeInstallPolicy : args.runtimeInstallPolicy } : { } ) ,
2026-05-10 23:12:26 +02:00
} ,
io ,
) ;
2026-05-14 01:43:06 +02:00
return result . exitCode ;
2026-05-10 23:12:26 +02:00
}
const plan = buildPublicIngestPlan ( project , args ) ;
2026-05-10 23:51:24 +02:00
const results : KtxPublicIngestTargetResult [ ] = [ ] ;
2026-05-10 23:12:26 +02:00
2026-05-14 01:43:06 +02:00
if ( ! args . json ) {
for ( const notice of plan . notices ? ? [ ] ) {
io . stdout . write ( ` ${ notice } \ n ` ) ;
}
for ( const warning of plan . warnings ) {
io . stderr . write ( ` Warning: ${ warning } \ n ` ) ;
}
}
2026-05-10 23:12:26 +02:00
for ( const target of plan . targets ) {
results . push ( await executePublicIngestTarget ( target , args , io , deps ) ) ;
}
if ( args . json ) {
io . stdout . write ( ` ${ JSON . stringify ( { plan , results } , null, 2)} \ n ` ) ;
} else {
renderPlainResults ( results , io ) ;
}
return results . some ( resultFailed ) ? 1 : 0 ;
}