diff --git a/.gitignore b/.gitignore index ed14196b..112e7faa 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,4 @@ yarn-error.log* *.swo *~ .vercel +.devtools diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..555d7fc0 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,70 @@ +# See https://pre-commit.com for hook documentation. +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-json + - id: check-toml + - id: check-added-large-files + args: ["--maxkb=1000"] + - id: check-merge-conflict + - id: check-case-conflict + - id: mixed-line-ending + + - repo: https://github.com/asottile/pyupgrade + rev: v3.21.2 + hooks: + - id: pyupgrade + name: pyupgrade (python) + files: ^python/ + args: [--py313-plus] + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.15.2 + hooks: + - id: ruff + name: ruff (python) + files: ^python/ + args: [--fix, --exit-non-zero-on-fix] + - id: ruff-format + name: ruff format (python) + files: ^python/ + + - repo: local + hooks: + - id: ktx-package-checks + name: ktx package checks + entry: node scripts/precommit-check.mjs + language: system + files: ^(packages/|scripts/|python/|package\.json$|pnpm-lock\.yaml$|pnpm-workspace\.yaml$|release-policy\.json$|tsconfig\.base\.json$|pyproject\.toml$|uv\.lock$|uv\.toml$) + + - repo: https://github.com/Yelp/detect-secrets + rev: v1.5.0 + hooks: + - id: detect-secrets + exclude: | + (?x)^( + .*\.lock$| + .*pnpm-lock\.yaml$| + .*package-lock\.json$| + .*yarn\.lock$| + .*\.log$| + .*\.dump$| + .*\.sql$| + .*\.csv$| + .*\.db$| + .*\.sqlite$| + .*\.sqlite3$| + .*/node_modules/.*| + .*/\.venv/.*| + .*/dist/.*| + .*/build/.*| + .*/coverage/.*| + .*/htmlcov/.*| + .*\.gen\.ts$| + .*\.gen\.py$| + .*\.generated\.ts$ + )$ diff --git a/packages/cli/src/clack.ts b/packages/cli/src/clack.ts index e7083df9..fc24f1e7 100644 --- a/packages/cli/src/clack.ts +++ b/packages/cli/src/clack.ts @@ -1,4 +1,4 @@ -import { spinner } from '@clack/prompts'; +import { cancel, confirm, isCancel, log, spinner } from '@clack/prompts'; export interface KtxCliSpinner { start(message: string): void; @@ -6,6 +6,62 @@ export interface KtxCliSpinner { error(message: string): void; } +export interface KtxCliPromptAdapter { + confirm(options: { message: string; initialValue?: boolean }): Promise; + cancel(message: string): void; + log: { + info(message: string): void; + warn(message: string): void; + error(message: string): void; + success(message: string): void; + step(message: string): void; + }; + spinner(): KtxCliSpinner; +} + +export class KtxCliPromptCancelledError extends Error { + constructor(message = 'Operation cancelled.') { + super(message); + this.name = 'KtxCliPromptCancelledError'; + } +} + export function createClackSpinner(): KtxCliSpinner { return spinner(); } + +export function createClackPromptAdapter(): KtxCliPromptAdapter { + return { + async confirm(options) { + const value = await confirm(options); + if (isCancel(value)) { + cancel('Operation cancelled.'); + throw new KtxCliPromptCancelledError(); + } + return value; + }, + cancel(message) { + cancel(message); + }, + log: { + info(message) { + log.info(message); + }, + warn(message) { + log.warn(message); + }, + error(message) { + log.error(message); + }, + success(message) { + log.success(message); + }, + step(message) { + log.step(message); + }, + }, + spinner() { + return createClackSpinner(); + }, + }; +} diff --git a/packages/cli/src/index.test.ts b/packages/cli/src/index.test.ts index 4a45274b..87a0089f 100644 --- a/packages/cli/src/index.test.ts +++ b/packages/cli/src/index.test.ts @@ -2,6 +2,7 @@ import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { createRequire } from 'node:module'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; +import { initKtxProject } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { @@ -333,6 +334,23 @@ describe('runKtxCli', () => { expect(testIo.stderr()).toBe(''); }); + it('keeps representative JSON command stdout parseable', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir, projectName: 'warehouse' }); + const commands = [ + ['--project-dir', projectDir, 'setup', 'status', '--json'], + ['--project-dir', projectDir, 'sl', 'list', '--json'], + ]; + + for (const argv of commands) { + const testIo = makeIo(); + await expect(runKtxCli(argv, testIo.io)).resolves.toBe(0); + + expect(() => JSON.parse(testIo.stdout())).not.toThrow(); + expect(testIo.stderr()).toBe(''); + } + }); + it('starts setup for bare ktx in a TTY when no project is discoverable', async () => { const { mkdtemp, realpath, rm } = await import('node:fs/promises'); const { tmpdir } = await import('node:os'); diff --git a/packages/cli/src/ingest-viz.test.ts b/packages/cli/src/ingest-viz.test.ts index 1347b3a8..6963d277 100644 --- a/packages/cli/src/ingest-viz.test.ts +++ b/packages/cli/src/ingest-viz.test.ts @@ -331,8 +331,9 @@ describe('runKtxIngest viz and replay', () => { ).resolves.toBe(0); expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() })); - expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake'); + expect(io.stderr()).toContain('[5%] Fetching source files for warehouse/fake'); expect(io.stdout()).toContain('Job: plain-run'); + expect(io.stdout()).not.toContain('[5%]'); expect(io.stdout()).not.toContain('KTX memory flow'); }); @@ -407,8 +408,9 @@ describe('runKtxIngest viz and replay', () => { expect(startLiveMemoryFlow).not.toHaveBeenCalled(); expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.anything() })); - expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake'); + expect(io.stderr()).toContain('[5%] Fetching source files for warehouse/fake'); expect(io.stdout()).toContain('Job: raw-missing-viz-run'); + expect(io.stdout()).not.toContain('[5%]'); expect(io.stdout()).not.toContain('KTX memory flow'); expect(io.stderr()).toContain( 'Visualization requested but stdin raw mode is unavailable; printing plain output.', diff --git a/packages/cli/src/ingest.test-utils.ts b/packages/cli/src/ingest.test-utils.ts index a83b38be..71d85c6c 100644 --- a/packages/cli/src/ingest.test-utils.ts +++ b/packages/cli/src/ingest.test-utils.ts @@ -546,7 +546,7 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync ), ).resolves.toBe(0); - expect(io.stderr()).toBe(''); + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); expect(io.stdout()).toContain('Metabase fan-out: all_succeeded'); expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`); diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 0307ca9e..b547d185 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -14,6 +14,7 @@ import { import { initKtxProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { type KtxIngestArgs, runKtxIngest } from './ingest.js'; +import type { KtxCliLocalIngestAdaptersOptions } from './local-adapters.js'; import { CliLookerSlWritingAgentRunner, CliMetabaseAgentRunner, @@ -229,7 +230,7 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Metabase fan-out: all_succeeded'); expect(io.stdout()).toContain('warehouse_a'); expect(io.stdout()).toContain('metabase-child-1'); - expect(io.stderr()).toBe(''); + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); it('returns a non-zero code when Metabase fan-out has failed children', async () => { @@ -299,7 +300,7 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Metabase fan-out: partial_failure'); expect(io.stdout()).toContain('Failed work units: 1'); expect(io.stdout()).toContain('status=error'); - expect(io.stderr()).toBe(''); + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); it('prints Metabase fan-out progress before the final summary', async () => { @@ -373,12 +374,56 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(io.stdout()).toContain('Metabase ingest: prod-metabase'); - expect(io.stdout()).toContain('Targets: 1 mapped database'); - expect(io.stdout()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1'); - expect(io.stdout()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1'); + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); + expect(io.stderr()).toContain('Targets: 1 mapped database'); + expect(io.stderr()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1'); + expect(io.stderr()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1'); expect(io.stdout()).toContain('Metabase fan-out: all_succeeded'); - expect(io.stderr()).toBe(''); + expect(io.stdout()).not.toContain('status=running job=metabase-child-1'); + }); + + it('writes metabase fan-out progress to stderr and final result to stdout', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'plain', + }, + io.io, + { + runLocalMetabaseIngest: async (input) => { + input.progress?.onMetabaseFanoutPlanned?.({ + metabaseConnectionId: 'prod-metabase', + children: [{ metabaseDatabaseId: 1, targetConnectionId: 'warehouse_a' }], + }); + input.progress?.onMetabaseChildStarted?.({ + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + jobId: 'metabase-child-1', + }); + return { + metabaseConnectionId: 'prod-metabase', + status: 'all_succeeded', + totals: { workUnits: 0, failedWorkUnits: 0 }, + children: [], + }; + }, + }, + ), + ).resolves.toBe(0); + + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); + expect(io.stderr()).toContain('status=running job=metabase-child-1'); + expect(io.stdout()).toContain('Metabase fan-out: all_succeeded'); + expect(io.stdout()).not.toContain('status=running job=metabase-child-1'); }); it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => { @@ -463,7 +508,8 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(io.stderr()).toBe(''); + expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); + expect(io.stderr()).toContain('Targets: 2 mapped databases'); expect(io.stdout()).toContain('Metabase fan-out: all_succeeded'); expect(io.stdout()).toContain('Source: prod-metabase'); expect(io.stdout()).toContain('Children: 2'); @@ -553,6 +599,46 @@ describe('runKtxIngest', () => { expect(io.stderr()).toBe(''); }); + it('keeps metabase JSON stdout free of operational adapter logs', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + let adapterOptions: KtxCliLocalIngestAdaptersOptions | undefined; + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'json', + }, + io.io, + { + createAdapters: (_project, options) => { + adapterOptions = options; + options?.logger?.warn('adapter warning'); + return []; + }, + runLocalMetabaseIngest: async (input) => { + input.adapters.find((adapter) => adapter.source === 'metabase'); + return { + metabaseConnectionId: 'prod-metabase', + status: 'all_succeeded', + totals: { workUnits: 0, failedWorkUnits: 0 }, + children: [], + }; + }, + }, + ), + ).resolves.toBe(0); + + expect(adapterOptions?.logger).toEqual(expect.objectContaining({ warn: expect.any(Function) })); + expect(() => JSON.parse(io.stdout())).not.toThrow(); + expect(io.stderr()).toBe(''); + }); + it('rejects source-dir uploads through the metabase fan-out route', async () => { const projectDir = join(tempDir, 'project'); await writeMetabaseConfig(projectDir); @@ -764,17 +850,22 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { - databaseIntrospectionUrl: 'http://127.0.0.1:8765', - }); + expect(createAdapters).toHaveBeenCalledWith( + expect.objectContaining({ projectDir }), + expect.objectContaining({ + databaseIntrospectionUrl: 'http://127.0.0.1:8765', + logger: expect.any(Object), + }), + ); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ adapters: createdAdapters, adapter: 'fake', connectionId: 'warehouse', - pullConfigOptions: { + pullConfigOptions: expect.objectContaining({ databaseIntrospectionUrl: 'http://127.0.0.1:8765', - }, + logger: expect.any(Object), + }), }), ); }); @@ -817,14 +908,19 @@ describe('runKtxIngest', () => { installPolicy: 'auto', io: io.io, }; - expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { - managedDaemon: expectedManagedDaemon, - }); + expect(createAdapters).toHaveBeenCalledWith( + expect.objectContaining({ projectDir }), + expect.objectContaining({ + managedDaemon: expectedManagedDaemon, + logger: expect.any(Object), + }), + ); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ - pullConfigOptions: { + pullConfigOptions: expect.objectContaining({ managedDaemon: expectedManagedDaemon, - }, + logger: expect.any(Object), + }), }), ); }); @@ -878,9 +974,13 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { - historicSqlConnectionId: 'warehouse', - }); + expect(createAdapters).toHaveBeenCalledWith( + expect.objectContaining({ projectDir }), + expect.objectContaining({ + historicSqlConnectionId: 'warehouse', + logger: expect.any(Object), + }), + ); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ adapters: createdAdapters, @@ -982,9 +1082,39 @@ describe('runKtxIngest', () => { expect(stdout).toContain('[45%] Planned 1 work unit'); expect(stdout).toContain('[80%] Processed 1/1 work units'); expect(stdout).toContain('[100%] Ingest completed'); - expect(stdout.indexOf('[5%] Fetching source files for warehouse/historic-sql')).toBeLessThan( - stdout.indexOf('Report: report-live-1'), - ); + expect(stdout).toContain('Report: report-live-1'); + expect(io.stderr()).toBe(''); + }); + + it('writes plain TTY ingest progress and final report to stdout', async () => { + const projectDir = join(tempDir, 'project'); + await writeWarehouseConfig(projectDir); + const sourceDir = join(tempDir, 'source'); + await mkdir(join(sourceDir, 'orders'), { recursive: true }); + await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => completedLocalBundleRun(input, 'local-job-1')); + const io = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'fake', + sourceDir, + outputMode: 'plain', + }, + io.io, + { + env: interactiveEnv(), + runLocalIngest: runLocal, + }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('[5%] Fetching source files for warehouse/fake'); + expect(io.stdout()).toContain('Report: report-live-1'); expect(io.stderr()).toBe(''); }); @@ -1214,15 +1344,19 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(0); - expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { - looker: { - parser: pullConfigOptions.looker.parser, - }, - }); + expect(createAdapters).toHaveBeenCalledWith( + expect.objectContaining({ projectDir }), + expect.objectContaining({ + logger: expect.any(Object), + looker: { + parser: pullConfigOptions.looker.parser, + }, + }), + ); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ agentRunner, - pullConfigOptions, + pullConfigOptions: expect.objectContaining(pullConfigOptions), }), ); }); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 5eadce29..571bc1ef 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -18,6 +18,7 @@ import { } from '@ktx/context/ingest'; import { loadKtxProject } from '@ktx/context/project'; import { readIngestReportSnapshotFile } from './ingest-report-file.js'; +import { createCliOperationalLogger } from './io/logger.js'; import { createKtxCliLocalIngestAdapters } from './local-adapters.js'; import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js'; import { type KtxMemoryFlowStdin, renderMemoryFlowInteractively } from './memory-flow-interactive.js'; @@ -142,22 +143,22 @@ function createMetabaseFanoutProgress( connectionId: string, io: KtxIngestIo, ): LocalMetabaseFanoutProgress { - io.stdout.write(`Metabase ingest: ${connectionId}\n`); - io.stdout.write('Checking mappings and scheduled-pull targets...\n'); + io.stderr.write(`Metabase ingest: ${connectionId}\n`); + io.stderr.write('Checking mappings and scheduled-pull targets...\n'); return { onMetabaseFanoutPlanned(event) { - io.stdout.write(`Targets: ${pluralize(event.children.length, 'mapped database')}\n`); + io.stderr.write(`Targets: ${pluralize(event.children.length, 'mapped database')}\n`); for (const child of event.children) { - io.stdout.write(`- database=${child.metabaseDatabaseId} target=${child.targetConnectionId} status=queued\n`); + io.stderr.write(`- database=${child.metabaseDatabaseId} target=${child.targetConnectionId} status=queued\n`); } }, onMetabaseChildStarted(event) { - io.stdout.write( + io.stderr.write( `- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=running job=${event.jobId}\n`, ); }, onMetabaseChildCompleted(event) { - io.stdout.write( + io.stderr.write( `- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=${event.status} job=${event.jobId}\n`, ); }, @@ -506,11 +507,13 @@ export async function runKtxIngest( const executeLocalIngest = deps.runLocalIngest ?? runLocalIngest; const localIngestOptions = deps.localIngestOptions ?? {}; const managedDaemon = managedDaemonOptionsForIngestRun(args, io); + const operationalLogger = createCliOperationalLogger(io, args.outputMode); const adapterOptions = { ...(localIngestOptions.pullConfigOptions ?? {}), ...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}), ...(managedDaemon ? { managedDaemon } : {}), ...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}), + logger: operationalLogger, }; if (args.adapter === 'metabase' && args.sourceDir) { throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter'); diff --git a/packages/cli/src/io/logger.test.ts b/packages/cli/src/io/logger.test.ts new file mode 100644 index 00000000..bf21a150 --- /dev/null +++ b/packages/cli/src/io/logger.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it, vi } from 'vitest'; +import { createCliOperationalLogger, createNoopOperationalLogger } from './logger.js'; + +function makeIo() { + let stdout = ''; + let stderr = ''; + return { + io: { + stdout: { + write: (chunk: string) => { + stdout += chunk; + }, + }, + stderr: { + write: (chunk: string) => { + stderr += chunk; + }, + }, + }, + stdout: () => stdout, + stderr: () => stderr, + }; +} + +describe('createCliOperationalLogger', () => { + it('routes operational messages to stderr outside JSON mode', () => { + const io = makeIo(); + const logger = createCliOperationalLogger(io.io, 'plain'); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(io.stdout()).toBe(''); + expect(io.stderr()).toBe('progress\nwarning\nfailure\ndebug\n'); + }); + + it('suppresses operational messages in JSON mode by default', () => { + const io = makeIo(); + const logger = createCliOperationalLogger(io.io, 'json'); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(io.stdout()).toBe(''); + expect(io.stderr()).toBe(''); + }); +}); + +describe('createNoopOperationalLogger', () => { + it('never writes', () => { + const logger = createNoopOperationalLogger(); + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(warn).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cli/src/io/logger.ts b/packages/cli/src/io/logger.ts new file mode 100644 index 00000000..e9952254 --- /dev/null +++ b/packages/cli/src/io/logger.ts @@ -0,0 +1,40 @@ +import type { KtxCliIo } from '../cli-runtime.js'; +import type { KtxOutputMode } from './mode.js'; + +export interface KtxOperationalLogger { + log(message: string): void; + warn(message: string): void; + error(message: string): void; + debug?(message: string): void; +} + +export type KtxOperationalOutputMode = KtxOutputMode | 'viz'; + +function writeLine(io: KtxCliIo, message: string): void { + io.stderr.write(message.endsWith('\n') ? message : `${message}\n`); +} + +export function createNoopOperationalLogger(): KtxOperationalLogger { + return { + log: () => undefined, + warn: () => undefined, + error: () => undefined, + debug: () => undefined, + }; +} + +export function createCliOperationalLogger( + io: KtxCliIo, + mode: KtxOperationalOutputMode, +): KtxOperationalLogger { + if (mode === 'json') { + return createNoopOperationalLogger(); + } + + return { + log: (message) => writeLine(io, message), + warn: (message) => writeLine(io, message), + error: (message) => writeLine(io, message), + debug: (message) => writeLine(io, message), + }; +} diff --git a/packages/cli/src/io/print-list.ts b/packages/cli/src/io/print-list.ts index b66fa7ad..d2129d7d 100644 --- a/packages/cli/src/io/print-list.ts +++ b/packages/cli/src/io/print-list.ts @@ -28,6 +28,16 @@ export interface PrintListArgs { io: KtxCliIo; } +export interface KtxJsonResultEnvelope { + kind: string; + data: T; + meta?: Record; +} + +export function writeJsonResult(io: KtxCliIo, envelope: KtxJsonResultEnvelope): void { + io.stdout.write(`${JSON.stringify(envelope, null, 2)}\n`); +} + export function printList(args: PrintListArgs): void { switch (args.mode) { case 'json': @@ -61,12 +71,11 @@ function printListPlain(args: PrintListArgs): void { } function printListJson(args: PrintListArgs): void { - const envelope = { + writeJsonResult(args.io, { kind: 'list', data: { items: args.rows }, meta: { command: args.command }, - }; - args.io.stdout.write(`${JSON.stringify(envelope, null, 2)}\n`); + }); } function pluralize(count: number, singular: string): string { diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index d0a5f571..8557674c 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -35,6 +35,7 @@ import { managedDaemonDatabaseIntrospectionOptions, type ManagedPythonCoreDaemonOptions, } from './managed-python-http.js'; +import type { KtxOperationalLogger } from './io/logger.js'; function hasSnowflakeDriver(connection: unknown): boolean { return ( @@ -162,6 +163,7 @@ export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdap sqlAnalysis?: SqlAnalysisPort; sqlAnalysisUrl?: string; managedDaemon?: ManagedPythonCoreDaemonOptions; + logger?: KtxOperationalLogger; } function historicSqlRecord(connection: unknown): Record | null { diff --git a/packages/cli/src/managed-python-command.test.ts b/packages/cli/src/managed-python-command.test.ts index d081c320..3dbf315a 100644 --- a/packages/cli/src/managed-python-command.test.ts +++ b/packages/cli/src/managed-python-command.test.ts @@ -214,6 +214,7 @@ describe('createManagedPythonSemanticLayerComputePort', () => { expect(confirmInstall).toHaveBeenCalledWith( 'KTX needs to install the core Python runtime. This downloads Python dependencies with uv. Continue?', + io.io, ); expect(installRuntime).toHaveBeenCalledWith({ cliVersion: '0.2.0', @@ -221,4 +222,45 @@ describe('createManagedPythonSemanticLayerComputePort', () => { force: false, }); }); + + it('uses injected runtime confirmation instead of reading process TTY directly', async () => { + const io = makeIo(); + const compute = { query: vi.fn(), validateSources: vi.fn(), generateSources: vi.fn() }; + const installRuntime = vi.fn(async (): Promise => installResult()); + const confirmInstall = vi.fn(async () => true); + + await expect( + createManagedPythonSemanticLayerComputePort({ + cliVersion: '0.2.0', + installPolicy: 'prompt', + io: io.io, + readStatus: async () => missingStatus(), + installRuntime, + confirmInstall, + createPythonCompute: () => compute, + }), + ).resolves.toBe(compute); + + expect(confirmInstall).toHaveBeenCalledWith( + 'KTX needs to install the core Python runtime. This downloads Python dependencies with uv. Continue?', + io.io, + ); + expect(io.stderr()).toContain('Installing KTX Python runtime (core) with uv...'); + }); + + it('can decide default runtime prompting from injected io capabilities', async () => { + const io = makeIo(); + Object.assign(io.io.stdout, { isTTY: false }); + + await expect( + createManagedPythonSemanticLayerComputePort({ + cliVersion: '0.2.0', + installPolicy: 'prompt', + io: io.io, + readStatus: async () => missingStatus(), + installRuntime: vi.fn(), + createPythonCompute: () => ({ query: vi.fn(), validateSources: vi.fn(), generateSources: vi.fn() }), + }), + ).rejects.toThrow('KTX Python runtime installation was cancelled'); + }); }); diff --git a/packages/cli/src/managed-python-command.ts b/packages/cli/src/managed-python-command.ts index 0a8a193c..ce7afe7b 100644 --- a/packages/cli/src/managed-python-command.ts +++ b/packages/cli/src/managed-python-command.ts @@ -1,6 +1,6 @@ -import { cancel, confirm, isCancel } from '@clack/prompts'; import { createPythonSemanticLayerComputePort, type KtxSemanticLayerComputePort } from '@ktx/context/daemon'; import type { KtxCliIo } from './cli-runtime.js'; +import { createClackPromptAdapter } from './clack.js'; import { installManagedPythonRuntime, readManagedPythonRuntimeStatus, @@ -36,7 +36,7 @@ export interface ManagedPythonCommandRuntime { export interface ManagedPythonCommandDeps { readStatus?: (options: ManagedPythonRuntimeLayoutOptions) => Promise; installRuntime?: (options: ManagedPythonRuntimeInstallOptions) => Promise; - confirmInstall?: (message: string) => Promise; + confirmInstall?: (message: string, io: KtxCliIo) => Promise; } export interface ManagedPythonCommandOptions extends ManagedPythonCommandDeps { @@ -69,16 +69,12 @@ function hasFeature(manifest: InstalledKtxRuntimeManifest, feature: KtxRuntimeFe return manifest.features.includes(feature); } -async function defaultConfirmInstall(message: string): Promise { - if (process.stdin.isTTY !== true || process.stdout.isTTY !== true) { +async function defaultConfirmInstall(message: string, io: KtxCliIo): Promise { + if (io.stdout.isTTY !== true) { return false; } - const response = await confirm({ message, initialValue: true }); - if (isCancel(response)) { - cancel('Runtime installation cancelled.'); - return false; - } - return response === true; + const prompts = createClackPromptAdapter(); + return await prompts.confirm({ message, initialValue: true }); } export async function ensureManagedPythonCommandRuntime( @@ -99,7 +95,7 @@ export async function ensureManagedPythonCommandRuntime( if (options.installPolicy === 'prompt') { const confirmInstall = options.confirmInstall ?? defaultConfirmInstall; - const confirmed = await confirmInstall(installPrompt(feature)); + const confirmed = await confirmInstall(installPrompt(feature), options.io); if (!confirmed) { throw new Error(`KTX Python runtime installation was cancelled. Run: ${managedRuntimeInstallCommand(feature)}`); } diff --git a/packages/cli/src/sl.test.ts b/packages/cli/src/sl.test.ts index bd746b0b..8752d0ec 100644 --- a/packages/cli/src/sl.test.ts +++ b/packages/cli/src/sl.test.ts @@ -398,10 +398,18 @@ joins: [] listIo.io, ); expect(code).toBe(0); + expect(listIo.stderr()).toBe(''); const parsed = JSON.parse(listIo.stdout()); - expect(parsed.kind).toBe('list'); - expect(parsed.meta).toEqual({ command: 'sl list' }); + expect(parsed).toMatchObject({ + kind: 'list', + data: { + items: expect.any(Array), + }, + meta: { + command: 'sl list', + }, + }); expect(parsed.data.items).toHaveLength(1); expect(parsed.data.items[0]).toMatchObject({ connectionId: 'warehouse', diff --git a/packages/context/src/ingest/adapters/looker/client.test.ts b/packages/context/src/ingest/adapters/looker/client.test.ts index a7d4e604..3b1822e0 100644 --- a/packages/context/src/ingest/adapters/looker/client.test.ts +++ b/packages/context/src/ingest/adapters/looker/client.test.ts @@ -112,6 +112,24 @@ describe('LookerClient', () => { }); }); + it('does not warn to console when optional prioritization inputs fail by default', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + const fakeSdk = sdk({ + search_dashboards: vi.fn().mockRejectedValue(new Error('dashboards unavailable')), + search_looks: vi.fn().mockRejectedValue(new Error('looks unavailable')), + }); + const client = new LookerClient(params(), { sdkFactory: () => fakeSdk }); + + await expect(client.getSignals()).resolves.toMatchObject({ + dashboardUsage: [], + lookUsage: [], + scheduledPlans: [], + favorites: [], + }); + + expect(warn).not.toHaveBeenCalled(); + }); + it('maps dashboards, looks, folders, models, explores, users, and groups to staged DTOs', async () => { const fakeSdk = sdk(); const client = new LookerClient(params(), { sdkFactory: () => fakeSdk }); diff --git a/packages/context/src/ingest/adapters/looker/client.ts b/packages/context/src/ingest/adapters/looker/client.ts index 50b0b104..90f9f466 100644 --- a/packages/context/src/ingest/adapters/looker/client.ts +++ b/packages/context/src/ingest/adapters/looker/client.ts @@ -80,10 +80,10 @@ export interface LookerClientDeps { } const defaultLogger: LookerClientLogger = { - log: (message) => console.log(message), - warn: (message) => console.warn(message), - error: (message) => console.error(message), - debug: (message) => console.debug(message), + log: () => undefined, + warn: () => undefined, + error: () => undefined, + debug: () => undefined, }; class InlineLookerSettings extends NodeSettings { diff --git a/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts b/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts index c1869da9..a29fecd1 100644 --- a/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts +++ b/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts @@ -1,4 +1,5 @@ import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js'; +import type { LookerClientLogger } from './client.js'; import { DefaultLookerClientFactory, DefaultLookerConnectionClientFactory, @@ -59,8 +60,11 @@ export function createLocalLookerCredentialResolver( export function createLocalLookerSourceAdapter( project: KtxLocalProject, env: NodeJS.ProcessEnv = process.env, + logger?: LookerClientLogger, ): LookerSourceAdapter { - const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env)); + const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env), { + ...(logger ? { logger } : {}), + }); return new LookerSourceAdapter({ clientFactory: new DefaultLookerClientFactory(connectionFactory), }); diff --git a/packages/context/src/ingest/adapters/metabase/client.test.ts b/packages/context/src/ingest/adapters/metabase/client.test.ts index f81939c6..1ee3fe93 100644 --- a/packages/context/src/ingest/adapters/metabase/client.test.ts +++ b/packages/context/src/ingest/adapters/metabase/client.test.ts @@ -72,6 +72,27 @@ describe('MetabaseClient retry exhaustion', () => { vi.restoreAllMocks(); }); + it('does not warn to console when retrying by default', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + globalThis.fetch = vi + .fn() + .mockRejectedValueOnce(Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' })) + .mockResolvedValueOnce(new Response(JSON.stringify([]), { status: 200 })); + + const client = new MetabaseClient( + { apiUrl: 'https://metabase.example.test', apiKey: 'key' }, + { + ...DEFAULT_METABASE_CLIENT_CONFIG, + baseDelayMs: 0, + maxRetries: 1, + }, + ); + + await client.getDatabases(); + + expect(warn).not.toHaveBeenCalled(); + }); + it('wraps an exhausted ECONNRESET retry chain with method, path, attempt count, and original cause', async () => { const sysErr = Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET', diff --git a/packages/context/src/ingest/adapters/metabase/client.ts b/packages/context/src/ingest/adapters/metabase/client.ts index 70e70964..2b70bc79 100644 --- a/packages/context/src/ingest/adapters/metabase/client.ts +++ b/packages/context/src/ingest/adapters/metabase/client.ts @@ -25,10 +25,10 @@ export interface MetabaseClientLogger { } const defaultLogger: MetabaseClientLogger = { - log: (message) => console.log(message), - warn: (message) => console.warn(message), - error: (message) => console.error(message), - debug: (message) => console.debug(message), + log: () => undefined, + warn: () => undefined, + error: () => undefined, + debug: () => undefined, }; interface TemplateTagInfo { diff --git a/packages/context/src/ingest/adapters/metabase/fetch.test.ts b/packages/context/src/ingest/adapters/metabase/fetch.test.ts index a86350ac..c8d4f4fb 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.test.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.test.ts @@ -86,6 +86,7 @@ describe('fetchMetabaseBundle', () => { }); afterEach(async () => { + vi.restoreAllMocks(); await rm(stagedDir, { recursive: true, force: true }); }); @@ -115,6 +116,41 @@ describe('fetchMetabaseBundle', () => { expect(card.archived).toBe(false); }); + it('does not write Metabase fetch progress to console by default', async () => { + const log = vi.spyOn(console, 'log').mockImplementation(() => undefined); + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + + await fetchMetabaseBundle({ + pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, + stagedDir, + ctx: makeFetchContext(), + clientFactory, + sourceStateReader, + }); + + expect(log).not.toHaveBeenCalled(); + expect(warn).not.toHaveBeenCalled(); + }); + + it('routes Metabase fetch warnings through the injected logger', async () => { + const logger = { + log: vi.fn(), + warn: vi.fn(), + }; + clientFactory.__client.getCard.mockRejectedValueOnce(new Error('card read failed')); + + await fetchMetabaseBundle({ + pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, + stagedDir, + ctx: makeFetchContext(), + clientFactory, + sourceStateReader, + logger, + }); + + expect(logger.warn).toHaveBeenCalledWith('failed to load card 1: card read failed'); + }); + it('passes the Metabase source pull config and target fetch context to the client factory', async () => { await fetchMetabaseBundle({ pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, diff --git a/packages/context/src/ingest/adapters/metabase/fetch.ts b/packages/context/src/ingest/adapters/metabase/fetch.ts index f8cc1e12..9ccb2be6 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.ts @@ -21,9 +21,14 @@ class IngestInputError extends Error { } } -const logger = { - log: (message: string) => console.log(message), - warn: (message: string) => console.warn(message), +export interface MetabaseFetchLogger { + log(message: string): void; + warn(message: string): void; +} + +const noopMetabaseFetchLogger: MetabaseFetchLogger = { + log: () => undefined, + warn: () => undefined, }; export interface FetchMetabaseBundleParams { @@ -32,6 +37,7 @@ export interface FetchMetabaseBundleParams { ctx: FetchContext; clientFactory: MetabaseClientFactory; sourceStateReader: MetabaseSourceStateReader; + logger?: MetabaseFetchLogger; } interface CollectionNode { @@ -76,6 +82,7 @@ function resolvePath(index: Map, collectionId: export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise { const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig); + const logger = params.logger ?? noopMetabaseFetchLogger; const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId); const mapping = syncState.mappings.find( (m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled, diff --git a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts index bd81413f..ec5e163e 100644 --- a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts @@ -1,12 +1,17 @@ import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js'; import { ktxLocalStateDbPath } from '../../../project/index.js'; import { resolveKtxConfigReference } from '../../../core/config-reference.js'; -import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory } from './client.js'; +import { + DEFAULT_METABASE_CLIENT_CONFIG, + DefaultMetabaseConnectionClientFactory, + type MetabaseClientLogger, +} from './client.js'; import { IngestMetabaseClientFactory, type MetabaseClientConfig, type MetabaseClientRuntimeConfig, } from './client-port.js'; +import type { MetabaseFetchLogger } from './fetch.js'; import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; import { MetabaseSourceAdapter } from './metabase.adapter.js'; @@ -50,6 +55,7 @@ export function metabaseRuntimeConfigFromLocalConnection( interface CreateLocalMetabaseSourceAdapterOptions { env?: NodeJS.ProcessEnv; defaultClientConfig?: MetabaseClientConfig; + logger?: MetabaseClientLogger & MetabaseFetchLogger; } export function createLocalMetabaseSourceAdapter( @@ -65,9 +71,11 @@ export function createLocalMetabaseSourceAdapter( options.env, ), options.defaultClientConfig ?? DEFAULT_METABASE_CLIENT_CONFIG, + options.logger, ); return new MetabaseSourceAdapter({ clientFactory: new IngestMetabaseClientFactory(connectionFactory), sourceStateReader, + ...(options.logger ? { logger: options.logger } : {}), }); } diff --git a/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts index 1c0bb53b..cff59641 100644 --- a/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts @@ -4,7 +4,7 @@ import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter import { chunkMetabaseStagedDir } from './chunk.js'; import type { MetabaseClientFactory } from './client-port.js'; import { detectMetabaseStagedDir } from './detect.js'; -import { fetchMetabaseBundle } from './fetch.js'; +import { fetchMetabaseBundle, type MetabaseFetchLogger } from './fetch.js'; import { computeFetchScope, hashScope, isPathInMetabaseScope } from './fetch-scope.js'; import type { MetabaseSourceStateReader } from './source-state-port.js'; import { STAGED_FILES, stagedSyncConfigSchema } from './types.js'; @@ -12,6 +12,7 @@ import { STAGED_FILES, stagedSyncConfigSchema } from './types.js'; export interface MetabaseSourceAdapterDeps { clientFactory: MetabaseClientFactory; sourceStateReader: MetabaseSourceStateReader; + logger?: MetabaseFetchLogger; } export class MetabaseSourceAdapter implements SourceAdapter { @@ -31,6 +32,7 @@ export class MetabaseSourceAdapter implements SourceAdapter { ctx, clientFactory: this.deps.clientFactory, sourceStateReader: this.deps.sourceStateReader, + ...(this.deps.logger ? { logger: this.deps.logger } : {}), }); } diff --git a/packages/context/src/ingest/adapters/notion/fetch.test.ts b/packages/context/src/ingest/adapters/notion/fetch.test.ts index ae6d5fd5..b60170f7 100644 --- a/packages/context/src/ingest/adapters/notion/fetch.test.ts +++ b/packages/context/src/ingest/adapters/notion/fetch.test.ts @@ -89,12 +89,13 @@ describe('fetchNotionSnapshot', () => { }); it('logs skipped page materialization failures', async () => { - const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + const logger = { warn: vi.fn() }; (client.retrievePage as ReturnType).mockRejectedValueOnce(new Error('Notion API failed')); const manifest = await fetchNotionSnapshot({ client, stagedDir, + logger, config: { authToken: 'secret', crawlMode: 'selected_roots', @@ -109,7 +110,7 @@ describe('fetchNotionSnapshot', () => { }); expect(manifest.skipped).toEqual([{ externalId: 'page-1', reason: 'Notion API failed' }]); - expect(warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed'); + expect(logger.warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed'); }); it('recursively fetches selected-root child pages and derives scoped links', async () => { @@ -191,7 +192,7 @@ describe('fetchNotionSnapshot', () => { }); it('truncates deeply nested block trees and records a warning', async () => { - const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + const logger = { warn: vi.fn() }; (client.listBlockChildren as ReturnType).mockImplementation((blockId: string) => { const currentDepth = blockId === 'page-1' ? 0 : Number(blockId.replace('block-', '')); const nextDepth = currentDepth + 1; @@ -215,6 +216,7 @@ describe('fetchNotionSnapshot', () => { await fetchNotionSnapshot({ client, stagedDir, + logger, config: { authToken: 'secret', crawlMode: 'selected_roots', @@ -232,11 +234,11 @@ describe('fetchNotionSnapshot', () => { const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')); expect(blocks).toHaveLength(10); expect(manifest.warnings).toContain('maxBlockDepth reached for page page-1 at depth 10'); - expect(warnSpy).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10'); + expect(logger.warn).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10'); }); it('truncates pages at the per-page block cap and records a warning', async () => { - const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + const logger = { warn: vi.fn() }; (client.listBlockChildren as ReturnType).mockResolvedValue({ results: Array.from({ length: 2001 }, (_, index) => ({ id: `block-${index}`, @@ -250,6 +252,7 @@ describe('fetchNotionSnapshot', () => { await fetchNotionSnapshot({ client, stagedDir, + logger, config: { authToken: 'secret', crawlMode: 'selected_roots', @@ -267,7 +270,7 @@ describe('fetchNotionSnapshot', () => { const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')); expect(blocks).toHaveLength(2000); expect(manifest.warnings).toContain('maxBlocksPerPage reached for page page-1 at 2000 blocks'); - expect(warnSpy).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks'); + expect(logger.warn).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks'); }); it('uses all_accessible search for pages and data sources', async () => { diff --git a/packages/context/src/ingest/adapters/notion/fetch.ts b/packages/context/src/ingest/adapters/notion/fetch.ts index 6d0ee691..18974f83 100644 --- a/packages/context/src/ingest/adapters/notion/fetch.ts +++ b/packages/context/src/ingest/adapters/notion/fetch.ts @@ -12,10 +12,19 @@ import { type NotionPullConfig, } from './types.js'; +export interface NotionFetchLogger { + warn(message: string): void; +} + +const noopNotionFetchLogger: NotionFetchLogger = { + warn: () => undefined, +}; + interface FetchNotionSnapshotParams { client: NotionApi; config: NotionPullConfig; stagedDir: string; + logger?: NotionFetchLogger; } interface CrawlState { @@ -23,6 +32,7 @@ interface CrawlState { databaseCount: number; dataSourceCount: number; capped: boolean; + logger: NotionFetchLogger; skipped: Array<{ externalId: string; reason: string }>; warnings: string[]; materializedPageTargets: Set; @@ -44,9 +54,6 @@ interface NotionLinks { const DEFAULT_MAX_BLOCK_DEPTH = 10; const DEFAULT_MAX_BLOCKS_PER_PAGE = 2000; -const logger = { - warn: (message: string) => console.warn(message), -}; async function writeJson(path: string, value: unknown): Promise { await mkdir(dirname(path), { recursive: true }); @@ -58,7 +65,12 @@ async function writeText(path: string, value: string): Promise { await writeFile(path, value.endsWith('\n') ? value : `${value}\n`, 'utf-8'); } -function addWarning(warnings: string[], warning: string, logWarning = false): void { +function addWarning( + warnings: string[], + warning: string, + logWarning = false, + logger: NotionFetchLogger = noopNotionFetchLogger, +): void { if (!warnings.includes(warning)) { warnings.push(warning); if (logWarning) { @@ -119,11 +131,21 @@ async function visitPaginated(params: { } while (cursor); } -function addBlockCountWarning(state: BlockCollectionState, warnings: string[], pageId: string): void { +function addBlockCountWarning( + state: BlockCollectionState, + warnings: string[], + pageId: string, + logger: NotionFetchLogger, +): void { if (state.blockCountWarningWritten) { return; } - addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, true); + addWarning( + warnings, + `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, + true, + logger, + ); state.blockCountWarningWritten = true; } @@ -134,18 +156,19 @@ async function collectBlockChildren(params: { depth: number; warnings: string[]; state: BlockCollectionState; + logger: NotionFetchLogger; }): Promise { let cursor: string | null = null; do { const remainingBlocks = DEFAULT_MAX_BLOCKS_PER_PAGE - params.state.blocks.length; if (remainingBlocks <= 0) { - addBlockCountWarning(params.state, params.warnings, params.pageId); + addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger); return; } const page = await params.client.listBlockChildren(params.blockId, cursor, Math.min(remainingBlocks, 100)); for (let index = 0; index < page.results.length; index += 1) { if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) { - addBlockCountWarning(params.state, params.warnings, params.pageId); + addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger); return; } @@ -159,9 +182,10 @@ async function collectBlockChildren(params: { params.warnings, `maxBlockDepth reached for page ${params.pageId} at depth ${DEFAULT_MAX_BLOCK_DEPTH}`, true, + params.logger, ); } else if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) { - addBlockCountWarning(params.state, params.warnings, params.pageId); + addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger); return; } else { await collectBlockChildren({ @@ -171,6 +195,7 @@ async function collectBlockChildren(params: { depth: blockDepth, warnings: params.warnings, state: params.state, + logger: params.logger, }); } } @@ -179,7 +204,7 @@ async function collectBlockChildren(params: { params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE && (index < page.results.length - 1 || page.hasMore) ) { - addBlockCountWarning(params.state, params.warnings, params.pageId); + addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger); return; } } @@ -187,7 +212,12 @@ async function collectBlockChildren(params: { } while (cursor); } -async function collectBlockTree(client: NotionApi, pageId: string, warnings: string[]): Promise { +async function collectBlockTree( + client: NotionApi, + pageId: string, + warnings: string[], + logger: NotionFetchLogger, +): Promise { const state: BlockCollectionState = { blocks: [], blockCountWarningWritten: false }; await collectBlockChildren({ client, @@ -196,6 +226,7 @@ async function collectBlockTree(client: NotionApi, pageId: string, warnings: str depth: 0, warnings, state, + logger, }); return state.blocks; } @@ -341,7 +372,7 @@ async function materializePage(params: { if (params.skipDataSourceRows && !params.dataSourceId && parentDataSourceId(page)) { return; } - const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings); + const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings, params.state.logger); const metadata = normalizeNotionPageMetadata({ page, fallbackPath: params.fallbackPath, @@ -374,7 +405,9 @@ async function materializePage(params: { } } } catch (error) { - logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`); + params.state.logger.warn( + `Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`, + ); params.state.skipped.push({ externalId: params.pageId, reason: error instanceof Error ? error.message : String(error), @@ -491,6 +524,7 @@ async function materializeDatabase(params: { export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Promise { await mkdir(params.stagedDir, { recursive: true }); + const logger = params.logger ?? noopNotionFetchLogger; const configuredCursor = params.config.crawlMode === 'all_accessible' ? parseConfiguredCursor(params.config) : null; const continuedFromCursor = configuredCursor !== null; const state: CrawlState = { @@ -498,6 +532,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr databaseCount: 0, dataSourceCount: 0, capped: false, + logger, skipped: [], warnings: [], materializedPageTargets: new Set(), diff --git a/packages/context/src/ingest/adapters/notion/notion.adapter.ts b/packages/context/src/ingest/adapters/notion/notion.adapter.ts index 896ef69f..fba68cee 100644 --- a/packages/context/src/ingest/adapters/notion/notion.adapter.ts +++ b/packages/context/src/ingest/adapters/notion/notion.adapter.ts @@ -14,7 +14,7 @@ import type { import { chunkNotionStagedDir, describeNotionScope } from './chunk.js'; import { clusterNotionWorkUnits } from './cluster.js'; import { detectNotionStagedDir } from './detect.js'; -import { fetchNotionSnapshot } from './fetch.js'; +import { fetchNotionSnapshot, type NotionFetchLogger } from './fetch.js'; import { NotionClient } from './notion-client.js'; import { parseNotionPullConfig } from './pull-config.js'; import { type NotionMetadata, notionManifestSchema, notionMetadataSchema } from './types.js'; @@ -31,6 +31,7 @@ interface NotionPullSucceededContext { export interface NotionSourceAdapterDeps { onPullSucceeded?: (ctx: NotionPullSucceededContext) => Promise; + logger?: NotionFetchLogger; } export class NotionSourceAdapter implements SourceAdapter { @@ -48,7 +49,12 @@ export class NotionSourceAdapter implements SourceAdapter { async fetch(pullConfig: unknown, stagedDir: string, _ctx: FetchContext): Promise { const config = parseNotionPullConfig(pullConfig); - await fetchNotionSnapshot({ client: new NotionClient(config.authToken), config, stagedDir }); + await fetchNotionSnapshot({ + client: new NotionClient(config.authToken), + config, + stagedDir, + ...(this.deps.logger ? { logger: this.deps.logger } : {}), + }); } chunk(stagedDir: string, diffSet?: DiffSet): Promise { diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 93d6b063..dc4f50a4 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -19,6 +19,7 @@ import { } from './adapters/live-database/daemon-introspection.js'; import { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js'; import { createDaemonLookerTableIdentifierParser } from './adapters/looker/daemon-table-identifier-parser.js'; +import type { LookerClientLogger } from './adapters/looker/client.js'; import { DefaultLookerConnectionClientFactory } from './adapters/looker/factory.js'; import { createLocalLookerCredentialResolver } from './adapters/looker/local-looker.adapter.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; @@ -32,9 +33,12 @@ import type { LookerRuntimeClient } from './adapters/looker/fetch.js'; import { LookmlSourceAdapter } from './adapters/lookml/lookml.adapter.js'; import { pullConfigFromIntegrationConfig } from './adapters/lookml/pull-config.js'; import { createLocalMetabaseSourceAdapter } from './adapters/metabase/local-metabase.adapter.js'; +import type { MetabaseClientLogger } from './adapters/metabase/client.js'; +import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js'; import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js'; import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js'; import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js'; +import type { NotionFetchLogger } from './adapters/notion/fetch.js'; import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; import type { SourceAdapter } from './types.js'; @@ -56,14 +60,23 @@ export interface DefaultLocalIngestAdaptersOptions { parser?: LookerTableIdentifierParser; env?: NodeJS.ProcessEnv; }; + logger?: LocalIngestOperationalLogger; } +type LocalIngestOperationalLogger = MetabaseClientLogger & + MetabaseFetchLogger & + LookerClientLogger & + NotionFetchLogger; + export function createDefaultLocalIngestAdapters( project: KtxLocalProject, options: DefaultLocalIngestAdaptersOptions = {}, ): SourceAdapter[] { const lookerConnectionFactory = new DefaultLookerConnectionClientFactory( createLocalLookerCredentialResolver(project, options.looker?.env), + { + ...(options.logger ? { logger: options.logger } : {}), + }, ); const adapters: SourceAdapter[] = [ @@ -77,7 +90,9 @@ export function createDefaultLocalIngestAdapters( }), new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), new DbtSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), - createLocalMetabaseSourceAdapter(project), + createLocalMetabaseSourceAdapter(project, { + ...(options.logger ? { logger: options.logger } : {}), + }), new LookerSourceAdapter({ clientFactory: { async createClient(config, ctx) { @@ -89,7 +104,9 @@ export function createDefaultLocalIngestAdapters( }, }), new MetricflowSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), - new NotionSourceAdapter(), + new NotionSourceAdapter({ + ...(options.logger ? { logger: options.logger } : {}), + }), ]; if (options.historicSql) { diff --git a/packages/context/src/scan/local-enrichment.test.ts b/packages/context/src/scan/local-enrichment.test.ts index c25dae61..cbed687d 100644 --- a/packages/context/src/scan/local-enrichment.test.ts +++ b/packages/context/src/scan/local-enrichment.test.ts @@ -427,6 +427,69 @@ describe('local scan enrichment', () => { expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); }); + it('generates table descriptions with bounded table-level concurrency', async () => { + const concurrentSnapshot: KtxSchemaSnapshot = { + ...snapshot, + tables: Array.from({ length: 8 }, (_, index) => ({ + catalog: null, + db: 'public', + name: `table_${index + 1}`, + kind: 'table' as const, + comment: null, + estimatedRows: 2, + foreignKeys: [], + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number' as const, + nullable: false, + primaryKey: true, + comment: null, + }, + ], + })), + }; + let activeColumnSamples = 0; + let maxActiveColumnSamples = 0; + const scanConnector = { + ...connector(), + introspect: vi.fn(async () => concurrentSnapshot), + sampleColumn: vi.fn(async () => { + activeColumnSamples += 1; + maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples); + await new Promise((resolve) => setTimeout(resolve, 10)); + activeColumnSamples -= 1; + return { + values: ['1'], + nullCount: 0, + distinctCount: 1, + }; + }), + sampleTable: vi.fn(async () => ({ + headers: ['id'], + rows: [[1]], + totalRows: 1, + })), + }; + const settings = { + ...buildDefaultKtxProjectConfig('test').scan.relationships, + enabled: false, + }; + + await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'enriched', + connector: scanConnector, + context: { runId: 'scan-run-concurrent-descriptions' }, + providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 }), + relationshipSettings: settings, + }); + + expect(maxActiveColumnSamples).toBe(6); + }); + it('reports enrichment progress for countable stages', async () => { const events: Array<{ progress: number; message?: string; transient?: boolean }> = []; const progress = { @@ -713,7 +776,7 @@ describe('local scan enrichment', () => { model: 'provider/embedding-model', dimensions: 1536, batchSize: 8, - openai: { api_key: 'env:OPENAI_API_KEY' }, + openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret }, }, { @@ -726,7 +789,7 @@ describe('local scan enrichment', () => { { createKtxLlmProvider: createKtxLlmProvider as any, createKtxEmbeddingProvider: createKtxEmbeddingProvider as any, - env: { OPENAI_API_KEY: 'openai-key' }, + env: { OPENAI_API_KEY: 'openai-key' }, // pragma: allowlist secret }, ); diff --git a/packages/context/src/scan/local-enrichment.ts b/packages/context/src/scan/local-enrichment.ts index cefecadb..5d58e189 100644 --- a/packages/context/src/scan/local-enrichment.ts +++ b/packages/context/src/scan/local-enrichment.ts @@ -1,4 +1,5 @@ import type { KtxLlmProvider } from '@ktx/llm'; +import pLimit from 'p-limit'; import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js'; import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js'; import { buildKtxColumnEmbeddingText } from './embedding-text.js'; @@ -40,6 +41,8 @@ import type { KtxTableRef, } from './types.js'; +const DESCRIPTION_TABLE_CONCURRENCY = 6; + export interface DeterministicLocalScanEnrichmentProviderOptions { embeddingDimensions?: number; maxBatchSize?: number; @@ -322,41 +325,47 @@ async function generateDescriptions(input: { await input.progress?.update(1, 'No tables to describe'); return updates; } - for (const [index, table] of input.snapshot.tables.entries()) { - await input.progress?.update( - (index + 1) / totalTables, - `Generating descriptions ${index + 1}/${totalTables} tables`, - { - transient: true, - }, - ); - const tableInput = descriptionTable(table); - const columnResult = await generator.generateColumnDescriptions({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, - supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, - table: tableInput, - }); - const tableDescription = await generator.generateTableDescription({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, - table: { - catalog: table.catalog, - db: table.db, - name: table.name, - rawDescriptions: table.comment ? { db: table.comment } : {}, - }, - }); - updates.push({ - table: tableRef(table), - tableDescription, - columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), - }); - } + const limitTable = pLimit(DESCRIPTION_TABLE_CONCURRENCY); + const tableUpdates = await Promise.all( + input.snapshot.tables.map((table, index) => + limitTable(async () => { + await input.progress?.update( + (index + 1) / totalTables, + `Generating descriptions ${index + 1}/${totalTables} tables`, + { + transient: true, + }, + ); + const tableInput = descriptionTable(table); + const columnResult = await generator.generateColumnDescriptions({ + connectionId: input.snapshot.connectionId, + connector: input.connector, + context: input.context, + dataSourceType: input.snapshot.driver, + supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, + table: tableInput, + }); + const tableDescription = await generator.generateTableDescription({ + connectionId: input.snapshot.connectionId, + connector: input.connector, + context: input.context, + dataSourceType: input.snapshot.driver, + table: { + catalog: table.catalog, + db: table.db, + name: table.name, + rawDescriptions: table.comment ? { db: table.comment } : {}, + }, + }); + return { + table: tableRef(table), + tableDescription, + columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), + }; + }), + ), + ); + updates.push(...tableUpdates); await input.progress?.update(1, `Generated descriptions for ${totalTables} tables`); return updates; } diff --git a/scripts/precommit-check.mjs b/scripts/precommit-check.mjs index fdd405bf..299db534 100644 --- a/scripts/precommit-check.mjs +++ b/scripts/precommit-check.mjs @@ -1,12 +1,11 @@ #!/usr/bin/env node import { spawnSync } from 'node:child_process'; import { existsSync, readFileSync } from 'node:fs'; -import { dirname, join, relative, sep } from 'node:path'; +import { dirname, join, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; const scriptPath = fileURLToPath(import.meta.url); const ktxRoot = dirname(dirname(scriptPath)); -const repoRoot = dirname(ktxRoot); const packageNameByDir = new Map( [ @@ -35,7 +34,8 @@ const pythonPackageTests = new Map([ ]); function normalizeFilePath(filePath) { - return filePath.replaceAll('\\', '/').replace(/^\.\//, ''); + const normalized = filePath.replaceAll('\\', '/').replace(/^\.\//, ''); + return normalized.startsWith('ktx/') ? normalized.slice('ktx/'.length) : normalized; } function stablePush(commands, key, cmd, args) { @@ -68,13 +68,7 @@ export function planChecks(files) { let runAllPythonTests = false; for (const rawFile of files) { - const file = normalizeFilePath(rawFile); - - if (!file.startsWith('ktx/')) { - continue; - } - - const ktxFile = file.slice('ktx/'.length); + const ktxFile = normalizeFilePath(rawFile); if (ktxFile.startsWith('packages/')) { const [, packageDir, ...rest] = ktxFile.split('/'); @@ -189,6 +183,6 @@ export function runChecks(files) { return 0; } -if (process.argv[1] && relative(repoRoot, process.argv[1]).split(sep).join('/') === 'ktx/scripts/precommit-check.mjs') { +if (process.argv[1] && resolve(process.argv[1]) === scriptPath) { process.exitCode = runChecks(process.argv.slice(2)); } diff --git a/scripts/precommit-check.test.mjs b/scripts/precommit-check.test.mjs index 55ef66bb..40bd1716 100644 --- a/scripts/precommit-check.test.mjs +++ b/scripts/precommit-check.test.mjs @@ -12,7 +12,16 @@ describe('precommit-check', () => { assert.deepEqual(commandKeys(['outside-workspace/src/app.ts']), []); }); - it('runs only the touched package checks for package code', () => { + it('runs only the touched package checks for standalone package paths', () => { + assert.deepEqual(commandKeys(['packages/cli/src/index.ts']), [ + 'boundary-check', + 'type-check:@ktx/cli', + 'build:@ktx/cli', + 'test:@ktx/cli', + ]); + }); + + it('accepts legacy subtree-prefixed package paths', () => { assert.deepEqual(commandKeys(['ktx/packages/cli/src/index.ts']), [ 'boundary-check', 'type-check:@ktx/cli', @@ -22,12 +31,12 @@ describe('precommit-check', () => { }); it('runs the matching script test when a script changes', () => { - assert.deepEqual(commandKeys(['ktx/scripts/check-boundaries.mjs']), [ + assert.deepEqual(commandKeys(['scripts/check-boundaries.mjs']), [ 'script-test:scripts/check-boundaries.test.mjs', ]); }); it('runs the touched python package tests', () => { - assert.deepEqual(commandKeys(['ktx/python/ktx-sl/semantic_layer/parser.py']), ['pytest:ktx-sl']); + assert.deepEqual(commandKeys(['python/ktx-sl/semantic_layer/parser.py']), ['pytest:ktx-sl']); }); });