feat(cli): route ingest adapter logs through operational logger

This commit is contained in:
Andrey Avtomonov 2026-05-12 11:26:34 +02:00
parent 9e80add72c
commit d7fb092cb0
10 changed files with 232 additions and 26 deletions

View file

@ -14,6 +14,7 @@ import {
import { initKtxProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { type KtxIngestArgs, runKtxIngest } from './ingest.js';
import type { KtxCliLocalIngestAdaptersOptions } from './local-adapters.js';
import {
CliLookerSlWritingAgentRunner,
CliMetabaseAgentRunner,
@ -553,6 +554,46 @@ describe('runKtxIngest', () => {
expect(io.stderr()).toBe('');
});
it('keeps metabase JSON stdout free of operational adapter logs', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
const io = makeIo();
let adapterOptions: KtxCliLocalIngestAdaptersOptions | undefined;
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'prod-metabase',
adapter: 'metabase',
outputMode: 'json',
},
io.io,
{
createAdapters: (_project, options) => {
adapterOptions = options;
options?.logger?.warn('adapter warning');
return [];
},
runLocalMetabaseIngest: async (input) => {
input.adapters.find((adapter) => adapter.source === 'metabase');
return {
metabaseConnectionId: 'prod-metabase',
status: 'all_succeeded',
totals: { workUnits: 0, failedWorkUnits: 0 },
children: [],
};
},
},
),
).resolves.toBe(0);
expect(adapterOptions?.logger).toEqual(expect.objectContaining({ warn: expect.any(Function) }));
expect(() => JSON.parse(io.stdout())).not.toThrow();
expect(io.stderr()).toBe('');
});
it('rejects source-dir uploads through the metabase fan-out route', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
@ -764,17 +805,22 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
adapters: createdAdapters,
adapter: 'fake',
connectionId: 'warehouse',
pullConfigOptions: {
pullConfigOptions: expect.objectContaining({
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
},
logger: expect.any(Object),
}),
}),
);
});
@ -817,14 +863,19 @@ describe('runKtxIngest', () => {
installPolicy: 'auto',
io: io.io,
};
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
managedDaemon: expectedManagedDaemon,
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
managedDaemon: expectedManagedDaemon,
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
pullConfigOptions: {
pullConfigOptions: expect.objectContaining({
managedDaemon: expectedManagedDaemon,
},
logger: expect.any(Object),
}),
}),
);
});
@ -878,9 +929,13 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
historicSqlConnectionId: 'warehouse',
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
historicSqlConnectionId: 'warehouse',
logger: expect.any(Object),
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
adapters: createdAdapters,
@ -1119,15 +1174,19 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(0);
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
looker: {
parser: pullConfigOptions.looker.parser,
},
});
expect(createAdapters).toHaveBeenCalledWith(
expect.objectContaining({ projectDir }),
expect.objectContaining({
logger: expect.any(Object),
looker: {
parser: pullConfigOptions.looker.parser,
},
}),
);
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
agentRunner,
pullConfigOptions,
pullConfigOptions: expect.objectContaining(pullConfigOptions),
}),
);
});

View file

@ -18,6 +18,7 @@ import {
} from '@ktx/context/ingest';
import { loadKtxProject } from '@ktx/context/project';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
import { createCliOperationalLogger } from './io/logger.js';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { type KtxMemoryFlowStdin, renderMemoryFlowInteractively } from './memory-flow-interactive.js';
@ -475,11 +476,13 @@ export async function runKtxIngest(
const executeLocalIngest = deps.runLocalIngest ?? runLocalIngest;
const localIngestOptions = deps.localIngestOptions ?? {};
const managedDaemon = managedDaemonOptionsForIngestRun(args, io);
const operationalLogger = createCliOperationalLogger(io, args.outputMode);
const adapterOptions = {
...(localIngestOptions.pullConfigOptions ?? {}),
...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}),
...(managedDaemon ? { managedDaemon } : {}),
...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}),
logger: operationalLogger,
};
if (args.adapter === 'metabase' && args.sourceDir) {
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');

View file

@ -0,0 +1,65 @@
import { describe, expect, it, vi } from 'vitest';
import { createCliOperationalLogger, createNoopOperationalLogger } from './logger.js';
function makeIo() {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
describe('createCliOperationalLogger', () => {
it('routes operational messages to stderr outside JSON mode', () => {
const io = makeIo();
const logger = createCliOperationalLogger(io.io, 'plain');
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('progress\nwarning\nfailure\ndebug\n');
});
it('suppresses operational messages in JSON mode by default', () => {
const io = makeIo();
const logger = createCliOperationalLogger(io.io, 'json');
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('');
});
});
describe('createNoopOperationalLogger', () => {
it('never writes', () => {
const logger = createNoopOperationalLogger();
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
logger.log('progress');
logger.warn('warning');
logger.error('failure');
logger.debug?.('debug');
expect(warn).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,40 @@
import type { KtxCliIo } from '../cli-runtime.js';
import type { KtxOutputMode } from './mode.js';
export interface KtxOperationalLogger {
log(message: string): void;
warn(message: string): void;
error(message: string): void;
debug?(message: string): void;
}
export type KtxOperationalOutputMode = KtxOutputMode | 'viz';
function writeLine(io: KtxCliIo, message: string): void {
io.stderr.write(message.endsWith('\n') ? message : `${message}\n`);
}
export function createNoopOperationalLogger(): KtxOperationalLogger {
return {
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
}
export function createCliOperationalLogger(
io: KtxCliIo,
mode: KtxOperationalOutputMode,
): KtxOperationalLogger {
if (mode === 'json') {
return createNoopOperationalLogger();
}
return {
log: (message) => writeLine(io, message),
warn: (message) => writeLine(io, message),
error: (message) => writeLine(io, message),
debug: (message) => writeLine(io, message),
};
}

View file

@ -35,6 +35,7 @@ import {
managedDaemonDatabaseIntrospectionOptions,
type ManagedPythonCoreDaemonOptions,
} from './managed-python-http.js';
import type { KtxOperationalLogger } from './io/logger.js';
function hasSnowflakeDriver(connection: unknown): boolean {
return (
@ -162,6 +163,7 @@ export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdap
sqlAnalysis?: SqlAnalysisPort;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
logger?: KtxOperationalLogger;
}
function historicSqlRecord(connection: unknown): Record<string, unknown> | null {