mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
fix(context): make ingest adapter logging explicit
This commit is contained in:
parent
a2dcd4eb08
commit
9e80add72c
8 changed files with 150 additions and 30 deletions
|
|
@ -112,6 +112,24 @@ describe('LookerClient', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('does not warn to console when optional prioritization inputs fail by default', async () => {
|
||||
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
const fakeSdk = sdk({
|
||||
search_dashboards: vi.fn().mockRejectedValue(new Error('dashboards unavailable')),
|
||||
search_looks: vi.fn().mockRejectedValue(new Error('looks unavailable')),
|
||||
});
|
||||
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });
|
||||
|
||||
await expect(client.getSignals()).resolves.toMatchObject({
|
||||
dashboardUsage: [],
|
||||
lookUsage: [],
|
||||
scheduledPlans: [],
|
||||
favorites: [],
|
||||
});
|
||||
|
||||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('maps dashboards, looks, folders, models, explores, users, and groups to staged DTOs', async () => {
|
||||
const fakeSdk = sdk();
|
||||
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });
|
||||
|
|
|
|||
|
|
@ -80,10 +80,10 @@ export interface LookerClientDeps {
|
|||
}
|
||||
|
||||
const defaultLogger: LookerClientLogger = {
|
||||
log: (message) => console.log(message),
|
||||
warn: (message) => console.warn(message),
|
||||
error: (message) => console.error(message),
|
||||
debug: (message) => console.debug(message),
|
||||
log: () => undefined,
|
||||
warn: () => undefined,
|
||||
error: () => undefined,
|
||||
debug: () => undefined,
|
||||
};
|
||||
|
||||
class InlineLookerSettings extends NodeSettings {
|
||||
|
|
|
|||
|
|
@ -72,6 +72,27 @@ describe('MetabaseClient retry exhaustion', () => {
|
|||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('does not warn to console when retrying by default', async () => {
|
||||
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
globalThis.fetch = vi
|
||||
.fn<typeof fetch>()
|
||||
.mockRejectedValueOnce(Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' }))
|
||||
.mockResolvedValueOnce(new Response(JSON.stringify([]), { status: 200 }));
|
||||
|
||||
const client = new MetabaseClient(
|
||||
{ apiUrl: 'https://metabase.example.test', apiKey: 'key' },
|
||||
{
|
||||
...DEFAULT_METABASE_CLIENT_CONFIG,
|
||||
baseDelayMs: 0,
|
||||
maxRetries: 1,
|
||||
},
|
||||
);
|
||||
|
||||
await client.getDatabases();
|
||||
|
||||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('wraps an exhausted ECONNRESET retry chain with method, path, attempt count, and original cause', async () => {
|
||||
const sysErr = Object.assign(new Error('read ECONNRESET'), {
|
||||
code: 'ECONNRESET',
|
||||
|
|
|
|||
|
|
@ -25,10 +25,10 @@ export interface MetabaseClientLogger {
|
|||
}
|
||||
|
||||
const defaultLogger: MetabaseClientLogger = {
|
||||
log: (message) => console.log(message),
|
||||
warn: (message) => console.warn(message),
|
||||
error: (message) => console.error(message),
|
||||
debug: (message) => console.debug(message),
|
||||
log: () => undefined,
|
||||
warn: () => undefined,
|
||||
error: () => undefined,
|
||||
debug: () => undefined,
|
||||
};
|
||||
|
||||
interface TemplateTagInfo {
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ describe('fetchMetabaseBundle', () => {
|
|||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
await rm(stagedDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
|
|
@ -115,6 +116,41 @@ describe('fetchMetabaseBundle', () => {
|
|||
expect(card.archived).toBe(false);
|
||||
});
|
||||
|
||||
it('does not write Metabase fetch progress to console by default', async () => {
|
||||
const log = vi.spyOn(console, 'log').mockImplementation(() => undefined);
|
||||
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
|
||||
await fetchMetabaseBundle({
|
||||
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
|
||||
stagedDir,
|
||||
ctx: makeFetchContext(),
|
||||
clientFactory,
|
||||
sourceStateReader,
|
||||
});
|
||||
|
||||
expect(log).not.toHaveBeenCalled();
|
||||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('routes Metabase fetch warnings through the injected logger', async () => {
|
||||
const logger = {
|
||||
log: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
clientFactory.__client.getCard.mockRejectedValueOnce(new Error('card read failed'));
|
||||
|
||||
await fetchMetabaseBundle({
|
||||
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
|
||||
stagedDir,
|
||||
ctx: makeFetchContext(),
|
||||
clientFactory,
|
||||
sourceStateReader,
|
||||
logger,
|
||||
});
|
||||
|
||||
expect(logger.warn).toHaveBeenCalledWith('failed to load card 1: card read failed');
|
||||
});
|
||||
|
||||
it('passes the Metabase source pull config and target fetch context to the client factory', async () => {
|
||||
await fetchMetabaseBundle({
|
||||
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
|
||||
|
|
|
|||
|
|
@ -21,9 +21,14 @@ class IngestInputError extends Error {
|
|||
}
|
||||
}
|
||||
|
||||
const logger = {
|
||||
log: (message: string) => console.log(message),
|
||||
warn: (message: string) => console.warn(message),
|
||||
export interface MetabaseFetchLogger {
|
||||
log(message: string): void;
|
||||
warn(message: string): void;
|
||||
}
|
||||
|
||||
const noopMetabaseFetchLogger: MetabaseFetchLogger = {
|
||||
log: () => undefined,
|
||||
warn: () => undefined,
|
||||
};
|
||||
|
||||
export interface FetchMetabaseBundleParams {
|
||||
|
|
@ -32,6 +37,7 @@ export interface FetchMetabaseBundleParams {
|
|||
ctx: FetchContext;
|
||||
clientFactory: MetabaseClientFactory;
|
||||
sourceStateReader: MetabaseSourceStateReader;
|
||||
logger?: MetabaseFetchLogger;
|
||||
}
|
||||
|
||||
interface CollectionNode {
|
||||
|
|
@ -76,6 +82,7 @@ function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId:
|
|||
|
||||
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
|
||||
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
|
||||
const logger = params.logger ?? noopMetabaseFetchLogger;
|
||||
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
|
||||
const mapping = syncState.mappings.find(
|
||||
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,
|
||||
|
|
|
|||
|
|
@ -89,12 +89,13 @@ describe('fetchNotionSnapshot', () => {
|
|||
});
|
||||
|
||||
it('logs skipped page materialization failures', async () => {
|
||||
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
const logger = { warn: vi.fn() };
|
||||
(client.retrievePage as ReturnType<typeof vi.fn>).mockRejectedValueOnce(new Error('Notion API failed'));
|
||||
|
||||
const manifest = await fetchNotionSnapshot({
|
||||
client,
|
||||
stagedDir,
|
||||
logger,
|
||||
config: {
|
||||
authToken: 'secret',
|
||||
crawlMode: 'selected_roots',
|
||||
|
|
@ -109,7 +110,7 @@ describe('fetchNotionSnapshot', () => {
|
|||
});
|
||||
|
||||
expect(manifest.skipped).toEqual([{ externalId: 'page-1', reason: 'Notion API failed' }]);
|
||||
expect(warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
|
||||
expect(logger.warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
|
||||
});
|
||||
|
||||
it('recursively fetches selected-root child pages and derives scoped links', async () => {
|
||||
|
|
@ -191,7 +192,7 @@ describe('fetchNotionSnapshot', () => {
|
|||
});
|
||||
|
||||
it('truncates deeply nested block trees and records a warning', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
const logger = { warn: vi.fn() };
|
||||
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockImplementation((blockId: string) => {
|
||||
const currentDepth = blockId === 'page-1' ? 0 : Number(blockId.replace('block-', ''));
|
||||
const nextDepth = currentDepth + 1;
|
||||
|
|
@ -215,6 +216,7 @@ describe('fetchNotionSnapshot', () => {
|
|||
await fetchNotionSnapshot({
|
||||
client,
|
||||
stagedDir,
|
||||
logger,
|
||||
config: {
|
||||
authToken: 'secret',
|
||||
crawlMode: 'selected_roots',
|
||||
|
|
@ -232,11 +234,11 @@ describe('fetchNotionSnapshot', () => {
|
|||
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
|
||||
expect(blocks).toHaveLength(10);
|
||||
expect(manifest.warnings).toContain('maxBlockDepth reached for page page-1 at depth 10');
|
||||
expect(warnSpy).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
|
||||
expect(logger.warn).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
|
||||
});
|
||||
|
||||
it('truncates pages at the per-page block cap and records a warning', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
||||
const logger = { warn: vi.fn() };
|
||||
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockResolvedValue({
|
||||
results: Array.from({ length: 2001 }, (_, index) => ({
|
||||
id: `block-${index}`,
|
||||
|
|
@ -250,6 +252,7 @@ describe('fetchNotionSnapshot', () => {
|
|||
await fetchNotionSnapshot({
|
||||
client,
|
||||
stagedDir,
|
||||
logger,
|
||||
config: {
|
||||
authToken: 'secret',
|
||||
crawlMode: 'selected_roots',
|
||||
|
|
@ -267,7 +270,7 @@ describe('fetchNotionSnapshot', () => {
|
|||
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
|
||||
expect(blocks).toHaveLength(2000);
|
||||
expect(manifest.warnings).toContain('maxBlocksPerPage reached for page page-1 at 2000 blocks');
|
||||
expect(warnSpy).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
|
||||
expect(logger.warn).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
|
||||
});
|
||||
|
||||
it('uses all_accessible search for pages and data sources', async () => {
|
||||
|
|
|
|||
|
|
@ -12,10 +12,19 @@ import {
|
|||
type NotionPullConfig,
|
||||
} from './types.js';
|
||||
|
||||
export interface NotionFetchLogger {
|
||||
warn(message: string): void;
|
||||
}
|
||||
|
||||
const noopNotionFetchLogger: NotionFetchLogger = {
|
||||
warn: () => undefined,
|
||||
};
|
||||
|
||||
interface FetchNotionSnapshotParams {
|
||||
client: NotionApi;
|
||||
config: NotionPullConfig;
|
||||
stagedDir: string;
|
||||
logger?: NotionFetchLogger;
|
||||
}
|
||||
|
||||
interface CrawlState {
|
||||
|
|
@ -23,6 +32,7 @@ interface CrawlState {
|
|||
databaseCount: number;
|
||||
dataSourceCount: number;
|
||||
capped: boolean;
|
||||
logger: NotionFetchLogger;
|
||||
skipped: Array<{ externalId: string; reason: string }>;
|
||||
warnings: string[];
|
||||
materializedPageTargets: Set<string>;
|
||||
|
|
@ -44,9 +54,6 @@ interface NotionLinks {
|
|||
|
||||
const DEFAULT_MAX_BLOCK_DEPTH = 10;
|
||||
const DEFAULT_MAX_BLOCKS_PER_PAGE = 2000;
|
||||
const logger = {
|
||||
warn: (message: string) => console.warn(message),
|
||||
};
|
||||
|
||||
async function writeJson(path: string, value: unknown): Promise<void> {
|
||||
await mkdir(dirname(path), { recursive: true });
|
||||
|
|
@ -58,7 +65,12 @@ async function writeText(path: string, value: string): Promise<void> {
|
|||
await writeFile(path, value.endsWith('\n') ? value : `${value}\n`, 'utf-8');
|
||||
}
|
||||
|
||||
function addWarning(warnings: string[], warning: string, logWarning = false): void {
|
||||
function addWarning(
|
||||
warnings: string[],
|
||||
warning: string,
|
||||
logWarning = false,
|
||||
logger: NotionFetchLogger = noopNotionFetchLogger,
|
||||
): void {
|
||||
if (!warnings.includes(warning)) {
|
||||
warnings.push(warning);
|
||||
if (logWarning) {
|
||||
|
|
@ -119,11 +131,21 @@ async function visitPaginated<T>(params: {
|
|||
} while (cursor);
|
||||
}
|
||||
|
||||
function addBlockCountWarning(state: BlockCollectionState, warnings: string[], pageId: string): void {
|
||||
function addBlockCountWarning(
|
||||
state: BlockCollectionState,
|
||||
warnings: string[],
|
||||
pageId: string,
|
||||
logger: NotionFetchLogger,
|
||||
): void {
|
||||
if (state.blockCountWarningWritten) {
|
||||
return;
|
||||
}
|
||||
addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, true);
|
||||
addWarning(
|
||||
warnings,
|
||||
`maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`,
|
||||
true,
|
||||
logger,
|
||||
);
|
||||
state.blockCountWarningWritten = true;
|
||||
}
|
||||
|
||||
|
|
@ -134,18 +156,19 @@ async function collectBlockChildren(params: {
|
|||
depth: number;
|
||||
warnings: string[];
|
||||
state: BlockCollectionState;
|
||||
logger: NotionFetchLogger;
|
||||
}): Promise<void> {
|
||||
let cursor: string | null = null;
|
||||
do {
|
||||
const remainingBlocks = DEFAULT_MAX_BLOCKS_PER_PAGE - params.state.blocks.length;
|
||||
if (remainingBlocks <= 0) {
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId);
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
|
||||
return;
|
||||
}
|
||||
const page = await params.client.listBlockChildren(params.blockId, cursor, Math.min(remainingBlocks, 100));
|
||||
for (let index = 0; index < page.results.length; index += 1) {
|
||||
if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId);
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -159,9 +182,10 @@ async function collectBlockChildren(params: {
|
|||
params.warnings,
|
||||
`maxBlockDepth reached for page ${params.pageId} at depth ${DEFAULT_MAX_BLOCK_DEPTH}`,
|
||||
true,
|
||||
params.logger,
|
||||
);
|
||||
} else if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId);
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
|
||||
return;
|
||||
} else {
|
||||
await collectBlockChildren({
|
||||
|
|
@ -171,6 +195,7 @@ async function collectBlockChildren(params: {
|
|||
depth: blockDepth,
|
||||
warnings: params.warnings,
|
||||
state: params.state,
|
||||
logger: params.logger,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -179,7 +204,7 @@ async function collectBlockChildren(params: {
|
|||
params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE &&
|
||||
(index < page.results.length - 1 || page.hasMore)
|
||||
) {
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId);
|
||||
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -187,7 +212,12 @@ async function collectBlockChildren(params: {
|
|||
} while (cursor);
|
||||
}
|
||||
|
||||
async function collectBlockTree(client: NotionApi, pageId: string, warnings: string[]): Promise<NotionBlock[]> {
|
||||
async function collectBlockTree(
|
||||
client: NotionApi,
|
||||
pageId: string,
|
||||
warnings: string[],
|
||||
logger: NotionFetchLogger,
|
||||
): Promise<NotionBlock[]> {
|
||||
const state: BlockCollectionState = { blocks: [], blockCountWarningWritten: false };
|
||||
await collectBlockChildren({
|
||||
client,
|
||||
|
|
@ -196,6 +226,7 @@ async function collectBlockTree(client: NotionApi, pageId: string, warnings: str
|
|||
depth: 0,
|
||||
warnings,
|
||||
state,
|
||||
logger,
|
||||
});
|
||||
return state.blocks;
|
||||
}
|
||||
|
|
@ -341,7 +372,7 @@ async function materializePage(params: {
|
|||
if (params.skipDataSourceRows && !params.dataSourceId && parentDataSourceId(page)) {
|
||||
return;
|
||||
}
|
||||
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings);
|
||||
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings, params.state.logger);
|
||||
const metadata = normalizeNotionPageMetadata({
|
||||
page,
|
||||
fallbackPath: params.fallbackPath,
|
||||
|
|
@ -374,7 +405,9 @@ async function materializePage(params: {
|
|||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`);
|
||||
params.state.logger.warn(
|
||||
`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`,
|
||||
);
|
||||
params.state.skipped.push({
|
||||
externalId: params.pageId,
|
||||
reason: error instanceof Error ? error.message : String(error),
|
||||
|
|
@ -491,6 +524,7 @@ async function materializeDatabase(params: {
|
|||
|
||||
export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Promise<NotionManifest> {
|
||||
await mkdir(params.stagedDir, { recursive: true });
|
||||
const logger = params.logger ?? noopNotionFetchLogger;
|
||||
const configuredCursor = params.config.crawlMode === 'all_accessible' ? parseConfiguredCursor(params.config) : null;
|
||||
const continuedFromCursor = configuredCursor !== null;
|
||||
const state: CrawlState = {
|
||||
|
|
@ -498,6 +532,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
|
|||
databaseCount: 0,
|
||||
dataSourceCount: 0,
|
||||
capped: false,
|
||||
logger,
|
||||
skipped: [],
|
||||
warnings: [],
|
||||
materializedPageTargets: new Set(),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue