feat: merge ingest and scan

* docs: add CLI component reuse guidance

* docs: add unified ingest ux design

* Refine unified ingest UX design after adversarial review iteration 1

* Refine unified ingest UX design after adversarial review iteration 2

* Refine unified ingest UX design after adversarial review iteration 3

* feat(cli): route public connection ingest command

* feat(cli): hide standalone scan from public help

* feat(cli): plan public ingest depth and query history

* feat(cli): execute public database ingest facets

* feat(ingest): read connection query history config

* fix(cli): use public ingest wording

* fix(config): stop generating ingest adapter allow lists

* docs: document public ingest command

* test: align ingest surface expectations

* docs: add unified ingest public CLI surface plan

* feat(cli): preflight deep public ingest readiness

* feat(setup): store query history in connection context

* feat(setup): store database context depth

* feat(setup): verify context readiness by database depth

* fix(setup): keep context build foreground only

* fix(config): reject reserved ingest connection ids

* test: close unified ingest v1 expectations

* docs: add unified ingest v1 closure plan

* fix(ingest): bypass adapter allow-list for public source ingest

* fix(ingest): honor query history window intent

* fix(ingest): hide scan internals from public database ingest

* feat(ingest): use foreground view for interactive public ingest

* fix(setup): use schema context and query history wording

* test(cli): verify unified ingest public output

* docs: add unified ingest v1 public output closure plan

* fix(setup): forward query history flags

* fix(setup): prompt for postgres query history

* fix(status): report query history readiness

* fix(ingest): remove legacy public guidance

* fix(ingest): polish foreground retry copy

* docs(examples): use unified query history wording

* chore(ingest): finish public query history cleanup

* docs: add unified ingest v1 query history status cleanup plan

* test(docs): cover unified ingest public docs

* docs: align ingest CLI reference with unified UX

* docs: update context build guides for unified ingest

* docs: update setup and primary source ingest wording

* docs: stop advertising adapter-backed example ingest

* docs: close unified ingest public docs gaps

* docs: add unified ingest v1 docs site closure plan

* fix: render unified ingest foreground warnings

* fix: explain query history schema order

* fix: add public ingest retry guidance

* fix: align setup next steps with unified ingest

* fix: remove scan wording from demo progress

* test: verify unified ingest ux closure

* docs: add unified ingest v1 foreground and retry closure plan

* fix(cli): preserve query-history pull config in public ingest

* fix(cli): omit hidden commands from docs command tree

* test(cli): close unified ingest final public surface checks

* docs: add unified ingest v1 final public surface closure plan

* fix(cli): use public source labels in ingest reports

* fix(cli): suppress low-level public ingest output

* test(cli): verify unified ingest public plain output

* docs: add unified ingest v1 public plain output closure plan

* fix(cli): add public ingest copy sanitizers

* fix(cli): sanitize public ingest progress copy

* fix(cli): rename setup schema scope prompt

* docs(plan): add progress copy closure; test: align setup back-nav fixture

Adds the iter9 plan and updates the setup back-navigation test fixture
to pass disableQueryHistory plus listSchemas/listTables stubs that the
unified ingest setup step now requires.

* docs(plan): add final ux labels plan with narrowed label scans

* fix(cli): aggregate unsupported query-history warnings

* fix(cli): align setup database labels

* test(cli): fix setup database test type-check

* fix(cli): remove primary-source wording from setup output

* test(cli): verify unified ingest setup closure

* docs(plan): add unified ingest v1 verification copy closure plan

* fix(cli): remove top-level scan command

* fix(cli): remove legacy ingest and wiki commands

* Merge scan into ingest flow

* feat(cli): split ingest progress into per-phase rows, rename work units to tasks

Each database target in the unified ingest dashboard now renders one row per
real subprocess (Schema, then Query history when enabled) instead of a single
combined bar. Each phase has its own monotonic 0-100% bar so the progress
never snaps back to zero when historic-sql starts after scan completes.
Completed phases keep their final bar, summary, and elapsed time visible as
an inline audit trail; queued and skipped phases are shown explicitly.

Also rename user-facing "work units" / "Failed work units" to "tasks" /
"Failed tasks" in ingest output and parseIngestSummary. The parser still
accepts the legacy "Work units:" wording in captured output for backward
compat. Internal memory-flow event names and type fields are left alone.

* Fix test harness failures

* Fix CI smoke checks

---------

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-05-14 01:43:06 +02:00 committed by GitHub
parent 1a472cf3ed
commit b00c1a11a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
118 changed files with 16890 additions and 2992 deletions

View file

