From 5cdae74825a784f4e74aa3eeca93585e28976596 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Tue, 12 May 2026 01:35:19 +0200 Subject: [PATCH] feat(cli): route ingest adapter logs through operational logger --- packages/cli/src/ingest.test.ts | 52 +++++++++++++-- packages/cli/src/ingest.ts | 3 + packages/cli/src/io/logger.test.ts | 65 +++++++++++++++++++ packages/cli/src/io/logger.ts | 40 ++++++++++++ packages/cli/src/local-adapters.ts | 2 + .../adapters/looker/local-looker.adapter.ts | 6 +- .../src/ingest/adapters/metabase/fetch.ts | 2 +- .../metabase/local-metabase.adapter.ts | 8 ++- .../adapters/metabase/metabase.adapter.ts | 4 +- .../src/ingest/adapters/notion/fetch.ts | 2 +- .../ingest/adapters/notion/notion.adapter.ts | 10 ++- packages/context/src/ingest/local-adapters.ts | 24 ++++++- 12 files changed, 203 insertions(+), 15 deletions(-) create mode 100644 packages/cli/src/io/logger.test.ts create mode 100644 packages/cli/src/io/logger.ts diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 9fc4dc82..2a3496ae 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -483,6 +483,39 @@ describe('runKtxIngest', () => { expect(io.stderr()).toBe(''); }); + it('keeps metabase JSON stdout free of operational adapter logs', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'json', + }, + io.io, + { + runLocalMetabaseIngest: async (input) => { + input.adapters.find((adapter) => adapter.source === 'metabase'); + return { + metabaseConnectionId: 'prod-metabase', + status: 'all_succeeded', + totals: { workUnits: 0, failedWorkUnits: 0 }, + children: [], + }; + }, + }, + ), + ).resolves.toBe(0); + + expect(() => JSON.parse(io.stdout())).not.toThrow(); + expect(io.stderr()).toBe(''); + }); + it('rejects source-dir uploads through the metabase fan-out route', async () => { const projectDir = join(tempDir, 'project'); await writeMetabaseConfig(projectDir); @@ -696,15 +729,17 @@ describe('runKtxIngest', () => { expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { databaseIntrospectionUrl: 'http://127.0.0.1:8765', + logger: expect.any(Object), }); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ adapters: createdAdapters, adapter: 'fake', connectionId: 'warehouse', - pullConfigOptions: { + pullConfigOptions: expect.objectContaining({ databaseIntrospectionUrl: 'http://127.0.0.1:8765', - }, + logger: expect.any(Object), + }), }), ); }); @@ -749,12 +784,14 @@ describe('runKtxIngest', () => { }; expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { managedDaemon: expectedManagedDaemon, + logger: expect.any(Object), }); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ - pullConfigOptions: { + pullConfigOptions: expect.objectContaining({ managedDaemon: expectedManagedDaemon, - }, + logger: expect.any(Object), + }), }), ); }); @@ -810,6 +847,7 @@ describe('runKtxIngest', () => { expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), { historicSqlConnectionId: 'warehouse', + logger: expect.any(Object), }); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ @@ -962,11 +1000,15 @@ describe('runKtxIngest', () => { looker: { parser: pullConfigOptions.looker.parser, }, + logger: expect.any(Object), }); expect(runLocal).toHaveBeenCalledWith( expect.objectContaining({ agentRunner, - pullConfigOptions, + pullConfigOptions: expect.objectContaining({ + ...pullConfigOptions, + logger: expect.any(Object), + }), }), ); }); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index a580b3d5..4b562aba 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -27,6 +27,7 @@ import { renderMemoryFlowTui, startLiveMemoryFlowTui, } from './memory-flow-tui.js'; +import { createCliOperationalLogger } from './io/logger.js'; import { resolveVizFallback, warnVizFallbackOnce } from './viz-fallback.js'; import { profileMark } from './startup-profile.js'; @@ -435,11 +436,13 @@ export async function runKtxIngest( const executeLocalIngest = deps.runLocalIngest ?? runLocalIngest; const localIngestOptions = deps.localIngestOptions ?? {}; const managedDaemon = managedDaemonOptionsForIngestRun(args, io); + const operationalLogger = createCliOperationalLogger(io, args.outputMode); const adapterOptions = { ...(localIngestOptions.pullConfigOptions ?? {}), ...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}), ...(managedDaemon ? { managedDaemon } : {}), ...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}), + logger: operationalLogger, }; if (args.adapter === 'metabase' && args.sourceDir) { throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter'); diff --git a/packages/cli/src/io/logger.test.ts b/packages/cli/src/io/logger.test.ts new file mode 100644 index 00000000..bf21a150 --- /dev/null +++ b/packages/cli/src/io/logger.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it, vi } from 'vitest'; +import { createCliOperationalLogger, createNoopOperationalLogger } from './logger.js'; + +function makeIo() { + let stdout = ''; + let stderr = ''; + return { + io: { + stdout: { + write: (chunk: string) => { + stdout += chunk; + }, + }, + stderr: { + write: (chunk: string) => { + stderr += chunk; + }, + }, + }, + stdout: () => stdout, + stderr: () => stderr, + }; +} + +describe('createCliOperationalLogger', () => { + it('routes operational messages to stderr outside JSON mode', () => { + const io = makeIo(); + const logger = createCliOperationalLogger(io.io, 'plain'); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(io.stdout()).toBe(''); + expect(io.stderr()).toBe('progress\nwarning\nfailure\ndebug\n'); + }); + + it('suppresses operational messages in JSON mode by default', () => { + const io = makeIo(); + const logger = createCliOperationalLogger(io.io, 'json'); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(io.stdout()).toBe(''); + expect(io.stderr()).toBe(''); + }); +}); + +describe('createNoopOperationalLogger', () => { + it('never writes', () => { + const logger = createNoopOperationalLogger(); + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + + logger.log('progress'); + logger.warn('warning'); + logger.error('failure'); + logger.debug?.('debug'); + + expect(warn).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cli/src/io/logger.ts b/packages/cli/src/io/logger.ts new file mode 100644 index 00000000..e9952254 --- /dev/null +++ b/packages/cli/src/io/logger.ts @@ -0,0 +1,40 @@ +import type { KtxCliIo } from '../cli-runtime.js'; +import type { KtxOutputMode } from './mode.js'; + +export interface KtxOperationalLogger { + log(message: string): void; + warn(message: string): void; + error(message: string): void; + debug?(message: string): void; +} + +export type KtxOperationalOutputMode = KtxOutputMode | 'viz'; + +function writeLine(io: KtxCliIo, message: string): void { + io.stderr.write(message.endsWith('\n') ? message : `${message}\n`); +} + +export function createNoopOperationalLogger(): KtxOperationalLogger { + return { + log: () => undefined, + warn: () => undefined, + error: () => undefined, + debug: () => undefined, + }; +} + +export function createCliOperationalLogger( + io: KtxCliIo, + mode: KtxOperationalOutputMode, +): KtxOperationalLogger { + if (mode === 'json') { + return createNoopOperationalLogger(); + } + + return { + log: (message) => writeLine(io, message), + warn: (message) => writeLine(io, message), + error: (message) => writeLine(io, message), + debug: (message) => writeLine(io, message), + }; +} diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index d0a5f571..8557674c 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -35,6 +35,7 @@ import { managedDaemonDatabaseIntrospectionOptions, type ManagedPythonCoreDaemonOptions, } from './managed-python-http.js'; +import type { KtxOperationalLogger } from './io/logger.js'; function hasSnowflakeDriver(connection: unknown): boolean { return ( @@ -162,6 +163,7 @@ export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdap sqlAnalysis?: SqlAnalysisPort; sqlAnalysisUrl?: string; managedDaemon?: ManagedPythonCoreDaemonOptions; + logger?: KtxOperationalLogger; } function historicSqlRecord(connection: unknown): Record | null { diff --git a/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts b/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts index c1869da9..a29fecd1 100644 --- a/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts +++ b/packages/context/src/ingest/adapters/looker/local-looker.adapter.ts @@ -1,4 +1,5 @@ import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js'; +import type { LookerClientLogger } from './client.js'; import { DefaultLookerClientFactory, DefaultLookerConnectionClientFactory, @@ -59,8 +60,11 @@ export function createLocalLookerCredentialResolver( export function createLocalLookerSourceAdapter( project: KtxLocalProject, env: NodeJS.ProcessEnv = process.env, + logger?: LookerClientLogger, ): LookerSourceAdapter { - const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env)); + const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env), { + ...(logger ? { logger } : {}), + }); return new LookerSourceAdapter({ clientFactory: new DefaultLookerClientFactory(connectionFactory), }); diff --git a/packages/context/src/ingest/adapters/metabase/fetch.ts b/packages/context/src/ingest/adapters/metabase/fetch.ts index bb8ff338..9ccb2be6 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.ts @@ -21,7 +21,7 @@ class IngestInputError extends Error { } } -interface MetabaseFetchLogger { +export interface MetabaseFetchLogger { log(message: string): void; warn(message: string): void; } diff --git a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts index bd81413f..0541f24e 100644 --- a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts @@ -1,12 +1,13 @@ import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js'; import { ktxLocalStateDbPath } from '../../../project/index.js'; import { resolveKtxConfigReference } from '../../../core/config-reference.js'; -import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory } from './client.js'; +import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory, type MetabaseClientLogger } from './client.js'; import { IngestMetabaseClientFactory, type MetabaseClientConfig, type MetabaseClientRuntimeConfig, } from './client-port.js'; +import type { MetabaseFetchLogger } from './fetch.js'; import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; import { MetabaseSourceAdapter } from './metabase.adapter.js'; @@ -50,6 +51,7 @@ export function metabaseRuntimeConfigFromLocalConnection( interface CreateLocalMetabaseSourceAdapterOptions { env?: NodeJS.ProcessEnv; defaultClientConfig?: MetabaseClientConfig; + logger?: MetabaseClientLogger & MetabaseFetchLogger; } export function createLocalMetabaseSourceAdapter( @@ -63,11 +65,13 @@ export function createLocalMetabaseSourceAdapter( metabaseConnectionId, project.config.connections[metabaseConnectionId], options.env, - ), + ), options.defaultClientConfig ?? DEFAULT_METABASE_CLIENT_CONFIG, + options.logger, ); return new MetabaseSourceAdapter({ clientFactory: new IngestMetabaseClientFactory(connectionFactory), sourceStateReader, + ...(options.logger ? { logger: options.logger } : {}), }); } diff --git a/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts index 1c0bb53b..cff59641 100644 --- a/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/metabase.adapter.ts @@ -4,7 +4,7 @@ import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter import { chunkMetabaseStagedDir } from './chunk.js'; import type { MetabaseClientFactory } from './client-port.js'; import { detectMetabaseStagedDir } from './detect.js'; -import { fetchMetabaseBundle } from './fetch.js'; +import { fetchMetabaseBundle, type MetabaseFetchLogger } from './fetch.js'; import { computeFetchScope, hashScope, isPathInMetabaseScope } from './fetch-scope.js'; import type { MetabaseSourceStateReader } from './source-state-port.js'; import { STAGED_FILES, stagedSyncConfigSchema } from './types.js'; @@ -12,6 +12,7 @@ import { STAGED_FILES, stagedSyncConfigSchema } from './types.js'; export interface MetabaseSourceAdapterDeps { clientFactory: MetabaseClientFactory; sourceStateReader: MetabaseSourceStateReader; + logger?: MetabaseFetchLogger; } export class MetabaseSourceAdapter implements SourceAdapter { @@ -31,6 +32,7 @@ export class MetabaseSourceAdapter implements SourceAdapter { ctx, clientFactory: this.deps.clientFactory, sourceStateReader: this.deps.sourceStateReader, + ...(this.deps.logger ? { logger: this.deps.logger } : {}), }); } diff --git a/packages/context/src/ingest/adapters/notion/fetch.ts b/packages/context/src/ingest/adapters/notion/fetch.ts index 427dce70..f1a46b72 100644 --- a/packages/context/src/ingest/adapters/notion/fetch.ts +++ b/packages/context/src/ingest/adapters/notion/fetch.ts @@ -12,7 +12,7 @@ import { type NotionPullConfig, } from './types.js'; -interface NotionFetchLogger { +export interface NotionFetchLogger { warn(message: string): void; } diff --git a/packages/context/src/ingest/adapters/notion/notion.adapter.ts b/packages/context/src/ingest/adapters/notion/notion.adapter.ts index 896ef69f..fba68cee 100644 --- a/packages/context/src/ingest/adapters/notion/notion.adapter.ts +++ b/packages/context/src/ingest/adapters/notion/notion.adapter.ts @@ -14,7 +14,7 @@ import type { import { chunkNotionStagedDir, describeNotionScope } from './chunk.js'; import { clusterNotionWorkUnits } from './cluster.js'; import { detectNotionStagedDir } from './detect.js'; -import { fetchNotionSnapshot } from './fetch.js'; +import { fetchNotionSnapshot, type NotionFetchLogger } from './fetch.js'; import { NotionClient } from './notion-client.js'; import { parseNotionPullConfig } from './pull-config.js'; import { type NotionMetadata, notionManifestSchema, notionMetadataSchema } from './types.js'; @@ -31,6 +31,7 @@ interface NotionPullSucceededContext { export interface NotionSourceAdapterDeps { onPullSucceeded?: (ctx: NotionPullSucceededContext) => Promise; + logger?: NotionFetchLogger; } export class NotionSourceAdapter implements SourceAdapter { @@ -48,7 +49,12 @@ export class NotionSourceAdapter implements SourceAdapter { async fetch(pullConfig: unknown, stagedDir: string, _ctx: FetchContext): Promise { const config = parseNotionPullConfig(pullConfig); - await fetchNotionSnapshot({ client: new NotionClient(config.authToken), config, stagedDir }); + await fetchNotionSnapshot({ + client: new NotionClient(config.authToken), + config, + stagedDir, + ...(this.deps.logger ? { logger: this.deps.logger } : {}), + }); } chunk(stagedDir: string, diffSet?: DiffSet): Promise { diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 93d6b063..31d35fb0 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -19,6 +19,7 @@ import { } from './adapters/live-database/daemon-introspection.js'; import { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js'; import { createDaemonLookerTableIdentifierParser } from './adapters/looker/daemon-table-identifier-parser.js'; +import type { LookerClientLogger } from './adapters/looker/client.js'; import { DefaultLookerConnectionClientFactory } from './adapters/looker/factory.js'; import { createLocalLookerCredentialResolver } from './adapters/looker/local-looker.adapter.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; @@ -32,12 +33,20 @@ import type { LookerRuntimeClient } from './adapters/looker/fetch.js'; import { LookmlSourceAdapter } from './adapters/lookml/lookml.adapter.js'; import { pullConfigFromIntegrationConfig } from './adapters/lookml/pull-config.js'; import { createLocalMetabaseSourceAdapter } from './adapters/metabase/local-metabase.adapter.js'; +import type { MetabaseClientLogger } from './adapters/metabase/client.js'; +import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js'; import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js'; import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js'; +import type { NotionFetchLogger } from './adapters/notion/fetch.js'; import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js'; import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; import type { SourceAdapter } from './types.js'; +type LocalIngestOperationalLogger = MetabaseClientLogger & + MetabaseFetchLogger & + LookerClientLogger & + NotionFetchLogger; + export interface DefaultLocalIngestAdaptersOptions { databaseIntrospectionUrl?: string; databaseIntrospection?: Omit; @@ -56,6 +65,7 @@ export interface DefaultLocalIngestAdaptersOptions { parser?: LookerTableIdentifierParser; env?: NodeJS.ProcessEnv; }; + logger?: LocalIngestOperationalLogger; } export function createDefaultLocalIngestAdapters( @@ -64,6 +74,9 @@ export function createDefaultLocalIngestAdapters( ): SourceAdapter[] { const lookerConnectionFactory = new DefaultLookerConnectionClientFactory( createLocalLookerCredentialResolver(project, options.looker?.env), + { + ...(options.logger ? { logger: options.logger } : {}), + }, ); const adapters: SourceAdapter[] = [ @@ -77,7 +90,9 @@ export function createDefaultLocalIngestAdapters( }), new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), new DbtSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), - createLocalMetabaseSourceAdapter(project), + createLocalMetabaseSourceAdapter(project, { + ...(options.logger ? { logger: options.logger } : {}), + }), new LookerSourceAdapter({ clientFactory: { async createClient(config, ctx) { @@ -89,7 +104,9 @@ export function createDefaultLocalIngestAdapters( }, }), new MetricflowSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), - new NotionSourceAdapter(), + new NotionSourceAdapter({ + ...(options.logger ? { logger: options.logger } : {}), + }), ]; if (options.historicSql) { @@ -205,6 +222,9 @@ export async function localPullConfigForAdapter( } else { const runtimeClient = await new DefaultLookerConnectionClientFactory( createLocalLookerCredentialResolver(project, options.looker?.env), + { + ...(options.logger ? { logger: options.logger } : {}), + }, ).createClient(connectionId); cleanupClient = runtimeClient; client = runtimeClient;