Merge pull request #34 from Kaelio/andreybavt/execute-context7-plan

feat(cli): standardize output handling and pre-commit checks
This commit is contained in:
Andrey Avtomonov 2026-05-12 14:40:53 +02:00 committed by GitHub
commit a0ea1609ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 822 additions and 144 deletions

1
.gitignore vendored
View file

@ -65,3 +65,4 @@ yarn-error.log*
*.swo
*~
.vercel
.devtools

70
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,70 @@
# See https://pre-commit.com for hook documentation.
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-json
- id: check-toml
- id: check-added-large-files
args: ["--maxkb=1000"]
- id: check-merge-conflict
- id: check-case-conflict
- id: mixed-line-ending
- repo: https://github.com/asottile/pyupgrade
rev: v3.21.2
hooks:
- id: pyupgrade
name: pyupgrade (python)
files: ^python/
args: [--py313-plus]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.2
hooks:
- id: ruff
name: ruff (python)
files: ^python/
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format
name: ruff format (python)
files: ^python/
- repo: local
hooks:
- id: ktx-package-checks
name: ktx package checks
entry: node scripts/precommit-check.mjs
language: system
files: ^(packages/|scripts/|python/|package\.json$|pnpm-lock\.yaml$|pnpm-workspace\.yaml$|release-policy\.json$|tsconfig\.base\.json$|pyproject\.toml$|uv\.lock$|uv\.toml$)
- repo: https://github.com/Yelp/detect-secrets
rev: v1.5.0
hooks:
- id: detect-secrets
exclude: |
(?x)^(
.*\.lock$|
.*pnpm-lock\.yaml$|
.*package-lock\.json$|
.*yarn\.lock$|
.*\.log$|
.*\.dump$|
.*\.sql$|
.*\.csv$|
.*\.db$|
.*\.sqlite$|
.*\.sqlite3$|
.*/node_modules/.*|
.*/\.venv/.*|
.*/dist/.*|
.*/build/.*|
.*/coverage/.*|
.*/htmlcov/.*|
.*\.gen\.ts$|
.*\.gen\.py$|
.*\.generated\.ts$
)$

View file

