fix(context): make ingest adapter logging explicit

This commit is contained in:
Andrey Avtomonov 2026-05-12 01:33:03 +02:00
parent 61ed276b44
commit 20ac0329b8
8 changed files with 156 additions and 33 deletions

View file

@ -103,6 +103,24 @@ function sdk(overrides: Partial<LookerSdkPort> = {}): LookerSdkPort {
}
describe('LookerClient', () => {
it('does not warn to console when optional prioritization inputs fail by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const fakeSdk = sdk({
search_dashboards: vi.fn().mockRejectedValue(new Error('dashboards unavailable')),
search_looks: vi.fn().mockRejectedValue(new Error('looks unavailable')),
});
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });
await expect(client.getSignals()).resolves.toEqual({
dashboardUsage: [],
lookUsage: [],
scheduledPlans: [],
favorites: [],
});
expect(warn).not.toHaveBeenCalled();
});
it('validates credentials with me()', async () => {
const client = new LookerClient(params(), { sdkFactory: () => sdk() });

View file

@ -80,10 +80,10 @@ export interface LookerClientDeps {
}
const defaultLogger: LookerClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
class InlineLookerSettings extends NodeSettings {

View file

@ -72,6 +72,27 @@ describe('MetabaseClient retry exhaustion', () => {
vi.restoreAllMocks();
});
it('does not warn to console when retrying by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
globalThis.fetch = vi
.fn<typeof fetch>()
.mockRejectedValueOnce(Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' }))
.mockResolvedValueOnce(new Response(JSON.stringify([]), { status: 200 }));
const client = new MetabaseClient(
{ apiUrl: 'https://metabase.example.test', apiKey: 'key' },
{
...DEFAULT_METABASE_CLIENT_CONFIG,
baseDelayMs: 0,
maxRetries: 1,
},
);
await client.getDatabases();
expect(warn).not.toHaveBeenCalled();
});
it('wraps an exhausted ECONNRESET retry chain with method, path, attempt count, and original cause', async () => {
const sysErr = Object.assign(new Error('read ECONNRESET'), {
code: 'ECONNRESET',

View file

@ -25,10 +25,10 @@ export interface MetabaseClientLogger {
}
const defaultLogger: MetabaseClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
interface TemplateTagInfo {

View file

@ -115,6 +115,41 @@ describe('fetchMetabaseBundle', () => {
expect(card.archived).toBe(false);
});
it('does not write Metabase fetch progress to console by default', async () => {
const log = vi.spyOn(console, 'log').mockImplementation(() => undefined);
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
});
expect(log).not.toHaveBeenCalled();
expect(warn).not.toHaveBeenCalled();
});
it('routes Metabase fetch warnings through the injected logger', async () => {
const logger = {
log: vi.fn(),
warn: vi.fn(),
};
clientFactory.__client.getCard.mockRejectedValueOnce(new Error('card read failed'));
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
logger,
});
expect(logger.warn).toHaveBeenCalledWith('failed to load card 1: card read failed');
});
it('passes the Metabase source pull config and target fetch context to the client factory', async () => {
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },

View file

@ -21,9 +21,14 @@ class IngestInputError extends Error {
}
}
const logger = {
log: (message: string) => console.log(message),
warn: (message: string) => console.warn(message),
interface MetabaseFetchLogger {
log(message: string): void;
warn(message: string): void;
}
const noopMetabaseFetchLogger: MetabaseFetchLogger = {
log: () => undefined,
warn: () => undefined,
};
export interface FetchMetabaseBundleParams {
@ -32,6 +37,7 @@ export interface FetchMetabaseBundleParams {
ctx: FetchContext;
clientFactory: MetabaseClientFactory;
sourceStateReader: MetabaseSourceStateReader;
logger?: MetabaseFetchLogger;
}
interface CollectionNode {
@ -76,6 +82,7 @@ function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId:
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
const logger = params.logger ?? noopMetabaseFetchLogger;
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
const mapping = syncState.mappings.find(
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,

View file

@ -89,12 +89,13 @@ describe('fetchNotionSnapshot', () => {
});
it('logs skipped page materialization failures', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.retrievePage as ReturnType<typeof vi.fn>).mockRejectedValueOnce(new Error('Notion API failed'));
const manifest = await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -109,7 +110,7 @@ describe('fetchNotionSnapshot', () => {
});
expect(manifest.skipped).toEqual([{ externalId: 'page-1', reason: 'Notion API failed' }]);
expect(warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
expect(logger.warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
});
it('recursively fetches selected-root child pages and derives scoped links', async () => {
@ -191,7 +192,7 @@ describe('fetchNotionSnapshot', () => {
});
it('truncates deeply nested block trees and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockImplementation((blockId: string) => {
const currentDepth = blockId === 'page-1' ? 0 : Number(blockId.replace('block-', ''));
const nextDepth = currentDepth + 1;
@ -215,6 +216,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -232,11 +234,11 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(10);
expect(manifest.warnings).toContain('maxBlockDepth reached for page page-1 at depth 10');
expect(warnSpy).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
expect(logger.warn).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
});
it('truncates pages at the per-page block cap and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockResolvedValue({
results: Array.from({ length: 2001 }, (_, index) => ({
id: `block-${index}`,
@ -250,6 +252,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -267,7 +270,7 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(2000);
expect(manifest.warnings).toContain('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(warnSpy).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(logger.warn).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
});
it('uses all_accessible search for pages and data sources', async () => {

View file

@ -12,10 +12,19 @@ import {
type NotionPullConfig,
} from './types.js';
interface NotionFetchLogger {
warn(message: string): void;
}
const noopNotionFetchLogger: NotionFetchLogger = {
warn: () => undefined,
};
interface FetchNotionSnapshotParams {
client: NotionApi;
config: NotionPullConfig;
stagedDir: string;
logger?: NotionFetchLogger;
}
interface CrawlState {
@ -44,9 +53,6 @@ interface NotionLinks {
const DEFAULT_MAX_BLOCK_DEPTH = 10;
const DEFAULT_MAX_BLOCKS_PER_PAGE = 2000;
const logger = {
warn: (message: string) => console.warn(message),
};
async function writeJson(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
@ -58,11 +64,15 @@ async function writeText(path: string, value: string): Promise<void> {
await writeFile(path, value.endsWith('\n') ? value : `${value}\n`, 'utf-8');
}
function addWarning(warnings: string[], warning: string, logWarning = false): void {
function addWarning(
warnings: string[],
warning: string,
options: { logWarning?: boolean; logger?: NotionFetchLogger } = {},
): void {
if (!warnings.includes(warning)) {
warnings.push(warning);
if (logWarning) {
logger.warn(warning);
if (options.logWarning) {
options.logger?.warn(warning);
}
}
}
@ -119,11 +129,19 @@ async function visitPaginated<T>(params: {
} while (cursor);
}
function addBlockCountWarning(state: BlockCollectionState, warnings: string[], pageId: string): void {
function addBlockCountWarning(
state: BlockCollectionState,
warnings: string[],
pageId: string,
logger: NotionFetchLogger,
): void {
if (state.blockCountWarningWritten) {
return;
}
addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, true);
addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, {
logWarning: true,
logger,
});
state.blockCountWarningWritten = true;
}
@ -134,18 +152,19 @@ async function collectBlockChildren(params: {
depth: number;
warnings: string[];
state: BlockCollectionState;
logger: NotionFetchLogger;
}): Promise<void> {
let cursor: string | null = null;
do {
const remainingBlocks = DEFAULT_MAX_BLOCKS_PER_PAGE - params.state.blocks.length;
if (remainingBlocks <= 0) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
const page = await params.client.listBlockChildren(params.blockId, cursor, Math.min(remainingBlocks, 100));
for (let index = 0; index < page.results.length; index += 1) {
if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
@ -158,10 +177,10 @@ async function collectBlockChildren(params: {
addWarning(
params.warnings,
`maxBlockDepth reached for page ${params.pageId} at depth ${DEFAULT_MAX_BLOCK_DEPTH}`,
true,
{ logWarning: true, logger: params.logger },
);
} else if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
} else {
await collectBlockChildren({
@ -171,6 +190,7 @@ async function collectBlockChildren(params: {
depth: blockDepth,
warnings: params.warnings,
state: params.state,
logger: params.logger,
});
}
}
@ -179,7 +199,7 @@ async function collectBlockChildren(params: {
params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE &&
(index < page.results.length - 1 || page.hasMore)
) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
}
@ -187,7 +207,12 @@ async function collectBlockChildren(params: {
} while (cursor);
}
async function collectBlockTree(client: NotionApi, pageId: string, warnings: string[]): Promise<NotionBlock[]> {
async function collectBlockTree(
client: NotionApi,
pageId: string,
warnings: string[],
logger: NotionFetchLogger,
): Promise<NotionBlock[]> {
const state: BlockCollectionState = { blocks: [], blockCountWarningWritten: false };
await collectBlockChildren({
client,
@ -196,6 +221,7 @@ async function collectBlockTree(client: NotionApi, pageId: string, warnings: str
depth: 0,
warnings,
state,
logger,
});
return state.blocks;
}
@ -325,6 +351,7 @@ async function materializePage(params: {
rowPath?: string | null;
page?: Record<string, unknown> | null;
skipDataSourceRows?: boolean;
logger: NotionFetchLogger;
}): Promise<void> {
const dir = params.rowPath ?? join('pages', params.pageId);
if (params.state.materializedPageTargets.has(dir)) {
@ -341,7 +368,7 @@ async function materializePage(params: {
if (params.skipDataSourceRows && !params.dataSourceId && parentDataSourceId(page)) {
return;
}
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings);
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings, params.logger);
const metadata = normalizeNotionPageMetadata({
page,
fallbackPath: params.fallbackPath,
@ -370,11 +397,12 @@ async function materializePage(params: {
fallbackPath: [...params.fallbackPath, metadata.title],
state: params.state,
config: params.config,
logger: params.logger,
});
}
}
} catch (error) {
logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`);
params.logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`);
params.state.skipped.push({
externalId: params.pageId,
reason: error instanceof Error ? error.message : String(error),
@ -392,6 +420,7 @@ async function materializeDataSource(params: {
databaseId?: string | null;
dataSourceSearchCursorAfterThis?: string | null;
rowStartCursor?: string | null;
logger: NotionFetchLogger;
}): Promise<void> {
const baseDir = params.databaseId
? join('databases', params.databaseId, 'data-sources', params.dataSourceId)
@ -431,6 +460,7 @@ async function materializeDataSource(params: {
dataSourceId: params.dataSourceId,
rowPath: join(baseDir, 'rows', row.id),
page: row,
logger: params.logger,
});
if (!hasPageBudget(params.state, params.config) && nextCursor) {
markCapped(
@ -456,6 +486,7 @@ async function materializeDatabase(params: {
stagedRoot: string;
state: CrawlState;
config: NotionPullConfig;
logger: NotionFetchLogger;
}): Promise<void> {
const database: NotionDatabaseContainer = await params.client.retrieveDatabase(params.databaseId);
await writeJson(join(params.stagedRoot, 'databases', params.databaseId, 'metadata.json'), {
@ -485,11 +516,13 @@ async function materializeDatabase(params: {
state: params.state,
config: params.config,
databaseId: params.databaseId,
logger: params.logger,
});
}
}
export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Promise<NotionManifest> {
const logger = params.logger ?? noopNotionFetchLogger;
await mkdir(params.stagedDir, { recursive: true });
const configuredCursor = params.config.crawlMode === 'all_accessible' ? parseConfiguredCursor(params.config) : null;
const continuedFromCursor = configuredCursor !== null;
@ -522,6 +555,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
config: params.config,
dataSourceSearchCursorAfterThis: configuredCursor.dataSourceSearchCursor,
rowStartCursor: configuredCursor.rowCursor,
logger,
});
if (!hasPageBudget(state, params.config) && !state.capped && configuredCursor.dataSourceSearchCursor) {
markCapped(state, params.config, {
@ -546,6 +580,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
state,
config: params.config,
skipDataSourceRows: true,
logger,
});
if (!hasPageBudget(state, params.config) && nextCursor) {
markCapped(state, params.config, { phase: 'all_accessible_pages', cursor: nextCursor });
@ -577,6 +612,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
state,
config: params.config,
dataSourceSearchCursorAfterThis: nextCursor,
logger,
});
if (!hasPageBudget(state, params.config) && state.nextSuccessfulCursor === null) {
markCapped(state, params.config, { phase: 'all_accessible_data_sources', cursor: nextCursor });
@ -596,6 +632,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
fallbackPath: [],
state,
config: params.config,
logger,
});
}
for (const databaseId of params.config.rootDatabaseIds) {
@ -608,6 +645,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
stagedRoot: params.stagedDir,
state,
config: params.config,
logger,
});
}
for (const dataSourceId of params.config.rootDataSourceIds) {
@ -621,6 +659,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
fallbackPath: [dataSourceId],
state,
config: params.config,
logger,
});
}
}