@ -52,11 +52,8 @@ describe('AgentRunnerService.runLoop', () => {
telemetryTags: { source: 'test' },
});
const call = (generateText as any).mock.calls[0][0];
expect(call.messages).toEqual([
{ role: 'system', content: 'SYS' },
{ role: 'user', content: 'USR' },
]);
expect(call.system).toBeUndefined();
expect(call.system).toEqual({ role: 'system', content: 'SYS' });
expect(call.messages).toEqual([{ role: 'user', content: 'USR' }]);
expect(call.prompt).toBeUndefined();
expect(call.tools).toEqual(tools);
expect(call.stopWhen).toBe(17);
@ -81,10 +78,8 @@ describe('AgentRunnerService.runLoop', () => {
expect(llmProvider.getModel).toHaveBeenCalledWith('candidateExtraction');
expect(generateText).toHaveBeenCalledWith(
expect.objectContaining({
messages: [
{ role: 'system', content: 'system' },
{ role: 'user', content: 'user' },
],
system: { role: 'system', content: 'system' },
messages: [{ role: 'user', content: 'user' }],
}),
);
});

View file

@ -36,6 +36,14 @@ export interface AgentRunnerServiceDeps {
logger?: KtxLogger;
}
function splitSystemPromptMessages(messages: ReturnType<KtxMessageBuilder['wrapSimple']>['messages']) {
const systemMessages = messages.filter((message) => message.role === 'system');
return {
system: systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: messages.filter((message) => message.role !== 'system'),
};
}
export class AgentRunnerService {
private readonly logger: KtxLogger;
@ -54,6 +62,7 @@ export class AgentRunnerService {
tools: params.toolSet,
model,
});
const promptMessages = splitSystemPromptMessages(built.messages);
await this.deps.debugRequestRecorder?.record(
summarizeKtxLlmDebugRequest({
@ -76,7 +85,8 @@ export class AgentRunnerService {
experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({
source: params.telemetryTags.operationName ?? 'ktx-agent-runner',
}),
messages: built.messages,
...(promptMessages.system ? { system: promptMessages.system } : {}),
messages: promptMessages.messages,
tools: built.tools as Record<string, Tool>,
onStepFinish: async () => {
stepIndex += 1;

View file

@ -103,7 +103,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -136,6 +136,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
dialect: 'bigquery',
minExecutions: 5,
windowDays: 90,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -215,7 +215,7 @@ describe('PostgresPgssReader aggregate path', () => {
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'postgres', minExecutions: 5, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}

View file

@ -102,7 +102,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -135,6 +135,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
dialect: 'snowflake',
minExecutions: 5,
windowDays: 90,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,

View file

@ -237,6 +237,80 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
expect(patternsJson).toContain('[REDACTED]');
});
it('limits staged table artifacts to configured enabled tables', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: [], info: [] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'selected-qualified',
canonicalSql: 'select count(*) from orbit_analytics.int_active_contract_arr',
});
yield aggregate({
templateId: 'selected-unqualified',
canonicalSql: 'select count(*) from int_customer_health_signals',
});
yield aggregate({
templateId: 'unselected',
canonicalSql: 'select count(*) from orbit_raw.accounts',
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
[
'selected-qualified',
{
tablesTouched: ['orbit_analytics.int_active_contract_arr'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'selected-unqualified',
{
tablesTouched: ['int_customer_health_signals'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
[
'unselected',
{
tablesTouched: ['orbit_raw.accounts'],
columnsByClause: { select: [], where: [], join: [], groupBy: [] },
},
],
])),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
enabledTables: [
'orbit_analytics.int_active_contract_arr',
'orbit_analytics.int_customer_health_signals',
],
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(await readdir(join(stagedDir, 'tables'))).toEqual([
'int_customer_health_signals.json',
'orbit_analytics.int_active_contract_arr.json',
]);
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
expect(manifest.touchedTableCount).toBe(2);
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates.map((entry: any) => entry.id)).toEqual(['selected-qualified', 'selected-unqualified']);
});
it('preserves full patterns audit input and writes bounded cross-table pattern shards', async () => {
const stagedDir = await tempDir();
const largeSql = `select * from public.orders o join public.customers c on c.id = o.customer_id where payload = '${'x'.repeat(8000)}'`;

View file

@ -39,9 +39,15 @@ interface StageHistoricSqlAggregatedSnapshotInput {
interface ParsedTemplate {
template: AggregatedTemplate;
tablesTouched: string[];
includedTables: string[];
columnsByClause: Record<string, string[]>;
}
interface EnabledTableFilter {
exact: Set<string>;
uniqueUnqualified: Set<string>;
}
interface TableAccumulator {
table: string;
executions: number;
@ -103,6 +109,45 @@ function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUni
return false;
}
function normalizeTableIdentifier(value: string): string {
return value.trim().toLowerCase();
}
function unqualifiedTableIdentifier(value: string): string {
const parts = normalizeTableIdentifier(value).split('.').filter(Boolean);
return parts.at(-1) ?? '';
}
function buildEnabledTableFilter(enabledTables: string[]): EnabledTableFilter | null {
if (enabledTables.length === 0) {
return null;
}
const exact = new Set(enabledTables.map(normalizeTableIdentifier).filter((value) => value.length > 0));
const unqualifiedCounts = new Map<string, number>();
for (const table of exact) {
const unqualified = unqualifiedTableIdentifier(table);
if (unqualified.length > 0) {
unqualifiedCounts.set(unqualified, (unqualifiedCounts.get(unqualified) ?? 0) + 1);
}
}
return {
exact,
uniqueUnqualified: new Set(
[...unqualifiedCounts.entries()]
.filter(([, count]) => count === 1)
.map(([table]) => table),
),
};
}
function isEnabledTable(table: string, filter: EnabledTableFilter | null): boolean {
if (!filter) {
return true;
}
const normalized = normalizeTableIdentifier(table);
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
}
function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number {
return 'windowDays' in config ? config.windowDays : 90;
}
@ -235,6 +280,7 @@ function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput
export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise<void> {
const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig);
const enabledTableFilter = buildEnabledTableFilter(config.enabledTables);
const redactors = compileHistoricSqlRedactionPatterns(config.redactionPatterns);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - historicSqlWindowDays(config) * 24 * 60 * 60 * 1000);
@ -263,12 +309,14 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
continue;
}
const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort();
if (tablesTouched.length === 0) {
const includedTables = tablesTouched.filter((table) => isEnabledTable(table, enabledTableFilter));
if (includedTables.length === 0) {
continue;
}
parsedTemplates.push({
template: redactTemplateSql(template, redactors),
tablesTouched,
includedTables,
columnsByClause: Object.fromEntries(
Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]),
),
@ -277,7 +325,7 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
const byTable = new Map<string, TableAccumulator>();
for (const parsed of parsedTemplates) {
for (const table of parsed.tablesTouched) {
for (const table of parsed.includedTables) {
const acc = byTable.get(table) ?? accumulatorFor(table);
addTemplate(acc, parsed);
byTable.set(table, acc);

View file

@ -10,6 +10,7 @@ const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']);
const historicSqlCommonPullConfigSchema = z.object({
minExecutions: z.number().int().nonnegative().default(5),
enabledTables: z.array(z.string().min(1)).default([]),
filters: z.object({
serviceAccounts: z.object({
patterns: z.array(z.string()).default([]),

View file

@ -194,6 +194,7 @@ describe('local ingest adapters', () => {
await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).resolves.toEqual({
dialect: 'postgres',
minExecutions: 7,
enabledTables: [],
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
dropTrivialProbes: true,
@ -203,6 +204,45 @@ describe('local ingest adapters', () => {
});
});
it('maps connection context.queryHistory to historic-sql pull config', async () => {
const project = projectWithConnections({
warehouse: {
driver: 'postgres',
context: {
queryHistory: {
enabled: true,
windowDays: 45,
minExecutions: 7,
filters: { dropTrivialProbes: true },
},
},
},
});
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
minExecutions: 7,
filters: { dropTrivialProbes: true },
});
});
it('prefers context.queryHistory over legacy historicSql', async () => {
const project = projectWithConnections({
warehouse: {
driver: 'postgres',
historicSql: { enabled: true, dialect: 'postgres', windowDays: 90 },
context: { queryHistory: { enabled: true, windowDays: 30 } },
},
});
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
minExecutions: 5,
});
});
it('rejects local historic-sql pulls when the connection has not enabled historic SQL', async () => {
const historicSql = createDefaultLocalIngestAdapters(project, {
historicSql: {
@ -234,7 +274,7 @@ describe('local ingest adapters', () => {
});
await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).rejects.toThrow(
'Connection "warehouse" does not have historicSql.enabled: true',
'Connection "warehouse" does not have context.queryHistory.enabled: true',
);
});

View file

@ -53,6 +53,7 @@ export interface DefaultLocalIngestAdaptersOptions {
postgresQueryClient?: KtxPostgresQueryClient;
now?: () => Date;
};
historicSqlPullConfigOverride?: Record<string, unknown>;
looker?: {
daemonBaseUrl?: string;
client?: Pick<LookerMappingClient, 'listLookmlModels' | 'getExplore'>;
@ -163,6 +164,28 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
const historicSqlDialectByDriver = new Map<string, 'postgres' | 'bigquery' | 'snowflake'>([
['postgres', 'postgres'],
['postgresql', 'postgres'],
['bigquery', 'bigquery'],
['snowflake', 'snowflake'],
]);
function queryHistoryRecord(connection: unknown): Record<string, unknown> | null {
if (!isRecord(connection)) return null;
const context = isRecord(connection.context) ? connection.context : null;
const queryHistory = isRecord(context?.queryHistory) ? context.queryHistory : null;
return queryHistory;
}
function queryHistoryPullConfig(connection: unknown): Record<string, unknown> | null {
const queryHistory = queryHistoryRecord(connection);
if (queryHistory?.enabled !== true || !isRecord(connection)) return null;
const dialect = historicSqlDialectByDriver.get(String(connection.driver ?? '').toLowerCase());
if (!dialect) return null;
return { ...queryHistory, dialect };
}
function stringField(value: unknown): string | null {
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null;
}
@ -213,14 +236,21 @@ export async function localPullConfigForAdapter(
): Promise<unknown> {
if (adapter.source === 'metabase') {
throw new Error(
'Metabase scheduled pulls fan out by mapping. Call runLocalMetabaseIngest() or use `ktx ingest run --adapter metabase --connection-id <metabase-source-id>` from the CLI.',
'Metabase scheduled pulls fan out by mapping. Call runLocalMetabaseIngest() or use `ktx ingest <metabase-source-id>` from the CLI.',
);
}
const connection = project.config.connections[connectionId];
if (adapter.source === HISTORIC_SQL_SOURCE_KEY) {
if (options.historicSqlPullConfigOverride) {
return historicSqlUnifiedPullConfigSchema.parse(options.historicSqlPullConfigOverride);
}
const queryHistory = queryHistoryPullConfig(connection);
if (queryHistory) {
return historicSqlUnifiedPullConfigSchema.parse(queryHistory);
}
const historicSql = isRecord(connection?.historicSql) ? connection.historicSql : null;
if (historicSql?.enabled !== true) {
throw new Error(`Connection "${connectionId}" does not have historicSql.enabled: true`);
throw new Error(`Connection "${connectionId}" does not have context.queryHistory.enabled: true`);
}
return historicSqlUnifiedPullConfigSchema.parse({
...historicSql,

View file

@ -56,7 +56,7 @@ describe('createLocalBundleIngestRuntime', () => {
}),
).toThrow(
[
'ktx ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'ktx ingest requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
`Configure an Anthropic provider, then rerun ingest:`,
` ktx setup --project-dir ${project.projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n'),

View file

@ -571,7 +571,7 @@ function nextLocalJobId(): string {
function localIngestLlmProviderGuardMessage(projectDir: string): string {
return [
'ktx ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'ktx ingest requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'Configure an Anthropic provider, then rerun ingest:',
` ktx setup --project-dir ${projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n');

View file

@ -2,6 +2,21 @@ import { describe, expect, it } from 'vitest';
import { buildDefaultKtxProjectConfig, parseKtxProjectConfig, serializeKtxProjectConfig } from './config.js';
describe('KTX project config', () => {
it.each(['status', 'replay', 'run', 'watch'])('accepts former ingest subcommand name "%s" as a connection id', (connectionId) => {
expect(
parseKtxProjectConfig(`
project: reserved-test
connections:
${connectionId}:
driver: postgres
`),
).toMatchObject({
connections: {
[connectionId]: { driver: 'postgres' },
},
});
});
it('builds the default standalone project config', () => {
expect(buildDefaultKtxProjectConfig('warehouse')).toEqual({
project: 'warehouse',
@ -21,7 +36,7 @@ describe('KTX project config', () => {
models: {},
},
ingest: {
adapters: ['live-database', 'lookml', 'metabase', 'metricflow', 'notion'],
adapters: [],
embeddings: {
backend: 'deterministic',
model: 'deterministic',
@ -67,13 +82,12 @@ describe('KTX project config', () => {
const parsed = parseKtxProjectConfig(serialized);
expect(serialized).toContain('project: warehouse');
expect(serialized).toContain('live-database');
expect(serialized).toContain('notion');
expect(serialized).not.toContain('live-database');
expect(serialized).toContain(
' embeddings:\n backend: deterministic\n model: deterministic\n dimensions: 8',
);
expect(parsed.project).toBe('warehouse');
expect(parsed.ingest.adapters).toEqual(['live-database', 'lookml', 'metabase', 'metricflow', 'notion']);
expect(parsed.ingest.adapters).toEqual([]);
expect(parsed.ingest.embeddings).toEqual({
backend: 'deterministic',
model: 'deterministic',

View file

@ -391,7 +391,7 @@ export function buildDefaultKtxProjectConfig(projectName = 'ktx-project'): KtxPr
models: {},
},
ingest: {
adapters: ['live-database', 'lookml', 'metabase', 'metricflow', 'notion'],
adapters: [],
embeddings: {
backend: 'deterministic',
model: 'deterministic',
@ -484,6 +484,9 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig {
...(isRecord(scanEnrichment.embeddings) ? { embeddings: scanEmbeddings } : {}),
};
const parsedScanRelationships = parseScanRelationshipConfig(scanRelationships, defaults.scan.relationships);
const parsedConnections = isRecord(parsed.connections)
? (parsed.connections as Record<string, KtxProjectConnectionConfig>)
: defaults.connections;
return {
project: project.trim(),
@ -494,9 +497,7 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig {
},
}
: {}),
connections: isRecord(parsed.connections)
? (parsed.connections as Record<string, KtxProjectConnectionConfig>)
: defaults.connections,
connections: parsedConnections,
storage: {
state: storage.state === 'sqlite' ? 'sqlite' : defaults.storage.state,
search: storage.search === 'sqlite-fts5' ? 'sqlite-fts5' : defaults.storage.search,
@ -529,5 +530,15 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig {
}
export function serializeKtxProjectConfig(config: KtxProjectConfig): string {
return `${YAML.stringify(config, { indent: 2, lineWidth: 0 }).trimEnd()}\n`;
const serializedConfig =
config.ingest.adapters.length === 0
? {
...config,
ingest: {
embeddings: config.ingest.embeddings,
workUnits: config.ingest.workUnits,
},
}
: config;
return `${YAML.stringify(serializedConfig, { indent: 2, lineWidth: 0 }).trimEnd()}\n`;
}

View file

@ -6,7 +6,11 @@ export type {
KtxSearchBackend,
KtxStorageState,
} from './config.js';
export { buildDefaultKtxProjectConfig, parseKtxProjectConfig, serializeKtxProjectConfig } from './config.js';
export {
buildDefaultKtxProjectConfig,
parseKtxProjectConfig,
serializeKtxProjectConfig,
} from './config.js';
export type { LocalGitFileStoreDeps } from './local-git-file-store.js';
export { LocalGitFileStore } from './local-git-file-store.js';
export { ktxLocalStateDbPath } from './local-state-db.js';

View file

@ -119,6 +119,22 @@ async function writeLiveDatabaseConfig(projectDir: string): Promise<void> {
);
}
async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Promise<void> {
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DATABASE_URL',
' readonly: true',
'',
].join('\n'),
'utf-8',
);
}
function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter {
return {
source: 'live-database',
@ -243,6 +259,27 @@ describe('local scan', () => {
});
});
it('runs a structural database scan when live-database is not listed in ktx.yaml', async () => {
await writeDatabaseConfigWithoutIngestAdapters(project.projectDir);
project = await loadKtxProject({ projectDir: project.projectDir });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
jobId: 'scan-run-without-public-adapter',
now: () => new Date('2026-04-29T09:10:00.000Z'),
});
expect(result.report).toMatchObject({
connectionId: 'warehouse',
runId: 'scan-run-without-public-adapter',
artifactPaths: {
reportPath: 'raw-sources/warehouse/live-database/2026-04-29-091000-scan-run-without-public-adapter/scan-report.json',
},
});
});
it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => {
const first = await runLocalScan({
project,

View file

@ -342,6 +342,22 @@ function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set
};
}
function withInternalLiveDatabaseAdapter(project: KtxLocalProject): KtxLocalProject {
if (project.config.ingest.adapters.includes(LIVE_DATABASE_ADAPTER)) {
return project;
}
return {
...project,
config: {
...project.config,
ingest: {
...project.config.ingest,
adapters: [...project.config.ingest.adapters, LIVE_DATABASE_ADAPTER],
},
},
};
}
export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalScanRunResult> {
const mode = options.mode ?? 'structural';
assertSupportedMode(mode);
@ -367,7 +383,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
await options.progress?.update(0.15, 'Inspecting database schema');
const record = await runLocalStageOnlyIngest({
project: options.project,
project: withInternalLiveDatabaseAdapter(options.project),
adapters,
adapter: LIVE_DATABASE_ADAPTER,
connectionId: options.connectionId,