@ -1,4 +1,4 @@
import { spinner } from '@clack/prompts';
import { cancel, confirm, isCancel, log, spinner } from '@clack/prompts';
export interface KtxCliSpinner {
start(message: string): void;
@ -6,6 +6,62 @@ export interface KtxCliSpinner {
error(message: string): void;
}
export interface KtxCliPromptAdapter {
confirm(options: { message: string; initialValue?: boolean }): Promise<boolean>;
cancel(message: string): void;
log: {
info(message: string): void;
warn(message: string): void;
error(message: string): void;
success(message: string): void;
step(message: string): void;
};
spinner(): KtxCliSpinner;
}
export class KtxCliPromptCancelledError extends Error {
constructor(message = 'Operation cancelled.') {
super(message);
this.name = 'KtxCliPromptCancelledError';
}
}
export function createClackSpinner(): KtxCliSpinner {
return spinner();
}
export function createClackPromptAdapter(): KtxCliPromptAdapter {
return {
async confirm(options) {
const value = await confirm(options);
if (isCancel(value)) {
cancel('Operation cancelled.');
throw new KtxCliPromptCancelledError();
}
return value;
},
cancel(message) {
cancel(message);
},
log: {
info(message) {
log.info(message);
},
warn(message) {
log.warn(message);
},
error(message) {
log.error(message);
},
success(message) {
log.success(message);
},
step(message) {
log.step(message);
},
},
spinner() {
return createClackSpinner();
},
};
}

View file

@ -2,6 +2,7 @@ import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { createRequire } from 'node:module';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { initKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
@ -333,6 +334,23 @@ describe('runKtxCli', () => {
expect(testIo.stderr()).toBe('');
});
it('keeps representative JSON command stdout parseable', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir, projectName: 'warehouse' });
const commands = [
['--project-dir', projectDir, 'setup', 'status', '--json'],
['--project-dir', projectDir, 'sl', 'list', '--json'],
];
for (const argv of commands) {
const testIo = makeIo();
await expect(runKtxCli(argv, testIo.io)).resolves.toBe(0);
expect(() => JSON.parse(testIo.stdout())).not.toThrow();
expect(testIo.stderr()).toBe('');
}
});
it('starts setup for bare ktx in a TTY when no project is discoverable', async () => {
const { mkdtemp, realpath, rm } = await import('node:fs/promises');
const { tmpdir } = await import('node:os');

View file

@ -331,8 +331,9 @@ describe('runKtxIngest viz and replay', () => {
).resolves.toBe(0);
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stderr()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: plain-run');
expect(io.stdout()).not.toContain('[5%]');
expect(io.stdout()).not.toContain('KTX memory flow');
});
@ -407,8 +408,9 @@ describe('runKtxIngest viz and replay', () => {
expect(startLiveMemoryFlow).not.toHaveBeenCalled();
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stderr()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Job: raw-missing-viz-run');
expect(io.stdout()).not.toContain('[5%]');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdin raw mode is unavailable; printing plain output.',

View file

@ -546,7 +546,7 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync
),
).resolves.toBe(0);
expect(io.stderr()).toBe('');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`);

View file

@ -14,6 +14,7 @@ import {
import { initKtxProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { type KtxIngestArgs, runKtxIngest } from './ingest.js';
import type { KtxCliLocalIngestAdaptersOptions } from './local-adapters.js';
import {
CliLookerSlWritingAgentRunner,
CliMetabaseAgentRunner,
@ -229,7 +230,7 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stdout()).toContain('warehouse_a');
expect(io.stdout()).toContain('metabase-child-1');
expect(io.stderr()).toBe('');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
});
it('returns a non-zero code when Metabase fan-out has failed children', async () => {
@ -299,7 +300,7 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Metabase fan-out: partial_failure');
expect(io.stdout()).toContain('Failed work units: 1');
expect(io.stdout()).toContain('status=error');
expect(io.stderr()).toBe('');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
});
it('prints Metabase fan-out progress before the final summary', async () => {
@ -373,12 +374,56 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Metabase ingest: prod-metabase');
expect(io.stdout()).toContain('Targets: 1 mapped database');
expect(io.stdout()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1');
expect(io.stdout()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
expect(io.stderr()).toContain('Targets: 1 mapped database');
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1');
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1');
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stderr()).toBe('');
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
});
it('writes metabase fan-out progress to stderr and final result to stdout', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'prod-metabase',
adapter: 'metabase',
outputMode: 'plain',
},
io.io,
{
runLocalMetabaseIngest: async (input) => {
input.progress?.onMetabaseFanoutPlanned?.({
metabaseConnectionId: 'prod-metabase',
children: [{ metabaseDatabaseId: 1, targetConnectionId: 'warehouse_a' }],
});
input.progress?.onMetabaseChildStarted?.({
metabaseConnectionId: 'prod-metabase',
metabaseDatabaseId: 1,
targetConnectionId: 'warehouse_a',
jobId: 'metabase-child-1',
});
return {
metabaseConnectionId: 'prod-metabase',
status: 'all_succeeded',
totals: { workUnits: 0, failedWorkUnits: 0 },
children: [],
};
},
},
),
).resolves.toBe(0);
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
expect(io.stderr()).toContain('status=running job=metabase-child-1');
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
});
it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => {
@ -463,7 +508,8 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(io.stderr()).toBe('');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
expect(io.stderr()).toContain('Targets: 2 mapped databases');
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stdout()).toContain('Source: prod-metabase');
expect(io.stdout()).toContain('Children: 2');
@ -553,6 +599,46 @@ describe('runKtxIngest', () => {
expect(io.stderr()).toBe('');
});
it('keeps metabase JSON stdout free of operational adapter logs', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
const io = makeIo();
let adapterOptions: KtxCliLocalIngestAdaptersOptions | undefined;
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'prod-metabase',
adapter: 'metabase',
outputMode: 'json',
},
io.io,
{
createAdapters: (_project, options) => {
adapterOptions = options;
options?.logger?.warn('adapter warning');
return [];
},
runLocalMetabaseIngest: async (input) => {
input.adapters.find((adapter) => adapter.source === 'metabase');
return {
metabaseConnectionId: 'prod-metabase',
status: 'all_succeeded',
totals: { workUnits: 0, failedWorkUnits: 0 },
children: [],
};
},
},
),
).resolves.toBe(0);
expect(adapterOptions?.logger).toEqual(expect.objectContaining({ warn: expect.any(Function) }));
expect(() => JSON.parse(io.stdout())).not.toThrow();
expect(io.stderr()).toBe('');
});
it('rejects source-dir uploads through the metabase fan-out route', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
@ -764,17 +850,22 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
adapters: createdAdapters,
adapter: 'fake',
connectionId: 'warehouse',
pullConfigOptions: {
pullConfigOptions: expect.objectContaining({
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
},
logger: expect.any(Object),
}),
}),
);
});
@ -817,14 +908,19 @@ describe('runKtxIngest', () => {
installPolicy: 'auto',
io: io.io,
};
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
managedDaemon: expectedManagedDaemon,
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
managedDaemon: expectedManagedDaemon,
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
pullConfigOptions: {
pullConfigOptions: expect.objectContaining({
managedDaemon: expectedManagedDaemon,
},
logger: expect.any(Object),
}),
}),
);
});
@ -878,9 +974,13 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
historicSqlConnectionId: 'warehouse',
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
historicSqlConnectionId: 'warehouse',
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
adapters: createdAdapters,
@ -982,9 +1082,39 @@ describe('runKtxIngest', () => {
expect(stdout).toContain('[45%] Planned 1 work unit');
expect(stdout).toContain('[80%] Processed 1/1 work units');
expect(stdout).toContain('[100%] Ingest completed');
expect(stdout.indexOf('[5%] Fetching source files for warehouse/historic-sql')).toBeLessThan(
stdout.indexOf('Report: report-live-1'),
);
expect(stdout).toContain('Report: report-live-1');
expect(io.stderr()).toBe('');
});
it('writes plain TTY ingest progress and final report to stdout', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => completedLocalBundleRun(input, 'local-job-1'));
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'plain',
},
io.io,
{
env: interactiveEnv(),
runLocalIngest: runLocal,
},
),
).resolves.toBe(0);
expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake');
expect(io.stdout()).toContain('Report: report-live-1');
expect(io.stderr()).toBe('');
});
@ -1214,15 +1344,19 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
looker: {
parser: pullConfigOptions.looker.parser,
},
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
logger: expect.any(Object),
looker: {
parser: pullConfigOptions.looker.parser,
},
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
agentRunner,
pullConfigOptions,
pullConfigOptions: expect.objectContaining(pullConfigOptions),
}),
);
});

View file

@ -18,6 +18,7 @@ import {
} from '@ktx/context/ingest';
import { loadKtxProject } from '@ktx/context/project';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
import { createCliOperationalLogger } from './io/logger.js';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { type KtxMemoryFlowStdin, renderMemoryFlowInteractively } from './memory-flow-interactive.js';
@ -142,22 +143,22 @@ function createMetabaseFanoutProgress(
connectionId: string,
io: KtxIngestIo,
): LocalMetabaseFanoutProgress {
io.stdout.write(`Metabase ingest: ${connectionId}\n`);
io.stdout.write('Checking mappings and scheduled-pull targets...\n');
io.stderr.write(`Metabase ingest: ${connectionId}\n`);
io.stderr.write('Checking mappings and scheduled-pull targets...\n');
return {
onMetabaseFanoutPlanned(event) {
io.stdout.write(`Targets: ${pluralize(event.children.length, 'mapped database')}\n`);
io.stderr.write(`Targets: ${pluralize(event.children.length, 'mapped database')}\n`);
for (const child of event.children) {
io.stdout.write(`- database=${child.metabaseDatabaseId} target=${child.targetConnectionId} status=queued\n`);
io.stderr.write(`- database=${child.metabaseDatabaseId} target=${child.targetConnectionId} status=queued\n`);
}
},
onMetabaseChildStarted(event) {
io.stdout.write(
io.stderr.write(
`- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=running job=${event.jobId}\n`,
);
},
onMetabaseChildCompleted(event) {
io.stdout.write(
io.stderr.write(
`- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=${event.status} job=${event.jobId}\n`,
);
},
@ -506,11 +507,13 @@ export async function runKtxIngest(
const executeLocalIngest = deps.runLocalIngest ?? runLocalIngest;
const localIngestOptions = deps.localIngestOptions ?? {};
const managedDaemon = managedDaemonOptionsForIngestRun(args, io);
const operationalLogger = createCliOperationalLogger(io, args.outputMode);
const adapterOptions = {
...(localIngestOptions.pullConfigOptions ?? {}),
...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}),
...(managedDaemon ? { managedDaemon } : {}),
...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}),
logger: operationalLogger,
};
if (args.adapter === 'metabase' && args.sourceDir) {
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');

View file

@ -0,0 +1,65 @@
import { describe, expect, it, vi } from 'vitest';
import { createCliOperationalLogger, createNoopOperationalLogger } from './logger.js';
function makeIo() {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
describe('createCliOperationalLogger', () => {
it('routes operational messages to stderr outside JSON mode', () => {
const io = makeIo();
const logger = createCliOperationalLogger(io.io, 'plain');
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('progress\nwarning\nfailure\ndebug\n');
});
it('suppresses operational messages in JSON mode by default', () => {
const io = makeIo();
const logger = createCliOperationalLogger(io.io, 'json');
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('');
});
});
describe('createNoopOperationalLogger', () => {
it('never writes', () => {
const logger = createNoopOperationalLogger();
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(warn).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,40 @@
import type { KtxCliIo } from '../cli-runtime.js';
import type { KtxOutputMode } from './mode.js';
export interface KtxOperationalLogger {
log(message: string): void;
warn(message: string): void;
error(message: string): void;
debug?(message: string): void;
}
export type KtxOperationalOutputMode = KtxOutputMode | 'viz';
function writeLine(io: KtxCliIo, message: string): void {
io.stderr.write(message.endsWith('\n') ? message : `${message}\n`);
}
export function createNoopOperationalLogger(): KtxOperationalLogger {
return {
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
}
export function createCliOperationalLogger(
io: KtxCliIo,
mode: KtxOperationalOutputMode,
): KtxOperationalLogger {
if (mode === 'json') {
return createNoopOperationalLogger();
}
return {
log: (message) => writeLine(io, message),
warn: (message) => writeLine(io, message),
error: (message) => writeLine(io, message),
debug: (message) => writeLine(io, message),
};
}

View file

@ -28,6 +28,16 @@ export interface PrintListArgs<Row> {
io: KtxCliIo;
}
export interface KtxJsonResultEnvelope<T> {
kind: string;
data: T;
meta?: Record<string, unknown>;
}
export function writeJsonResult<T>(io: KtxCliIo, envelope: KtxJsonResultEnvelope<T>): void {
io.stdout.write(`${JSON.stringify(envelope, null, 2)}\n`);
}
export function printList<Row extends object>(args: PrintListArgs<Row>): void {
switch (args.mode) {
case 'json':
@ -61,12 +71,11 @@ function printListPlain<Row extends object>(args: PrintListArgs<Row>): void {
}
function printListJson<Row extends object>(args: PrintListArgs<Row>): void {
const envelope = {
writeJsonResult(args.io, {
kind: 'list',
data: { items: args.rows },
meta: { command: args.command },
};
args.io.stdout.write(`${JSON.stringify(envelope, null, 2)}\n`);
});
}
function pluralize(count: number, singular: string): string {

View file

@ -35,6 +35,7 @@ import {
managedDaemonDatabaseIntrospectionOptions,
type ManagedPythonCoreDaemonOptions,
} from './managed-python-http.js';
import type { KtxOperationalLogger } from './io/logger.js';
function hasSnowflakeDriver(connection: unknown): boolean {
return (
@ -162,6 +163,7 @@ export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdap
sqlAnalysis?: SqlAnalysisPort;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
logger?: KtxOperationalLogger;
}
function historicSqlRecord(connection: unknown): Record<string, unknown> | null {

View file

@ -214,6 +214,7 @@ describe('createManagedPythonSemanticLayerComputePort', () => {
expect(confirmInstall).toHaveBeenCalledWith(
'KTX needs to install the core Python runtime. This downloads Python dependencies with uv. Continue?',
io.io,
);
expect(installRuntime).toHaveBeenCalledWith({
cliVersion: '0.2.0',
@ -221,4 +222,45 @@ describe('createManagedPythonSemanticLayerComputePort', () => {
force: false,
});
});
it('uses injected runtime confirmation instead of reading process TTY directly', async () => {
const io = makeIo();
const compute = { query: vi.fn(), validateSources: vi.fn(), generateSources: vi.fn() };
const installRuntime = vi.fn(async (): Promise<ManagedPythonRuntimeInstallResult> => installResult());
const confirmInstall = vi.fn(async () => true);
await expect(
createManagedPythonSemanticLayerComputePort({
cliVersion: '0.2.0',
installPolicy: 'prompt',
io: io.io,
readStatus: async () => missingStatus(),
installRuntime,
confirmInstall,
createPythonCompute: () => compute,
}),
).resolves.toBe(compute);
expect(confirmInstall).toHaveBeenCalledWith(
'KTX needs to install the core Python runtime. This downloads Python dependencies with uv. Continue?',
io.io,
);
expect(io.stderr()).toContain('Installing KTX Python runtime (core) with uv...');
});
it('can decide default runtime prompting from injected io capabilities', async () => {
const io = makeIo();
Object.assign(io.io.stdout, { isTTY: false });
await expect(
createManagedPythonSemanticLayerComputePort({
cliVersion: '0.2.0',
installPolicy: 'prompt',
io: io.io,
readStatus: async () => missingStatus(),
installRuntime: vi.fn(),
createPythonCompute: () => ({ query: vi.fn(), validateSources: vi.fn(), generateSources: vi.fn() }),
}),
).rejects.toThrow('KTX Python runtime installation was cancelled');
});
});

View file

@ -1,6 +1,6 @@
import { cancel, confirm, isCancel } from '@clack/prompts';
import { createPythonSemanticLayerComputePort, type KtxSemanticLayerComputePort } from '@ktx/context/daemon';
import type { KtxCliIo } from './cli-runtime.js';
import { createClackPromptAdapter } from './clack.js';
import {
installManagedPythonRuntime,
readManagedPythonRuntimeStatus,
@ -36,7 +36,7 @@ export interface ManagedPythonCommandRuntime {
export interface ManagedPythonCommandDeps {
readStatus?: (options: ManagedPythonRuntimeLayoutOptions) => Promise<ManagedPythonRuntimeStatus>;
installRuntime?: (options: ManagedPythonRuntimeInstallOptions) => Promise<ManagedPythonRuntimeInstallResult>;
confirmInstall?: (message: string) => Promise<boolean>;
confirmInstall?: (message: string, io: KtxCliIo) => Promise<boolean>;
}
export interface ManagedPythonCommandOptions extends ManagedPythonCommandDeps {
@ -69,16 +69,12 @@ function hasFeature(manifest: InstalledKtxRuntimeManifest, feature: KtxRuntimeFe
return manifest.features.includes(feature);
}
async function defaultConfirmInstall(message: string): Promise<boolean> {
if (process.stdin.isTTY !== true || process.stdout.isTTY !== true) {
async function defaultConfirmInstall(message: string, io: KtxCliIo): Promise<boolean> {
if (io.stdout.isTTY !== true) {
return false;
}
const response = await confirm({ message, initialValue: true });
if (isCancel(response)) {
cancel('Runtime installation cancelled.');
return false;
}
return response === true;
const prompts = createClackPromptAdapter();
return await prompts.confirm({ message, initialValue: true });
}
export async function ensureManagedPythonCommandRuntime(
@ -99,7 +95,7 @@ export async function ensureManagedPythonCommandRuntime(
if (options.installPolicy === 'prompt') {
const confirmInstall = options.confirmInstall ?? defaultConfirmInstall;
const confirmed = await confirmInstall(installPrompt(feature));
const confirmed = await confirmInstall(installPrompt(feature), options.io);
if (!confirmed) {
throw new Error(`KTX Python runtime installation was cancelled. Run: ${managedRuntimeInstallCommand(feature)}`);
}

View file

@ -398,10 +398,18 @@ joins: []
listIo.io,
);
expect(code).toBe(0);
expect(listIo.stderr()).toBe('');
const parsed = JSON.parse(listIo.stdout());
expect(parsed.kind).toBe('list');
expect(parsed.meta).toEqual({ command: 'sl list' });
expect(parsed).toMatchObject({
kind: 'list',
data: {
items: expect.any(Array),
},
meta: {
command: 'sl list',
},
});
expect(parsed.data.items).toHaveLength(1);
expect(parsed.data.items[0]).toMatchObject({
connectionId: 'warehouse',

View file

@ -112,6 +112,24 @@ describe('LookerClient', () => {
});
});
it('does not warn to console when optional prioritization inputs fail by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const fakeSdk = sdk({
search_dashboards: vi.fn().mockRejectedValue(new Error('dashboards unavailable')),
search_looks: vi.fn().mockRejectedValue(new Error('looks unavailable')),
});
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });
await expect(client.getSignals()).resolves.toMatchObject({
dashboardUsage: [],
lookUsage: [],
scheduledPlans: [],
favorites: [],
});
expect(warn).not.toHaveBeenCalled();
});
it('maps dashboards, looks, folders, models, explores, users, and groups to staged DTOs', async () => {
const fakeSdk = sdk();
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });

View file

@ -80,10 +80,10 @@ export interface LookerClientDeps {
}
const defaultLogger: LookerClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
class InlineLookerSettings extends NodeSettings {

View file

@ -1,4 +1,5 @@
import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js';
import type { LookerClientLogger } from './client.js';
import {
DefaultLookerClientFactory,
DefaultLookerConnectionClientFactory,
@ -59,8 +60,11 @@ export function createLocalLookerCredentialResolver(
export function createLocalLookerSourceAdapter(
project: KtxLocalProject,
env: NodeJS.ProcessEnv = process.env,
logger?: LookerClientLogger,
): LookerSourceAdapter {
const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env));
const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env), {
...(logger ? { logger } : {}),
});
return new LookerSourceAdapter({
clientFactory: new DefaultLookerClientFactory(connectionFactory),
});

View file

@ -72,6 +72,27 @@ describe('MetabaseClient retry exhaustion', () => {
vi.restoreAllMocks();
});
it('does not warn to console when retrying by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
globalThis.fetch = vi
.fn<typeof fetch>()
.mockRejectedValueOnce(Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' }))
.mockResolvedValueOnce(new Response(JSON.stringify([]), { status: 200 }));
const client = new MetabaseClient(
{ apiUrl: 'https://metabase.example.test', apiKey: 'key' },
{
...DEFAULT_METABASE_CLIENT_CONFIG,
baseDelayMs: 0,
maxRetries: 1,
},
);
await client.getDatabases();
expect(warn).not.toHaveBeenCalled();
});
it('wraps an exhausted ECONNRESET retry chain with method, path, attempt count, and original cause', async () => {
const sysErr = Object.assign(new Error('read ECONNRESET'), {
code: 'ECONNRESET',

View file

@ -25,10 +25,10 @@ export interface MetabaseClientLogger {
}
const defaultLogger: MetabaseClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
interface TemplateTagInfo {

View file

@ -86,6 +86,7 @@ describe('fetchMetabaseBundle', () => {
});
afterEach(async () => {
vi.restoreAllMocks();
await rm(stagedDir, { recursive: true, force: true });
});
@ -115,6 +116,41 @@ describe('fetchMetabaseBundle', () => {
expect(card.archived).toBe(false);
});
it('does not write Metabase fetch progress to console by default', async () => {
const log = vi.spyOn(console, 'log').mockImplementation(() => undefined);
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
});
expect(log).not.toHaveBeenCalled();
expect(warn).not.toHaveBeenCalled();
});
it('routes Metabase fetch warnings through the injected logger', async () => {
const logger = {
log: vi.fn(),
warn: vi.fn(),
};
clientFactory.__client.getCard.mockRejectedValueOnce(new Error('card read failed'));
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
logger,
});
expect(logger.warn).toHaveBeenCalledWith('failed to load card 1: card read failed');
});
it('passes the Metabase source pull config and target fetch context to the client factory', async () => {
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },

View file

@ -21,9 +21,14 @@ class IngestInputError extends Error {
}
}
const logger = {
log: (message: string) => console.log(message),
warn: (message: string) => console.warn(message),
export interface MetabaseFetchLogger {
log(message: string): void;
warn(message: string): void;
}
const noopMetabaseFetchLogger: MetabaseFetchLogger = {
log: () => undefined,
warn: () => undefined,
};
export interface FetchMetabaseBundleParams {
@ -32,6 +37,7 @@ export interface FetchMetabaseBundleParams {
ctx: FetchContext;
clientFactory: MetabaseClientFactory;
sourceStateReader: MetabaseSourceStateReader;
logger?: MetabaseFetchLogger;
}
interface CollectionNode {
@ -76,6 +82,7 @@ function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId:
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
const logger = params.logger ?? noopMetabaseFetchLogger;
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
const mapping = syncState.mappings.find(
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,

View file

@ -1,12 +1,17 @@
import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js';
import { ktxLocalStateDbPath } from '../../../project/index.js';
import { resolveKtxConfigReference } from '../../../core/config-reference.js';
import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory } from './client.js';
import {
DEFAULT_METABASE_CLIENT_CONFIG,
DefaultMetabaseConnectionClientFactory,
type MetabaseClientLogger,
} from './client.js';
import {
IngestMetabaseClientFactory,
type MetabaseClientConfig,
type MetabaseClientRuntimeConfig,
} from './client-port.js';
import type { MetabaseFetchLogger } from './fetch.js';
import { LocalMetabaseSourceStateReader } from './local-source-state-store.js';
import { MetabaseSourceAdapter } from './metabase.adapter.js';
@ -50,6 +55,7 @@ export function metabaseRuntimeConfigFromLocalConnection(
interface CreateLocalMetabaseSourceAdapterOptions {
env?: NodeJS.ProcessEnv;
defaultClientConfig?: MetabaseClientConfig;
logger?: MetabaseClientLogger & MetabaseFetchLogger;
}
export function createLocalMetabaseSourceAdapter(
@ -65,9 +71,11 @@ export function createLocalMetabaseSourceAdapter(
options.env,
),
options.defaultClientConfig ?? DEFAULT_METABASE_CLIENT_CONFIG,
options.logger,
);
return new MetabaseSourceAdapter({
clientFactory: new IngestMetabaseClientFactory(connectionFactory),
sourceStateReader,
...(options.logger ? { logger: options.logger } : {}),
});
}

View file

@ -4,7 +4,7 @@ import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter
import { chunkMetabaseStagedDir } from './chunk.js';
import type { MetabaseClientFactory } from './client-port.js';
import { detectMetabaseStagedDir } from './detect.js';
import { fetchMetabaseBundle } from './fetch.js';
import { fetchMetabaseBundle, type MetabaseFetchLogger } from './fetch.js';
import { computeFetchScope, hashScope, isPathInMetabaseScope } from './fetch-scope.js';
import type { MetabaseSourceStateReader } from './source-state-port.js';
import { STAGED_FILES, stagedSyncConfigSchema } from './types.js';
@ -12,6 +12,7 @@ import { STAGED_FILES, stagedSyncConfigSchema } from './types.js';
export interface MetabaseSourceAdapterDeps {
clientFactory: MetabaseClientFactory;
sourceStateReader: MetabaseSourceStateReader;
logger?: MetabaseFetchLogger;
}
export class MetabaseSourceAdapter implements SourceAdapter {
@ -31,6 +32,7 @@ export class MetabaseSourceAdapter implements SourceAdapter {
ctx,
clientFactory: this.deps.clientFactory,
sourceStateReader: this.deps.sourceStateReader,
...(this.deps.logger ? { logger: this.deps.logger } : {}),
});
}

View file

@ -89,12 +89,13 @@ describe('fetchNotionSnapshot', () => {
});
it('logs skipped page materialization failures', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.retrievePage as ReturnType<typeof vi.fn>).mockRejectedValueOnce(new Error('Notion API failed'));
const manifest = await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -109,7 +110,7 @@ describe('fetchNotionSnapshot', () => {
});
expect(manifest.skipped).toEqual([{ externalId: 'page-1', reason: 'Notion API failed' }]);
expect(warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
expect(logger.warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
});
it('recursively fetches selected-root child pages and derives scoped links', async () => {
@ -191,7 +192,7 @@ describe('fetchNotionSnapshot', () => {
});
it('truncates deeply nested block trees and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockImplementation((blockId: string) => {
const currentDepth = blockId === 'page-1' ? 0 : Number(blockId.replace('block-', ''));
const nextDepth = currentDepth + 1;
@ -215,6 +216,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -232,11 +234,11 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(10);
expect(manifest.warnings).toContain('maxBlockDepth reached for page page-1 at depth 10');
expect(warnSpy).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
expect(logger.warn).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
});
it('truncates pages at the per-page block cap and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockResolvedValue({
results: Array.from({ length: 2001 }, (_, index) => ({
id: `block-${index}`,
@ -250,6 +252,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -267,7 +270,7 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(2000);
expect(manifest.warnings).toContain('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(warnSpy).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(logger.warn).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
});
it('uses all_accessible search for pages and data sources', async () => {

View file

@ -12,10 +12,19 @@ import {
type NotionPullConfig,
} from './types.js';
export interface NotionFetchLogger {
warn(message: string): void;
}
const noopNotionFetchLogger: NotionFetchLogger = {
warn: () => undefined,
};
interface FetchNotionSnapshotParams {
client: NotionApi;
config: NotionPullConfig;
stagedDir: string;
logger?: NotionFetchLogger;
}
interface CrawlState {
@ -23,6 +32,7 @@ interface CrawlState {
databaseCount: number;
dataSourceCount: number;
capped: boolean;
logger: NotionFetchLogger;
skipped: Array<{ externalId: string; reason: string }>;
warnings: string[];
materializedPageTargets: Set<string>;
@ -44,9 +54,6 @@ interface NotionLinks {
const DEFAULT_MAX_BLOCK_DEPTH = 10;
const DEFAULT_MAX_BLOCKS_PER_PAGE = 2000;
const logger = {
warn: (message: string) => console.warn(message),
};
async function writeJson(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
@ -58,7 +65,12 @@ async function writeText(path: string, value: string): Promise<void> {
await writeFile(path, value.endsWith('\n') ? value : `${value}\n`, 'utf-8');
}
function addWarning(warnings: string[], warning: string, logWarning = false): void {
function addWarning(
warnings: string[],
warning: string,
logWarning = false,
logger: NotionFetchLogger = noopNotionFetchLogger,
): void {
if (!warnings.includes(warning)) {
warnings.push(warning);
if (logWarning) {
@ -119,11 +131,21 @@ async function visitPaginated<T>(params: {
} while (cursor);
}
function addBlockCountWarning(state: BlockCollectionState, warnings: string[], pageId: string): void {
function addBlockCountWarning(
state: BlockCollectionState,
warnings: string[],
pageId: string,
logger: NotionFetchLogger,
): void {
if (state.blockCountWarningWritten) {
return;
}
addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, true);
addWarning(
warnings,
`maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`,
true,
logger,
);
state.blockCountWarningWritten = true;
}
@ -134,18 +156,19 @@ async function collectBlockChildren(params: {
depth: number;
warnings: string[];
state: BlockCollectionState;
logger: NotionFetchLogger;
}): Promise<void> {
let cursor: string | null = null;
do {
const remainingBlocks = DEFAULT_MAX_BLOCKS_PER_PAGE - params.state.blocks.length;
if (remainingBlocks <= 0) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
const page = await params.client.listBlockChildren(params.blockId, cursor, Math.min(remainingBlocks, 100));
for (let index = 0; index < page.results.length; index += 1) {
if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
@ -159,9 +182,10 @@ async function collectBlockChildren(params: {
params.warnings,
`maxBlockDepth reached for page ${params.pageId} at depth ${DEFAULT_MAX_BLOCK_DEPTH}`,
true,
params.logger,
);
} else if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
} else {
await collectBlockChildren({
@ -171,6 +195,7 @@ async function collectBlockChildren(params: {
depth: blockDepth,
warnings: params.warnings,
state: params.state,
logger: params.logger,
});
}
}
@ -179,7 +204,7 @@ async function collectBlockChildren(params: {
params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE &&
(index < page.results.length - 1 || page.hasMore)
) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
}
@ -187,7 +212,12 @@ async function collectBlockChildren(params: {
} while (cursor);
}
async function collectBlockTree(client: NotionApi, pageId: string, warnings: string[]): Promise<NotionBlock[]> {
async function collectBlockTree(
client: NotionApi,
pageId: string,
warnings: string[],
logger: NotionFetchLogger,
): Promise<NotionBlock[]> {
const state: BlockCollectionState = { blocks: [], blockCountWarningWritten: false };
await collectBlockChildren({
client,
@ -196,6 +226,7 @@ async function collectBlockTree(client: NotionApi, pageId: string, warnings: str
depth: 0,
warnings,
state,
logger,
});
return state.blocks;
}
@ -341,7 +372,7 @@ async function materializePage(params: {
if (params.skipDataSourceRows && !params.dataSourceId && parentDataSourceId(page)) {
return;
}
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings);
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings, params.state.logger);
const metadata = normalizeNotionPageMetadata({
page,
fallbackPath: params.fallbackPath,
@ -374,7 +405,9 @@ async function materializePage(params: {
}
}
} catch (error) {
logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`);
params.state.logger.warn(
`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`,
);
params.state.skipped.push({
externalId: params.pageId,
reason: error instanceof Error ? error.message : String(error),
@ -491,6 +524,7 @@ async function materializeDatabase(params: {
export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Promise<NotionManifest> {
await mkdir(params.stagedDir, { recursive: true });
const logger = params.logger ?? noopNotionFetchLogger;
const configuredCursor = params.config.crawlMode === 'all_accessible' ? parseConfiguredCursor(params.config) : null;
const continuedFromCursor = configuredCursor !== null;
const state: CrawlState = {
@ -498,6 +532,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
databaseCount: 0,
dataSourceCount: 0,
capped: false,
logger,
skipped: [],
warnings: [],
materializedPageTargets: new Set(),

View file

@ -14,7 +14,7 @@ import type {
import { chunkNotionStagedDir, describeNotionScope } from './chunk.js';
import { clusterNotionWorkUnits } from './cluster.js';
import { detectNotionStagedDir } from './detect.js';
import { fetchNotionSnapshot } from './fetch.js';
import { fetchNotionSnapshot, type NotionFetchLogger } from './fetch.js';
import { NotionClient } from './notion-client.js';
import { parseNotionPullConfig } from './pull-config.js';
import { type NotionMetadata, notionManifestSchema, notionMetadataSchema } from './types.js';
@ -31,6 +31,7 @@ interface NotionPullSucceededContext {
export interface NotionSourceAdapterDeps {
onPullSucceeded?: (ctx: NotionPullSucceededContext) => Promise<void>;
logger?: NotionFetchLogger;
}
export class NotionSourceAdapter implements SourceAdapter {
@ -48,7 +49,12 @@ export class NotionSourceAdapter implements SourceAdapter {
async fetch(pullConfig: unknown, stagedDir: string, _ctx: FetchContext): Promise<void> {
const config = parseNotionPullConfig(pullConfig);
await fetchNotionSnapshot({ client: new NotionClient(config.authToken), config, stagedDir });
await fetchNotionSnapshot({
client: new NotionClient(config.authToken),
config,
stagedDir,
...(this.deps.logger ? { logger: this.deps.logger } : {}),
});
}
chunk(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult> {

View file

@ -19,6 +19,7 @@ import {
} from './adapters/live-database/daemon-introspection.js';
import { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js';
import { createDaemonLookerTableIdentifierParser } from './adapters/looker/daemon-table-identifier-parser.js';
import type { LookerClientLogger } from './adapters/looker/client.js';
import { DefaultLookerConnectionClientFactory } from './adapters/looker/factory.js';
import { createLocalLookerCredentialResolver } from './adapters/looker/local-looker.adapter.js';
import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js';
@ -32,9 +33,12 @@ import type { LookerRuntimeClient } from './adapters/looker/fetch.js';
import { LookmlSourceAdapter } from './adapters/lookml/lookml.adapter.js';
import { pullConfigFromIntegrationConfig } from './adapters/lookml/pull-config.js';
import { createLocalMetabaseSourceAdapter } from './adapters/metabase/local-metabase.adapter.js';
import type { MetabaseClientLogger } from './adapters/metabase/client.js';
import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js';
import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js';
import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js';
import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js';
import type { NotionFetchLogger } from './adapters/notion/fetch.js';
import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js';
import type { SourceAdapter } from './types.js';
@ -56,14 +60,23 @@ export interface DefaultLocalIngestAdaptersOptions {
parser?: LookerTableIdentifierParser;
env?: NodeJS.ProcessEnv;
};
logger?: LocalIngestOperationalLogger;
}
type LocalIngestOperationalLogger = MetabaseClientLogger &
MetabaseFetchLogger &
LookerClientLogger &
NotionFetchLogger;
export function createDefaultLocalIngestAdapters(
project: KtxLocalProject,
options: DefaultLocalIngestAdaptersOptions = {},
): SourceAdapter[] {
const lookerConnectionFactory = new DefaultLookerConnectionClientFactory(
createLocalLookerCredentialResolver(project, options.looker?.env),
{
...(options.logger ? { logger: options.logger } : {}),
},
);
const adapters: SourceAdapter[] = [
@ -77,7 +90,9 @@ export function createDefaultLocalIngestAdapters(
}),
new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
new DbtSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
createLocalMetabaseSourceAdapter(project),
createLocalMetabaseSourceAdapter(project, {
...(options.logger ? { logger: options.logger } : {}),
}),
new LookerSourceAdapter({
clientFactory: {
async createClient(config, ctx) {
@ -89,7 +104,9 @@ export function createDefaultLocalIngestAdapters(
},
}),
new MetricflowSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
new NotionSourceAdapter(),
new NotionSourceAdapter({
...(options.logger ? { logger: options.logger } : {}),
}),
];
if (options.historicSql) {

View file

@ -427,6 +427,69 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
catalog: null,
db: 'public',
name: `table_${index + 1}`,
kind: 'table' as const,
comment: null,
estimatedRows: 2,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number' as const,
nullable: false,
primaryKey: true,
comment: null,
},
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig('test').scan.relationships,
enabled: false,
};
await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'enriched',
connector: scanConnector,
context: { runId: 'scan-run-concurrent-descriptions' },
providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 }),
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
});
it('reports enrichment progress for countable stages', async () => {
const events: Array<{ progress: number; message?: string; transient?: boolean }> = [];
const progress = {
@ -713,7 +776,7 @@ describe('local scan enrichment', () => {
model: 'provider/embedding-model',
dimensions: 1536,
batchSize: 8,
openai: { api_key: 'env:OPENAI_API_KEY' },
openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret
},
},
{
@ -726,7 +789,7 @@ describe('local scan enrichment', () => {
{
createKtxLlmProvider: createKtxLlmProvider as any,
createKtxEmbeddingProvider: createKtxEmbeddingProvider as any,
env: { OPENAI_API_KEY: 'openai-key' },
env: { OPENAI_API_KEY: 'openai-key' }, // pragma: allowlist secret
},
);

View file

@ -1,4 +1,5 @@
import type { KtxLlmProvider } from '@ktx/llm';
import pLimit from 'p-limit';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
@ -40,6 +41,8 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
export interface DeterministicLocalScanEnrichmentProviderOptions {
embeddingDimensions?: number;
maxBatchSize?: number;
@ -322,41 +325,47 @@ async function generateDescriptions(input: {
await input.progress?.update(1, 'No tables to describe');
return updates;
}
for (const [index, table] of input.snapshot.tables.entries()) {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
updates.push({
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
});
}
const limitTable = pLimit(DESCRIPTION_TABLE_CONCURRENCY);
const tableUpdates = await Promise.all(
input.snapshot.tables.map((table, index) =>
limitTable(async () => {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
};
}),
),
);
updates.push(...tableUpdates);
await input.progress?.update(1, `Generated descriptions for ${totalTables} tables`);
return updates;
}

View file

@ -1,12 +1,11 @@
#!/usr/bin/env node
import { spawnSync } from 'node:child_process';
import { existsSync, readFileSync } from 'node:fs';
import { dirname, join, relative, sep } from 'node:path';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
const scriptPath = fileURLToPath(import.meta.url);
const ktxRoot = dirname(dirname(scriptPath));
const repoRoot = dirname(ktxRoot);
const packageNameByDir = new Map(
[
@ -35,7 +34,8 @@ const pythonPackageTests = new Map([
]);
function normalizeFilePath(filePath) {
return filePath.replaceAll('\\', '/').replace(/^\.\//, '');
const normalized = filePath.replaceAll('\\', '/').replace(/^\.\//, '');
return normalized.startsWith('ktx/') ? normalized.slice('ktx/'.length) : normalized;
}
function stablePush(commands, key, cmd, args) {
@ -68,13 +68,7 @@ export function planChecks(files) {
let runAllPythonTests = false;
for (const rawFile of files) {
const file = normalizeFilePath(rawFile);
if (!file.startsWith('ktx/')) {
continue;
}
const ktxFile = file.slice('ktx/'.length);
const ktxFile = normalizeFilePath(rawFile);
if (ktxFile.startsWith('packages/')) {
const [, packageDir, ...rest] = ktxFile.split('/');
@ -189,6 +183,6 @@ export function runChecks(files) {
return 0;
}
if (process.argv[1] && relative(repoRoot, process.argv[1]).split(sep).join('/') === 'ktx/scripts/precommit-check.mjs') {
if (process.argv[1] && resolve(process.argv[1]) === scriptPath) {
process.exitCode = runChecks(process.argv.slice(2));
}

View file

@ -12,7 +12,16 @@ describe('precommit-check', () => {
assert.deepEqual(commandKeys(['outside-workspace/src/app.ts']), []);
});
it('runs only the touched package checks for package code', () => {
it('runs only the touched package checks for standalone package paths', () => {
assert.deepEqual(commandKeys(['packages/cli/src/index.ts']), [
'boundary-check',
'type-check:@ktx/cli',
'build:@ktx/cli',
'test:@ktx/cli',
]);
});
it('accepts legacy subtree-prefixed package paths', () => {
assert.deepEqual(commandKeys(['ktx/packages/cli/src/index.ts']), [
'boundary-check',
'type-check:@ktx/cli',
@ -22,12 +31,12 @@ describe('precommit-check', () => {
});
it('runs the matching script test when a script changes', () => {
assert.deepEqual(commandKeys(['ktx/scripts/check-boundaries.mjs']), [
assert.deepEqual(commandKeys(['scripts/check-boundaries.mjs']), [
'script-test:scripts/check-boundaries.test.mjs',
]);
});
it('runs the touched python package tests', () => {
assert.deepEqual(commandKeys(['ktx/python/ktx-sl/semantic_layer/parser.py']), ['pytest:ktx-sl']);
assert.deepEqual(commandKeys(['python/ktx-sl/semantic_layer/parser.py']), ['pytest:ktx-sl']);
});
});