diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index ce68cfca..0b152d09 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -133,6 +133,50 @@ function warningLine(warning: KtxScanWarning): string { return `${warning.code}: ${location}${warning.message}`; } +function groupWarningsByCode(warnings: readonly KtxScanWarning[]): Map { + const groups = new Map(); + for (const warning of warnings) { + const list = groups.get(warning.code); + if (list) { + list.push(warning); + } else { + groups.set(warning.code, [warning]); + } + } + return groups; +} + +function describeWarningGroup(code: string, count: number): string { + switch (code) { + case 'sampling_failed': + return `${count} ${plural(count, 'table')} could not be sampled (retries exhausted); descriptions used metadata-only fallback or were skipped.`; + case 'description_fallback_used': + return `${count} ${plural(count, 'table')} got an AI description from column metadata only (no sample rows available).`; + case 'enrichment_failed': + return `${count} ${plural(count, 'table/column')} could not be enriched.`; + case 'connector_capability_missing': + return `${count} ${plural(count, 'table')} affected by missing connector capability.`; + case 'statistics_failed': + return `${count} statistics ${plural(count, 'lookup')} failed.`; + case 'llm_unavailable': + return 'LLM provider unavailable; AI enrichment was skipped.'; + case 'embedding_unavailable': + return 'Embedding provider unavailable; embeddings were skipped.'; + case 'relationship_validation_failed': + return `${count} relationship ${plural(count, 'validation')} could not run.`; + case 'relationship_llm_invalid_reference': + return `${count} LLM-proposed ${plural(count, 'relationship')} referenced unknown columns.`; + case 'relationship_llm_proposal_failed': + return `${count} LLM relationship ${plural(count, 'proposal')} failed.`; + case 'scan_enrichment_backend_not_configured': + return 'Scan enrichment backend is not configured; AI stages were skipped.'; + case 'credential_redacted': + return `${count} ${plural(count, 'credential')} were redacted from scan output.`; + default: + return `${count} ${plural(count, 'warning')} (${code})`; + } +} + function managedDaemonOptionsForScanRun(args: Extract, io: KtxCliIo) { if (args.databaseIntrospectionUrl || !args.cliVersion || !args.runtimeInstallPolicy) { return undefined; @@ -153,11 +197,26 @@ function writeNeedsAttention(report: KtxScanReport, io: KtxCliIo): void { } if (report.warnings.length > 0) { io.stdout.write(` ${report.warnings.length} ${plural(report.warnings.length, 'warning')}\n`); - for (const warning of report.warnings.slice(0, 5)) { - io.stdout.write(` - ${warningLine(warning)}\n`); - } - if (report.warnings.length > 5) { - io.stdout.write(` - ${report.warnings.length - 5} more warnings in the JSON report\n`); + const groups = groupWarningsByCode(report.warnings); + for (const [code, warnings] of groups) { + io.stdout.write(` - ${describeWarningGroup(code, warnings.length)}\n`); + const first = warnings[0]; + if (first) { + io.stdout.write(` ${warningLine(first)}\n`); + } + if (warnings.length > 1) { + const moreTables = warnings + .slice(1) + .map((warning) => + warning.table ? (warning.column ? `${warning.table}.${warning.column}` : warning.table) : null, + ) + .filter((value): value is string => value !== null) + .slice(0, 3); + if (moreTables.length > 0) { + const suffix = warnings.length - 1 > moreTables.length ? `, …` : ''; + io.stdout.write(` also: ${moreTables.join(', ')}${suffix}\n`); + } + } } } if (report.capabilityGaps.length > 0) { diff --git a/packages/cli/src/sl.ts b/packages/cli/src/sl.ts index 59c2a6f5..e7594feb 100644 --- a/packages/cli/src/sl.ts +++ b/packages/cli/src/sl.ts @@ -213,7 +213,11 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx if (!source) { throw new Error(`Semantic-layer source "${args.connectionId}/${args.sourceName}" was not found`); } - const result = await validateLocalSlSource(source.yaml, { project, connectionId: args.connectionId }); + const result = await validateLocalSlSource(source.yaml, { + project, + connectionId: args.connectionId, + sourceName: args.sourceName, + }); if (!result.valid) { for (const error of result.errors) { io.stderr.write(`${error}\n`); diff --git a/packages/context/package.json b/packages/context/package.json index 32423c20..28ec5190 100644 --- a/packages/context/package.json +++ b/packages/context/package.json @@ -153,6 +153,7 @@ "@types/node": "^25.7.0", "@types/pg": "^8.20.0", "@vitest/coverage-v8": "^4.1.6", + "ajv": "8.20.0", "typescript": "^6.0.3", "vitest": "^4.1.6" }, diff --git a/packages/context/skills/dbt_ingest/SKILL.md b/packages/context/skills/dbt_ingest/SKILL.md index 02a3e05b..a3ce0151 100644 --- a/packages/context/skills/dbt_ingest/SKILL.md +++ b/packages/context/skills/dbt_ingest/SKILL.md @@ -14,14 +14,14 @@ Use this skill for **uploaded** dbt projects (`dbt_project.yml` at stage root, ` |-----|--------|--------| | `models:` entry with `columns:` | **Overlay** on the manifest table with the same name (after `discover_data` / `entity_details`) | One SL source per physical table; model name may differ from DB name - resolve with `read_raw_file` + warehouse context. | | `sources:` → `tables:` | Same as models; use `identifier` when present instead of logical `name`. | Schema + name must match how the connection sees tables. | -| Column `description` | `descriptions.user` or merged `descriptions` map on the column | Do not overwrite `dbt` description keys from sync. | +| Column `description` | `column_overrides[].descriptions.user` on the overlay | Do not overwrite `dbt` description keys from sync. | | `data_tests: not_null` / `unique` | Short hint in column `descriptions` or notes: “dbt: not null”, “dbt: unique” | Full structured metadata lands in manifest via **sync**; the skill keeps bundle-time SL text useful for the agent. | | `accepted_values` | Add a **brief** line in the column description: allowed values (truncate long lists) | Also mention enum-like use in `discover_data` / filters. | | `relationships` | Add or confirm `joins:` on the overlay **only** when `to` resolves to a real table via `read_raw_file` + `discover_data` / `entity_details` | If the ref cannot be resolved, capture the intent in a wiki page instead. | ## Physical schema grounding -dbt YAML is documentation and test metadata; it is not permission to invent physical columns. Before writing any table-backed SL source, confirm the real warehouse shape with `discover_data`, `sl_discover`, or `entity_details` and use only confirmed column names in `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr`. +dbt YAML is documentation and test metadata; it is not permission to invent physical columns. Before writing any table-backed SL source, confirm the real warehouse shape with `discover_data`, `sl_discover`, or `entity_details` and use only confirmed column names in `column_overrides:`, computed-only `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr`. For dbt context-source ingest, the dbt connection is usually not the warehouse connection. Call `sl_discover` without `connectionId` first, then write overlays to the connection that owns the matching manifest-backed source (for example `postgres-warehouse`), not to the dbt connection (for example `dbt-main`). If no matching manifest-backed source is visible on any warehouse connection, do not call `sl_write_source`; record `emit_unmapped_fallback` and keep the fact wiki-only. @@ -61,7 +61,7 @@ SL source, `tables:` frontmatter, `sl_refs`, or `emit_unmapped_fallback`: ## 1.1 test hints (descriptions / meta) -When YAML shows `accepted_values` or `not_null`, add **short** hints into `columns[].descriptions` (e.g. under `user`) or freeform column notes so chat and validation see intent before the next git sync refreshes `constraints` / `enum_values` in `_schema`. Keep hints under a few words when possible. +When YAML shows `accepted_values` or `not_null`, add **short** hints into `column_overrides[].descriptions` (for example under `user`) or freeform column notes so chat and validation see intent before the next git sync refreshes `constraints` / `enum_values` in `_schema`. Keep hints under a few words when possible. ## Overlap with MetricFlow @@ -71,6 +71,6 @@ If the same bundle also has MetricFlow `semantic_models:` / `metrics:`, the **`m - Do not run `dbt` CLI or assume `target/` / `manifest.json` exists in the upload. - Do not invent column names, grain keys, or measure expressions from dbt model names, descriptions, tests, or common naming patterns. -- Do not write `columns:`, `grain:`, or `measures:` for a dbt model unless those exact column names are confirmed by dbt YAML columns or warehouse schema discovery. +- Do not write computed `columns:`, `column_overrides:`, `grain:`, or `measures:` for a dbt model unless those exact column names are confirmed by dbt YAML columns or warehouse schema discovery. - Do not invent joins from `relationships` tests if the target model/table is not found in SL or the warehouse. - Do not read `peerFileIndex` paths - use `read_raw_file` only on `rawFiles` and `dependencyPaths` from the WorkUnit. diff --git a/packages/context/skills/lookml_ingest/SKILL.md b/packages/context/skills/lookml_ingest/SKILL.md index cc723737..52b08438 100644 --- a/packages/context/skills/lookml_ingest/SKILL.md +++ b/packages/context/skills/lookml_ingest/SKILL.md @@ -12,7 +12,7 @@ LookML views map to SL sources, `measure:` to measures, `explore: { join: }` to | LookML | KTX form | Notes | |---|---|---| -| `view: X { sql_table_name: …; measure:/dimension:/join: }` | **Overlay** at `/X.yaml` with `measures`, `columns` (computed), `joins`, `segments` | Manifest-backed; inherit grain/columns | +| `view: X { sql_table_name: …; measure:/dimension:/join: }` | **Overlay** at `/X.yaml` with `measures`, computed-only `columns`, `column_overrides`, `joins`, `segments` | Manifest-backed; inherit grain/columns | | `view: X { derived_table: { sql: … } }` | **Standalone** with top-level `sql:`, explicit `grain:` + `columns:` | No manifest entry exists | | `view: X { sql_always_where:

}` | **Standalone** with `sql: SELECT * FROM WHERE

` | Enforcement, not opt-in | | `explore: { join: Y { sql_on: …; relationship: … } }` | `joins:` entry `{ to: Y, on: " = Y.", relationship: … }` | On the overlay or standalone | @@ -136,7 +136,8 @@ KTX overlay at `/fct_labs.yaml`: ```yaml name: fct_labs -description: "Lab-order fact table. One row per lab order event." +descriptions: + user: "Lab-order fact table. One row per lab order event." columns: - name: is_byol type: boolean diff --git a/packages/context/skills/metabase_ingest/SKILL.md b/packages/context/skills/metabase_ingest/SKILL.md index af25288f..aefd067f 100644 --- a/packages/context/skills/metabase_ingest/SKILL.md +++ b/packages/context/skills/metabase_ingest/SKILL.md @@ -79,7 +79,7 @@ SL source, `tables:` frontmatter, `sl_refs`, or `emit_unmapped_fallback`: For each card: 1. Analyze `resolvedSql` + `resultMetadata`: identify base tables, aggregations, joins, filters, column types. -2. **REQUIRED before any write**: call `sl_discover` for every candidate target source name. The response tells you whether the name is manifest-backed (`Type: table` or `Type: sql`). For manifest-backed names you MUST use the overlay shape (`name:` + `measures:`/`segments:`/`description:` only - no `sql:`, `table:`, `grain:`, or `columns:`); the tool will reject a standalone write and you'll have wasted the call. If `sl_discover` returns nothing for the name, you can write a standalone source. Also call `sl_read_source` on existing sources you intend to extend so you don't duplicate measures. +2. **REQUIRED before any write**: call `sl_discover` for every candidate target source name. The response tells you whether the name is manifest-backed (`Type: table` or `Type: sql`). For manifest-backed names you MUST use the overlay shape (`name:` plus overlay fields such as `measures:`, `segments:`, `descriptions:`, `joins:`, `disable_joins:`, `column_overrides:`, and computed-only `columns:` entries with `expr` + `type`; no `sql:`, `table:`, `grain:`, or base-table `columns:`); the tool will reject a standalone write and you'll have wasted the call. If `sl_discover` returns nothing for the name, you can write a standalone source. Also call `sl_read_source` on existing sources you intend to extend so you don't duplicate measures. 3. Include `rawPaths: ["cards/.json"]` on every `sl_write_source`, `sl_edit_source`, and `wiki_write` call. If one artifact generalizes multiple near-duplicate cards, include each contributing card path and no unrelated cards. 4. Decide: - Simple aggregation on a table that already has a source → `sl_edit_source` to add a measure. @@ -98,7 +98,7 @@ measures: expr: "" ``` -Overlay shape: `name:` plus any of `measures:`, `segments:`, `descriptions:`, `joins:`, `disable_joins:`. Never include `sql:`, `table:`, `grain:`, or `columns:` on a manifest-backed name - those would shadow the manifest's schema and drop its joins. Overlay `joins:` are merged additively with the manifest's joins (deduped by `to` + `on`); use `disable_joins: [""]` to suppress a specific manifest join. After the overlay exists, use `sl_edit_source` for further tweaks. See `sl_capture` skill for the canonical overlay rule. +Overlay shape: `name:` plus any of `measures:`, `segments:`, `descriptions:`, `joins:`, `disable_joins:`, `exclude_columns:`, `column_overrides:`, or computed-only `columns:` entries with `expr` + `type`. Never include `sql:`, `table:`, `grain:`, or base-table `columns:` on a manifest-backed name — those would shadow the manifest's schema and drop its joins. Use `column_overrides:` for inherited column descriptions. Overlay `joins:` are merged additively with the manifest's joins (deduped by `to` + `on`); use `disable_joins: [""]` to suppress a specific manifest join. After the overlay exists, use `sl_edit_source` for further tweaks. See `sl_capture` skill for the canonical overlay rule. **Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '' })` before writing. The matching manifest entry's `name` is the value you use in `joins: [- to: ]` only when the card output exposes a local key that matches the target source grain (for example `account_id = mart_account_segments.account_id`). Do not declare a KTX join just because the card SQL joins that table internally. If the output only exposes display fields such as `account_name`, keep the SQL source self-contained or project the key before adding the join. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse. diff --git a/packages/context/skills/metricflow_ingest/SKILL.md b/packages/context/skills/metricflow_ingest/SKILL.md index 646dedb8..42caf604 100644 --- a/packages/context/skills/metricflow_ingest/SKILL.md +++ b/packages/context/skills/metricflow_ingest/SKILL.md @@ -12,7 +12,7 @@ A MetricFlow `semantic_model` maps to an SL source; MetricFlow `measures` map to | MetricFlow | KTX form | Notes | |---|---|---| -| `semantic_model: X { model: ref('t') }` with measures + dimensions | **Overlay** at `/X.yaml` with `measures`, `columns` (computed), `joins` | The `model:` ref resolves to a manifest table. | +| `semantic_model: X { model: ref('t') }` with measures + dimensions | **Overlay** at `/X.yaml` with `measures`, computed-only `columns`, `column_overrides`, `joins` | The `model:` ref resolves to a manifest table. | | `semantic_model: X { model: source('s','t') }` | **Overlay** at `/X.yaml` over table `t`. | Same shape; `source()` still resolves to a physical table. | | `semantic_model: X { model: }` with no manifest entry | **Standalone** with explicit `sql:`, `grain:`, `columns:` | Happens when the dbt manifest isn't available. | | `semantic_model: Y { extends: X }` | **Merge** Y's measures/dimensions/entities into X's overlay, or write a single overlay named for the most-derived child (Y) containing both X's and Y's primitives | Do not emit a second overlay for X - flatten. | @@ -84,7 +84,7 @@ If `sl_discover` errors because no such table exists, use `discover_data` and `entity_details` to find the warehouse target. If a SQL probe is still needed, call `sql_execution` with the same warehouse connection name, for example: `sql_execution({connectionName: "warehouse", sql: "SELECT 1 FROM analytics.orders LIMIT 0"})`. -**Never invent column names** - every column in `columns:`, `grain:`, and +**Never invent column names** - every column in computed `columns:`, `column_overrides:`, `grain:`, and `sql:` must be sourced from raw files, `entity_details`, or a successful SQL probe. diff --git a/packages/context/skills/sl/SKILL.md b/packages/context/skills/sl/SKILL.md index 2a1e4a09..d5f334fe 100644 --- a/packages/context/skills/sl/SKILL.md +++ b/packages/context/skills/sl/SKILL.md @@ -39,6 +39,10 @@ columns: # computed dimensions only - name: is_large_order type: boolean expr: "amount > 1000" +column_overrides: # metadata patches for inherited columns + - name: status + descriptions: + user: "Order lifecycle status." segments: - name: paid_non_refunded expr: "is_paid = true AND is_refunded = false" @@ -51,6 +55,7 @@ joins: Rules: - Do **not** repeat base-table columns, grain, `table`, or `source_type` in an overlay - those are inherited. - Overlay columns MUST be computed (`expr` + `type`). +- Use `column_overrides` to add descriptions or metadata to inherited manifest columns. Do not put `type` or `expr` in `column_overrides`. - `exclude_columns` hides specific manifest columns; `disable_joins` suppresses specific auto-detected joins. ### Standalone table sources @@ -110,7 +115,7 @@ An SQL source is a one-shot answer: the aggregation is frozen, callers cannot re ### Columns -Every standalone column requires `name` and `type`. Overlays have computed columns only. +Every standalone column requires `name` and `type`. Overlays have computed columns in `columns:` and manifest column metadata patches in `column_overrides:`. - `type`: one of `string`, `number`, `boolean`, `time`. Map LookML `date`/`datetime`/`timestamp` → `time`. Map LookML `yesno` → `boolean`. - `role` (optional): `time` enables time-granularity queries (month, week, day). `default` is the implicit fallback. diff --git a/packages/context/skills/sl_capture/SKILL.md b/packages/context/skills/sl_capture/SKILL.md index 3d19118f..22e55859 100644 --- a/packages/context/skills/sl_capture/SKILL.md +++ b/packages/context/skills/sl_capture/SKILL.md @@ -100,7 +100,33 @@ measures: **Extract repeated filter bundles into named segments.** If the same predicate appears on multiple measures of the same source, lift it to a `segments[]` entry and have each measure reference it. One edit updates every measure that depends on it. -**Never write a standalone file on a manifest-backed name.** If `sl_discover({ query: "" })` finds an existing schema for that name, you MUST write an overlay (`name:` + `measures:`/`segments:`/`descriptions:` only - no `sql:`, `table:`, `grain:`, `columns:`, `joins:`). A standalone with `sql:` or `table:` on a manifest-backed name clobbers the inherited columns and joins; `sl_write_source` and `sl_validate` both reject this shape with a clear fix hint. Always run `sl_discover` before your first write on any existing name. +**Never write a standalone file on a manifest-backed name.** If `sl_discover({ query: "" })` finds an existing schema for that name, you MUST write an overlay. A standalone with `sql:` or `table:` on a manifest-backed name clobbers the inherited columns and joins; `sl_write_source` and `sl_validate` both reject this shape with a clear fix hint. Always run `sl_discover` before your first write on any existing name. + +Overlay before/after examples: + +```yaml +# Wrong: patches an inherited manifest column through columns: +name: fct_orders +columns: + - name: status + descriptions: + user: "Order lifecycle status." +``` + +```yaml +# Right: patch inherited columns with column_overrides: +name: fct_orders +column_overrides: + - name: status + descriptions: + user: "Order lifecycle status." +columns: + - name: is_large_order + type: boolean + expr: "amount > 1000" +``` + +Overlay YAML may include `measures:`, `segments:`, `descriptions:`, `joins:`, `disable_joins:`, `exclude_columns:`, `column_overrides:`, and computed-only `columns:` entries with `expr` and `type`. Do not include `sql:`, `table:`, `grain:`, or base-table `columns:`. **Prefer overlay decomposition over standalone SQL sources.** Before reaching for `source_type: sql`, check whether the metric decomposes into measures on existing overlays (including cross-source derived measures). Use `source_type: sql` only when: - The metric requires per-user/per-entity derivation that cannot be expressed as a single `expr` (e.g., `EXISTS` over a time-windowed subset), OR diff --git a/packages/context/src/agent/agent-runner.service.ts b/packages/context/src/agent/agent-runner.service.ts index 11a0715c..128818f9 100644 --- a/packages/context/src/agent/agent-runner.service.ts +++ b/packages/context/src/agent/agent-runner.service.ts @@ -1,4 +1,4 @@ -import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; import { generateText, stepCountIs, type TelemetrySettings, type Tool } from 'ai'; import { noopLogger, type KtxLogger } from '../core/index.js'; import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from '../llm/index.js'; @@ -36,14 +36,6 @@ export interface AgentRunnerServiceDeps { logger?: KtxLogger; } -function splitSystemPromptMessages(messages: ReturnType['messages']) { - const systemMessages = messages.filter((message) => message.role === 'system'); - return { - system: systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages, - messages: messages.filter((message) => message.role !== 'system'), - }; -} - export class AgentRunnerService { private readonly logger: KtxLogger; @@ -62,7 +54,7 @@ export class AgentRunnerService { tools: params.toolSet, model, }); - const promptMessages = splitSystemPromptMessages(built.messages); + const promptMessages = splitKtxSystemMessages(built.messages); await this.deps.debugRequestRecorder?.record( summarizeKtxLlmDebugRequest({ diff --git a/packages/context/src/ingest/adapters/metricflow/import-semantic-models.test.ts b/packages/context/src/ingest/adapters/metricflow/import-semantic-models.test.ts index c0d72e35..d5a7e3c5 100644 --- a/packages/context/src/ingest/adapters/metricflow/import-semantic-models.test.ts +++ b/packages/context/src/ingest/adapters/metricflow/import-semantic-models.test.ts @@ -38,7 +38,7 @@ describe('importMetricflowSemanticModels', () => { const scoped = { getManifestEntry: vi.fn().mockResolvedValue(null), isManifestBacked: vi.fn().mockResolvedValue(false), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockResolvedValue(null), writeSource: vi.fn().mockResolvedValue({ warnings: [] }), }; @@ -104,7 +104,7 @@ describe('importMetricflowSemanticModels', () => { const scoped = { getManifestEntry: vi.fn().mockResolvedValue(null), isManifestBacked: vi.fn().mockResolvedValue(false), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockImplementation((connectionId: string, sourceName: string) => Promise.resolve(sourceName === 'orders' ? { name: 'orders' } : null), ), @@ -139,7 +139,7 @@ describe('importMetricflowSemanticModels', () => { const scoped = { getManifestEntry: vi.fn().mockResolvedValue(null), isManifestBacked: vi.fn().mockResolvedValue(false), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockResolvedValue(null), writeSource: vi.fn().mockRejectedValueOnce(new Error('cannot write orders')).mockResolvedValue({ warnings: [] }), }; @@ -190,7 +190,7 @@ describe('importMetricflowSemanticModels', () => { isManifestBacked: vi.fn().mockImplementation(async (_connectionId: string, sourceName: string) => { return sourceName === 'orders'; }), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockResolvedValue(null), writeSource: vi.fn().mockImplementation(async (_connectionId: string, source: (typeof written)[number]) => { written.push(source); @@ -268,7 +268,7 @@ describe('importMetricflowSemanticModels', () => { isManifestBacked: vi.fn().mockImplementation(async (_connectionId: string, sourceName: string) => { return sourceName === 'orders'; }), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockResolvedValue(null), writeSource: vi.fn().mockResolvedValue({ warnings: [] }), }; @@ -311,7 +311,7 @@ describe('importMetricflowSemanticModels', () => { const scoped = { getManifestEntry: vi.fn().mockResolvedValue(null), isManifestBacked: vi.fn().mockResolvedValue(false), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), loadSource: vi.fn().mockResolvedValue(null), writeSource: vi .fn() diff --git a/packages/context/src/ingest/adapters/metricflow/import-semantic-models.ts b/packages/context/src/ingest/adapters/metricflow/import-semantic-models.ts index 13127a3d..7d80a9c0 100644 --- a/packages/context/src/ingest/adapters/metricflow/import-semantic-models.ts +++ b/packages/context/src/ingest/adapters/metricflow/import-semantic-models.ts @@ -71,7 +71,7 @@ export async function importMetricflowSemanticModels( let crossModelSourcesCreated = 0; const preexistingSourceNames = new Set( - (await semanticLayerService.loadAllSources(input.connectionId)).map((source) => source.name), + (await semanticLayerService.loadAllSources(input.connectionId)).sources.map((source) => source.name), ); const modelContexts: MetricflowSemanticModelImportContext[] = []; const sourceNameByModelRef = new Map(); diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index bc25308f..c73eb436 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -187,7 +187,10 @@ const makeDeps = () => { loadAllSources: vi .fn() .mockImplementation((connectionId: string) => - Promise.resolve(connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : []), + Promise.resolve({ + sources: connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : [], + loadErrors: [], + }), ), }; const slSearchService = { @@ -1347,7 +1350,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { frontmatter: { sl_refs: ['looker__b2b__sales_pipeline.arr'] }, }); deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => - Promise.resolve([{ name: `${connectionId}_source` }]), + Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }), ); deps.agentRunner.runLoop.mockImplementation(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-bundle-wu') { @@ -1447,7 +1450,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { parseArtifacts: { semanticModels: [{ name: 'orders' }] }, }); deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => - Promise.resolve([{ name: `${connectionId}_source` }]), + Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }), ); const postProcessor = { run: vi.fn().mockResolvedValue({ @@ -1631,7 +1634,10 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { const deps = makeDeps(); deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['postgres-warehouse']); deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => - Promise.resolve(connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : []), + Promise.resolve({ + sources: connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : [], + loadErrors: [], + }), ); const runner = buildRunner(deps); @@ -1659,7 +1665,10 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { it('does not resolve qualified fallback table refs by source name alone', async () => { const deps = makeDeps(); - deps.semanticLayerService.loadAllSources.mockResolvedValue([{ name: 'orders', table: 'sales.orders' }]); + deps.semanticLayerService.loadAllSources.mockResolvedValue({ + sources: [{ name: 'orders', table: 'sales.orders' }], + loadErrors: [], + }); const runner = buildRunner(deps); await expect( diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 582cbbf3..33495736 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -300,7 +300,7 @@ export class IngestBundleRunner { const blocks = await Promise.all( connectionIds.map(async (connectionId) => { try { - const sources = await this.deps.semanticLayerService.loadAllSources(connectionId); + const { sources } = await this.deps.semanticLayerService.loadAllSources(connectionId); const names = sources.map((source) => source.name).sort((left, right) => left.localeCompare(right)); const body = names.length > 0 ? names.join('\n') : '(no sources yet)'; return `## ${connectionId}\n${body}`; @@ -329,7 +329,7 @@ export class IngestBundleRunner { ): Promise { for (const connectionId of connectionIds) { try { - const sources = await semanticLayerService.loadAllSources(connectionId); + const { sources } = await semanticLayerService.loadAllSources(connectionId); if (sources.some((source) => semanticSourceMatchesTableRef(source, tableRef))) { return true; } @@ -1211,7 +1211,7 @@ export class IngestBundleRunner { ].sort(); for (const connectionId of touchedConnections) { try { - const allSources = await this.deps.semanticLayerService.loadAllSources(connectionId); + const { sources: allSources } = await this.deps.semanticLayerService.loadAllSources(connectionId); await this.deps.slSearchService.indexSources(connectionId, allSources); } catch (err) { this.logger.warn( diff --git a/packages/context/src/ingest/page-triage/page-triage.service.test.ts b/packages/context/src/ingest/page-triage/page-triage.service.test.ts index 2fa367aa..4fd57c42 100644 --- a/packages/context/src/ingest/page-triage/page-triage.service.test.ts +++ b/packages/context/src/ingest/page-triage/page-triage.service.test.ts @@ -227,9 +227,10 @@ describe('PageTriageService', () => { }); generateTextMock .mockImplementationOnce((args: any) => { - const systemMessage = args.messages.find((m: { role: string }) => m.role === 'system'); + const systemMessage = args.system ?? args.messages.find((m: { role: string }) => m.role === 'system'); const userMessage = args.messages.find((m: { role: string }) => m.role === 'user'); - const systemText = systemMessage.content as string; + const systemText = + typeof systemMessage === 'string' ? systemMessage : (systemMessage.content as string); const userText = userMessage.content as string; expect(systemText).toContain( 'Reusable templates and scripts are durable knowledge regardless of subject matter.', diff --git a/packages/context/src/ingest/page-triage/page-triage.service.ts b/packages/context/src/ingest/page-triage/page-triage.service.ts index f4e6e65f..765b4c21 100644 --- a/packages/context/src/ingest/page-triage/page-triage.service.ts +++ b/packages/context/src/ingest/page-triage/page-triage.service.ts @@ -1,7 +1,7 @@ import { createHash } from 'node:crypto'; import { readdir, readFile } from 'node:fs/promises'; import { dirname, join, relative } from 'node:path'; -import { KtxMessageBuilder, type KtxLlmProvider } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider } from '@ktx/llm'; import { generateText, type ToolSet } from 'ai'; import pLimit from 'p-limit'; import { z } from 'zod'; @@ -346,10 +346,12 @@ export class PageTriageService { tools: {}, model, }); + const split = splitKtxSystemMessages(built.messages); const result = await this.runGenerateText({ model, temperature: 0, - messages: built.messages, + ...(split.system ? { system: split.system } : {}), + messages: split.messages, tools: built.tools as ToolSet, }); return result.text; diff --git a/packages/context/src/ingest/wiki-sl-ref-repair.test.ts b/packages/context/src/ingest/wiki-sl-ref-repair.test.ts index 68f2b349..bcf4a993 100644 --- a/packages/context/src/ingest/wiki-sl-ref-repair.test.ts +++ b/packages/context/src/ingest/wiki-sl-ref-repair.test.ts @@ -44,23 +44,26 @@ describe('repairWikiSlRefs', () => { })), }; const semanticLayerService = { - loadAllSources: vi.fn(async () => [ - { - name: 'mart_customer_health', - grain: [], - columns: [], - joins: [], - measures: [{ name: 'high_risk_account_count', expr: 'count(*)' }], - segments: [{ name: 'high_risk', expr: "risk_level = 'high'" }], - }, - { - name: 'int_procurement_qualifying_actions', - grain: [], - columns: [], - joins: [], - measures: [], - }, - ]), + loadAllSources: vi.fn(async () => ({ + sources: [ + { + name: 'mart_customer_health', + grain: [], + columns: [], + joins: [], + measures: [{ name: 'high_risk_account_count', expr: 'count(*)' }], + segments: [{ name: 'high_risk', expr: "risk_level = 'high'" }], + }, + { + name: 'int_procurement_qualifying_actions', + grain: [], + columns: [], + joins: [], + measures: [], + }, + ], + loadErrors: [], + })), }; const result = await repairWikiSlRefs({ diff --git a/packages/context/src/ingest/wiki-sl-ref-repair.ts b/packages/context/src/ingest/wiki-sl-ref-repair.ts index e416c52b..3205ebbd 100644 --- a/packages/context/src/ingest/wiki-sl-ref-repair.ts +++ b/packages/context/src/ingest/wiki-sl-ref-repair.ts @@ -56,7 +56,8 @@ async function loadVisibleSlRefs( const warnings: string[] = []; for (const connectionId of connectionIds) { try { - for (const source of await semanticLayerService.loadAllSources(connectionId)) { + const { sources } = await semanticLayerService.loadAllSources(connectionId); + for (const source of sources) { for (const ref of entityRefsForSource(source)) { refs.add(ref); } diff --git a/packages/context/src/llm/generation.ts b/packages/context/src/llm/generation.ts index 1bbdbcab..7cb11d58 100644 --- a/packages/context/src/llm/generation.ts +++ b/packages/context/src/llm/generation.ts @@ -1,4 +1,4 @@ -import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; import { generateText, Output, type FlexibleSchema, type ToolSet } from 'ai'; type GenerateTextInput = Parameters[0]; @@ -29,10 +29,12 @@ export async function generateKtxText(input: GenerateKtxTextInput): Promise( tools: input.tools ?? {}, model, }); + const split = splitKtxSystemMessages(built.messages); const result = await (input.generateText ?? generateText)({ model, temperature: input.temperature ?? 0, - messages: built.messages, + ...(split.system ? { system: split.system } : {}), + messages: split.messages, tools: built.tools as ToolSet, ...(hasTools(built.tools as ToolSet) ? { diff --git a/packages/context/src/memory/memory-agent.service.ingest.test.ts b/packages/context/src/memory/memory-agent.service.ingest.test.ts index 6375e494..2df4140c 100644 --- a/packages/context/src/memory/memory-agent.service.ingest.test.ts +++ b/packages/context/src/memory/memory-agent.service.ingest.test.ts @@ -89,7 +89,7 @@ const buildMocks = (overrides: Partial = {}): BuiltMocks => { embeddingService: { computeEmbedding: vi.fn() }, semanticLayerService: { forWorktree: vi.fn().mockReturnThis(), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), readSourceFile: vi.fn(), }, slSearchService: { indexSources: vi.fn(), buildSearchText: vi.fn() }, diff --git a/packages/context/src/memory/memory-agent.service.ts b/packages/context/src/memory/memory-agent.service.ts index 437111e4..d7e86d3d 100644 --- a/packages/context/src/memory/memory-agent.service.ts +++ b/packages/context/src/memory/memory-agent.service.ts @@ -308,7 +308,7 @@ export class MemoryAgentService { // Reindex SL search if any SL actions actually landed on main. if (hasSL && finalActions.some((a) => a.target === 'sl')) { try { - const allSources = await this.deps.semanticLayerService.loadAllSources(input.connectionId!); + const { sources: allSources } = await this.deps.semanticLayerService.loadAllSources(input.connectionId!); await this.deps.slSearchService.indexSources(input.connectionId!, allSources); } catch (e) { this.logger.warn( @@ -610,7 +610,7 @@ export class MemoryAgentService { private async buildSlIndex(connectionId: string): Promise { const [sources, warehouseLine] = await Promise.all([ - this.deps.semanticLayerService.loadAllSources(connectionId), + this.deps.semanticLayerService.loadAllSources(connectionId).then((result) => result.sources), this.buildWarehouseLine(connectionId), ]); const indexLines = diff --git a/packages/context/src/scan/description-generation.test.ts b/packages/context/src/scan/description-generation.test.ts index bd9621df..8ffd3b5c 100644 --- a/packages/context/src/scan/description-generation.test.ts +++ b/packages/context/src/scan/description-generation.test.ts @@ -203,11 +203,11 @@ describe('KtxDescriptionGenerator', () => { expect(generateText).toHaveBeenCalledWith( expect.objectContaining({ temperature: 0.2, + system: expect.objectContaining({ + role: 'system', + content: expect.stringContaining('Please provide a concise description in 12 words or less.'), + }), messages: expect.arrayContaining([ - expect.objectContaining({ - role: 'system', - content: expect.stringContaining('Please provide a concise description in 12 words or less.'), - }), expect.objectContaining({ role: 'user', content: expect.stringContaining(' status '), @@ -215,6 +215,8 @@ describe('KtxDescriptionGenerator', () => { ]), }), ); + const lastCall = vi.mocked(generateText).mock.calls.at(-1)?.[0]; + expect(lastCall?.messages?.some((message) => message.role === 'system')).toBe(false); }); it('samples through the connector when column values are not pre-fetched', async () => { @@ -391,3 +393,289 @@ describe('KtxDescriptionGenerator', () => { expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders'); }); }); + +describe('KtxDescriptionGenerator resilience', () => { + function createLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + } + + it('retries sampleTable on transient failure and uses sampled rows when it eventually succeeds', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValueOnce(new Error('pool: transient ECONNRESET')) + .mockRejectedValueOnce(new Error('pool: transient ECONNRESET')) + .mockResolvedValue({ + headers: ['id', 'status'], + rows: [ + [1, 'paid'], + [2, 'refunded'], + ], + totalRows: 2, + }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const logger = createLogger(); + const warnings: Array<{ code: string; table?: string }> = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Commerce orders'), + logger, + onWarning: (warning) => warnings.push({ code: warning.code, ...(warning.table ? { table: warning.table } : {}) }), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orders' }, + }); + + expect(description).toBe('Commerce orders'); + expect(sampleTable).toHaveBeenCalledTimes(3); + expect(logger.warn).toHaveBeenCalledTimes(2); + expect(warnings).toEqual([]); + }); + + it('falls back to metadata-only prompt when sampleTable retries exhaust', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const logger = createLogger(); + const warnings: Array<{ code: string; table?: string; metadata?: Record }> = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Customer reference data'), + logger, + onWarning: (warning) => + warnings.push({ + code: warning.code, + ...(warning.table ? { table: warning.table } : {}), + ...(warning.metadata ? { metadata: warning.metadata } : {}), + }), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { + catalog: null, + db: 'public', + name: 'customers', + columns: [ + { name: 'id', nativeType: 'uuid' }, + { name: 'email', nativeType: 'text', comment: 'Primary contact email' }, + ], + }, + }); + + expect(description).toBe('Customer reference data'); + expect(sampleTable).toHaveBeenCalledTimes(3); + expect(warnings.map((warning) => warning.code)).toEqual(['sampling_failed', 'description_fallback_used']); + expect(warnings[1]?.metadata?.reason).toBe('sampling_failed'); + const userPrompt = (vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> }) + .messages.find((message) => message.role === 'user')?.content; + expect(userPrompt).toContain('Columns (metadata only, no sample rows)'); + expect(userPrompt).toContain('email (text)'); + expect(userPrompt).toContain('Primary contact email'); + }); + + it('emits enrichment_failed and returns null when both sampling and metadata-only LLM fail', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createFailingLlmProvider(), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orphan', columns: [{ name: 'id' }] }, + }); + + expect(description).toBeNull(); + expect(warnings).toEqual(['sampling_failed', 'enrichment_failed']); + }); + + it('uses metadata-only fallback when connector has no sampleTable', async () => { + const connector = createConnector(); + const samplerWithoutTable: KtxScanConnector = { + ...connector, + sampleTable: undefined, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Orders mart'), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector: samplerWithoutTable, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { + catalog: null, + db: 'public', + name: 'mart_orders', + columns: [{ name: 'order_id', nativeType: 'uuid' }], + }, + }); + + expect(description).toBe('Orders mart'); + expect(warnings).toEqual(['connector_capability_missing', 'description_fallback_used']); + }); + + it('aborts retry loop when the scan context signal fires', async () => { + const controller = new AbortController(); + const sampleTable = vi.fn>().mockImplementation(async () => { + controller.abort(); + throw new Error('first attempt blew up'); + }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('should not be called'), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + await expect( + generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1', signal: controller.signal }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orders' }, + }), + ).rejects.toThrow('aborted'); + + expect(sampleTable).toHaveBeenCalledTimes(1); + expect(warnings).toEqual([]); + }); + + it('generates column descriptions from rawDescriptions when sampleColumn is unavailable', async () => { + const samplerWithoutColumn: KtxScanConnector = { + ...createConnector(), + sampleColumn: undefined, + }; + const logger = createLogger(); + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Payment lifecycle state'), + logger, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector: samplerWithoutColumn, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'status', rawDescriptions: { db: 'order lifecycle state' } }], + }, + }); + + expect(result.columnDescriptions).toEqual([['status', 'Payment lifecycle state']]); + expect(logger.warn).toHaveBeenCalled(); + const userPrompt = ( + vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> } + ).messages.find((message) => message.role === 'user')?.content; + expect(userPrompt).toContain(' unavailable '); + expect(userPrompt).toContain(' order lifecycle state '); + }); + + it('generates column descriptions from rawDescriptions when sampleColumn retries exhaust', async () => { + const sampleColumn = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const flakyConnector: KtxScanConnector = { + ...createConnector(), + sampleColumn, + }; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Customer reference identifier'), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector: flakyConnector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'customer_id', rawDescriptions: { db: 'FK to customers.id' } }], + }, + }); + + expect(sampleColumn).toHaveBeenCalledTimes(3); + expect(result.columnDescriptions).toEqual([['customer_id', 'Customer reference identifier']]); + }); + + it('skips column LLM call only when neither samples nor rawDescriptions are available', async () => { + const sampleColumn = vi + .fn>() + .mockResolvedValue({ values: [null, null], nullCount: 2, distinctCount: 0 }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleColumn, + }; + vi.mocked(generateText).mockClear(); + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('should not be called'), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'opaque_blob' }], + }, + }); + + expect(result.columnDescriptions).toEqual([['opaque_blob', null]]); + expect(generateText).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/context/src/scan/description-generation.ts b/packages/context/src/scan/description-generation.ts index 22e33f93..184827ff 100644 --- a/packages/context/src/scan/description-generation.ts +++ b/packages/context/src/scan/description-generation.ts @@ -5,11 +5,18 @@ import type { KtxColumnSampleResult, KtxScanContext, KtxScanLoggerPort, + KtxScanWarning, KtxTableRef, KtxTableSampleInput, KtxTableSampleResult, } from './types.js'; +interface KtxDescriptionTableColumn { + name: string; + nativeType?: string | null; + comment?: string | null; +} + export interface KtxDescriptionCachePort { buildTableKey(table: KtxTableRef): string; buildColumnKey(table: KtxTableRef, columnName: string): string; @@ -53,6 +60,7 @@ export interface KtxDescriptionColumnTable extends KtxTableRef { export interface KtxDescriptionTableInput extends KtxTableRef { rawDescriptions?: Record; + columns?: KtxDescriptionTableColumn[]; } export interface KtxColumnAnalysisResult { @@ -72,7 +80,8 @@ export interface KtxColumnDescriptionPromptInput { export interface KtxTableDescriptionPromptInput { tableName: string; - sampleData: KtxTableSampleResult; + sampleData?: KtxTableSampleResult; + columns?: KtxDescriptionTableColumn[]; dataSourceType: string; rawDescriptions?: Record; } @@ -114,6 +123,7 @@ export interface KtxDescriptionGeneratorOptions { llmProvider: KtxLlmProvider; cache?: KtxDescriptionCachePort; logger?: KtxScanLoggerPort; + onWarning?: (warning: KtxScanWarning) => void; settings: KtxDescriptionGenerationSettings; } @@ -136,6 +146,66 @@ function errorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error); } +class KtxAbortedError extends Error { + constructor() { + super('aborted'); + this.name = 'KtxAbortedError'; + } +} + +async function delayWithAbort(ms: number, signal?: AbortSignal): Promise { + if (!signal) { + await new Promise((resolve) => setTimeout(resolve, ms)); + return; + } + if (signal.aborted) { + throw new KtxAbortedError(); + } + await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + signal.removeEventListener('abort', onAbort); + resolve(); + }, ms); + const onAbort = (): void => { + clearTimeout(timer); + reject(new KtxAbortedError()); + }; + signal.addEventListener('abort', onAbort, { once: true }); + }); +} + +interface RetryAsyncOptions { + attempts: number; + baseDelayMs: number; + signal?: AbortSignal; + onAttemptFailure?: (error: unknown, attempt: number) => void; +} + +async function retryAsync(fn: () => Promise, options: RetryAsyncOptions): Promise { + const attempts = Math.max(1, options.attempts); + let lastError: unknown; + for (let attempt = 1; attempt <= attempts; attempt += 1) { + if (options.signal?.aborted) { + throw new KtxAbortedError(); + } + try { + return await fn(); + } catch (error) { + lastError = error; + if (error instanceof KtxAbortedError) { + throw error; + } + options.onAttemptFailure?.(error, attempt); + if (attempt === attempts) { + break; + } + const delay = options.baseDelayMs * 2 ** (attempt - 1); + await delayWithAbort(delay, options.signal); + } + } + throw lastError; +} + function toTableRef(table: KtxTableRef): KtxTableRef { return { catalog: table.catalog, @@ -205,11 +275,12 @@ Example: systemParts.push(wordLimitLine(input.maxWords)); } + const sampleValuesContent = valuesStr.length > 0 ? valuesStr : 'unavailable'; let user = ` ${input.tableContext} ${input.columnName} - ${valuesStr} + ${sampleValuesContent} `; const sources = descriptionSources(input.rawDescriptions); @@ -228,16 +299,6 @@ Example: export function buildKtxTableDescriptionPrompt( input: KtxTableDescriptionPromptInput & { maxWords?: number }, ): KtxDescriptionPrompt { - const columnInfo: string[] = []; - for (let index = 0; index < Math.min(input.sampleData.headers.length, 10); index += 1) { - const header = input.sampleData.headers[index]; - const sampleValues = input.sampleData.rows - .slice(0, 3) - .map((row) => row[index]) - .filter((value) => value !== null && value !== undefined); - columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`); - } - const systemParts: string[] = [ `Analyze database tables and provide a concise description. @@ -256,9 +317,38 @@ Example: "Information about healthcare professionals used for workforce manageme systemParts.push(wordLimitLine(input.maxWords)); } + const hasSamples = !!input.sampleData && input.sampleData.rows.length > 0; + let columnsLine: string; + let rowsLine: string; + if (hasSamples) { + const sampleData = input.sampleData!; + const columnInfo: string[] = []; + for (let index = 0; index < Math.min(sampleData.headers.length, 10); index += 1) { + const header = sampleData.headers[index]; + const sampleValues = sampleData.rows + .slice(0, 3) + .map((row) => row[index]) + .filter((value) => value !== null && value !== undefined); + columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`); + } + columnsLine = `Columns and sample data: ${columnInfo.join(' | ')}`; + rowsLine = `Total rows in sample: ${sampleData.rows.length}`; + } else if (input.columns && input.columns.length > 0) { + const columnInfo = input.columns.slice(0, 30).map((column) => { + const typePart = column.nativeType ? ` (${column.nativeType})` : ''; + const commentPart = column.comment ? ` — ${column.comment}` : ''; + return `${column.name}${typePart}${commentPart}`; + }); + columnsLine = `Columns (metadata only, no sample rows): ${columnInfo.join(' | ')}`; + rowsLine = 'Sample rows: unavailable'; + } else { + columnsLine = 'Columns: unavailable'; + rowsLine = 'Sample rows: unavailable'; + } + let user = `Table: ${input.tableName} -Columns and sample data: ${columnInfo.join(' | ')} -Total rows in sample: ${input.sampleData.rows.length} +${columnsLine} +${rowsLine} Data source type: ${input.dataSourceType}`; const sources = descriptionSources(input.rawDescriptions); @@ -313,12 +403,14 @@ export class KtxDescriptionGenerator { private readonly llmProvider: KtxLlmProvider; private readonly cache?: KtxDescriptionCachePort; private readonly logger?: KtxScanLoggerPort; + private readonly onWarning?: (warning: KtxScanWarning) => void; private readonly settings: ResolvedKtxDescriptionGenerationSettings; constructor(options: KtxDescriptionGeneratorOptions) { this.llmProvider = options.llmProvider; this.cache = options.cache; this.logger = options.logger; + this.onWarning = options.onWarning; this.settings = { columnMaxWords: options.settings.columnMaxWords, tableMaxWords: options.settings.tableMaxWords, @@ -366,26 +458,82 @@ export class KtxDescriptionGenerator { } } - if (!input.connector.sampleTable) { - this.logger?.warn('KTX scan connector does not support table sampling for table description generation', { + const sampleTable = input.connector.sampleTable; + let sampleData: KtxTableSampleResult | null = null; + let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null; + + if (!sampleTable) { + fallbackReason = 'capability_missing'; + this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', { connectorId: input.connector.id, table: input.table.name, }); - return 'Table not found'; + this.onWarning?.({ + code: 'connector_capability_missing', + message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, capability: 'sampleTable' }, + }); + } else { + try { + sampleData = await retryAsync( + () => + sampleTable( + { + connectionId: input.connectionId, + table: tableRef, + limit: 20, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn( + `sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + attempt, + }, + ); + }, + }, + ); + if (sampleData.rows.length === 0) { + fallbackReason = 'empty_sample'; + this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + } + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + fallbackReason = 'sampling_failed'; + this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'sampling_failed', + message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, error: errorMessage(error) }, + }); + } } try { - const sampleData = await input.connector.sampleTable( - { - connectionId: input.connectionId, - table: tableRef, - limit: 20, - }, - input.context, - ); const prompt = buildKtxTableDescriptionPrompt({ tableName: input.table.name, - sampleData, + ...(fallbackReason === null && sampleData ? { sampleData } : {}), + ...(input.table.columns && input.table.columns.length > 0 ? { columns: input.table.columns } : {}), dataSourceType: input.dataSourceType, rawDescriptions: input.table.rawDescriptions, maxWords: this.settings.tableMaxWords, @@ -394,10 +542,38 @@ export class KtxDescriptionGenerator { if (cacheKey && description) { await this.cache?.set(cacheKey, description); } + if (description && fallbackReason !== null) { + this.onWarning?.({ + code: 'description_fallback_used', + message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, reason: fallbackReason }, + }); + } + if (!description) { + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate description for table ${input.table.name}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, usedFallback: fallbackReason !== null }, + }); + } return description; } catch (error) { - this.logger?.error(`Error generating table description: ${errorMessage(error)}`); - return 'Table not found'; + this.logger?.error(`Error generating table description: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate description for table ${input.table.name}: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id }, + }); + return null; } } @@ -496,33 +672,64 @@ export class KtxDescriptionGenerator { let columnValues = column.sampleValues; if (!columnValues || columnValues.length === 0) { if (!input.connector.sampleColumn) { - this.logger?.warn('KTX scan connector does not support column sampling for column description generation', { + this.logger?.warn('KTX scan connector does not support column sampling; using available metadata only', { connectorId: input.connector.id, table: input.table.name, column: column.name, }); - return { - columnName: column.name, - description: null, - skipped: false, - processed: false, - }; + columnValues = []; + } else { + const sampleColumn = input.connector.sampleColumn; + try { + const sample = await retryAsync( + () => + sampleColumn( + { + connectionId: input.connectionId, + table: tableRef, + column: column.name, + limit: 50, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn( + `sampleColumn attempt ${attempt} failed for ${input.table.name}.${column.name}: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + attempt, + }, + ); + }, + }, + ); + columnValues = sample.values; + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + this.logger?.warn( + `sampleColumn exhausted retries for ${input.table.name}.${column.name}; using available metadata only: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + }, + ); + columnValues = []; + } } - - const sample = await input.connector.sampleColumn( - { - connectionId: input.connectionId, - table: tableRef, - column: column.name, - limit: 50, - }, - input.context, - ); - columnValues = sample.values; } const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined); - if (nonNullValues.length === 0) { + const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0; + if (nonNullValues.length === 0 && !hasRawDescriptions) { return { columnName: column.name, description: null, @@ -553,7 +760,14 @@ export class KtxDescriptionGenerator { processed: description !== null, }; } catch (error) { - this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`); + if (error instanceof KtxAbortedError) { + throw error; + } + this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + }); return { columnName: column.name, description: null, diff --git a/packages/context/src/scan/local-enrichment.test.ts b/packages/context/src/scan/local-enrichment.test.ts index 86a595d4..f0ddd448 100644 --- a/packages/context/src/scan/local-enrichment.test.ts +++ b/packages/context/src/scan/local-enrichment.test.ts @@ -404,6 +404,41 @@ describe('local scan enrichment', () => { expect(result.resolvedRelationships).toBeNull(); }); + it('forwards context.logger and emits warnings when sampleTable fails repeatedly', async () => { + const failingConnector: KtxScanConnector = { + ...connector(), + sampleTable: vi.fn(async () => { + throw new Error('pool: ECONNRESET'); + }), + }; + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'enriched', + detectRelationships: false, + connector: failingConnector, + context: { runId: 'scan-run-warnings', logger }, + providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 6 }), + }); + + const codes = result.warnings.map((warning) => warning.code); + expect(codes).toContain('sampling_failed'); + expect(codes).toContain('description_fallback_used'); + expect(result.warnings.some((warning) => warning.table === 'customers')).toBe(true); + expect(logger.warn).toHaveBeenCalled(); + expect(logger.error).toHaveBeenCalled(); + // Each of the two tables produced sampling_failed + description_fallback_used, so 2 + 2 = 4 warnings minimum. + expect(result.warnings.length).toBeGreaterThanOrEqual(4); + // Sampling was retried 3× for each of the 2 tables = 6 calls + expect(failingConnector.sampleTable).toHaveBeenCalledTimes(6); + }); + it('runs configured deterministic enrichment with descriptions and embeddings', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', diff --git a/packages/context/src/scan/local-enrichment.ts b/packages/context/src/scan/local-enrichment.ts index ffefd923..e6a9976b 100644 --- a/packages/context/src/scan/local-enrichment.ts +++ b/packages/context/src/scan/local-enrichment.ts @@ -298,6 +298,18 @@ function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable { }; } +function tableMetadataColumns(table: KtxSchemaTable): Array<{ + name: string; + nativeType?: string | null; + comment?: string | null; +}> { + return table.columns.map((column) => ({ + name: column.name, + nativeType: column.nativeType ?? null, + comment: column.comment ?? null, + })); +} + function embeddingBatchSize(maxBatchSize: number): number { return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100; } @@ -308,9 +320,19 @@ async function generateDescriptions(input: { context: KtxScanContext; providers: KtxLocalScanEnrichmentProviders; progress?: KtxProgressPort; + warnings?: KtxScanWarning[]; }): Promise { + const warningSink = input.warnings; const generator = new KtxDescriptionGenerator({ llmProvider: input.providers.llm, + ...(input.context.logger ? { logger: input.context.logger } : {}), + ...(warningSink + ? { + onWarning: (warning: KtxScanWarning) => { + warningSink.push(warning); + }, + } + : {}), settings: { columnMaxWords: 16, tableMaxWords: 24, @@ -355,6 +377,7 @@ async function generateDescriptions(input: { db: table.db, name: table.name, rawDescriptions: table.comment ? { db: table.comment } : {}, + columns: tableMetadataColumns(table), }, }); return { @@ -559,6 +582,7 @@ export async function runLocalScanEnrichment( context: input.context, providers, progress: descriptionProgress, + warnings, }), }); const embeddingProgress = progress?.startPhase(0.2); diff --git a/packages/context/src/scan/relationship-llm-proposal.test.ts b/packages/context/src/scan/relationship-llm-proposal.test.ts index eb05638e..ed43ff02 100644 --- a/packages/context/src/scan/relationship-llm-proposal.test.ts +++ b/packages/context/src/scan/relationship-llm-proposal.test.ts @@ -166,11 +166,11 @@ describe('relationship LLM proposals', () => { }); expect(generateText).toHaveBeenCalledWith( expect.objectContaining({ + system: expect.objectContaining({ + role: 'system', + content: expect.stringContaining('You are helping KTX review possible SQL relationships'), + }), messages: expect.arrayContaining([ - expect.objectContaining({ - role: 'system', - content: expect.stringContaining('You are helping KTX review possible SQL relationships'), - }), expect.objectContaining({ role: 'user', content: expect.stringContaining('"tables"'), @@ -178,9 +178,12 @@ describe('relationship LLM proposals', () => { ]), }), ); - const call = (generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]>)[0]?.[0]; + const call = ( + generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]> + )[0]?.[0]; const userMessage = call?.messages.find((m) => m.role === 'user'); expect(userMessage?.content).not.toContain('You are helping KTX review possible SQL relationships'); + expect(call?.messages.some((m) => m.role === 'system')).toBe(false); }); it('skips deterministic providers without calling generateText', async () => { diff --git a/packages/context/src/scan/types.ts b/packages/context/src/scan/types.ts index c21d21bf..bc8959f5 100644 --- a/packages/context/src/scan/types.ts +++ b/packages/context/src/scan/types.ts @@ -345,7 +345,8 @@ export type KtxScanWarningCode = | 'relationship_llm_invalid_reference' | 'relationship_llm_proposal_failed' | 'credential_redacted' - | 'enrichment_failed'; + | 'enrichment_failed' + | 'description_fallback_used'; export interface KtxScanWarning { code: KtxScanWarningCode; diff --git a/packages/context/src/sl/local-sl.test.ts b/packages/context/src/sl/local-sl.test.ts index 5e5fb6c4..00c00874 100644 --- a/packages/context/src/sl/local-sl.test.ts +++ b/packages/context/src/sl/local-sl.test.ts @@ -392,6 +392,26 @@ describe('local semantic-layer helpers', () => { ).rejects.toThrow('Invalid semantic-layer source'); }); + it('reports legacy overlay column patches with a file-attributed migration hint', async () => { + const invalidYaml = [ + 'name: orders', + 'columns:', + ' - name: status', + ' descriptions:', + ' user: Order status.', + '', + ].join('\n'); + + await expect( + validateLocalSlSource(invalidYaml, { project, connectionId: 'warehouse', sourceName: 'orders' }), + ).resolves.toEqual({ + valid: false, + errors: [ + "semantic-layer/warehouse/orders.yaml: column 'status' patches a manifest column but is in 'columns:' — move it to 'column_overrides:'", + ], + }); + }); + it('rejects unsafe source paths', async () => { await expect( readLocalSlSource(project, { diff --git a/packages/context/src/sl/local-sl.ts b/packages/context/src/sl/local-sl.ts index bf170d0a..f4844b7b 100644 --- a/packages/context/src/sl/local-sl.ts +++ b/packages/context/src/sl/local-sl.ts @@ -12,6 +12,7 @@ import { type ManifestTableEntry, projectManifestEntry, SemanticLayerService, + toResolvedWire, } from './semantic-layer.service.js'; import type { PgliteSlSearchPrototypeOwnerOptions } from './pglite-sl-search-prototype.js'; import { loadLatestSlDictionaryEntries } from './sl-dictionary-profile.js'; @@ -240,7 +241,12 @@ export async function loadLocalSlSourceRecords( if (!base) { continue; } - const source = composeOverlay(base.source, parsed); + let source: SemanticLayerSource; + try { + source = composeOverlay(base.source, parsed); + } catch (error) { + throw new Error(`${path}: ${error instanceof Error ? error.message : String(error)}`); + } sources.set(name, { ...summarizeSemanticSource({ connectionId, path, source }), yaml: sourceToYaml(source), @@ -253,11 +259,28 @@ export async function loadLocalSlSourceRecords( export async function validateLocalSlSource( rawYaml: string, - options?: { project?: KtxLocalProject; connectionId?: string }, + options?: { project?: KtxLocalProject; connectionId?: string; sourceName?: string }, ): Promise { try { const parsed = parseYamlRecord(rawYaml); const schema = parsed.table || parsed.sql ? sourceDefinitionSchema : sourceOverlaySchema; + if (schema === sourceOverlaySchema && Array.isArray(parsed.columns)) { + const sourceName = options?.sourceName ?? (typeof parsed.name === 'string' ? parsed.name : 'source'); + const path = + options?.connectionId && isSafeConnectionId(options.connectionId) + ? `semantic-layer/${options.connectionId}/${sourceName}.yaml` + : `${sourceName}.yaml`; + const legacyColumnPatchErrors = parsed.columns + .filter((column): column is Record => isRecord(column)) + .filter((column) => typeof column.name === 'string' && (!column.expr || !column.type)) + .map( + (column) => + `${path}: column '${column.name}' patches a manifest column but is in 'columns:' — move it to 'column_overrides:'`, + ); + if (legacyColumnPatchErrors.length > 0) { + return { valid: false, errors: legacyColumnPatchErrors }; + } + } const result = schema.parse(parsed); const errors: string[] = []; @@ -268,6 +291,10 @@ export async function validateLocalSlSource( ); } + if ('table' in result || 'sql' in result) { + toResolvedWire(result as SemanticLayerSource); + } + return { valid: errors.length === 0, errors }; } catch (error) { return { valid: false, errors: validationErrors(error) }; diff --git a/packages/context/src/sl/ports.ts b/packages/context/src/sl/ports.ts index 08e1ca6d..888248d5 100644 --- a/packages/context/src/sl/ports.ts +++ b/packages/context/src/sl/ports.ts @@ -1,4 +1,4 @@ -import type { SemanticLayerQueryInput, SemanticLayerSource } from './types.js'; +import type { ResolvedSemanticLayerSource, SemanticLayerQueryInput } from './types.js'; export interface KtxConnectionInfo { id: string; @@ -20,7 +20,7 @@ export interface SlConnectionCatalogPort { export interface SlPythonPort { validateSources(input: { - sources: SemanticLayerSource[]; + sources: ResolvedSemanticLayerSource[]; dialect: string; recently_touched?: string[]; }): Promise<{ @@ -28,7 +28,7 @@ export interface SlPythonPort { error?: unknown; }>; query(input: { - sources: SemanticLayerSource[]; + sources: ResolvedSemanticLayerSource[]; query: SemanticLayerQueryInput; dialect: string; }): Promise<{ data?: { sql?: string; plan?: Record } | null; error?: unknown }>; diff --git a/packages/context/src/sl/schemas.contract.test.ts b/packages/context/src/sl/schemas.contract.test.ts new file mode 100644 index 00000000..1b0dac20 --- /dev/null +++ b/packages/context/src/sl/schemas.contract.test.ts @@ -0,0 +1,68 @@ +import { execFileSync } from 'node:child_process'; +import { Ajv2020 } from 'ajv/dist/2020.js'; +import { describe, expect, it } from 'vitest'; + +import { resolvedSourceSchema } from './schemas.js'; +import { toResolvedWire } from './semantic-layer.service.js'; +import type { SemanticLayerSource } from './types.js'; + +function loadPythonSourceDefinitionSchema(): Record | null { + try { + const stdout = execFileSync('uv', ['run', 'python', '-m', 'semantic_layer', 'dump-schema'], { + cwd: new URL('../../../../', import.meta.url), + encoding: 'utf8', + stdio: ['ignore', 'pipe', 'ignore'], + }); + return JSON.parse(stdout) as Record; + } catch { + return null; + } +} + +const sourceDefinitionJsonSchema = loadPythonSourceDefinitionSchema(); + +const fixtures: SemanticLayerSource[] = [ + { + name: 'orders', + table: 'public.orders', + grain: ['id'], + columns: [ + { name: 'id', type: 'number' }, + { + name: 'status', + type: 'string', + descriptions: { dbt: 'Order lifecycle status.' }, + constraints: { dbt: { not_null: true } }, + enum_values: { dbt: ['placed', 'shipped'] }, + tests: { dbt: [{ name: 'accepted_values', package: 'dbt' }] }, + }, + ], + joins: [{ to: 'customers', on: 'orders.customer_id = customers.id', relationship: 'many_to_one' }], + measures: [{ name: 'order_count', expr: 'count(id)' }], + segments: [{ name: 'paid', expr: "status = 'paid'" }], + default_time_dimension: { dbt: 'created_at' }, + tags: { dbt: ['mart'] }, + freshness: { dbt: { loaded_at_field: 'updated_at' } }, + }, + { + name: 'aav_orders', + sql: 'select id, status from public.orders where status = paid', + grain: ['id'], + columns: [{ name: 'id', type: 'number' }], + joins: [], + measures: [], + }, +]; + +describe.skipIf(sourceDefinitionJsonSchema === null)('resolved source JSON Schema contract', () => { + it('keeps TS resolved-source fixtures accepted by the Python SourceDefinition schema', () => { + const ajv = new Ajv2020({ allErrors: true, strict: false }); + const validate = ajv.compile(sourceDefinitionJsonSchema as Record); + + for (const fixture of fixtures) { + const wire = toResolvedWire(fixture); + expect(resolvedSourceSchema.safeParse(wire).success).toBe(true); + expect(validate(wire), JSON.stringify(validate.errors, null, 2)).toBe(true); + } + }); +}); diff --git a/packages/context/src/sl/schemas.ts b/packages/context/src/sl/schemas.ts index a57359d4..bb66a1ca 100644 --- a/packages/context/src/sl/schemas.ts +++ b/packages/context/src/sl/schemas.ts @@ -78,6 +78,8 @@ const joinDeclarationSchema = z.object({ alias: z.string().optional(), }); +const resolvedJoinDeclarationSchema = joinDeclarationSchema.strict(); + const sourceColumnSchema = z.object({ name: unqualifiedNameSchema, // type/descriptions optional on standalone sources: compose-time enrichment fills them @@ -89,24 +91,39 @@ const sourceColumnSchema = z.object({ visibility: z.enum(columnVisibilityValues).optional(), descriptions: descriptionsSchema.optional(), expr: z.string().optional(), + natural_granularity: z.string().optional(), constraints: sourceKeyedColumnConstraintsSchema.optional(), enum_values: sourceKeyedStringArraySchema.optional(), tests: dbtColumnTestsSchema.optional(), }); -/** Overlay column: type requires expr (structural types are inherited from manifest). */ +const resolvedSourceColumnSchema = sourceColumnSchema.extend({ + type: z.enum(columnTypeValues), +}).strict(); + +/** Overlay column: computed columns only. Structural columns live in the manifest. */ const overlayColumnSchema = z .object({ name: unqualifiedNameSchema, - type: z.enum(columnTypeValues).optional(), + type: z.enum(columnTypeValues), role: z.enum(columnRoleValues).optional(), visibility: z.enum(columnVisibilityValues).optional(), descriptions: descriptionsSchema.optional(), - expr: z.string().optional(), + expr: z.string().min(1), }) - .refine((col) => !col.type || col.expr, { - message: "Overlay column with 'type' must also have 'expr' (only computed columns may specify a type)", - }); + .strict(); + +const columnOverrideSchema = z + .object({ + name: unqualifiedNameSchema, + role: z.enum(columnRoleValues).optional(), + visibility: z.enum(columnVisibilityValues).optional(), + descriptions: descriptionsSchema.optional(), + constraints: sourceKeyedColumnConstraintsSchema.optional(), + enum_values: sourceKeyedStringArraySchema.optional(), + tests: dbtColumnTestsSchema.optional(), + }) + .strict(); /** Standalone source: has `table` or `sql`, requires grain + columns. */ export const sourceDefinitionSchema = z @@ -143,6 +160,26 @@ export const sourceDefinitionSchema = z message: "Standalone source must have exactly one of 'table' or 'sql' (not both)", }); +export const resolvedSourceSchema = z + .object({ + name: z.string().min(1), + descriptions: descriptionsSchema.optional(), + table: z.string().optional(), + sql: z.string().optional(), + grain: z.array(unqualifiedNameSchema).min(1), + columns: z.array(resolvedSourceColumnSchema).min(1), + joins: z.array(resolvedJoinDeclarationSchema).default([]), + measures: z.array(slMeasureDefinitionSchema).default([]), + segments: z.array(segmentDefinitionSchema).optional(), + default_time_dimension: defaultTimeDimensionDbtSchema.optional(), + tags: sourceKeyedStringArraySchema.optional(), + freshness: sourceFreshnessSchema.optional(), + }) + .strict() + .refine((s) => (s.table || s.sql) && !(s.table && s.sql), { + message: "Resolved source must have exactly one of 'table' or 'sql' (not both)", + }); + /** Overlay source: no table/sql, all fields optional except name. */ export const sourceOverlaySchema = z .object({ @@ -150,6 +187,7 @@ export const sourceOverlaySchema = z descriptions: z.record(z.string(), z.string()).optional(), grain: z.array(unqualifiedNameSchema).optional(), columns: z.array(overlayColumnSchema).optional(), + column_overrides: z.array(columnOverrideSchema).optional(), joins: z.array(joinDeclarationSchema).optional(), measures: z.array(slMeasureDefinitionSchema).optional(), segments: z.array(segmentDefinitionSchema).optional(), diff --git a/packages/context/src/sl/semantic-layer.service.test.ts b/packages/context/src/sl/semantic-layer.service.test.ts index 179904d5..6ac3460a 100644 --- a/packages/context/src/sl/semantic-layer.service.test.ts +++ b/packages/context/src/sl/semantic-layer.service.test.ts @@ -2,13 +2,17 @@ import type { Mock } from 'vitest'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { + ColumnNameCollisionError, composeOverlay, + ConflictingExcludeAndOverrideError, enrichColumnsFromManifest, findDanglingSegmentRefs, projectManifestEntry, SemanticLayerService, + toResolvedWire, + UnknownColumnOverrideError, } from './semantic-layer.service.js'; -import { sourceDefinitionSchema } from './schemas.js'; +import { resolvedSourceSchema, sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js'; import type { SemanticLayerSource } from './types.js'; const pythonPort = { @@ -139,6 +143,69 @@ describe('composeOverlay', () => { expect(composed.measures).toHaveLength(1); }); + it('applies column_overrides to same-named manifest columns', () => { + const overlay = { + name: 'fct_labs', + column_overrides: [ + { name: 'lab_order_id', descriptions: { user: 'Primary key' } }, + { name: 'admin_user_id', descriptions: { user: 'FK to admin_users' } }, + ], + }; + const composed = composeOverlay(baseTable, overlay); + // No duplicate columns appended — same-named overlay entries merged onto the base. + expect(composed.columns).toHaveLength(3); + const labOrder = composed.columns.find((c) => c.name === 'lab_order_id'); + expect(labOrder?.type).toBe('string'); + expect(labOrder?.descriptions).toEqual({ user: 'Primary key' }); + const adminUser = composed.columns.find((c) => c.name === 'admin_user_id'); + expect(adminUser?.type).toBe('string'); + expect(adminUser?.descriptions).toEqual({ user: 'FK to admin_users' }); + }); + + it('appends computed columns alongside column overrides', () => { + const overlay = { + name: 'fct_labs', + column_overrides: [ + { name: 'lab_order_id', descriptions: { user: 'PK doc' } }, + ], + columns: [ + { name: 'is_byol', type: 'boolean', expr: "lab_type = 'byol'" }, + ], + }; + const composed = composeOverlay(baseTable, overlay); + expect(composed.columns).toHaveLength(4); + expect(composed.columns.find((c) => c.name === 'is_byol')?.expr).toBe("lab_type = 'byol'"); + expect(composed.columns.find((c) => c.name === 'lab_order_id')?.type).toBe('string'); + }); + + it('rejects column_overrides that target unknown manifest columns', () => { + expect(() => + composeOverlay(baseTable, { + name: 'fct_labs', + column_overrides: [{ name: 'missing', descriptions: { user: 'Nope' } }], + }), + ).toThrow(UnknownColumnOverrideError); + }); + + it('rejects computed columns whose names collide with manifest columns', () => { + expect(() => + composeOverlay(baseTable, { + name: 'fct_labs', + columns: [{ name: 'lab_order_id', type: 'string', expr: 'lab_order_id' }], + }), + ).toThrow(ColumnNameCollisionError); + }); + + it('rejects exclude/override conflicts before applying exclusions', () => { + expect(() => + composeOverlay(baseTable, { + name: 'fct_labs', + exclude_columns: ['lab_order_id'], + column_overrides: [{ name: 'lab_order_id', descriptions: { user: 'Hidden PK' } }], + }), + ).toThrow(ConflictingExcludeAndOverrideError); + }); + it('merges overlay descriptions (plural) with base descriptions keyed by source', () => { const baseWithDescriptions: SemanticLayerSource = { ...baseTable, @@ -418,6 +485,62 @@ describe('sourceDefinitionSchema', () => { }); }); +describe('sourceOverlaySchema', () => { + it('accepts column_overrides and keeps columns computed-only', () => { + const result = sourceOverlaySchema.safeParse({ + name: 'orders', + column_overrides: [{ name: 'status', descriptions: { user: 'Lifecycle status' } }], + columns: [{ name: 'is_paid', type: 'boolean', expr: "status = 'paid'" }], + }); + expect(result.success).toBe(true); + }); + + it('rejects typeless overlay columns and singular description on overrides', () => { + const result = sourceOverlaySchema.safeParse({ + name: 'orders', + column_overrides: [{ name: 'status', description: 'Lifecycle status' }], + columns: [{ name: 'status', descriptions: { user: 'Lifecycle status' } }], + }); + expect(result.success).toBe(false); + if (!result.success) { + const paths = result.error.issues.map((issue) => issue.path.join('.')); + expect(paths).toContain('column_overrides.0'); + expect(paths).toContain('columns.0.type'); + expect(paths).toContain('columns.0.expr'); + } + }); +}); + +describe('toResolvedWire', () => { + it('strips TS-only authoring and provenance fields before the Python boundary', () => { + const wire = toResolvedWire({ + name: 'orders', + table: 'public.orders', + inherits_columns_from: 'orders', + grain: ['id'], + columns: [{ name: 'id', type: 'string' }], + joins: [{ to: 'customers', on: 'orders.customer_id = customers.id', relationship: 'many_to_one', source: 'formal' }], + measures: [], + usage: { + narrative: 'Frequently queried orders.', + frequencyTier: 'high', + commonFilters: ['status'], + commonJoins: [], + }, + }); + + expect(wire).toEqual({ + name: 'orders', + table: 'public.orders', + grain: ['id'], + columns: [{ name: 'id', type: 'string' }], + joins: [{ to: 'customers', on: 'orders.customer_id = customers.id', relationship: 'many_to_one' }], + measures: [], + }); + expect(resolvedSourceSchema.parse(wire)).toEqual(wire); + }); +}); + describe('projectManifestEntry', () => { it('projects manifest usage onto the semantic-layer source', () => { const source = projectManifestEntry('orders', { @@ -537,7 +660,8 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( ].join('\n'), }); - const sources = await service.loadAllSources('conn-1'); + const { sources, loadErrors } = await service.loadAllSources('conn-1'); + expect(loadErrors).toEqual([]); expect(sources[0]).toMatchObject({ name: 'orders', @@ -601,7 +725,8 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( return Promise.reject(new Error(`Unexpected readFile: ${path}`)); }); - const sources = await service.loadAllSources('conn-1'); + const { sources, loadErrors } = await service.loadAllSources('conn-1'); + expect(loadErrors).toEqual([]); const aav = sources.find((s) => s.name === 'aav_consignments'); expect(aav).toBeDefined(); expect(aav?.columns).toEqual([ @@ -646,7 +771,8 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( }); }); - const sources = await service.loadAllSources('conn-1'); + const { sources, loadErrors } = await service.loadAllSources('conn-1'); + expect(loadErrors).toEqual([]); const aav = sources.find((s) => s.name === 'aav_consignments'); expect(aav?.columns[0].type).toBe('string'); }); @@ -670,7 +796,8 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( ].join('\n'), }); - const sources = await service.loadAllSources('conn-1'); + const { sources, loadErrors } = await service.loadAllSources('conn-1'); + expect(loadErrors).toEqual([]); const aav = sources.find((s) => s.name === 'aav_consignments'); expect(aav?.columns).toEqual([{ name: 'FOO', type: 'string' }]); }); @@ -693,7 +820,8 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( ].join('\n'), }); - const sources = await service.loadAllSources('conn-1'); + const { sources, loadErrors } = await service.loadAllSources('conn-1'); + expect(loadErrors).toEqual([]); expect(sources[0]).toMatchObject({ name: 'orders', @@ -701,6 +829,33 @@ describe('loadAllSources — standalone enrichment via inherits_columns_from', ( columns: [{ name: 'id', type: 'string', descriptions: { user: 'Stable order identifier.' } }], }); }); + + it('reports file-attributed errors for legacy overlay column patches', async () => { + const schemaPath = 'semantic-layer/conn-1/_schema/marts.yaml'; + const overlayPath = 'semantic-layer/conn-1/orders.yaml'; + configService.listFiles.mockResolvedValue({ files: [schemaPath, overlayPath] }); + configService.readFile.mockImplementation((path: string) => { + if (path === schemaPath) { + return Promise.resolve({ + content: [ + 'tables:', + ' orders:', + ' table: public.orders', + ' columns:', + ' - { name: id, type: string, pk: true }', + ].join('\n'), + }); + } + return Promise.resolve({ + content: ['name: orders', 'columns:', ' - name: id', ' descriptions: { user: "Stable id." }'].join('\n'), + }); + }); + + const { loadErrors } = await service.loadAllSources('conn-1'); + + expect(loadErrors.join('\n')).toContain(overlayPath); + expect(loadErrors.join('\n')).toContain("move it to 'column_overrides:'"); + }); }); describe('validateWithProposedSource', () => { diff --git a/packages/context/src/sl/semantic-layer.service.ts b/packages/context/src/sl/semantic-layer.service.ts index 7d13d10a..00149d3b 100644 --- a/packages/context/src/sl/semantic-layer.service.ts +++ b/packages/context/src/sl/semantic-layer.service.ts @@ -4,8 +4,14 @@ import { noopLogger } from '../core/index.js'; import type { TableUsageOutput } from '../ingest/adapters/historic-sql/skill-schemas.js'; import type { SlConnectionCatalogPort, SlPythonPort } from './ports.js'; import { normalizeSemanticLayerDescriptions } from './description-normalization.js'; -import { isOverlaySource, sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js'; -import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput, SemanticLayerSource } from './types.js'; +import { isOverlaySource, resolvedSourceSchema, sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js'; +import type { + ResolvedSemanticLayerSource, + SemanticLayerColumnOverride, + SemanticLayerQueryExecutionResult, + SemanticLayerQueryInput, + SemanticLayerSource, +} from './types.js'; interface WriteSourceOptions { skipValidation?: boolean; @@ -14,6 +20,30 @@ interface WriteSourceOptions { const SL_DIR_PREFIX = 'semantic-layer'; const CONNECTION_ID_PATTERN = /^[a-zA-Z0-9][a-zA-Z0-9_-]*$/; +export interface LoadAllSourcesResult { + sources: SemanticLayerSource[]; + loadErrors: string[]; +} + +export class UnknownColumnOverrideError extends Error {} +export class ColumnNameCollisionError extends Error {} +export class ConflictingExcludeAndOverrideError extends Error {} +class ComposeContractError extends Error {} + +function isComposeError(error: unknown): boolean { + return ( + error instanceof UnknownColumnOverrideError || + error instanceof ColumnNameCollisionError || + error instanceof ConflictingExcludeAndOverrideError || + error instanceof ComposeContractError + ); +} + +function formatComposeError(filePath: string, error: unknown): string { + const message = error instanceof Error ? error.message : String(error); + return `${filePath}: ${message}`; +} + function formatPortError(error: unknown, fallback: string): string { if (typeof error === 'string') { return error; @@ -37,6 +67,24 @@ function formatPortError(error: unknown, fallback: string): string { return fallback; } +export function toResolvedWire(source: SemanticLayerSource): ResolvedSemanticLayerSource { + const stripped = { + ...source, + columns: source.columns.map((column) => ({ ...column })), + joins: source.joins.map(({ source: _source, ...join }) => join), + } as Record; + delete stripped.inherits_columns_from; + delete stripped.usage; + delete stripped.source_type; + + const parsed = resolvedSourceSchema.safeParse(stripped); + if (!parsed.success) { + const issues = parsed.error.issues.map((issue) => `${issue.path.join('.')}: ${issue.message}`).join('; '); + throw new ComposeContractError(`resolved source '${source.name}' violates the TS/Python contract: ${issues}`); + } + return parsed.data as ResolvedSemanticLayerSource; +} + export class SemanticLayerService { constructor( private readonly configService: KtxFileStorePort, @@ -158,16 +206,17 @@ export class SemanticLayerService { } } - async loadAllSources(connectionId: string): Promise { + async loadAllSources(connectionId: string): Promise { const dir = `${SL_DIR_PREFIX}/${connectionId}`; const schemaDir = `${dir}/_schema`; + const loadErrors: string[] = []; let allFiles: string[]; try { const result = await this.configService.listFiles(dir); allFiles = result.files.filter((f) => f.endsWith('.yaml')); } catch { - return []; + return { sources: [], loadErrors }; } // 1. Load manifest shards from _schema/*.yaml → project to sources @@ -184,7 +233,9 @@ export class SemanticLayerService { } } } catch (e) { - this.logger.warn(`Failed to parse manifest shard ${filePath}: ${e}`); + const message = `Failed to parse manifest shard ${filePath}: ${e instanceof Error ? e.message : String(e)}`; + loadErrors.push(message); + this.logger.warn(message); } } @@ -227,6 +278,7 @@ export class SemanticLayerService { ); } } + toResolvedWire(standalone); sources.set(name, standalone); } else { // Overlay — compose with manifest entry if present @@ -238,11 +290,15 @@ export class SemanticLayerService { } } } catch (e) { - this.logger.warn(`Failed to parse YAML file ${filePath}: ${e}`); + const message = isComposeError(e) + ? formatComposeError(filePath, e) + : `Failed to parse YAML file ${filePath}: ${e instanceof Error ? e.message : String(e)}`; + loadErrors.push(message); + this.logger.warn(message); } } - return Array.from(sources.values()); + return { sources: Array.from(sources.values()), loadErrors }; } /** @@ -622,8 +678,10 @@ export class SemanticLayerService { connectionId: string, proposedSource: SemanticLayerSource, ): Promise<{ errors: string[]; warnings: string[]; perSourceWarnings: Record }> { - const existing = await this.loadAllSources(connectionId); + const loaded = await this.loadAllSources(connectionId); + const existing = loaded.sources; const merged = existing.filter((s) => s.name !== proposedSource.name); + const loadErrors = [...loaded.loadErrors]; // Overlays (no table/sql) must be composed with their manifest base before // validation, otherwise the filter below drops them and the edited source @@ -641,11 +699,27 @@ export class SemanticLayerService { perSourceWarnings: {}, }; } - toPush = composeOverlay(base, { ...proposedSource }); + try { + toPush = composeOverlay(base, { ...proposedSource }); + } catch (error) { + return { + errors: [...loadErrors, formatComposeError(`${proposedSource.name}.yaml`, error)], + warnings: [], + perSourceWarnings: {}, + }; + } } else if (proposedSource.inherits_columns_from) { const base = await this.findManifestEntryByTableRef(connectionId, proposedSource.inherits_columns_from); if (base) { - toPush = enrichColumnsFromManifest(proposedSource, base); + try { + toPush = enrichColumnsFromManifest(proposedSource, base); + } catch (error) { + return { + errors: [...loadErrors, formatComposeError(`${proposedSource.name}.yaml`, error)], + warnings: [], + perSourceWarnings: {}, + }; + } } // Miss is non-fatal — the source ships unenriched, validator will surface // any column-without-type errors via the warehouse probe. @@ -654,37 +728,37 @@ export class SemanticLayerService { const validatable = merged.filter((s) => s.table != null || s.sql != null); if (validatable.length === 0) { - return { errors: [], warnings: [], perSourceWarnings: {} }; + return { errors: loadErrors, warnings: [], perSourceWarnings: {} }; } const dialect = await this.getDialectForConnection(connectionId); try { const { data, error } = await this.python.validateSources({ - sources: validatable, + sources: validatable.map(toResolvedWire), dialect, recently_touched: [proposedSource.name], }); if (error) { const errorMsg = formatPortError(error, 'Unknown validation error'); - return { errors: [errorMsg], warnings: [], perSourceWarnings: {} }; + return { errors: [...loadErrors, errorMsg], warnings: [], perSourceWarnings: {} }; } if (!data) { return { - errors: await this.validatePhysicalTableReferences(connectionId, validatable), + errors: [...loadErrors, ...(await this.validatePhysicalTableReferences(connectionId, validatable))], warnings: [], perSourceWarnings: {}, }; } const physicalErrors = await this.validatePhysicalTableReferences(connectionId, validatable); return { - errors: [...(data.errors ?? []), ...physicalErrors], + errors: [...loadErrors, ...(data.errors ?? []), ...physicalErrors], warnings: data.warnings ?? [], perSourceWarnings: data.per_source_warnings ?? {}, }; } catch (e) { return { - errors: [`Validation call failed: ${e instanceof Error ? e.message : String(e)}`], + errors: [...loadErrors, `Validation call failed: ${e instanceof Error ? e.message : String(e)}`], warnings: [], perSourceWarnings: {}, }; @@ -692,23 +766,23 @@ export class SemanticLayerService { } async validateSourcesForConnection(connectionId: string): Promise<{ errors: string[]; warnings: string[] }> { - const allSources = await this.loadAllSources(connectionId); + const { sources: allSources, loadErrors } = await this.loadAllSources(connectionId); const sources = allSources.filter((source) => source.table != null || source.sql != null); if (sources.length === 0) { - return { errors: [], warnings: [] }; + return { errors: loadErrors, warnings: [] }; } const dialect = await this.getDialectForConnection(connectionId); - const { data, error } = await this.python.validateSources({ sources, dialect }); + const { data, error } = await this.python.validateSources({ sources: sources.map(toResolvedWire), dialect }); if (error) { - return { errors: [formatPortError(error, 'Unknown validation error')], warnings: [] }; + return { errors: [...loadErrors, formatPortError(error, 'Unknown validation error')], warnings: [] }; } if (!data) { - return { errors: await this.validatePhysicalTableReferences(connectionId, sources), warnings: [] }; + return { errors: [...loadErrors, ...(await this.validatePhysicalTableReferences(connectionId, sources))], warnings: [] }; } const physicalErrors = await this.validatePhysicalTableReferences(connectionId, sources); return { - errors: [...(data.errors ?? []), ...physicalErrors], + errors: [...loadErrors, ...(data.errors ?? []), ...physicalErrors], warnings: data.warnings ?? [], }; } @@ -802,6 +876,7 @@ export class SemanticLayerService { } else { // Overlay — check references against manifest const excludeColumns = (data.exclude_columns as string[]) ?? []; + const columnOverrides = (data.column_overrides as Array<{ name: string }> | undefined) ?? []; const disableJoins = (data.disable_joins as string[]) ?? []; const cols = manifestColumns.get(name); const joins = manifestJoins.get(name); @@ -817,6 +892,16 @@ export class SemanticLayerService { } } + const excluded = new Set(excludeColumns); + for (const override of columnOverrides) { + if (!cols.has(override.name)) { + warnings.push(`${name}: column_overrides references non-existent column '${override.name}'`); + } + if (excluded.has(override.name)) { + warnings.push(`${name}: column '${override.name}' appears in both exclude_columns and column_overrides`); + } + } + for (const joinOn of disableJoins) { const normalized = joinOn.replace(/\s+/g, ' ').trim(); if (!joins?.has(normalized)) { @@ -999,7 +1084,10 @@ export class SemanticLayerService { */ async executeQuery(connectionId: string, query: SemanticLayerQueryInput): Promise { // 1. Load sources, filtering out sources with no table or sql - const allSources = await this.loadAllSources(connectionId); + const { sources: allSources, loadErrors } = await this.loadAllSources(connectionId); + if (loadErrors.length > 0) { + throw new Error(`Semantic layer source load failed: ${loadErrors.join('; ')}`); + } const sources = allSources.filter((s) => { if (!s.table && !s.sql) { this.logger.warn(`Skipping source "${s.name}" with no table or sql defined`); @@ -1021,7 +1109,7 @@ export class SemanticLayerService { // 3. Generate SQL via python SL engine const { data: slResult, error: slError } = await this.python.query({ - sources, + sources: sources.map(toResolvedWire), query, dialect, }); @@ -1092,18 +1180,20 @@ export function projectManifestEntry(name: string, entry: ManifestTableEntry): S const grain = pkColumns.length > 0 ? pkColumns : entry.columns.map((c) => c.name); // Table-level dbt config from manifest shards is surfaced on the source for search / tools. - return { + const source: SemanticLayerSource = { name, table: entry.table, descriptions: entry.descriptions, grain, columns, - joins: (entry.joins ?? []).map((j) => ({ to: j.to, on: j.on, relationship: j.relationship, source: j.source })), + joins: (entry.joins ?? []).map((j) => ({ to: j.to, on: j.on, relationship: j.relationship })), measures: [], ...(entry.tags?.dbt?.length ? { tags: entry.tags } : {}), ...(entry.freshness?.dbt ? { freshness: entry.freshness } : {}), ...(entry.usage ? { usage: entry.usage } : {}), }; + toResolvedWire(source); + return source; } function normalizeWs(s: string): string { @@ -1331,6 +1421,7 @@ const COMPOSE_KNOWN_KEYS = new Set([ 'descriptions', 'grain', 'columns', + 'column_overrides', 'joins', 'measures', 'segments', @@ -1365,14 +1456,48 @@ export function composeOverlay(base: SemanticLayerSource, overlay: Record !excluded.has(c.name)); + const columnOverrides = (normalizedOverlay.column_overrides as SemanticLayerColumnOverride[] | undefined) ?? []; + const overrideNames = columnOverrides.map((column) => column.name); + const conflictingOverrides = overrideNames.filter((name) => excluded.has(name)); + if (conflictingOverrides.length > 0) { + throw new ConflictingExcludeAndOverrideError( + `column_overrides conflict with exclude_columns for '${base.name}': ${conflictingOverrides.join(', ')}`, + ); + } - // Append overlay computed columns - const overlayColumns = (normalizedOverlay.columns as SemanticLayerSource['columns'] | undefined) ?? []; - columns = [...columns, ...overlayColumns]; - result.columns = columns; + const baseByLowerName = new Map(base.columns.map((column) => [column.name.toLowerCase(), column])); + const columnsByLowerName = new Map( + result.columns.filter((column) => !excluded.has(column.name)).map((column) => [column.name.toLowerCase(), column]), + ); + + for (const override of columnOverrides) { + const key = override.name.toLowerCase(); + const baseColumn = baseByLowerName.get(key); + if (!baseColumn) { + throw new UnknownColumnOverrideError( + `column '${override.name}' in column_overrides does not exist on manifest source '${base.name}'`, + ); + } + const baseDescriptions = baseColumn.descriptions ?? {}; + const overrideDescriptions = override.descriptions ?? {}; + const merged = { ...baseColumn, ...override }; + if (Object.keys(baseDescriptions).length > 0 || Object.keys(overrideDescriptions).length > 0) { + merged.descriptions = { ...baseDescriptions, ...overrideDescriptions }; + } + columnsByLowerName.set(key, merged); + } + + const computedColumns = (normalizedOverlay.columns as SemanticLayerSource['columns'] | undefined) ?? []; + for (const column of computedColumns) { + if (baseByLowerName.has(column.name.toLowerCase())) { + throw new ColumnNameCollisionError( + `column '${column.name}' in columns patches a manifest column on '${base.name}' — move it to 'column_overrides:'`, + ); + } + columnsByLowerName.set(column.name.toLowerCase(), column); + } + result.columns = [...columnsByLowerName.values()]; // Measures from overlay only result.measures = (normalizedOverlay.measures as SemanticLayerSource['measures'] | undefined) ?? []; @@ -1401,6 +1526,12 @@ export function composeOverlay(base: SemanticLayerSource, overlay: Record !existingKeys.has(`${j.to}::${normalizeWs(j.on)}`)); result.joins = [...manifestJoins, ...newJoins]; + const overlayParse = sourceOverlaySchema.safeParse(normalizedOverlay); + if (!overlayParse.success) { + const issues = overlayParse.error.issues.map((issue) => `${issue.path.join('.')}: ${issue.message}`).join('; '); + throw new ComposeContractError(`overlay for '${base.name}' violates the authoring schema: ${issues}`); + } + toResolvedWire(result); return result; } @@ -1464,5 +1595,7 @@ export function enrichColumnsFromManifest( } return merged; }); - return { ...source, columns: enrichedColumns }; + const enriched = { ...source, columns: enrichedColumns }; + toResolvedWire(enriched); + return enriched; } diff --git a/packages/context/src/sl/tools/sl-discover.tool.test.ts b/packages/context/src/sl/tools/sl-discover.tool.test.ts index 3277d45d..1b961141 100644 --- a/packages/context/src/sl/tools/sl-discover.tool.test.ts +++ b/packages/context/src/sl/tools/sl-discover.tool.test.ts @@ -7,7 +7,7 @@ import { SlDiscoverTool } from './sl-discover.tool.js'; function makeTool() { const semanticLayerService = { listConnectionIdsWithNames: vi.fn(async () => [] as Array<{ id: string; name: string; connectionType: string }>), - loadAllSources: vi.fn(async () => [] as SemanticLayerSource[]), + loadAllSources: vi.fn(async () => ({ sources: [] as SemanticLayerSource[], loadErrors: [] })), }; const slSearchService = { search: vi.fn(async () => []), @@ -53,7 +53,8 @@ describe('SlDiscoverTool - session-scoped reads', () => { listConnectionIdsWithNames: vi.fn().mockResolvedValue([ { id: 'warehouse', name: 'warehouse', connectionType: 'postgres' }, ]), - loadAllSources: vi.fn().mockResolvedValue([ + loadAllSources: vi.fn().mockResolvedValue({ + sources: [ { name: 'orders', table: 'public.orders', @@ -62,7 +63,9 @@ describe('SlDiscoverTool - session-scoped reads', () => { measures: [], joins: [], }, - ]), + ], + loadErrors: [], + }), }; const result = await tool.call({}, makeContext({ session: makeSession(sessionSemanticLayerService) })); diff --git a/packages/context/src/sl/tools/sl-discover.tool.ts b/packages/context/src/sl/tools/sl-discover.tool.ts index 74570c2e..fb55175d 100644 --- a/packages/context/src/sl/tools/sl-discover.tool.ts +++ b/packages/context/src/sl/tools/sl-discover.tool.ts @@ -101,7 +101,7 @@ Use this to understand what data is available before querying through the semant // If inspecting a specific source — show the SL interface (columns, measures, joins) // without the raw SQL. Use `sl_read_source` to see the full YAML including SQL. if (sourceName) { - const sources = await semanticLayerService.loadAllSources(connectionId); + const { sources } = await semanticLayerService.loadAllSources(connectionId); const source = sources.find((s) => s.name === sourceName); if (!source) { return { @@ -151,7 +151,7 @@ Use this to understand what data is available before querying through the semant // Load sources from all connections in parallel const results = await Promise.all( connections.map(async (conn) => { - const sources = await semanticLayerService.loadAllSources(conn.id); + const { sources } = await semanticLayerService.loadAllSources(conn.id); let filtered = sources; if (query) { filtered = await this.filterByQuery(conn.id, sources, query); @@ -213,7 +213,7 @@ Use this to understand what data is available before querying through the semant connectionName: string, query?: string, ): Promise> { - const sources = await semanticLayerService.loadAllSources(connectionId); + const { sources } = await semanticLayerService.loadAllSources(connectionId); if (sources.length === 0) { return { diff --git a/packages/context/src/sl/tools/sl-edit-source.tool.test.ts b/packages/context/src/sl/tools/sl-edit-source.tool.test.ts index d90f0356..49a8f757 100644 --- a/packages/context/src/sl/tools/sl-edit-source.tool.test.ts +++ b/packages/context/src/sl/tools/sl-edit-source.tool.test.ts @@ -11,7 +11,7 @@ function makeTool(overrides: any = {}) { }), validateWithProposedSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }), writeSource: vi.fn().mockResolvedValue({ commitHash: 'c1' }), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), deleteSource: vi.fn().mockResolvedValue(undefined), isManifestBacked: vi.fn().mockResolvedValue(false), ...overrides.semanticLayerService, @@ -44,7 +44,7 @@ function makeSession(overrides: Partial = {}): ToolSession { }), validateWithProposedSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }), writeSource: vi.fn().mockResolvedValue({ commitHash: 'c1' }), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), } as any, wikiService: {} as any, configService: {} as any, @@ -191,9 +191,10 @@ describe('SlEditSourceTool — manifest-backed source without overlay', () => { expect(joinedErrors).toContain('manifest'); expect(joinedErrors).toContain('sl_write_source'); expect(joinedErrors).toContain('overlay'); - // Overlay shape: only name + measures/segments/description + // Overlay shape: name plus overlay-only fields. expect(joinedErrors).toContain('measures'); expect(joinedErrors).toContain('segments'); + expect(joinedErrors).toContain('column_overrides'); }); it('still returns the plain "Source not found" error for truly-missing names', async () => { diff --git a/packages/context/src/sl/tools/sl-edit-source.tool.ts b/packages/context/src/sl/tools/sl-edit-source.tool.ts index 30972707..813072c0 100644 --- a/packages/context/src/sl/tools/sl-edit-source.tool.ts +++ b/packages/context/src/sl/tools/sl-edit-source.tool.ts @@ -127,7 +127,8 @@ If no source exists yet, use sl_write_source instead — this tool will reject t ` - name: `, ` expr: ""`, ` description: ""`, - `Overlay shape: "name:" plus any of "measures:", "segments:", "descriptions:". Do NOT include "sql:", "table:", "grain:", "columns:", or "joins:" — those are inherited from the manifest.`, + `Overlay shape: "name:" plus any of "measures:", "segments:", "descriptions:", "joins:", "disable_joins:", "exclude_columns:", "column_overrides:", or computed-only "columns:" entries with expr + type.`, + `Do NOT include "sql:", "table:", "grain:", or base-table "columns:" — those are inherited from the manifest.`, ].join('\n'), ], sourceName, @@ -181,7 +182,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t const result = await semanticLayerService.writeSource(connectionId, source, author, authorEmail, commitMessage); if (!skipIndex) { - const allSources = await semanticLayerService.loadAllSources(connectionId); + const { sources: allSources } = await semanticLayerService.loadAllSources(connectionId); await this.slSearchService.indexSources(connectionId, allSources).catch(() => {}); } diff --git a/packages/context/src/sl/tools/sl-validate.tool.test.ts b/packages/context/src/sl/tools/sl-validate.tool.test.ts index 190a7e12..b6725c08 100644 --- a/packages/context/src/sl/tools/sl-validate.tool.test.ts +++ b/packages/context/src/sl/tools/sl-validate.tool.test.ts @@ -34,7 +34,7 @@ describe('SlValidateTool — session-aware touched-set filtering', () => { { name: 'customers', table: 'x.customers', grain: ['id'], columns: [], joins: [], measures: [] }, ]; const serviceMock = { - loadAllSources: vi.fn().mockResolvedValue(sources), + loadAllSources: vi.fn().mockResolvedValue({ sources, loadErrors: [] }), validateSourcesForConnection: vi.fn().mockResolvedValue({ errors: ['orders: missing join target', 'customers: invalid grain'], warnings: ['orders: disconnected-components warning'], diff --git a/packages/context/src/sl/tools/sl-validate.tool.ts b/packages/context/src/sl/tools/sl-validate.tool.ts index 4b457f0c..8117fbcf 100644 --- a/packages/context/src/sl/tools/sl-validate.tool.ts +++ b/packages/context/src/sl/tools/sl-validate.tool.ts @@ -62,7 +62,7 @@ Checks: all join targets exist, grain is valid, no missing references. const semanticLayerService = context.session?.semanticLayerService ?? this.semanticLayerService; - const sources = await semanticLayerService.loadAllSources(connectionId); + const { sources } = await semanticLayerService.loadAllSources(connectionId); if (sources.length === 0) { return this.buildOutput(true, [], '(all)', { validationErrors: ['No sources found for this connection.'], diff --git a/packages/context/src/sl/tools/sl-warehouse-validation.test.ts b/packages/context/src/sl/tools/sl-warehouse-validation.test.ts index 16d4ec7f..5796cdb7 100644 --- a/packages/context/src/sl/tools/sl-warehouse-validation.test.ts +++ b/packages/context/src/sl/tools/sl-warehouse-validation.test.ts @@ -8,7 +8,7 @@ function makeDeps(opts: { sourceYaml: string; executeQuery: ReturnType s.name === sourceName); } catch (e) { errors.push( diff --git a/packages/context/src/sl/tools/sl-write-source.tool.test.ts b/packages/context/src/sl/tools/sl-write-source.tool.test.ts index d9c58225..6f9cdbc0 100644 --- a/packages/context/src/sl/tools/sl-write-source.tool.test.ts +++ b/packages/context/src/sl/tools/sl-write-source.tool.test.ts @@ -8,7 +8,7 @@ function makeTool(overrides: Partial> = {}) { listManifestSourceNames: vi.fn().mockResolvedValue(['ACCOUNTS', 'ORDERS']), isManifestBacked: vi.fn().mockResolvedValue(false), loadSource: vi.fn().mockResolvedValue(null), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), validateWithProposedSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }), writeSource: vi.fn().mockResolvedValue({ commitHash: 'c1' }), deleteSource: vi.fn().mockResolvedValue(undefined), @@ -59,7 +59,7 @@ describe('SlWriteSourceTool — session gating', () => { actions: [], semanticLayerService: { loadSource: vi.fn().mockResolvedValue(null), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), validateWithProposedSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }), writeSource: vi.fn().mockResolvedValue({ commitHash: 'c1' }), deleteSource: vi.fn().mockResolvedValue(undefined), @@ -213,7 +213,7 @@ describe('SlWriteSourceTool — session gating', () => { ingest: { runId: 'run-1', jobId: 'job-1', syncId: 'sync-1', sourceKey: 'metabase' }, semanticLayerService: { loadSource: vi.fn().mockResolvedValue(null), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), validateWithProposedSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }), writeSource: vi.fn().mockResolvedValue({ commitHash: 'c1' }), deleteSource: vi.fn().mockResolvedValue(undefined), diff --git a/packages/context/src/sl/tools/sl-write-source.tool.ts b/packages/context/src/sl/tools/sl-write-source.tool.ts index e7efb357..357e7ca0 100644 --- a/packages/context/src/sl/tools/sl-write-source.tool.ts +++ b/packages/context/src/sl/tools/sl-write-source.tool.ts @@ -23,7 +23,9 @@ const slWriteSourceInputSchema = z.object({ .describe('Name of the source to create, edit, or delete'), source: sourceInputSchema .optional() - .describe('Source definition (standalone with table/sql) or overlay (measures, computed columns, etc.)'), + .describe( + 'Source definition (standalone with table/sql) or overlay (measures, column_overrides, computed columns, etc.)', + ), delete: z.boolean().optional().describe('Set to true to delete this source entirely'), rawPaths: z .array(z.string().min(1)) @@ -73,7 +75,8 @@ If the source already exists, this tool will overwrite it with the new definitio - table: For physical table/view sources (e.g., "public.orders"). Mutually exclusive with sql. - sql: For SQL-based sources (the SQL query). Mutually exclusive with table. - grain: What one row represents (e.g., ["id"], ["customer_id", "product_id"]) -- columns: All columns with type (string/number/time/boolean) and optional descriptions +- columns: All columns with type (string/number/time/boolean) and optional descriptions. On overlays, columns are computed-only and require expr + type. +- column_overrides: Overlay-only metadata patches for existing manifest columns (descriptions, role, visibility, constraints, enum_values, tests). Do not include type or expr. - joins: Relationships to other sources (to, on, relationship: many_to_one/one_to_many/one_to_one) - measures: Pre-defined aggregations (name, expr like "sum(amount)", optional filter, optional segments — bare names of segments defined on the same source, optional description) - segments: Named, reusable boolean predicates scoped to this source (name, expr — a SQL boolean over this source's columns, optional description). A measure references one with \`segments: [name]\`; a query references one with the dotted form \`source.segment_name\`. Use when the same predicate appears on 3+ measures — e.g. extract \`is_paid = true and is_refunded = '0'\` as \`segments: [{name: paid_non_refunded, expr: "..."}]\` and have each measure use \`segments: [paid_non_refunded]\` instead of re-typing the predicate inside \`sum(case when ... then x end)\`. Segments are predicates only — they cannot be selected as dimensions or grouped by; if you need to group by the predicate, add a \`columns[]\` entry instead. @@ -113,7 +116,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co try { await semanticLayerService.deleteSource(connectionId, sourceName, author, authorEmail); if (!skipIndex) { - const allSources = await semanticLayerService.loadAllSources(connectionId); + const { sources: allSources } = await semanticLayerService.loadAllSources(connectionId); await this.slSearchService.indexSources(connectionId, allSources).catch(() => {}); } if (context.session) { @@ -210,7 +213,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co ); if (!skipIndex) { - const allSources = await semanticLayerService.loadAllSources(connectionId); + const { sources: allSources } = await semanticLayerService.loadAllSources(connectionId); await this.slSearchService.indexSources(connectionId, allSources).catch(() => {}); } @@ -317,8 +320,9 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co `Error: cannot write "${sourceName}" as a standalone source — a manifest entry with that name already exists.`, ` Writing standalone would drop the manifest's columns and joins, leaving only what you list here.`, `To add measures/segments on top of the manifest, rewrite this YAML as an overlay:`, - ` - Remove "sql:", "table:", "grain:", "columns:", and "joins:".`, - ` - Keep only "name:", plus "measures:", "segments:", and/or "descriptions:".`, + ` - Remove "sql:", "table:", "grain:", and base-table "columns:".`, + ` - Keep "name:" plus "measures:", "segments:", "descriptions:", "joins:", "disable_joins:",`, + ` "exclude_columns:", "column_overrides:", and/or computed-only "columns:" entries with expr + type.`, ` - The manifest's schema is inherited automatically.`, `If you really need a different base table, use a different source name.`, ].join('\n'); diff --git a/packages/context/src/sl/types.ts b/packages/context/src/sl/types.ts index 7f153c58..da1ea9aa 100644 --- a/packages/context/src/sl/types.ts +++ b/packages/context/src/sl/types.ts @@ -47,6 +47,32 @@ export interface SemanticLayerSource { usage?: TableUsageOutput; } +type SemanticLayerColumn = SemanticLayerSource['columns'][number]; +type SemanticLayerJoin = SemanticLayerSource['joins'][number]; + +export interface SemanticLayerColumnOverride { + name: string; + role?: string; + visibility?: string; + descriptions?: Record; + constraints?: { dbt?: { not_null?: boolean; unique?: boolean } }; + enum_values?: { dbt?: string[] }; + tests?: { + dbt?: Array<{ name: string; package: string; kwargs?: Record }>; + dbt_by_package?: Record; + }; +} + +export type ResolvedSemanticLayerSource = Omit< + SemanticLayerSource, + 'inherits_columns_from' | 'usage' | 'joins' +> & { + table?: string; + sql?: string; + columns: Array; + joins: Array>; +}; + export interface SemanticLayerQueryInput { measures: Array; dimensions: Array; diff --git a/packages/llm/src/index.ts b/packages/llm/src/index.ts index 164ba183..ab2ad341 100644 --- a/packages/llm/src/index.ts +++ b/packages/llm/src/index.ts @@ -1,6 +1,7 @@ export { createKtxEmbeddingProvider } from './embedding-provider.js'; export { runKtxEmbeddingHealthCheck } from './embedding-health.js'; -export { KtxMessageBuilder } from './message-builder.js'; +export { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js'; +export type { KtxSplitSystemMessagesResult } from './message-builder.js'; export type { KtxEmbeddingHealthCheckOptions, KtxEmbeddingHealthCheckResult } from './embedding-health.js'; export type { KtxEmbeddingProviderDeps } from './embedding-provider.js'; export type { KtxLlmHealthCheckDeps, KtxLlmHealthCheckOptions, KtxLlmHealthCheckResult } from './model-health.js'; diff --git a/packages/llm/src/message-builder.test.ts b/packages/llm/src/message-builder.test.ts index bc13a7e1..60f7d948 100644 --- a/packages/llm/src/message-builder.test.ts +++ b/packages/llm/src/message-builder.test.ts @@ -1,6 +1,6 @@ import type { ModelMessage } from 'ai'; import { describe, expect, it } from 'vitest'; -import { KtxMessageBuilder } from './message-builder.js'; +import { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js'; import { createKtxLlmProvider } from './model-provider.js'; function makeBuilder(overrides: Parameters[0]['promptCaching'] = {}) { @@ -111,3 +111,36 @@ describe('KtxMessageBuilder.build', () => { expect((out.tools.z as { providerOptions: any }).providerOptions.anthropic.cacheControl.ttl).toBe('5m'); }); }); + +describe('splitKtxSystemMessages', () => { + it('returns undefined system when no system messages are present', () => { + const split = splitKtxSystemMessages([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi' }, + ]); + expect(split.system).toBeUndefined(); + expect(split.messages).toHaveLength(2); + }); + + it('returns a single system message object when one system message is present, preserving providerOptions', () => { + const systemMessage = { + role: 'system' as const, + content: 'You are helpful.', + providerOptions: { anthropic: { cacheControl: { type: 'ephemeral' } } }, + }; + const split = splitKtxSystemMessages([systemMessage, { role: 'user', content: 'hello' }]); + expect(split.system).toBe(systemMessage); + expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]); + }); + + it('returns an array of system messages when multiple are present, in order', () => { + const split = splitKtxSystemMessages([ + { role: 'system', content: 'cached' }, + { role: 'system', content: 'fresh' }, + { role: 'user', content: 'hello' }, + ]); + expect(Array.isArray(split.system)).toBe(true); + expect(split.system).toHaveLength(2); + expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]); + }); +}); diff --git a/packages/llm/src/message-builder.ts b/packages/llm/src/message-builder.ts index a98a0375..f3336f3c 100644 --- a/packages/llm/src/message-builder.ts +++ b/packages/llm/src/message-builder.ts @@ -1,7 +1,29 @@ -import type { LanguageModel, ModelMessage, ToolSet } from 'ai'; +import type { LanguageModel, ModelMessage, SystemModelMessage, ToolSet } from 'ai'; import { isAnthropicProtocolModel } from './model-provider.js'; import type { KtxLlmProvider, KtxPromptCacheTtl, KtxPromptParts } from './types.js'; +export interface KtxSplitSystemMessagesResult { + system: SystemModelMessage | SystemModelMessage[] | undefined; + messages: ModelMessage[]; +} + +export function splitKtxSystemMessages(messages: readonly ModelMessage[]): KtxSplitSystemMessagesResult { + const systemMessages: SystemModelMessage[] = []; + const otherMessages: ModelMessage[] = []; + for (const message of messages) { + if (message.role === 'system') { + systemMessages.push(message); + } else { + otherMessages.push(message); + } + } + return { + system: + systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages, + messages: otherMessages, + }; +} + type ToolMap = ToolSet | Record>; interface KtxMessageBuilderOptions { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index de92ff9e..9c66ab51 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -376,6 +376,9 @@ importers: '@vitest/coverage-v8': specifier: ^4.1.6 version: 4.1.6(vitest@4.1.6) + ajv: + specifier: 8.20.0 + version: 8.20.0 typescript: specifier: ^6.0.3 version: 6.0.3 diff --git a/python/ktx-sl/semantic_layer/__main__.py b/python/ktx-sl/semantic_layer/__main__.py index 22ec0dd6..c797009c 100644 --- a/python/ktx-sl/semantic_layer/__main__.py +++ b/python/ktx-sl/semantic_layer/__main__.py @@ -1,3 +1,22 @@ -from semantic_layer.cli import main +from __future__ import annotations -main() +import json +import sys + +from semantic_layer.cli import main as cli_main +from semantic_layer.models import SourceDefinition + + +def dump_schema() -> None: + json.dump( + SourceDefinition.model_json_schema(), sys.stdout, indent=2, sort_keys=True + ) + sys.stdout.write("\n") + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] in {"dump-schema", "schema"}: + sys.argv.pop(1) + dump_schema() + else: + cli_main() diff --git a/python/ktx-sl/semantic_layer/loader.py b/python/ktx-sl/semantic_layer/loader.py index 02b5b2c4..55a3a0ee 100644 --- a/python/ktx-sl/semantic_layer/loader.py +++ b/python/ktx-sl/semantic_layer/loader.py @@ -87,18 +87,23 @@ class SourceLoader: sources[name] = SourceDefinition(**data) else: # Overlay — validate and compose with matching manifest entry - errors = validate_overlay(data) - if errors: - raise ValueError( - f"Invalid overlay '{name}' in {path}: {'; '.join(errors)}" - ) base = sources.get(name) if base: + errors = validate_overlay(data, {c.name for c in base.columns}) + if errors: + raise ValueError( + f"Invalid overlay '{name}' in {path}: {'; '.join(errors)}" + ) ( sources[name], description_sources[name], ) = self._compose(base, data, description_sources.get(name)) else: + errors = validate_overlay(data) + if errors: + raise ValueError( + f"Invalid overlay '{name}' in {path}: {'; '.join(errors)}" + ) logger.warning( "Orphan overlay '%s' in %s: no matching manifest entry, skipping", name, @@ -149,12 +154,55 @@ class SourceLoader: description_sources or None, ) - # Filter columns + excluded = set(overlay.get("exclude_columns", [])) + overrides = overlay.get("column_overrides", []) + override_names = {override.get("name") for override in overrides} + conflicts = sorted(name for name in override_names if name in excluded) + if conflicts: + raise ValueError( + "column_overrides conflict with exclude_columns: " + + ", ".join(conflicts) + ) + + base_by_name = {column.name: column for column in base.columns} + + for override in overrides: + name = override.get("name") + base_column = base_by_name.get(name) + if base_column is None: + raise ValueError( + f"column '{name}' in column_overrides does not exist on manifest source '{base.name}'" + ) + excluded = set(overlay.get("exclude_columns", [])) source.columns = [c for c in source.columns if c.name not in excluded] - # Append computed columns (overlay columns with expr) + columns_by_name = {column.name: column for column in source.columns} + + for override in overrides: + name = override["name"] + base_column = base_by_name[name] + merged = base_column.model_dump(mode="python", exclude_none=True) + base_descriptions = merged.get("descriptions") or {} + override_data = dict(override) + override_descriptions = override_data.get("descriptions") or {} + merged.update(override_data) + if base_descriptions or override_descriptions: + merged["descriptions"] = { + **base_descriptions, + **override_descriptions, + } + columns_by_name[name] = SourceColumn(**merged) + source.columns = list(columns_by_name.values()) + + # Append computed columns. Manifest column names cannot be reused here; + # use column_overrides for metadata patches. for col in overlay.get("columns", []): + name = col.get("name") + if name in base_by_name: + raise ValueError( + f"column '{name}' in columns patches a manifest column on '{base.name}' — move it to 'column_overrides:'" + ) source.columns.append(SourceColumn(**col)) # Set measures @@ -181,6 +229,11 @@ class SourceLoader: ] source.joins = manifest_joins + new_joins + if not source.table and not source.sql: + raise ValueError("resolved source must have 'table' or 'sql'") + if source.table and source.sql: + raise ValueError("'table' and 'sql' are mutually exclusive") + return source, (description_sources or None) def _validate_cross_references(self, sources: dict[str, SourceDefinition]) -> None: diff --git a/python/ktx-sl/semantic_layer/manifest.py b/python/ktx-sl/semantic_layer/manifest.py index 432019e8..3cc0c299 100644 --- a/python/ktx-sl/semantic_layer/manifest.py +++ b/python/ktx-sl/semantic_layer/manifest.py @@ -143,7 +143,9 @@ class Manifest(BaseModel): # ── Projection ────────────────────────────────────────────────────── -def validate_overlay(data: dict) -> list[str]: +def validate_overlay( + data: dict, manifest_column_names: set[str] | None = None +) -> list[str]: """Validate that overlay data doesn't contain structural fields. Returns a list of error messages (empty if valid). @@ -162,11 +164,26 @@ def validate_overlay(data: dict) -> list[str]: errors.append( f"Overlay column '{col.get('name', '?')}' must use 'descriptions'" ) - if "type" in col and "expr" not in col: + if "expr" not in col: errors.append( - f"Overlay column '{col.get('name', '?')}' specifies 'type' without 'expr' " - f"(structural types are inherited from manifest — only computed columns may specify a type)" + f"Overlay column '{col.get('name', '?')}' in 'columns' must define " + f"'expr' and 'type' (use 'column_overrides' to patch manifest columns)" ) + if "type" not in col: + errors.append( + f"Overlay column '{col.get('name', '?')}' in 'columns' must define " + f"'type' and 'expr' (use 'column_overrides' to patch manifest columns)" + ) + for col in data.get("column_overrides", []): + name = col.get("name", "?") + if "description" in col: + errors.append(f"Column override '{name}' must use 'descriptions'") + if "type" in col: + errors.append(f"Column override '{name}' must not contain 'type'") + if "expr" in col: + errors.append(f"Column override '{name}' must not contain 'expr'") + if manifest_column_names is not None and name not in manifest_column_names: + errors.append(f"Column override '{name}' does not match a manifest column") return errors diff --git a/python/ktx-sl/semantic_layer/models.py b/python/ktx-sl/semantic_layer/models.py index 7e922933..08e9fd42 100644 --- a/python/ktx-sl/semantic_layer/models.py +++ b/python/ktx-sl/semantic_layer/models.py @@ -3,7 +3,7 @@ from __future__ import annotations from enum import Enum from typing import Any, Literal -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator # ── Source Definition Models ────────────────────────────────────────── @@ -105,6 +105,8 @@ class DefaultTimeDimensionDbt(BaseModel): class SourceDefinition(BaseModel): + model_config = ConfigDict(extra="forbid") + name: str description: str | None = None descriptions: dict[str, str] | None = None @@ -123,6 +125,8 @@ class SourceDefinition(BaseModel): def validate_source(self) -> SourceDefinition: if self.description is None: self.description = _resolve_description_map(self.descriptions) + if not self.table and not self.sql: + raise ValueError("resolved source must have 'table' or 'sql'") if self.table and self.sql: raise ValueError("'table' and 'sql' are mutually exclusive") if not self.grain: diff --git a/python/ktx-sl/tests/test_loader.py b/python/ktx-sl/tests/test_loader.py index 73c5c23a..d69540d0 100644 --- a/python/ktx-sl/tests/test_loader.py +++ b/python/ktx-sl/tests/test_loader.py @@ -148,11 +148,21 @@ class TestLoaderEdgeCases: with open(Path(tmpdir) / "test.yaml", "w") as f: yaml.dump(data, f) loader = SourceLoader(tmpdir) - try: - sources = loader.load_all() - assert "test" in sources - except Exception: - pass + with pytest.raises(Exception, match="unknown_field"): + loader.load_all() + + def test_source_requires_table_or_sql(self): + with tempfile.TemporaryDirectory() as tmpdir: + data = { + "name": "test", + "grain": ["id"], + "columns": [{"name": "id", "type": "number"}], + } + with open(Path(tmpdir) / "test.yaml", "w") as f: + yaml.dump(data, f) + loader = SourceLoader(tmpdir) + with pytest.raises(Exception, match="table.*sql"): + loader.load_file(Path(tmpdir) / "test.yaml") def test_subdirectory_sources(self): with tempfile.TemporaryDirectory() as tmpdir: diff --git a/python/ktx-sl/tests/test_manifest.py b/python/ktx-sl/tests/test_manifest.py index 1007fc89..83420cbb 100644 --- a/python/ktx-sl/tests/test_manifest.py +++ b/python/ktx-sl/tests/test_manifest.py @@ -205,12 +205,15 @@ class TestValidateOverlay: "descriptions": {"user": "Revenue-bearing orders"}, "grain": ["id"], "measures": [{"name": "revenue", "expr": "sum(total)"}], + "column_overrides": [ + {"name": "status", "descriptions": {"user": "Order lifecycle status"}} + ], "columns": [ {"name": "is_high_value", "expr": "total > 1000", "type": "boolean"} ], "exclude_columns": ["status"], } - errors = validate_overlay(data) + errors = validate_overlay(data, {"status", "total"}) assert errors == [] def test_validate_overlay_rejects_table(self): @@ -225,14 +228,13 @@ class TestValidateOverlay: assert len(errors) == 1 assert "sql" in errors[0].lower() - def test_validate_overlay_rejects_type_without_expr(self): + def test_validate_overlay_rejects_column_without_expr(self): data = { "name": "orders", "columns": [{"name": "status", "type": "string"}], } errors = validate_overlay(data) assert len(errors) == 1 - assert "type" in errors[0].lower() assert "expr" in errors[0].lower() def test_validate_overlay_allows_type_with_expr(self): @@ -243,6 +245,33 @@ class TestValidateOverlay: errors = validate_overlay(data) assert errors == [] + def test_validate_overlay_rejects_column_override_structural_fields(self): + data = { + "name": "orders", + "column_overrides": [ + { + "name": "status", + "description": "Status", + "type": "string", + "expr": "status", + } + ], + } + errors = validate_overlay(data, {"status"}) + assert len(errors) == 3 + assert "descriptions" in errors[0] + assert "type" in errors[1] + assert "expr" in errors[2] + + def test_validate_overlay_rejects_unknown_column_override(self): + data = { + "name": "orders", + "column_overrides": [{"name": "missing", "descriptions": {"user": "Nope"}}], + } + errors = validate_overlay(data, {"status"}) + assert len(errors) == 1 + assert "does not match" in errors[0] + # ── Two-Tier Loading Tests ───────────────────────────────────────── @@ -502,6 +531,77 @@ class TestTwoTierLoading: assert hv.expr == "total > 1000" assert hv.type == "boolean" + def test_overlay_column_overrides_patch_manifest_columns(self, tmp_path: Path): + schema_dir = tmp_path / "_schema" + _write_yaml(schema_dir / "public.yaml", _manifest_tables()) + + overlay = { + "name": "orders", + "column_overrides": [ + {"name": "status", "descriptions": {"user": "Order lifecycle status"}} + ], + } + _write_yaml(tmp_path / "orders.yaml", overlay) + _write_yaml(tmp_path / "customers.yaml", {"name": "customers"}) + + loader = SourceLoader(tmp_path) + sources = loader.load_all() + + status = next(c for c in sources["orders"].columns if c.name == "status") + assert status.type == "string" + assert status.description == "Order lifecycle status" + assert status.descriptions == {"user": "Order lifecycle status"} + + def test_overlay_rejects_unknown_column_override(self, tmp_path: Path): + schema_dir = tmp_path / "_schema" + _write_yaml(schema_dir / "public.yaml", _manifest_tables()) + + overlay = { + "name": "orders", + "column_overrides": [ + {"name": "missing", "descriptions": {"user": "No such column"}} + ], + } + _write_yaml(tmp_path / "orders.yaml", overlay) + _write_yaml(tmp_path / "customers.yaml", {"name": "customers"}) + + loader = SourceLoader(tmp_path) + with pytest.raises(ValueError, match="Column override 'missing'"): + loader.load_all() + + def test_overlay_rejects_computed_column_name_collision(self, tmp_path: Path): + schema_dir = tmp_path / "_schema" + _write_yaml(schema_dir / "public.yaml", _manifest_tables()) + + overlay = { + "name": "orders", + "columns": [{"name": "status", "type": "string", "expr": "status"}], + } + _write_yaml(tmp_path / "orders.yaml", overlay) + _write_yaml(tmp_path / "customers.yaml", {"name": "customers"}) + + loader = SourceLoader(tmp_path) + with pytest.raises(ValueError, match="move it to 'column_overrides:'"): + loader.load_all() + + def test_overlay_rejects_exclude_override_conflict(self, tmp_path: Path): + schema_dir = tmp_path / "_schema" + _write_yaml(schema_dir / "public.yaml", _manifest_tables()) + + overlay = { + "name": "orders", + "exclude_columns": ["status"], + "column_overrides": [ + {"name": "status", "descriptions": {"user": "Hidden status"}} + ], + } + _write_yaml(tmp_path / "orders.yaml", overlay) + _write_yaml(tmp_path / "customers.yaml", {"name": "customers"}) + + loader = SourceLoader(tmp_path) + with pytest.raises(ValueError, match="conflict with exclude_columns"): + loader.load_all() + def test_overlay_measures_set(self, tmp_path: Path): schema_dir = tmp_path / "_schema" _write_yaml(schema_dir / "public.yaml", _manifest_tables()) diff --git a/scripts/examples-docs.test.mjs b/scripts/examples-docs.test.mjs index 5e6a93d7..bc96e372 100644 --- a/scripts/examples-docs.test.mjs +++ b/scripts/examples-docs.test.mjs @@ -168,7 +168,7 @@ describe('standalone example docs', () => { 'ktx status --json', 'ktx sl list --json', 'ktx sl search "revenue" --json', - 'ktx sl query --json', + 'ktx sl query', 'ktx wiki search "revenue recognition" --json', ]) { assert.match(servingAgents, new RegExp(command.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'))); @@ -183,7 +183,8 @@ describe('standalone example docs', () => { assert.match(connectionReference, /ktx connection test my-warehouse/); assert.match(connectionReference, /ktx connection test --all/); assert.match(quickstart, /Connection test passed/); - assert.match(quickstart, /Driver: PostgreSQL .* Status: ok/); + assert.match(connectionReference, /Driver: postgres/); + assert.match(connectionReference, /Status: ok/); }); it('documents public npm and managed runtime usage', async () => { @@ -195,7 +196,6 @@ describe('standalone example docs', () => { assert.match(quickstart, publicPackagePattern('npm install -g {package}')); assert.match(quickstart, /ktx dev runtime install --feature local-embeddings --yes/); assert.match(quickstart, /ktx dev runtime start --feature local-embeddings/); - assert.match(quickstart, /Install `uv`, run `ktx dev runtime status`/); assert.match(packageArtifacts, /requires `uv` on `PATH`/); assert.match(packageArtifacts, /ktx dev runtime status/); assert.match(packageArtifacts, /ktx dev runtime status/); @@ -255,15 +255,15 @@ describe('standalone example docs', () => { assert.match(ingestReference, /ktx ingest /); assert.match(ingestReference, /ktx ingest --all --deep/); assert.match(ingestReference, /--query-history-window-days /); - assert.match(buildingContext, /ktx ingest /); + assert.match(buildingContext, /ktx ingest /); assert.match(buildingContext, /ktx ingest --all/); assert.match(contextSources, /ktx ingest /); assert.match(contextAsCode, /ktx ingest --all --no-input/); assert.match(quickstart, /schema context/); assert.match(primarySources, /context:\n queryHistory:/); assert.match(rootReadme, /Databases configured: yes \(postgres-warehouse\)/); - assert.match(quickstart, /Databases:\n postgres-warehouse: deep context complete/); - assert.match(quickstart, /Databases configured: yes \(postgres-warehouse\)/); + assert.match(quickstart, /Databases:\n warehouse: deep context complete/); + assert.match(quickstart, /Databases configured: yes \(warehouse\)/); assert.match(setupReference, /Databases configured: yes \(postgres-warehouse\)/); assert.doesNotMatch(rootReadme, new RegExp(['Primary sources', 'configured'].join(' '))); assert.doesNotMatch(quickstart, new RegExp(['Primary', 'sources'].join(' ')));