ktx/packages/context/src/sl/tools/sl-discover.tool.ts
Andrey Avtomonov cb8902f1e5
fix(context): merge overlay columns onto manifest columns by name (#94)
* fix(context): merge overlay columns onto manifest columns by name

composeOverlay was appending overlay columns to the manifest column list,
producing duplicate entries when dbt/metabase overlays declared a column
just to attach descriptions. The duplicates carried no `type`, so the
pydantic SourceDefinition rejected them at semantic-query time and broke
`ktx sl query` for every overlay-backed measure. Now overlay columns
match base columns by name (case-insensitive): same-name entries merge
onto the manifest (overlay fields win, type/role fall back to the base,
descriptions merge per source key) and only new names append.

* refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract

Overlay sources now have two distinct collections: `columns:` for computed
columns (requiring `expr` + `type`) and `column_overrides:` for metadata
patches to inherited manifest columns. Composing or loading an overlay that
mixes the two — or references an unknown column — fails with a typed error.

Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` /
`toResolvedWire` as the strict shape sent to the Python engine, and add a
schema contract test that diffs Zod against the Pydantic JSON schema dumped
by `python -m semantic_layer dump-schema`. `SourceDefinition` is now
`extra="forbid"` on the Python side.

`loadAllSources` surfaces per-file load errors instead of swallowing them,
so validation/query paths can report manifest shard parse failures.

* fix(context): make scan description generation resilient and quiet

A transient sampleTable failure during ingest used to take out every
table in a connection: generateTableDescription returned a hardcoded
'Table not found' string into descriptions.ai, and KtxDescriptionGenerator
was constructed without a logger, so the failure left no trail anywhere.

- sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff,
  honouring KtxScanContext.signal via a new KtxAbortedError.
- On retry exhaustion or missing capability, table generation falls back
  to a metadata-only prompt built from column name / native type / comment
  / rawDescriptions. The column path follows the same rule -- call the
  LLM when any of samples or rawDescriptions are available; skip only
  when both are absent.
- Logger is now threaded from KtxScanContext into the generator. Failures
  emit structured KtxScanWarning entries (new description_fallback_used
  code, plus existing sampling_failed / enrichment_failed /
  connector_capability_missing). ktx scan groups warnings by code so a
  batch of identical failures collapses to one summary line plus sample.
- Returns null on failure instead of the 'Table not found' sentinel; the
  manifest writer's existing guard already skips empty descriptions, so
  schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS
  already strips stale 'ai' on merge, so existing YAML clears on next run.

Also suppress AI SDK v6 'system in messages' warning: pull system messages
out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages
helper and pass them top-level to generateText (preserves cacheControl
providerOptions on the SystemModelMessage). Agent-runner's local
splitSystemPromptMessages dedupes onto the shared helper.

* test(docs): align examples-docs assertions with revamped docs

PR #103 (setup/guide doc revamp) reworded several CLI examples and
connection labels; the assertions in scripts/examples-docs.test.mjs
still referenced the pre-revamp wording and were failing in CI on main.
Update the regexes to match the post-revamp content:

- drop the `--json` flag from the sl-query example expectation
- move the `Driver:` / `Status: ok` probe to the connection reference,
  which is where that output now lives (driver id is lowercase
  `postgres`, not the display name `PostgreSQL`)
- drop the obsolete `Install \`uv\`...` troubleshooting line
- accept `<connectionId>` everywhere; the docs no longer use the
  hyphenated `<connection-id>` form
- match the `warehouse` connection id used in the quickstart instead of
  the `postgres-warehouse` id only used in the README and setup ref

* fix(sl): skip TS/Python schema contract test when uv is unavailable

The TypeScript checks CI job does not install uv or Python, so the
module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw
ENOENT and failed the suite. Wrap the schema dump in a try/catch and
guard the describe block with `describe.skipIf` so the test skips in
environments without uv. Local dev and any CI job that has uv on PATH
still runs the cross-language contract assertion.
2026-05-15 02:11:04 +02:00

341 lines
12 KiB
TypeScript

import { z } from 'zod';
import { DEFAULT_PRIORITY, resolveDescription } from '../descriptions.js';
import type { SemanticLayerService } from '../semantic-layer.service.js';
import type { SemanticLayerSource } from '../types.js';
import type { ToolContext, ToolOutput } from '../../tools/index.js';
import { BaseSemanticLayerTool, type BaseSemanticLayerToolDeps } from './base-semantic-layer.tool.js';
import { slToolConnectionIdSchema } from './connection-id-schema.js';
export interface SlDiscoverySettings {
maxSources: number;
minRrfScore: number;
maxDetailedSources: number;
}
const slDiscoverInputSchema = z.object({
connectionId: slToolConnectionIdSchema
.optional()
.describe('Data source connection ID (omit to discover across all data sources)'),
query: z.string().optional().describe('Search query to filter sources/columns/measures by name or description'),
sourceName: z
.string()
.optional()
.describe('Inspect a specific source in full detail (requires connectionId if multiple data sources)'),
});
type SlDiscoverInput = z.infer<typeof slDiscoverInputSchema>;
interface SlDiscoverStructured {
sources: Array<{
connectionId: string;
connectionName: string;
name: string;
description?: string;
columnCount: number;
measureCount: number;
joinCount: number;
}>;
detail?: Record<string, unknown>;
totalSources: number;
}
export class SlDiscoverTool extends BaseSemanticLayerTool<typeof slDiscoverInputSchema> {
readonly name = 'sl_discover';
constructor(
deps: BaseSemanticLayerToolDeps,
private readonly discoverySettings: SlDiscoverySettings,
) {
super(deps);
}
get description(): string {
return `<purpose>
Discover available semantic layer sources, columns, measures, and joins.
When called without a connectionId, discovers sources across ALL data sources — grouped by data source name and ID.
Use this to understand what data is available before querying through the semantic layer.
</purpose>
<when_to_use>
- Before querying: understand available sources across all data sources
- To inspect a specific source in detail (columns, joins, measures, grain) — requires connectionId when multiple data sources exist
- To search for sources related to a concept (e.g., "revenue", "customers") across all data sources
</when_to_use>`;
}
get inputSchema() {
return slDiscoverInputSchema;
}
async call(input: SlDiscoverInput, context: ToolContext): Promise<ToolOutput<SlDiscoverStructured>> {
const { query, sourceName } = input;
const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService;
// Resolve connectionId: use provided value, or auto-detect
let connectionId = input.connectionId;
if (!connectionId) {
const connections = await semanticLayerService.listConnectionIdsWithNames();
if (connections.length === 0) {
return {
markdown: 'No semantic layer sources found. Run a schema scan first.',
structured: { sources: [], totalSources: 0 },
};
}
if (connections.length === 1) {
connectionId = connections[0].id;
} else {
// Multiple connections — aggregate or prompt depending on operation
if (sourceName) {
const connectionList = connections
.map((c) => `- **${c.name}** (${c.connectionType}): \`${c.id}\``)
.join('\n');
return {
markdown: `Multiple data sources have semantic layer sources. Specify a connectionId to inspect source "${sourceName}":\n\n${connectionList}`,
structured: { sources: [], totalSources: 0 },
};
}
return this.discoverAcrossConnections(semanticLayerService, connections, query);
}
}
// If inspecting a specific source — show the SL interface (columns, measures, joins)
// without the raw SQL. Use `sl_read_source` to see the full YAML including SQL.
if (sourceName) {
const { sources } = await semanticLayerService.loadAllSources(connectionId);
const source = sources.find((s) => s.name === sourceName);
if (!source) {
return {
markdown: `Source **${sourceName}** not found for this connection.`,
structured: { sources: [], totalSources: 0 },
};
}
const parts: string[] = [];
this.appendSourceDetail(parts, source);
if (source.grain?.length) {
parts.push(`Grain: ${source.grain.join(', ')}`);
}
return {
markdown: parts.join('\n'),
structured: {
sources: [
{
connectionId,
connectionName: connectionId,
name: source.name,
description:
resolveDescription(source.descriptions, { priority: DEFAULT_PRIORITY }) ?? undefined,
columnCount: source.columns.length,
measureCount: source.measures.length,
joinCount: source.joins.length,
},
],
totalSources: 1,
},
};
}
// Single connection: list all sources
const connections = await semanticLayerService.listConnectionIdsWithNames();
const connInfo = connections.find((c) => c.id === connectionId);
return this.discoverForConnection(semanticLayerService, connectionId, connInfo?.name ?? connectionId, query);
}
private async discoverAcrossConnections(
semanticLayerService: SemanticLayerService,
connections: Array<{ id: string; name: string; connectionType: string }>,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
// Load sources from all connections in parallel
const results = await Promise.all(
connections.map(async (conn) => {
const { sources } = await semanticLayerService.loadAllSources(conn.id);
let filtered = sources;
if (query) {
filtered = await this.filterByQuery(conn.id, sources, query);
}
return { conn, sources: filtered };
}),
);
const allSummaries: SlDiscoverStructured['sources'] = [];
const parts: string[] = [];
let totalSources = 0;
for (const { conn, sources } of results) {
if (sources.length === 0) {
continue;
}
totalSources += sources.length;
parts.push(`## ${conn.name} (${conn.connectionType}) — \`${conn.id}\``);
parts.push('');
const config = { priority: DEFAULT_PRIORITY };
for (const s of sources) {
allSummaries.push({
connectionId: conn.id,
connectionName: conn.name,
name: s.name,
description: resolveDescription(s.descriptions, config) ?? undefined,
columnCount: (s.columns ?? []).length,
measureCount: (s.measures ?? []).length,
joinCount: (s.joins ?? []).length,
});
}
this.appendTieredSources(parts, sources, !!query);
}
if (totalSources === 0) {
return {
markdown: query
? `No semantic layer sources found matching "${query}".`
: 'No semantic layer sources found. Run a schema scan first, or create sources with sl_write_source.',
structured: { sources: [], totalSources: 0 },
};
}
const header = `**${totalSources} source(s) found across ${results.filter((r) => r.sources.length > 0).length} data source(s)**${query ? ` matching "${query}"` : ''}:\n`;
parts.unshift(header);
return {
markdown: parts.join('\n'),
structured: { sources: allSummaries, totalSources },
};
}
private async discoverForConnection(
semanticLayerService: SemanticLayerService,
connectionId: string,
connectionName: string,
query?: string,
): Promise<ToolOutput<SlDiscoverStructured>> {
const { sources } = await semanticLayerService.loadAllSources(connectionId);
if (sources.length === 0) {
return {
markdown: 'No semantic layer sources found. Run a schema scan first, or create sources with sl_write_source.',
structured: { sources: [], totalSources: 0 },
};
}
const filtered = query ? await this.filterByQuery(connectionId, sources, query) : sources;
const config = { priority: DEFAULT_PRIORITY };
const summaries = filtered.map((s) => ({
connectionId,
connectionName,
name: s.name,
description: resolveDescription(s.descriptions, config) ?? undefined,
columnCount: (s.columns ?? []).length,
measureCount: (s.measures ?? []).length,
joinCount: (s.joins ?? []).length,
}));
const parts: string[] = [`**${filtered.length} source(s) found**${query ? ` matching "${query}"` : ''}:\n`];
this.appendTieredSources(parts, filtered, !!query);
return {
markdown: parts.join('\n'),
structured: { sources: summaries, totalSources: filtered.length },
};
}
private async filterByQuery(
connectionId: string,
sources: SemanticLayerSource[],
query: string,
): Promise<SemanticLayerSource[]> {
const config = this.discoverySettings;
const searchResults = await this.slSearchService.search(connectionId, query, config.maxSources, config.minRrfScore);
if (searchResults.length > 0) {
const rankedNames = new Set(searchResults.map((r) => r.sourceName));
const nameOrder = new Map(searchResults.map((r, i) => [r.sourceName, i]));
return sources
.filter((s) => rankedNames.has(s.name))
.sort((a, b) => (nameOrder.get(a.name) ?? 0) - (nameOrder.get(b.name) ?? 0));
}
return this.fallbackTermMatch(sources, query);
}
private fallbackTermMatch(sources: SemanticLayerSource[], query: string): SemanticLayerSource[] {
const config = { priority: DEFAULT_PRIORITY };
const terms = query.toLowerCase().split(/\s+/).filter(Boolean);
const scored = sources
.map((s) => {
const searchText = [
s.name,
resolveDescription(s.descriptions, config) ?? '',
...s.columns.map((c) => `${c.name} ${resolveDescription(c.descriptions, config) ?? ''}`),
...s.measures.map((m) => `${m.name} ${m.description ?? ''}`),
]
.join(' ')
.toLowerCase();
const matchCount = terms.filter((term) => searchText.includes(term)).length;
return { source: s, matchCount };
})
.filter((x) => x.matchCount > 0)
.sort((a, b) => b.matchCount - a.matchCount);
return scored.map((x) => x.source);
}
/**
* Render sources in two tiers:
* - Top N (ranked by relevance when query is present) get full detail
* - Remaining sources get a one-liner with name, description, and measure count
*/
private appendTieredSources(parts: string[], sources: SemanticLayerSource[], hasQuery: boolean): void {
const maxDetailed = this.discoverySettings.maxDetailedSources;
const detailLimit = hasQuery ? maxDetailed : 0;
const detailed = sources.slice(0, detailLimit);
const rest = sources.slice(detailLimit);
for (const s of detailed) {
this.appendSourceDetail(parts, s);
}
if (rest.length > 0) {
if (detailed.length > 0) {
parts.push('**Other sources** (pass `sourceName` to inspect):');
}
const defaultConfig = { priority: DEFAULT_PRIORITY };
for (const s of rest) {
const resolvedDesc = resolveDescription(s.descriptions, defaultConfig);
const desc = resolvedDesc ? `${resolvedDesc}` : '';
const stats = [s.measures.length > 0 ? `${s.measures.length} measures` : null, `${s.columns.length} cols`]
.filter(Boolean)
.join(', ');
parts.push(`- **${s.name}**${desc} (${stats})`);
}
parts.push('');
}
}
/** Full detail for a single source: metadata, measures, joins, all public columns. */
private appendSourceDetail(parts: string[], s: SemanticLayerSource): void {
const detailDesc = resolveDescription(s.descriptions, { priority: DEFAULT_PRIORITY });
parts.push(`### ${s.name}${detailDesc ? `${detailDesc}` : ''}`);
parts.push(
`Type: ${s.sql ? 'sql' : 'table'} | Columns: ${s.columns.length} | Measures: ${s.measures.length} | Joins: ${s.joins.length}`,
);
if (s.measures.length > 0) {
parts.push(`Measures: ${s.measures.map((m) => `\`${m.name}\` (${m.expr})`).join(', ')}`);
}
if (s.joins.length > 0) {
parts.push(`Joins: ${s.joins.map((j) => `${j.to} (${j.relationship})`).join(', ')}`);
}
const publicCols = s.columns.filter((c) => c.visibility !== 'hidden');
if (publicCols.length > 0) {
parts.push(`Columns: ${publicCols.map((c) => `\`${s.name}.${c.name}\` (${c.type})`).join(', ')}`);
}
parts.push('');
}
}