Merge remote-tracking branch 'origin/main' into luca-martial/schema-select-ux-text

# Conflicts:
#	packages/cli/src/demo.test.ts
#	packages/context/src/ingest/local-adapters.ts
This commit is contained in:
Andrey Avtomonov 2026-05-12 14:47:28 +02:00
commit d0f650f44a
123 changed files with 3739 additions and 933 deletions

View file

@ -23,14 +23,14 @@ describe('standalone Notion connection config', () => {
it('parses selected-root Notion config with safe defaults', () => {
const parsed = parseNotionConnectionConfig({
driver: 'notion',
auth_token_ref: 'env:NOTION_AUTH_TOKEN',
auth_token_ref: 'env:NOTION_TOKEN',
crawl_mode: 'selected_roots',
root_page_ids: ['page-1'],
});
expect(parsed).toEqual({
driver: 'notion',
auth_token_ref: 'env:NOTION_AUTH_TOKEN',
auth_token_ref: 'env:NOTION_TOKEN',
crawl_mode: 'selected_roots',
root_page_ids: ['page-1'],
root_database_ids: [],
@ -70,7 +70,7 @@ describe('standalone Notion connection config', () => {
expect(() =>
parseNotionConnectionConfig({
driver: 'notion',
auth_token_ref: 'env:NOTION_AUTH_TOKEN',
auth_token_ref: 'env:NOTION_TOKEN',
crawl_mode: 'selected_roots',
}),
).toThrow('selected_roots requires at least one root page, database, or data source id');
@ -81,8 +81,8 @@ describe('standalone Notion connection config', () => {
await writeFile(tokenPath, 'ntn_file_token\n', 'utf-8');
await expect(
resolveNotionAuthToken('env:NOTION_AUTH_TOKEN', {
env: { NOTION_AUTH_TOKEN: 'ntn_env_token' },
resolveNotionAuthToken('env:NOTION_TOKEN', {
env: { NOTION_TOKEN: 'ntn_env_token' },
}),
).resolves.toBe('ntn_env_token');
await expect(resolveNotionAuthToken(`file:${tokenPath}`)).resolves.toBe('ntn_file_token');
@ -95,14 +95,14 @@ describe('standalone Notion connection config', () => {
const pullConfig = await notionConnectionToPullConfig(
parseNotionConnectionConfig({
driver: 'notion',
auth_token_ref: 'env:NOTION_AUTH_TOKEN',
auth_token_ref: 'env:NOTION_TOKEN',
crawl_mode: 'all_accessible',
max_pages_per_run: 12,
max_knowledge_creates_per_run: 2,
max_knowledge_updates_per_run: 7,
last_successful_cursor: '{"phase":"all_accessible_pages","cursor":"cursor-1"}',
}),
{ env: { NOTION_AUTH_TOKEN: 'ntn_env_token' } },
{ env: { NOTION_TOKEN: 'ntn_env_token' } },
);
expect(pullConfig).toEqual({

View file

@ -112,6 +112,24 @@ describe('LookerClient', () => {
});
});
it('does not warn to console when optional prioritization inputs fail by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const fakeSdk = sdk({
search_dashboards: vi.fn().mockRejectedValue(new Error('dashboards unavailable')),
search_looks: vi.fn().mockRejectedValue(new Error('looks unavailable')),
});
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });
await expect(client.getSignals()).resolves.toMatchObject({
dashboardUsage: [],
lookUsage: [],
scheduledPlans: [],
favorites: [],
});
expect(warn).not.toHaveBeenCalled();
});
it('maps dashboards, looks, folders, models, explores, users, and groups to staged DTOs', async () => {
const fakeSdk = sdk();
const client = new LookerClient(params(), { sdkFactory: () => fakeSdk });

View file

@ -80,10 +80,10 @@ export interface LookerClientDeps {
}
const defaultLogger: LookerClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
class InlineLookerSettings extends NodeSettings {

View file

@ -1,4 +1,5 @@
import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js';
import type { LookerClientLogger } from './client.js';
import {
DefaultLookerClientFactory,
DefaultLookerConnectionClientFactory,
@ -59,8 +60,11 @@ export function createLocalLookerCredentialResolver(
export function createLocalLookerSourceAdapter(
project: KtxLocalProject,
env: NodeJS.ProcessEnv = process.env,
logger?: LookerClientLogger,
): LookerSourceAdapter {
const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env));
const connectionFactory = new DefaultLookerConnectionClientFactory(createLocalLookerCredentialResolver(project, env), {
...(logger ? { logger } : {}),
});
return new LookerSourceAdapter({
clientFactory: new DefaultLookerClientFactory(connectionFactory),
});

View file

@ -72,6 +72,27 @@ describe('MetabaseClient retry exhaustion', () => {
vi.restoreAllMocks();
});
it('does not warn to console when retrying by default', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
globalThis.fetch = vi
.fn<typeof fetch>()
.mockRejectedValueOnce(Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' }))
.mockResolvedValueOnce(new Response(JSON.stringify([]), { status: 200 }));
const client = new MetabaseClient(
{ apiUrl: 'https://metabase.example.test', apiKey: 'key' },
{
...DEFAULT_METABASE_CLIENT_CONFIG,
baseDelayMs: 0,
maxRetries: 1,
},
);
await client.getDatabases();
expect(warn).not.toHaveBeenCalled();
});
it('wraps an exhausted ECONNRESET retry chain with method, path, attempt count, and original cause', async () => {
const sysErr = Object.assign(new Error('read ECONNRESET'), {
code: 'ECONNRESET',

View file

@ -25,10 +25,10 @@ export interface MetabaseClientLogger {
}
const defaultLogger: MetabaseClientLogger = {
log: (message) => console.log(message),
warn: (message) => console.warn(message),
error: (message) => console.error(message),
debug: (message) => console.debug(message),
log: () => undefined,
warn: () => undefined,
error: () => undefined,
debug: () => undefined,
};
interface TemplateTagInfo {

View file

@ -86,6 +86,7 @@ describe('fetchMetabaseBundle', () => {
});
afterEach(async () => {
vi.restoreAllMocks();
await rm(stagedDir, { recursive: true, force: true });
});
@ -115,6 +116,41 @@ describe('fetchMetabaseBundle', () => {
expect(card.archived).toBe(false);
});
it('does not write Metabase fetch progress to console by default', async () => {
const log = vi.spyOn(console, 'log').mockImplementation(() => undefined);
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
});
expect(log).not.toHaveBeenCalled();
expect(warn).not.toHaveBeenCalled();
});
it('routes Metabase fetch warnings through the injected logger', async () => {
const logger = {
log: vi.fn(),
warn: vi.fn(),
};
clientFactory.__client.getCard.mockRejectedValueOnce(new Error('card read failed'));
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
stagedDir,
ctx: makeFetchContext(),
clientFactory,
sourceStateReader,
logger,
});
expect(logger.warn).toHaveBeenCalledWith('failed to load card 1: card read failed');
});
it('passes the Metabase source pull config and target fetch context to the client factory', async () => {
await fetchMetabaseBundle({
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },

View file

@ -21,9 +21,14 @@ class IngestInputError extends Error {
}
}
const logger = {
log: (message: string) => console.log(message),
warn: (message: string) => console.warn(message),
export interface MetabaseFetchLogger {
log(message: string): void;
warn(message: string): void;
}
const noopMetabaseFetchLogger: MetabaseFetchLogger = {
log: () => undefined,
warn: () => undefined,
};
export interface FetchMetabaseBundleParams {
@ -32,6 +37,7 @@ export interface FetchMetabaseBundleParams {
ctx: FetchContext;
clientFactory: MetabaseClientFactory;
sourceStateReader: MetabaseSourceStateReader;
logger?: MetabaseFetchLogger;
}
interface CollectionNode {
@ -76,6 +82,7 @@ function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId:
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
const logger = params.logger ?? noopMetabaseFetchLogger;
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
const mapping = syncState.mappings.find(
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,

View file

@ -1,12 +1,17 @@
import type { KtxLocalProject, KtxProjectConnectionConfig } from '../../../project/index.js';
import { ktxLocalStateDbPath } from '../../../project/index.js';
import { resolveKtxConfigReference } from '../../../core/config-reference.js';
import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory } from './client.js';
import {
DEFAULT_METABASE_CLIENT_CONFIG,
DefaultMetabaseConnectionClientFactory,
type MetabaseClientLogger,
} from './client.js';
import {
IngestMetabaseClientFactory,
type MetabaseClientConfig,
type MetabaseClientRuntimeConfig,
} from './client-port.js';
import type { MetabaseFetchLogger } from './fetch.js';
import { LocalMetabaseSourceStateReader } from './local-source-state-store.js';
import { MetabaseSourceAdapter } from './metabase.adapter.js';
@ -50,6 +55,7 @@ export function metabaseRuntimeConfigFromLocalConnection(
interface CreateLocalMetabaseSourceAdapterOptions {
env?: NodeJS.ProcessEnv;
defaultClientConfig?: MetabaseClientConfig;
logger?: MetabaseClientLogger & MetabaseFetchLogger;
}
export function createLocalMetabaseSourceAdapter(
@ -65,9 +71,11 @@ export function createLocalMetabaseSourceAdapter(
options.env,
),
options.defaultClientConfig ?? DEFAULT_METABASE_CLIENT_CONFIG,
options.logger,
);
return new MetabaseSourceAdapter({
clientFactory: new IngestMetabaseClientFactory(connectionFactory),
sourceStateReader,
...(options.logger ? { logger: options.logger } : {}),
});
}

View file

@ -4,7 +4,7 @@ import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter
import { chunkMetabaseStagedDir } from './chunk.js';
import type { MetabaseClientFactory } from './client-port.js';
import { detectMetabaseStagedDir } from './detect.js';
import { fetchMetabaseBundle } from './fetch.js';
import { fetchMetabaseBundle, type MetabaseFetchLogger } from './fetch.js';
import { computeFetchScope, hashScope, isPathInMetabaseScope } from './fetch-scope.js';
import type { MetabaseSourceStateReader } from './source-state-port.js';
import { STAGED_FILES, stagedSyncConfigSchema } from './types.js';
@ -12,6 +12,7 @@ import { STAGED_FILES, stagedSyncConfigSchema } from './types.js';
export interface MetabaseSourceAdapterDeps {
clientFactory: MetabaseClientFactory;
sourceStateReader: MetabaseSourceStateReader;
logger?: MetabaseFetchLogger;
}
export class MetabaseSourceAdapter implements SourceAdapter {
@ -31,6 +32,7 @@ export class MetabaseSourceAdapter implements SourceAdapter {
ctx,
clientFactory: this.deps.clientFactory,
sourceStateReader: this.deps.sourceStateReader,
...(this.deps.logger ? { logger: this.deps.logger } : {}),
});
}

View file

@ -89,12 +89,13 @@ describe('fetchNotionSnapshot', () => {
});
it('logs skipped page materialization failures', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.retrievePage as ReturnType<typeof vi.fn>).mockRejectedValueOnce(new Error('Notion API failed'));
const manifest = await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -109,7 +110,7 @@ describe('fetchNotionSnapshot', () => {
});
expect(manifest.skipped).toEqual([{ externalId: 'page-1', reason: 'Notion API failed' }]);
expect(warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
expect(logger.warn).toHaveBeenCalledWith('Skipping Notion page page-1: Notion API failed');
});
it('recursively fetches selected-root child pages and derives scoped links', async () => {
@ -191,7 +192,7 @@ describe('fetchNotionSnapshot', () => {
});
it('truncates deeply nested block trees and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockImplementation((blockId: string) => {
const currentDepth = blockId === 'page-1' ? 0 : Number(blockId.replace('block-', ''));
const nextDepth = currentDepth + 1;
@ -215,6 +216,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -232,11 +234,11 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(10);
expect(manifest.warnings).toContain('maxBlockDepth reached for page page-1 at depth 10');
expect(warnSpy).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
expect(logger.warn).toHaveBeenCalledWith('maxBlockDepth reached for page page-1 at depth 10');
});
it('truncates pages at the per-page block cap and records a warning', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const logger = { warn: vi.fn() };
(client.listBlockChildren as ReturnType<typeof vi.fn>).mockResolvedValue({
results: Array.from({ length: 2001 }, (_, index) => ({
id: `block-${index}`,
@ -250,6 +252,7 @@ describe('fetchNotionSnapshot', () => {
await fetchNotionSnapshot({
client,
stagedDir,
logger,
config: {
authToken: 'secret',
crawlMode: 'selected_roots',
@ -267,7 +270,7 @@ describe('fetchNotionSnapshot', () => {
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8'));
expect(blocks).toHaveLength(2000);
expect(manifest.warnings).toContain('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(warnSpy).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
expect(logger.warn).toHaveBeenCalledWith('maxBlocksPerPage reached for page page-1 at 2000 blocks');
});
it('uses all_accessible search for pages and data sources', async () => {

View file

@ -12,10 +12,19 @@ import {
type NotionPullConfig,
} from './types.js';
export interface NotionFetchLogger {
warn(message: string): void;
}
const noopNotionFetchLogger: NotionFetchLogger = {
warn: () => undefined,
};
interface FetchNotionSnapshotParams {
client: NotionApi;
config: NotionPullConfig;
stagedDir: string;
logger?: NotionFetchLogger;
}
interface CrawlState {
@ -23,6 +32,7 @@ interface CrawlState {
databaseCount: number;
dataSourceCount: number;
capped: boolean;
logger: NotionFetchLogger;
skipped: Array<{ externalId: string; reason: string }>;
warnings: string[];
materializedPageTargets: Set<string>;
@ -44,9 +54,6 @@ interface NotionLinks {
const DEFAULT_MAX_BLOCK_DEPTH = 10;
const DEFAULT_MAX_BLOCKS_PER_PAGE = 2000;
const logger = {
warn: (message: string) => console.warn(message),
};
async function writeJson(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
@ -58,7 +65,12 @@ async function writeText(path: string, value: string): Promise<void> {
await writeFile(path, value.endsWith('\n') ? value : `${value}\n`, 'utf-8');
}
function addWarning(warnings: string[], warning: string, logWarning = false): void {
function addWarning(
warnings: string[],
warning: string,
logWarning = false,
logger: NotionFetchLogger = noopNotionFetchLogger,
): void {
if (!warnings.includes(warning)) {
warnings.push(warning);
if (logWarning) {
@ -119,11 +131,21 @@ async function visitPaginated<T>(params: {
} while (cursor);
}
function addBlockCountWarning(state: BlockCollectionState, warnings: string[], pageId: string): void {
function addBlockCountWarning(
state: BlockCollectionState,
warnings: string[],
pageId: string,
logger: NotionFetchLogger,
): void {
if (state.blockCountWarningWritten) {
return;
}
addWarning(warnings, `maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`, true);
addWarning(
warnings,
`maxBlocksPerPage reached for page ${pageId} at ${DEFAULT_MAX_BLOCKS_PER_PAGE} blocks`,
true,
logger,
);
state.blockCountWarningWritten = true;
}
@ -134,18 +156,19 @@ async function collectBlockChildren(params: {
depth: number;
warnings: string[];
state: BlockCollectionState;
logger: NotionFetchLogger;
}): Promise<void> {
let cursor: string | null = null;
do {
const remainingBlocks = DEFAULT_MAX_BLOCKS_PER_PAGE - params.state.blocks.length;
if (remainingBlocks <= 0) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
const page = await params.client.listBlockChildren(params.blockId, cursor, Math.min(remainingBlocks, 100));
for (let index = 0; index < page.results.length; index += 1) {
if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
@ -159,9 +182,10 @@ async function collectBlockChildren(params: {
params.warnings,
`maxBlockDepth reached for page ${params.pageId} at depth ${DEFAULT_MAX_BLOCK_DEPTH}`,
true,
params.logger,
);
} else if (params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
} else {
await collectBlockChildren({
@ -171,6 +195,7 @@ async function collectBlockChildren(params: {
depth: blockDepth,
warnings: params.warnings,
state: params.state,
logger: params.logger,
});
}
}
@ -179,7 +204,7 @@ async function collectBlockChildren(params: {
params.state.blocks.length >= DEFAULT_MAX_BLOCKS_PER_PAGE &&
(index < page.results.length - 1 || page.hasMore)
) {
addBlockCountWarning(params.state, params.warnings, params.pageId);
addBlockCountWarning(params.state, params.warnings, params.pageId, params.logger);
return;
}
}
@ -187,7 +212,12 @@ async function collectBlockChildren(params: {
} while (cursor);
}
async function collectBlockTree(client: NotionApi, pageId: string, warnings: string[]): Promise<NotionBlock[]> {
async function collectBlockTree(
client: NotionApi,
pageId: string,
warnings: string[],
logger: NotionFetchLogger,
): Promise<NotionBlock[]> {
const state: BlockCollectionState = { blocks: [], blockCountWarningWritten: false };
await collectBlockChildren({
client,
@ -196,6 +226,7 @@ async function collectBlockTree(client: NotionApi, pageId: string, warnings: str
depth: 0,
warnings,
state,
logger,
});
return state.blocks;
}
@ -341,7 +372,7 @@ async function materializePage(params: {
if (params.skipDataSourceRows && !params.dataSourceId && parentDataSourceId(page)) {
return;
}
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings);
const blocks = await collectBlockTree(params.client, params.pageId, params.state.warnings, params.state.logger);
const metadata = normalizeNotionPageMetadata({
page,
fallbackPath: params.fallbackPath,
@ -374,7 +405,9 @@ async function materializePage(params: {
}
}
} catch (error) {
logger.warn(`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`);
params.state.logger.warn(
`Skipping Notion page ${params.pageId}: ${error instanceof Error ? error.message : String(error)}`,
);
params.state.skipped.push({
externalId: params.pageId,
reason: error instanceof Error ? error.message : String(error),
@ -491,6 +524,7 @@ async function materializeDatabase(params: {
export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Promise<NotionManifest> {
await mkdir(params.stagedDir, { recursive: true });
const logger = params.logger ?? noopNotionFetchLogger;
const configuredCursor = params.config.crawlMode === 'all_accessible' ? parseConfiguredCursor(params.config) : null;
const continuedFromCursor = configuredCursor !== null;
const state: CrawlState = {
@ -498,6 +532,7 @@ export async function fetchNotionSnapshot(params: FetchNotionSnapshotParams): Pr
databaseCount: 0,
dataSourceCount: 0,
capped: false,
logger,
skipped: [],
warnings: [],
materializedPageTargets: new Set(),

View file

@ -14,7 +14,7 @@ import type {
import { chunkNotionStagedDir, describeNotionScope } from './chunk.js';
import { clusterNotionWorkUnits } from './cluster.js';
import { detectNotionStagedDir } from './detect.js';
import { fetchNotionSnapshot } from './fetch.js';
import { fetchNotionSnapshot, type NotionFetchLogger } from './fetch.js';
import { NotionClient } from './notion-client.js';
import { parseNotionPullConfig } from './pull-config.js';
import { type NotionMetadata, notionManifestSchema, notionMetadataSchema } from './types.js';
@ -31,6 +31,7 @@ interface NotionPullSucceededContext {
export interface NotionSourceAdapterDeps {
onPullSucceeded?: (ctx: NotionPullSucceededContext) => Promise<void>;
logger?: NotionFetchLogger;
}
export class NotionSourceAdapter implements SourceAdapter {
@ -48,7 +49,12 @@ export class NotionSourceAdapter implements SourceAdapter {
async fetch(pullConfig: unknown, stagedDir: string, _ctx: FetchContext): Promise<void> {
const config = parseNotionPullConfig(pullConfig);
await fetchNotionSnapshot({ client: new NotionClient(config.authToken), config, stagedDir });
await fetchNotionSnapshot({
client: new NotionClient(config.authToken),
config,
stagedDir,
...(this.deps.logger ? { logger: this.deps.logger } : {}),
});
}
chunk(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult> {

View file

@ -19,6 +19,7 @@ import {
} from './adapters/live-database/daemon-introspection.js';
import { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js';
import { createDaemonLookerTableIdentifierParser } from './adapters/looker/daemon-table-identifier-parser.js';
import type { LookerClientLogger } from './adapters/looker/client.js';
import { DefaultLookerConnectionClientFactory } from './adapters/looker/factory.js';
import { createLocalLookerCredentialResolver } from './adapters/looker/local-looker.adapter.js';
import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js';
@ -32,9 +33,12 @@ import type { LookerRuntimeClient } from './adapters/looker/fetch.js';
import { LookmlSourceAdapter } from './adapters/lookml/lookml.adapter.js';
import { pullConfigFromIntegrationConfig } from './adapters/lookml/pull-config.js';
import { createLocalMetabaseSourceAdapter } from './adapters/metabase/local-metabase.adapter.js';
import type { MetabaseClientLogger } from './adapters/metabase/client.js';
import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js';
import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js';
import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js';
import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js';
import type { NotionFetchLogger } from './adapters/notion/fetch.js';
import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js';
import type { SourceAdapter } from './types.js';
@ -56,14 +60,23 @@ export interface DefaultLocalIngestAdaptersOptions {
parser?: LookerTableIdentifierParser;
env?: NodeJS.ProcessEnv;
};
logger?: LocalIngestOperationalLogger;
}
type LocalIngestOperationalLogger = MetabaseClientLogger &
MetabaseFetchLogger &
LookerClientLogger &
NotionFetchLogger;
export function createDefaultLocalIngestAdapters(
project: KtxLocalProject,
options: DefaultLocalIngestAdaptersOptions = {},
): SourceAdapter[] {
const lookerConnectionFactory = new DefaultLookerConnectionClientFactory(
createLocalLookerCredentialResolver(project, options.looker?.env),
{
...(options.logger ? { logger: options.logger } : {}),
},
);
const adapters: SourceAdapter[] = [
@ -80,7 +93,9 @@ export function createDefaultLocalIngestAdapters(
homeDir: join(project.projectDir, '.ktx/cache'),
targetConnectionIds: primaryWarehouseConnectionIds(project),
}),
createLocalMetabaseSourceAdapter(project),
createLocalMetabaseSourceAdapter(project, {
...(options.logger ? { logger: options.logger } : {}),
}),
new LookerSourceAdapter({
clientFactory: {
async createClient(config, ctx) {
@ -92,7 +107,9 @@ export function createDefaultLocalIngestAdapters(
},
}),
new MetricflowSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
new NotionSourceAdapter(),
new NotionSourceAdapter({
...(options.logger ? { logger: options.logger } : {}),
}),
];
if (options.historicSql) {

View file

@ -53,7 +53,13 @@ describe('createLocalBundleIngestRuntime', () => {
project,
adapters: [new FakeSourceAdapter()],
}),
).toThrow('ktx dev ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner');
).toThrow(
[
'ktx dev ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
`Configure an Anthropic provider, then rerun ingest:`,
` ktx setup --project-dir ${project.projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n'),
);
});
it('builds runner deps with local SQLite stores and context tools enabled', async () => {

View file

@ -548,6 +548,14 @@ function nextLocalJobId(): string {
return `local-${Date.now().toString(36)}`;
}
function localIngestLlmProviderGuardMessage(projectDir: string): string {
return [
'ktx dev ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'Configure an Anthropic provider, then rerun ingest:',
` ktx setup --project-dir ${projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n');
}
function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): {
agentRunner: AgentRunnerService;
llmProvider?: KtxLlmProvider;
@ -560,9 +568,7 @@ function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): {
}
if (!llmProvider) {
throw new Error(
'ktx dev ingest run requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner',
);
throw new Error(localIngestLlmProviderGuardMessage(options.project.projectDir));
}
return {

View file

@ -569,8 +569,8 @@ describe('local ingest', () => {
});
it('passes resolved standalone Notion config into fetch adapters', async () => {
const priorToken = process.env.NOTION_AUTH_TOKEN;
process.env.NOTION_AUTH_TOKEN = 'ntn_local_test_token';
const priorToken = process.env.NOTION_TOKEN;
process.env.NOTION_TOKEN = 'ntn_local_test_token';
try {
await writeFile(
join(project.projectDir, 'ktx.yaml'),
@ -579,7 +579,7 @@ describe('local ingest', () => {
'connections:',
' notion-main:',
' driver: notion',
' auth_token_ref: env:NOTION_AUTH_TOKEN',
' auth_token_ref: env:NOTION_TOKEN',
' crawl_mode: selected_roots',
' root_page_ids:',
' - page-1',
@ -667,9 +667,9 @@ describe('local ingest', () => {
});
} finally {
if (priorToken === undefined) {
delete process.env.NOTION_AUTH_TOKEN;
delete process.env.NOTION_TOKEN;
} else {
process.env.NOTION_AUTH_TOKEN = priorToken;
process.env.NOTION_TOKEN = priorToken;
}
}
});

View file

@ -37,6 +37,7 @@ interface BuiltMocks {
agentRunner: any;
slValidator: any;
toolsetFactory: any;
logger: any;
}
const buildMocks = (overrides: Partial<BuiltMocks> = {}): BuiltMocks => {
@ -131,6 +132,7 @@ const buildMocks = (overrides: Partial<BuiltMocks> = {}): BuiltMocks => {
getAllTools: vi.fn().mockReturnValue([]),
}),
},
logger: { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
};
return { ...defaults, ...overrides };
@ -179,6 +181,7 @@ const buildService = (mocks: BuiltMocks): MemoryAgentService =>
telemetry: {
trackMemoryIngestion: mocks.eventTracker.trackEvent,
},
logger: mocks.logger,
});
const baseInput = {
@ -238,6 +241,27 @@ describe('MemoryAgentService.ingest — session-branch orchestration', () => {
expect(result.commitHash).toBe('cafebabe');
});
it('logs prompt debug output when KTX_MEMORY_AGENT_DEBUG_PROMPTS is enabled', async () => {
const previousDebugPrompts = process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS;
const mocks = buildMocks();
const svc = buildService(mocks);
try {
process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS = '1';
await svc.ingest(baseInput);
expect(mocks.logger.debug).toHaveBeenCalledWith(expect.stringContaining('[memory-agent prompt-debug] system='));
expect(mocks.logger.debug).toHaveBeenCalledWith(expect.stringContaining('[memory-agent prompt-debug] user='));
} finally {
if (previousDebugPrompts === undefined) {
delete process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS;
} else {
process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS = previousDebugPrompts;
}
}
});
it('empty path: squash returns no touched paths → no enqueue, cleanup(empty), commitHash=null', async () => {
const mocks = buildMocks();
mocks.gitService.squashMergeIntoMain.mockResolvedValue({

View file

@ -192,7 +192,7 @@ export class MemoryAgentService {
`[memory-agent] chat=${chatId} running (sourceType=${sourceType}, hasSL=${hasSL}, budget=${stepBudget}, model=${modelName})${signalsSuffix}${dialectSuffix}`,
);
if (process.env.MEMORY_AGENT_DEBUG_PROMPTS === '1') {
if (process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS === '1') {
this.logger.debug(`[memory-agent prompt-debug] system=${systemPrompt}`);
this.logger.debug(`[memory-agent prompt-debug] user=${prompt}`);
}

View file

@ -51,6 +51,29 @@ function createLlmProvider(text = 'generated description') {
} as any;
}
function createFailingLlmProvider(message = 'timeout exceeded when trying to connect') {
vi.mocked(generateText).mockRejectedValue(new Error(message) as never);
return {
getModel: vi.fn().mockReturnValue({ modelId: 'claude-sonnet-4-6', provider: 'anthropic' }),
getModelByName: vi.fn(),
cacheMarker: vi.fn(),
repairToolCallHandler: vi.fn(),
thinkingProviderOptions: vi.fn(),
telemetryConfig: vi.fn(),
promptCachingConfig: vi.fn(() => ({
enabled: false,
systemTtl: '1h',
toolsTtl: '1h',
historyTtl: '5m',
cacheSystem: true,
cacheTools: true,
cacheHistory: true,
vertexFallbackTo5m: false,
})),
activeBackend: vi.fn(() => 'anthropic'),
} as any;
}
function createConnector(): KtxScanConnector {
return {
id: 'test-connector',
@ -274,6 +297,51 @@ describe('KtxDescriptionGenerator', () => {
expect('introspect' in sampler).toBe(false);
});
it('does not turn LLM failures into generated descriptions', async () => {
const cache = createCache();
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmProvider: createFailingLlmProvider(),
cache,
settings: {
columnMaxWords: 12,
tableMaxWords: 18,
dataSourceMaxWords: 24,
},
});
const columnResult = await generator.generateColumnDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'status' }],
},
});
await expect(
generator.generateTableDescription({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
table: { catalog: null, db: 'public', name: 'orders' },
}),
).resolves.toBeNull();
expect(columnResult).toEqual({
columnDescriptions: [['status', null]],
processedColumns: [],
skippedColumns: [],
});
expect(cache.set).not.toHaveBeenCalled();
});
it('generates and caches table and data-source descriptions', async () => {
const cache = createCache();
const connector = createConnector();

View file

@ -348,7 +348,7 @@ export class KtxDescriptionGenerator {
};
}
async generateTableDescription(input: KtxGenerateTableDescriptionInput): Promise<string> {
async generateTableDescription(input: KtxGenerateTableDescriptionInput): Promise<string | null> {
const tableRef = toTableRef(input.table);
const cacheKey = this.cache?.buildTableKey(tableRef);
if (cacheKey) {
@ -386,7 +386,7 @@ export class KtxDescriptionGenerator {
this.settings.tableMaxWords,
'ktx-table-description',
);
if (cacheKey) {
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
}
return description;
@ -396,7 +396,7 @@ export class KtxDescriptionGenerator {
}
}
async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise<string> {
async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise<string | null> {
if (input.tables.length === 0) {
return 'No tables found in database';
}
@ -451,7 +451,7 @@ export class KtxDescriptionGenerator {
this.settings.dataSourceMaxWords,
'ktx-data-source-description',
);
if (cacheKey) {
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
}
return description;
@ -543,7 +543,7 @@ export class KtxDescriptionGenerator {
'ktx-column-description',
);
if (cacheKey) {
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
}
@ -551,20 +551,20 @@ export class KtxDescriptionGenerator {
columnName: column.name,
description,
skipped: false,
processed: true,
processed: description !== null,
};
} catch (error) {
this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`);
return {
columnName: column.name,
description: `Error generating description: ${errorMessage(error)}`,
description: null,
skipped: false,
processed: false,
};
}
}
private async generateAiDescription(prompt: string, maxWords: number, _operationName: string): Promise<string> {
private async generateAiDescription(prompt: string, maxWords: number, _operationName: string): Promise<string | null> {
try {
const text = await generateKtxText({
llmProvider: this.llmProvider,
@ -573,10 +573,10 @@ export class KtxDescriptionGenerator {
temperature: this.settings.temperature,
});
const description = text.trim();
return description || 'Failed to generate description';
return description || null;
} catch (error) {
this.logger?.error(`Error generating AI description: ${errorMessage(error)}`);
return `Error generating description: ${errorMessage(error)}`;
return null;
}
}
}

View file

@ -553,6 +553,47 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
});
});
it('does not persist generated error descriptions in manifest shards', async () => {
await writeLocalScanManifestShards({
project,
connectionId: 'warehouse',
syncId: 'sync-error-description',
driver: 'postgres',
snapshot,
descriptionUpdates: [
{
table: { catalog: null, db: 'public', name: 'orders' },
tableDescription: 'Error generating description: timeout exceeded when trying to connect',
columnDescriptions: {
id: 'Error generating description: timeout exceeded when trying to connect',
customer_id: 'AI customer reference',
},
},
],
dryRun: false,
});
const shard = YAML.parse(
await readFile(join(tempDir, 'project/semantic-layer/warehouse/_schema/public.yaml'), 'utf8'),
) as {
tables: {
orders: {
descriptions?: Record<string, string>;
columns: Array<{ name: string; descriptions?: Record<string, string> }>;
};
};
};
expect(shard.tables.orders.descriptions).toEqual({ db: 'DB orders table' });
expect(shard.tables.orders.columns.find((column) => column.name === 'id')?.descriptions).toEqual({
db: 'DB order id',
});
expect(shard.tables.orders.columns.find((column) => column.name === 'customer_id')?.descriptions).toEqual({
db: 'DB customer id',
ai: 'AI customer reference',
});
});
it('writes accepted composite relationships to relationship artifacts and manifest shards', async () => {
const compositeSnapshot: KtxSchemaSnapshot = {
connectionId: 'warehouse',

View file

@ -62,6 +62,14 @@ interface ExistingManifestState {
type LocalDescriptionUpdates = KtxLocalScanEnrichmentResult['descriptionUpdates'];
function isGeneratedErrorDescription(description: string | null | undefined): boolean {
const normalized = description?.trim().toLowerCase();
return (
normalized === 'failed to generate description' ||
normalized?.startsWith('error generating description:') === true
);
}
function artifactDir(connectionId: string, syncId: string): string {
return `raw-sources/${connectionId}/${LIVE_DATABASE_ADAPTER}/${syncId}/enrichment`;
}
@ -79,7 +87,7 @@ function tableDescription(
if (table.comment) {
descriptions.db = table.comment;
}
if (update?.tableDescription) {
if (update?.tableDescription && !isGeneratedErrorDescription(update.tableDescription)) {
descriptions.ai = update.tableDescription;
}
return Object.keys(descriptions).length > 0 ? descriptions : undefined;
@ -96,7 +104,7 @@ function columnDescription(
if (column.comment) {
descriptions.db = column.comment;
}
if (aiDescription) {
if (aiDescription && !isGeneratedErrorDescription(aiDescription)) {
descriptions.ai = aiDescription;
}
return Object.keys(descriptions).length > 0 ? descriptions : undefined;

View file

@ -427,6 +427,69 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
catalog: null,
db: 'public',
name: `table_${index + 1}`,
kind: 'table' as const,
comment: null,
estimatedRows: 2,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number' as const,
nullable: false,
primaryKey: true,
comment: null,
},
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig('test').scan.relationships,
enabled: false,
};
await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'enriched',
connector: scanConnector,
context: { runId: 'scan-run-concurrent-descriptions' },
providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 }),
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
});
it('reports enrichment progress for countable stages', async () => {
const events: Array<{ progress: number; message?: string; transient?: boolean }> = [];
const progress = {
@ -713,7 +776,7 @@ describe('local scan enrichment', () => {
model: 'provider/embedding-model',
dimensions: 1536,
batchSize: 8,
openai: { api_key: 'env:OPENAI_API_KEY' },
openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret
},
},
{
@ -726,7 +789,7 @@ describe('local scan enrichment', () => {
{
createKtxLlmProvider: createKtxLlmProvider as any,
createKtxEmbeddingProvider: createKtxEmbeddingProvider as any,
env: { OPENAI_API_KEY: 'openai-key' },
env: { OPENAI_API_KEY: 'openai-key' }, // pragma: allowlist secret
},
);

View file

@ -1,4 +1,5 @@
import type { KtxLlmProvider } from '@ktx/llm';
import pLimit from 'p-limit';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
@ -40,6 +41,8 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
export interface DeterministicLocalScanEnrichmentProviderOptions {
embeddingDimensions?: number;
maxBatchSize?: number;
@ -322,41 +325,47 @@ async function generateDescriptions(input: {
await input.progress?.update(1, 'No tables to describe');
return updates;
}
for (const [index, table] of input.snapshot.tables.entries()) {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
updates.push({
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
});
}
const limitTable = pLimit(DESCRIPTION_TABLE_CONCURRENCY);
const tableUpdates = await Promise.all(
input.snapshot.tables.map((table, index) =>
limitTable(async () => {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
};
}),
),
);
updates.push(...tableUpdates);
await input.progress?.update(1, `Generated descriptions for ${totalTables} tables`);
return updates;
}

View file

@ -38,6 +38,31 @@ const baseTable: SemanticLayerSource = {
measures: [],
};
describe('listConnectionIdsWithNames', () => {
it('discovers local KTX connection ids from semantic-layer directories', async () => {
const configService = {
listFiles: vi.fn().mockResolvedValue({
files: [
'semantic-layer/warehouse/_schema/public.yaml',
'semantic-layer/dbt-main/orders.yaml',
'semantic-layer/.gitkeep',
],
}),
};
const catalog = connectionCatalog();
catalog.listEnabledConnections.mockImplementation(async (ids: string[]) =>
ids.map((id) => ({ id, name: id, connectionType: id === 'warehouse' ? 'postgres' : 'dbt' })),
);
const service = new SemanticLayerService(configService as never, catalog, pythonPort);
await expect(service.listConnectionIdsWithNames()).resolves.toEqual([
{ id: 'dbt-main', name: 'dbt-main', connectionType: 'dbt' },
{ id: 'warehouse', name: 'warehouse', connectionType: 'postgres' },
]);
expect(catalog.listEnabledConnections).toHaveBeenCalledWith(['dbt-main', 'warehouse']);
});
});
describe('composeOverlay', () => {
it('carries top-level segments from overlay into the composed source', () => {
const overlay = {

View file

@ -12,6 +12,7 @@ interface WriteSourceOptions {
}
const SL_DIR_PREFIX = 'semantic-layer';
const CONNECTION_ID_PATTERN = /^[a-zA-Z0-9][a-zA-Z0-9_-]*$/;
function formatPortError(error: unknown, fallback: string): string {
if (typeof error === 'string') {
@ -61,11 +62,12 @@ export class SemanticLayerService {
async listConnectionIds(): Promise<string[]> {
try {
const result = await this.configService.listFiles(SL_DIR_PREFIX);
// Directories under semantic-layer/ are connectionIds (UUIDs)
const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
// Directories under semantic-layer/ are connectionIds. Local KTX projects use
// readable ids like "warehouse" and "dbt-main", not only UUIDs.
return result.files
.map((f) => f.replace(`${SL_DIR_PREFIX}/`, '').split('/')[0])
.filter((name, i, arr) => uuidPattern.test(name) && arr.indexOf(name) === i);
.filter((name, i, arr) => CONNECTION_ID_PATTERN.test(name) && arr.indexOf(name) === i)
.sort();
} catch {
return [];
}

View file

@ -0,0 +1,80 @@
import { describe, expect, it, vi } from 'vitest';
import type { ToolContext, ToolSession } from '../../tools/index.js';
import { createTouchedSlSources } from '../../tools/index.js';
import type { SemanticLayerSource } from '../types.js';
import { SlDiscoverTool } from './sl-discover.tool.js';
function makeTool() {
const semanticLayerService = {
listConnectionIdsWithNames: vi.fn(async () => [] as Array<{ id: string; name: string; connectionType: string }>),
loadAllSources: vi.fn(async () => [] as SemanticLayerSource[]),
};
const slSearchService = {
search: vi.fn(async () => []),
};
const tool = new SlDiscoverTool(
{
semanticLayerService: semanticLayerService as never,
slSearchService: slSearchService as never,
authorResolver: { resolve: vi.fn() },
},
{ maxSources: 25, minRrfScore: 0, maxDetailedSources: 5 },
);
return { tool, semanticLayerService, slSearchService };
}
function makeContext(overrides: Partial<ToolContext> = {}): ToolContext {
return {
sourceId: 'src',
messageId: 'msg',
userId: 'user',
...overrides,
};
}
function makeSession(semanticLayerService: Record<string, unknown>): ToolSession {
return {
connectionId: 'dbt-main',
isWorktreeScoped: true,
preHead: 'base',
touchedSlSources: createTouchedSlSources(),
actions: [],
semanticLayerService: semanticLayerService as never,
wikiService: {} as never,
configService: {} as never,
gitService: {} as never,
};
}
describe('SlDiscoverTool - session-scoped reads', () => {
it('discovers sources through context.session.semanticLayerService when a session is present', async () => {
const { tool, semanticLayerService } = makeTool();
const sessionSemanticLayerService = {
listConnectionIdsWithNames: vi.fn().mockResolvedValue([
{ id: 'warehouse', name: 'warehouse', connectionType: 'postgres' },
]),
loadAllSources: vi.fn().mockResolvedValue([
{
name: 'orders',
table: 'public.orders',
grain: ['order_id'],
columns: [{ name: 'order_id', type: 'string' }],
measures: [],
joins: [],
},
]),
};
const result = await tool.call({}, makeContext({ session: makeSession(sessionSemanticLayerService) }));
expect(result.structured.totalSources).toBe(1);
expect(result.structured.sources[0]).toMatchObject({
connectionId: 'warehouse',
name: 'orders',
columnCount: 1,
});
expect(sessionSemanticLayerService.listConnectionIdsWithNames).toHaveBeenCalled();
expect(sessionSemanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse');
expect(semanticLayerService.listConnectionIdsWithNames).not.toHaveBeenCalled();
});
});

View file

@ -1,5 +1,6 @@
import { z } from 'zod';
import { DEFAULT_PRIORITY, resolveDescription } from '../descriptions.js';
import type { SemanticLayerService } from '../semantic-layer.service.js';
import type { SemanticLayerSource } from '../types.js';
import type { ToolContext, ToolOutput } from '../../tools/index.js';
import { BaseSemanticLayerTool, type BaseSemanticLayerToolDeps } from './base-semantic-layer.tool.js';
@ -66,13 +67,14 @@ Use this to understand what data is available before writing a semantic_query.
return slDiscoverInputSchema;
}
async call(input: SlDiscoverInput, _context: ToolContext): Promise<ToolOutput<SlDiscoverStructured>> {
async call(input: SlDiscoverInput, context: ToolContext): Promise<ToolOutput<SlDiscoverStructured>> {
const { query, sourceName } = input;
const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService;
// Resolve connectionId: use provided value, or auto-detect
let connectionId = input.connectionId;
if (!connectionId) {
const connections = await this.semanticLayerService.listConnectionIdsWithNames();
const connections = await semanticLayerService.listConnectionIdsWithNames();
if (connections.length === 0) {
return {
markdown: 'No semantic layer sources found. Run a schema scan first.',
@ -92,14 +94,14 @@ Use this to understand what data is available before writing a semantic_query.
structured: { sources: [], totalSources: 0 },
};
}
return this.discoverAcrossConnections(connections, query);
return this.discoverAcrossConnections(semanticLayerService, connections, query);
}
}
// If inspecting a specific source — show the SL interface (columns, measures, joins)
// without the raw SQL. Use `sl_read_source` to see the full YAML including SQL.
if (sourceName) {
const sources = await this.semanticLayerService.loadAllSources(connectionId);
const sources = await semanticLayerService.loadAllSources(connectionId);
const source = sources.find((s) => s.name === sourceName);
if (!source) {
return {
@ -136,19 +138,20 @@ Use this to understand what data is available before writing a semantic_query.
}
// Single connection: list all sources
const connections = await this.semanticLayerService.listConnectionIdsWithNames();
const connections = await semanticLayerService.listConnectionIdsWithNames();
const connInfo = connections.find((c) => c.id === connectionId);
return this.discoverForConnection(connectionId, connInfo?.name ?? connectionId, query);
return this.discoverForConnection(semanticLayerService, connectionId, connInfo?.name ?? connectionId, query);
}
private async discoverAcrossConnections(
semanticLayerService: SemanticLayerService,
connections: Array<{ id: string; name: string; connectionType: string }>,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
// Load sources from all connections in parallel
const results = await Promise.all(
connections.map(async (conn) => {
const sources = await this.semanticLayerService.loadAllSources(conn.id);
const sources = await semanticLayerService.loadAllSources(conn.id);
let filtered = sources;
if (query) {
filtered = await this.filterByQuery(conn.id, sources, query);
@ -205,11 +208,12 @@ Use this to understand what data is available before writing a semantic_query.
}
private async discoverForConnection(
semanticLayerService: SemanticLayerService,
connectionId: string,
connectionName: string,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
const sources = await this.semanticLayerService.loadAllSources(connectionId);
const sources = await semanticLayerService.loadAllSources(connectionId);
if (sources.length === 0) {
return {