fix: allow dbt ingest to discover warehouse schemas (#20)

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-05-12 10:25:56 +02:00 committed by GitHub
parent 106ce161ee
commit 9d3b1015cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 122 additions and 11 deletions

View file

@ -38,6 +38,31 @@ const baseTable: SemanticLayerSource = {
measures: [],
};
describe('listConnectionIdsWithNames', () => {
it('discovers local KTX connection ids from semantic-layer directories', async () => {
const configService = {
listFiles: vi.fn().mockResolvedValue({
files: [
'semantic-layer/warehouse/_schema/public.yaml',
'semantic-layer/dbt-main/orders.yaml',
'semantic-layer/.gitkeep',
],
}),
};
const catalog = connectionCatalog();
catalog.listEnabledConnections.mockImplementation(async (ids: string[]) =>
ids.map((id) => ({ id, name: id, connectionType: id === 'warehouse' ? 'postgres' : 'dbt' })),
);
const service = new SemanticLayerService(configService as never, catalog, pythonPort);
await expect(service.listConnectionIdsWithNames()).resolves.toEqual([
{ id: 'dbt-main', name: 'dbt-main', connectionType: 'dbt' },
{ id: 'warehouse', name: 'warehouse', connectionType: 'postgres' },
]);
expect(catalog.listEnabledConnections).toHaveBeenCalledWith(['dbt-main', 'warehouse']);
});
});
describe('composeOverlay', () => {
it('carries top-level segments from overlay into the composed source', () => {
const overlay = {

View file

@ -12,6 +12,7 @@ interface WriteSourceOptions {
}
const SL_DIR_PREFIX = 'semantic-layer';
const CONNECTION_ID_PATTERN = /^[a-zA-Z0-9][a-zA-Z0-9_-]*$/;
function formatPortError(error: unknown, fallback: string): string {
if (typeof error === 'string') {
@ -61,11 +62,12 @@ export class SemanticLayerService {
async listConnectionIds(): Promise<string[]> {
try {
const result = await this.configService.listFiles(SL_DIR_PREFIX);
// Directories under semantic-layer/ are connectionIds (UUIDs)
const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
// Directories under semantic-layer/ are connectionIds. Local KTX projects use
// readable ids like "warehouse" and "dbt-main", not only UUIDs.
return result.files
.map((f) => f.replace(`${SL_DIR_PREFIX}/`, '').split('/')[0])
.filter((name, i, arr) => uuidPattern.test(name) && arr.indexOf(name) === i);
.filter((name, i, arr) => CONNECTION_ID_PATTERN.test(name) && arr.indexOf(name) === i)
.sort();
} catch {
return [];
}

View file

@ -0,0 +1,80 @@
import { describe, expect, it, vi } from 'vitest';
import type { ToolContext, ToolSession } from '../../tools/index.js';
import { createTouchedSlSources } from '../../tools/index.js';
import type { SemanticLayerSource } from '../types.js';
import { SlDiscoverTool } from './sl-discover.tool.js';
function makeTool() {
const semanticLayerService = {
listConnectionIdsWithNames: vi.fn(async () => [] as Array<{ id: string; name: string; connectionType: string }>),
loadAllSources: vi.fn(async () => [] as SemanticLayerSource[]),
};
const slSearchService = {
search: vi.fn(async () => []),
};
const tool = new SlDiscoverTool(
{
semanticLayerService: semanticLayerService as never,
slSearchService: slSearchService as never,
authorResolver: { resolve: vi.fn() },
},
{ maxSources: 25, minRrfScore: 0, maxDetailedSources: 5 },
);
return { tool, semanticLayerService, slSearchService };
}
function makeContext(overrides: Partial<ToolContext> = {}): ToolContext {
return {
sourceId: 'src',
messageId: 'msg',
userId: 'user',
...overrides,
};
}
function makeSession(semanticLayerService: Record<string, unknown>): ToolSession {
return {
connectionId: 'dbt-main',
isWorktreeScoped: true,
preHead: 'base',
touchedSlSources: createTouchedSlSources(),
actions: [],
semanticLayerService: semanticLayerService as never,
wikiService: {} as never,
configService: {} as never,
gitService: {} as never,
};
}
describe('SlDiscoverTool - session-scoped reads', () => {
it('discovers sources through context.session.semanticLayerService when a session is present', async () => {
const { tool, semanticLayerService } = makeTool();
const sessionSemanticLayerService = {
listConnectionIdsWithNames: vi.fn().mockResolvedValue([
{ id: 'warehouse', name: 'warehouse', connectionType: 'postgres' },
]),
loadAllSources: vi.fn().mockResolvedValue([
{
name: 'orders',
table: 'public.orders',
grain: ['order_id'],
columns: [{ name: 'order_id', type: 'string' }],
measures: [],
joins: [],
},
]),
};
const result = await tool.call({}, makeContext({ session: makeSession(sessionSemanticLayerService) }));
expect(result.structured.totalSources).toBe(1);
expect(result.structured.sources[0]).toMatchObject({
connectionId: 'warehouse',
name: 'orders',
columnCount: 1,
});
expect(sessionSemanticLayerService.listConnectionIdsWithNames).toHaveBeenCalled();
expect(sessionSemanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse');
expect(semanticLayerService.listConnectionIdsWithNames).not.toHaveBeenCalled();
});
});

View file

@ -1,5 +1,6 @@
import { z } from 'zod';
import { DEFAULT_PRIORITY, resolveDescription } from '../descriptions.js';
import type { SemanticLayerService } from '../semantic-layer.service.js';
import type { SemanticLayerSource } from '../types.js';
import type { ToolContext, ToolOutput } from '../../tools/index.js';
import { BaseSemanticLayerTool, type BaseSemanticLayerToolDeps } from './base-semantic-layer.tool.js';
@ -66,13 +67,14 @@ Use this to understand what data is available before writing a semantic_query.
return slDiscoverInputSchema;
}
async call(input: SlDiscoverInput, _context: ToolContext): Promise<ToolOutput<SlDiscoverStructured>> {
async call(input: SlDiscoverInput, context: ToolContext): Promise<ToolOutput<SlDiscoverStructured>> {
const { query, sourceName } = input;
const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService;
// Resolve connectionId: use provided value, or auto-detect
let connectionId = input.connectionId;
if (!connectionId) {
const connections = await this.semanticLayerService.listConnectionIdsWithNames();
const connections = await semanticLayerService.listConnectionIdsWithNames();
if (connections.length === 0) {
return {
markdown: 'No semantic layer sources found. Run a schema scan first.',
@ -92,14 +94,14 @@ Use this to understand what data is available before writing a semantic_query.
structured: { sources: [], totalSources: 0 },
};
}
return this.discoverAcrossConnections(connections, query);
return this.discoverAcrossConnections(semanticLayerService, connections, query);
}
}
// If inspecting a specific source — show the SL interface (columns, measures, joins)
// without the raw SQL. Use `sl_read_source` to see the full YAML including SQL.
if (sourceName) {
const sources = await this.semanticLayerService.loadAllSources(connectionId);
const sources = await semanticLayerService.loadAllSources(connectionId);
const source = sources.find((s) => s.name === sourceName);
if (!source) {
return {
@ -136,19 +138,20 @@ Use this to understand what data is available before writing a semantic_query.
}
// Single connection: list all sources
const connections = await this.semanticLayerService.listConnectionIdsWithNames();
const connections = await semanticLayerService.listConnectionIdsWithNames();
const connInfo = connections.find((c) => c.id === connectionId);
return this.discoverForConnection(connectionId, connInfo?.name ?? connectionId, query);
return this.discoverForConnection(semanticLayerService, connectionId, connInfo?.name ?? connectionId, query);
}
private async discoverAcrossConnections(
semanticLayerService: SemanticLayerService,
connections: Array<{ id: string; name: string; connectionType: string }>,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
// Load sources from all connections in parallel
const results = await Promise.all(
connections.map(async (conn) => {
const sources = await this.semanticLayerService.loadAllSources(conn.id);
const sources = await semanticLayerService.loadAllSources(conn.id);
let filtered = sources;
if (query) {
filtered = await this.filterByQuery(conn.id, sources, query);
@ -205,11 +208,12 @@ Use this to understand what data is available before writing a semantic_query.
}
private async discoverForConnection(
semanticLayerService: SemanticLayerService,
connectionId: string,
connectionName: string,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
const sources = await this.semanticLayerService.loadAllSources(connectionId);
const sources = await semanticLayerService.loadAllSources(connectionId);
if (sources.length === 0) {
return {