diff --git a/packages/cli/src/sl.test.ts b/packages/cli/src/sl.test.ts index bd746b0b..e7debffd 100644 --- a/packages/cli/src/sl.test.ts +++ b/packages/cli/src/sl.test.ts @@ -84,6 +84,56 @@ describe('runKtxSl', () => { expect(listIo.stdout()).toContain('warehouse\torders\tcolumns=1\tmeasures=0\tjoins=0'); }); + it('fails validation when a table-backed source declares columns absent from a matching warehouse manifest', async () => { + const projectDir = join(tempDir, 'project'); + const project = await initKtxProject({ projectDir, projectName: 'warehouse' }); + await project.fileStore.writeFile( + 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml', + `tables: + int_active_contract_arr: + table: orbit_analytics.int_active_contract_arr + columns: + - { name: contract_id, type: string } + - { name: contract_arr_cents, type: number } +`, + 'ktx', + 'ktx@example.com', + 'Add warehouse manifest', + ); + await project.fileStore.writeFile( + 'semantic-layer/dbt-main/int_active_contract_arr.yaml', + `name: int_active_contract_arr +table: orbit_analytics.int_active_contract_arr +grain: [contract_id] +columns: + - { name: contract_id, type: string } + - { name: arr_cents, type: number } +measures: + - { name: arr, expr: sum(arr_cents) } +joins: [] +`, + 'ktx', + 'ktx@example.com', + 'Add invalid dbt source', + ); + + const validateIo = makeIo(); + await expect( + runKtxSl( + { + command: 'validate', + projectDir, + connectionId: 'dbt-main', + sourceName: 'int_active_contract_arr', + }, + validateIo.io, + ), + ).resolves.toBe(1); + + expect(validateIo.stderr()).toContain('arr_cents'); + expect(validateIo.stderr()).toContain('absent from physical table'); + }); + it('runs sl query and prints SQL output', async () => { const projectDir = join(tempDir, 'project'); const project = await initKtxProject({ projectDir, projectName: 'warehouse' }); diff --git a/packages/cli/src/sl.ts b/packages/cli/src/sl.ts index a82f84d8..fb0f129e 100644 --- a/packages/cli/src/sl.ts +++ b/packages/cli/src/sl.ts @@ -97,7 +97,7 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx if (!source) { throw new Error(`Semantic-layer source "${args.connectionId}/${args.sourceName}" was not found`); } - const result = await validateLocalSlSource(source.yaml); + const result = await validateLocalSlSource(source.yaml, { project, connectionId: args.connectionId }); if (!result.valid) { for (const error of result.errors) { io.stderr.write(`${error}\n`); diff --git a/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md b/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md index b0fa8997..5fdde49b 100644 --- a/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md +++ b/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md @@ -23,6 +23,7 @@ All wiki writes go to the GLOBAL scope. Bundle ingests are not personal. The `wi - Do not read peer files; only files listed in `rawFiles` or `dependencyPaths` are accessible. `read_raw_file` will reject everything else. - Do not invent measures/joins/rules not declared in the raw files. +- Do not invent physical column names or grain keys. For table-backed SL sources, every `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr` column must come from raw-file column declarations or warehouse-backed discovery (`wiki_sl_search`, `sl_discover`, `sl_describe_table`). If column names are not confirmed, capture the business context in wiki instead of writing a full SL source. - Do not duplicate an artifact that prior provenance says you already produced; update it. - Do not silently accept a name collision with a prior WU's write when the formula differs. Trigger `ingest_triage`. diff --git a/packages/context/skills/dbt_ingest/SKILL.md b/packages/context/skills/dbt_ingest/SKILL.md index 0f7f7904..0d189661 100644 --- a/packages/context/skills/dbt_ingest/SKILL.md +++ b/packages/context/skills/dbt_ingest/SKILL.md @@ -19,6 +19,14 @@ Use this skill for **uploaded** dbt projects (`dbt_project.yml` at stage root, ` | `accepted_values` | Add a **brief** line in the column description: allowed values (truncate long lists) | Also mention enum-like use in `wiki_sl_search` / filters. | | `relationships` | Add or confirm `joins:` on the overlay **only** when `to` resolves to a real table via `read_raw_file` + `wiki_sl_search` / `sl_describe_table` | If the ref cannot be resolved, capture the intent in a wiki page instead. | +## Physical schema grounding + +dbt YAML is documentation and test metadata; it is not permission to invent physical columns. Before writing any table-backed SL source, confirm the real warehouse shape with `wiki_sl_search`, `sl_discover`, or `sl_describe_table` and use only confirmed column names in `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr`. + +If a `models:` entry has no `columns:` block, or the available raw files do not confirm the physical column names, do **not** synthesize a full standalone source. Write a wiki note or a description-only overlay for the resolved manifest table instead. If a business metric is described but its referenced column is not confirmed in the warehouse schema, omit the measure and capture the unresolved intent in the wiki. + +After every `sl_write_source`, call `sl_validate`. A validation error saying a declared column or measure reference is absent from the physical table is a hard stop: re-read the warehouse-backed source and rewrite with confirmed names, or remove the invalid SL fields. + ## 1.1 test hints (descriptions / meta) When YAML shows `accepted_values` or `not_null`, add **short** hints into `columns[].descriptions` (e.g. under `user`) or freeform column notes so chat and validation see intent before the next git sync refreshes `constraints` / `enum_values` in `_schema`. Keep hints under a few words when possible. @@ -30,5 +38,7 @@ If the same bundle also has MetricFlow `semantic_models:` / `metrics:`, the **`m ## Do not - Do not run `dbt` CLI or assume `target/` / `manifest.json` exists in the upload. +- Do not invent column names, grain keys, or measure expressions from dbt model names, descriptions, tests, or common naming patterns. +- Do not write `columns:`, `grain:`, or `measures:` for a dbt model unless those exact column names are confirmed by dbt YAML columns or warehouse schema discovery. - Do not invent joins from `relationships` tests if the target model/table is not found in SL or the warehouse. - Do not read `peerFileIndex` paths — use `read_raw_file` only on `rawFiles` and `dependencyPaths` from the WorkUnit. diff --git a/packages/context/src/sl/local-sl.test.ts b/packages/context/src/sl/local-sl.test.ts index 99d20c35..aa48546b 100644 --- a/packages/context/src/sl/local-sl.test.ts +++ b/packages/context/src/sl/local-sl.test.ts @@ -89,6 +89,39 @@ describe('local semantic-layer helpers', () => { await expect(validateLocalSlSource(ORDERS_YAML)).resolves.toEqual({ valid: true, errors: [] }); }); + it('validates table-backed sources against matching physical manifests when project context is provided', async () => { + await project.fileStore.writeFile( + 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml', + `tables: + int_active_contract_arr: + table: orbit_analytics.int_active_contract_arr + columns: + - { name: contract_id, type: string } + - { name: contract_arr_cents, type: number } +`, + 'ktx', + 'ktx@example.com', + 'Add warehouse manifest', + ); + + const invalidDbtSource = [ + 'name: int_active_contract_arr', + 'table: orbit_analytics.int_active_contract_arr', + 'grain: [contract_id]', + 'columns:', + ' - { name: contract_id, type: string }', + ' - { name: arr_cents, type: number }', + 'measures:', + ' - { name: arr, expr: sum(arr_cents) }', + '', + ].join('\n'); + + const result = await validateLocalSlSource(invalidDbtSource, { project, connectionId: 'dbt-main' }); + expect(result.valid).toBe(false); + expect(result.errors.join('\n')).toContain('arr_cents'); + expect(result.errors.join('\n')).toContain('absent from physical table'); + }); + it('lists and reads manifest-backed scan sources as queryable sources', async () => { await project.fileStore.writeFile( 'semantic-layer/warehouse/_schema/public.yaml', diff --git a/packages/context/src/sl/local-sl.ts b/packages/context/src/sl/local-sl.ts index fe201d3f..bf170d0a 100644 --- a/packages/context/src/sl/local-sl.ts +++ b/packages/context/src/sl/local-sl.ts @@ -7,7 +7,12 @@ import { HybridSearchCore, type SearchCandidateGenerator } from '../search/index import { DEFAULT_PRIORITY, resolveDescription } from './descriptions.js'; import { normalizeSemanticLayerDescriptions } from './description-normalization.js'; import { sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js'; -import { composeOverlay, type ManifestTableEntry, projectManifestEntry } from './semantic-layer.service.js'; +import { + composeOverlay, + type ManifestTableEntry, + projectManifestEntry, + SemanticLayerService, +} from './semantic-layer.service.js'; import type { PgliteSlSearchPrototypeOwnerOptions } from './pglite-sl-search-prototype.js'; import { loadLatestSlDictionaryEntries } from './sl-dictionary-profile.js'; import { buildSemanticLayerSourceSearchText, SlSearchService } from './sl-search.service.js'; @@ -246,12 +251,24 @@ export async function loadLocalSlSourceRecords( return [...sources.values()].sort((left, right) => left.name.localeCompare(right.name)); } -export async function validateLocalSlSource(rawYaml: string): Promise { +export async function validateLocalSlSource( + rawYaml: string, + options?: { project?: KtxLocalProject; connectionId?: string }, +): Promise { try { const parsed = parseYamlRecord(rawYaml); const schema = parsed.table || parsed.sql ? sourceDefinitionSchema : sourceOverlaySchema; - schema.parse(parsed); - return { valid: true, errors: [] }; + const result = schema.parse(parsed); + const errors: string[] = []; + + if (options?.project && options.connectionId && 'table' in result && result.table) { + const service = new SemanticLayerService(options.project.fileStore, {} as never, {} as never); + errors.push( + ...(await service.validatePhysicalTableReferences(options.connectionId, [result as SemanticLayerSource])), + ); + } + + return { valid: errors.length === 0, errors }; } catch (error) { return { valid: false, errors: validationErrors(error) }; } @@ -261,7 +278,7 @@ export async function writeLocalSlSource( project: KtxLocalProject, input: { connectionId: string; sourceName: string; yaml: string }, ): Promise { - const validation = await validateLocalSlSource(input.yaml); + const validation = await validateLocalSlSource(input.yaml, { project, connectionId: input.connectionId }); if (!validation.valid) { throw new Error(`Invalid semantic-layer source: ${validation.errors.join('; ')}`); } diff --git a/packages/context/src/sl/semantic-layer.service.test.ts b/packages/context/src/sl/semantic-layer.service.test.ts index 78f5accf..7c70950b 100644 --- a/packages/context/src/sl/semantic-layer.service.test.ts +++ b/packages/context/src/sl/semantic-layer.service.test.ts @@ -784,6 +784,156 @@ describe('validateWithProposedSource', () => { expect(result.errors[0]).toMatch(/Overlay 'orphan' has no matching manifest entry/); expect(pythonPort.validateSources).not.toHaveBeenCalled(); }); + + it('rejects table-backed sources whose declared columns are absent from a matching physical manifest', async () => { + const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml'; + configService.listFiles.mockImplementation((dir: string) => { + if (dir === 'semantic-layer/dbt-main') { + return Promise.resolve({ files: [] }); + } + if (dir === 'semantic-layer') { + return Promise.resolve({ files: [schemaPath] }); + } + if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') { + return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] }); + } + return Promise.resolve({ files: [] }); + }); + configService.readFile.mockImplementation((path: string) => { + if (path === schemaPath) { + return Promise.resolve({ + content: [ + 'tables:', + ' int_procurement_qualifying_actions:', + ' table: orbit_analytics.int_procurement_qualifying_actions', + ' columns:', + ' - { name: action_id, type: string }', + ' - { name: account_id, type: string }', + ' - { name: user_id, type: string }', + ' - { name: action_date, type: time }', + ' - { name: action_type, type: string }', + ].join('\n'), + }); + } + return Promise.reject(new Error(`Unexpected readFile: ${path}`)); + }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('dbt-main', { + name: 'int_procurement_qualifying_actions', + table: 'orbit_analytics.int_procurement_qualifying_actions', + grain: ['purchase_request_id'], + columns: [ + { name: 'purchase_request_id', type: 'string' }, + { name: 'account_id', type: 'string' }, + { name: 'requester_user_id', type: 'string' }, + { name: 'action_week', type: 'time' }, + ], + joins: [], + measures: [{ name: 'qualifying_action_count', expr: 'count(purchase_request_id)' }], + }); + + expect(result.errors.join('\n')).toMatch(/declared column\(s\) absent from physical table/); + expect(result.errors.join('\n')).toMatch(/purchase_request_id/); + expect(result.errors.join('\n')).toMatch(/requester_user_id/); + expect(result.errors.join('\n')).toMatch(/action_week/); + expect(result.errors.join('\n')).toMatch(/measure "qualifying_action_count" references unknown column\(s\)/); + }); + + it('keeps valid table-backed sources clean when a physical manifest matches', async () => { + const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml'; + configService.listFiles.mockImplementation((dir: string) => { + if (dir === 'semantic-layer/dbt-main') { + return Promise.resolve({ files: [] }); + } + if (dir === 'semantic-layer') { + return Promise.resolve({ files: [schemaPath] }); + } + if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') { + return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] }); + } + return Promise.resolve({ files: [] }); + }); + configService.readFile.mockResolvedValue({ + content: [ + 'tables:', + ' mart_revenue_daily:', + ' table: orbit_analytics.mart_revenue_daily', + ' columns:', + ' - { name: revenue_date, type: time }', + ' - { name: gross_revenue_cents, type: number }', + ' - { name: credits_cents, type: number }', + ' - { name: refunds_cents, type: number }', + ' - { name: net_revenue_cents, type: number }', + ].join('\n'), + }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('dbt-main', { + name: 'mart_revenue_daily', + table: 'orbit_analytics.mart_revenue_daily', + grain: ['revenue_date'], + columns: [ + { name: 'revenue_date', type: 'time' }, + { name: 'gross_revenue_cents', type: 'number' }, + { name: 'credits_cents', type: 'number' }, + { name: 'refunds_cents', type: 'number' }, + { name: 'net_revenue_cents', type: 'number' }, + ], + joins: [], + measures: [{ name: 'net_revenue', expr: 'sum(net_revenue_cents)' }], + }); + + expect(result.errors).toEqual([]); + }); + + it('rejects join keys that are absent from matched physical sources', async () => { + const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml'; + configService.listFiles.mockImplementation((dir: string) => { + if (dir === 'semantic-layer/dbt-main') { + return Promise.resolve({ files: [] }); + } + if (dir === 'semantic-layer') { + return Promise.resolve({ files: [schemaPath] }); + } + if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') { + return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] }); + } + return Promise.resolve({ files: [] }); + }); + configService.readFile.mockResolvedValue({ + content: [ + 'tables:', + ' activity:', + ' table: orbit_analytics.activity', + ' columns:', + ' - { name: account_id, type: string }', + ' accounts:', + ' table: orbit_analytics.accounts', + ' columns:', + ' - { name: account_id, type: string }', + ].join('\n'), + }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('dbt-main', { + name: 'activity', + table: 'orbit_analytics.activity', + grain: ['account_id'], + columns: [{ name: 'account_id', type: 'string' }], + joins: [{ to: 'accounts', on: 'activity.account_name = accounts.account_uuid', relationship: 'many_to_one' }], + measures: [], + }); + + expect(result.errors.join('\n')).toMatch(/local column "account_name"/); + expect(result.errors.join('\n')).toMatch(/target column "account_uuid"/); + }); }); describe('findDanglingSegmentRefs', () => { diff --git a/packages/context/src/sl/semantic-layer.service.ts b/packages/context/src/sl/semantic-layer.service.ts index 33a44d11..6ec21a58 100644 --- a/packages/context/src/sl/semantic-layer.service.ts +++ b/packages/context/src/sl/semantic-layer.service.ts @@ -396,6 +396,174 @@ export class SemanticLayerService { return null; } + async findManifestEntryByTableRefAcrossConnections( + preferredConnectionId: string, + ref: string, + ): Promise<{ connectionId: string; source: SemanticLayerSource } | null> { + const preferred = await this.findManifestEntryByTableRef(preferredConnectionId, ref); + if (preferred) { + return { connectionId: preferredConnectionId, source: preferred }; + } + + for (const entry of await this.listAllManifestEntries()) { + if (entry.connectionId === preferredConnectionId) { + continue; + } + if (manifestEntryMatchesRef(entry.source, ref)) { + return entry; + } + } + + return null; + } + + async validatePhysicalTableReferences( + connectionId: string, + sources: SemanticLayerSource[], + ): Promise { + const errors: string[] = []; + const sourceNames = new Set(sources.map((s) => s.name.toLowerCase())); + const sourcesByName = new Map(sources.map((s) => [s.name.toLowerCase(), s])); + + for (const source of sources) { + if (!source.table) { + continue; + } + + const manifestMatch = await this.findManifestEntryByTableRefAcrossConnections(connectionId, source.table); + if (!manifestMatch) { + continue; + } + + const manifestSource = manifestMatch.source; + const manifestColumns = new Map(manifestSource.columns.map((c) => [c.name.toLowerCase(), c.name])); + const declaredColumns = source.columns ?? []; + const declaredByLower = new Map(declaredColumns.map((c) => [c.name.toLowerCase(), c])); + const validOutputColumns = new Set( + declaredColumns + .filter((c) => c.expr || manifestColumns.has(c.name.toLowerCase())) + .map((c) => c.name.toLowerCase()), + ); + const measureNames = new Set((source.measures ?? []).map((m) => m.name.toLowerCase())); + const manifestLabel = + manifestMatch.connectionId === connectionId + ? manifestSource.name + : `${manifestMatch.connectionId}/${manifestSource.name}`; + + const absentDeclaredColumns = declaredColumns + .filter((c) => !c.expr && !manifestColumns.has(c.name.toLowerCase())) + .map((c) => c.name); + if (absentDeclaredColumns.length > 0) { + errors.push( + `${source.name}.yaml: table "${source.table}" matched manifest ${manifestLabel}, ` + + `but declared column(s) absent from physical table: ${absentDeclaredColumns.join(', ')}. ` + + `Available columns: ${[...manifestColumns.values()].join(', ')}`, + ); + } + + const missingGrainColumns = (source.grain ?? []).filter((grain) => { + const declared = declaredByLower.get(grain.toLowerCase()); + return !declared || (!declared.expr && !manifestColumns.has(grain.toLowerCase())); + }); + if (missingGrainColumns.length > 0) { + errors.push( + `${source.name}.yaml: grain column(s) absent from physical table "${source.table}": ${missingGrainColumns.join(', ')}`, + ); + } + + for (const column of declaredColumns) { + if (!column.expr) { + continue; + } + const missing = missingLocalExpressionRefs({ + expr: column.expr, + sourceName: source.name, + sourceNames, + validColumns: new Set([...manifestColumns.keys(), ...validOutputColumns]), + validMeasures: new Set(), + }); + if (missing.length > 0) { + errors.push( + `${source.name}.yaml: computed column "${column.name}" references unknown column(s): ${missing.join(', ')}`, + ); + } + } + + for (const segment of source.segments ?? []) { + const missing = missingLocalExpressionRefs({ + expr: segment.expr, + sourceName: source.name, + sourceNames, + validColumns: validOutputColumns, + validMeasures: new Set(), + }); + if (missing.length > 0) { + errors.push( + `${source.name}.yaml: segment "${segment.name}" references unknown column(s): ${missing.join(', ')}`, + ); + } + } + + for (const measure of source.measures ?? []) { + const exprMissing = missingLocalExpressionRefs({ + expr: measure.expr, + sourceName: source.name, + sourceNames, + validColumns: validOutputColumns, + validMeasures: measureNames, + }); + if (exprMissing.length > 0) { + errors.push( + `${source.name}.yaml: measure "${measure.name}" references unknown column(s): ${exprMissing.join(', ')}`, + ); + } + + if (measure.filter) { + const filterMissing = missingLocalExpressionRefs({ + expr: measure.filter, + sourceName: source.name, + sourceNames, + validColumns: validOutputColumns, + validMeasures: new Set(), + }); + if (filterMissing.length > 0) { + errors.push( + `${source.name}.yaml: measure "${measure.name}" filter references unknown column(s): ${filterMissing.join(', ')}`, + ); + } + } + } + + for (const join of source.joins ?? []) { + const parsed = parseJoinColumns(join.on, source.name, join.to); + if (!parsed) { + continue; + } + if (!validOutputColumns.has(parsed.localColumn.toLowerCase())) { + errors.push( + `${source.name}.yaml: join to "${join.to}" references local column ` + + `"${parsed.localColumn}" that is not a valid output column`, + ); + } + + const targetSource = + sourcesByName.get(join.to.toLowerCase()) ?? + (await this.findManifestEntryByTableRefAcrossConnections(connectionId, join.to))?.source; + if (targetSource) { + const targetColumns = new Set(targetSource.columns.map((c) => c.name.toLowerCase())); + if (!targetColumns.has(parsed.targetColumn.toLowerCase())) { + errors.push( + `${source.name}.yaml: join to "${join.to}" references target column ` + + `"${parsed.targetColumn}" that does not exist on the target source`, + ); + } + } + } + } + + return errors; + } + async getDialectForConnection(connectionId: string): Promise { const connection = await this.connections.getConnectionById(connectionId); if (!connection) { @@ -500,10 +668,15 @@ export class SemanticLayerService { return { errors: [errorMsg], warnings: [], perSourceWarnings: {} }; } if (!data) { - return { errors: [], warnings: [], perSourceWarnings: {} }; + return { + errors: await this.validatePhysicalTableReferences(connectionId, validatable), + warnings: [], + perSourceWarnings: {}, + }; } + const physicalErrors = await this.validatePhysicalTableReferences(connectionId, validatable); return { - errors: data.errors ?? [], + errors: [...(data.errors ?? []), ...physicalErrors], warnings: data.warnings ?? [], perSourceWarnings: data.per_source_warnings ?? {}, }; @@ -529,14 +702,40 @@ export class SemanticLayerService { return { errors: [formatPortError(error, 'Unknown validation error')], warnings: [] }; } if (!data) { - return { errors: [], warnings: [] }; + return { errors: await this.validatePhysicalTableReferences(connectionId, sources), warnings: [] }; } + const physicalErrors = await this.validatePhysicalTableReferences(connectionId, sources); return { - errors: data.errors ?? [], + errors: [...(data.errors ?? []), ...physicalErrors], warnings: data.warnings ?? [], }; } + private async listAllManifestEntries(): Promise> { + let files: string[]; + try { + files = (await this.configService.listFiles(SL_DIR_PREFIX)).files; + } catch { + return []; + } + + const schemaFiles = files.filter((file) => /^semantic-layer\/[^/]+\/_schema\/.+\.ya?ml$/.test(file)); + const entries: Array<{ connectionId: string; source: SemanticLayerSource }> = []; + for (const filePath of schemaFiles) { + const connectionId = filePath.split('/')[1]; + try { + const { content } = await this.configService.readFile(filePath); + const shard = YAML.parse(content) as { tables?: Record }; + for (const [name, entry] of Object.entries(shard?.tables ?? {})) { + entries.push({ connectionId, source: projectManifestEntry(name, entry) }); + } + } catch { + // skip unparseable shards + } + } + return entries; + } + /** * Validate overlays and standalone sources against the current manifest. * Returns warnings for stale references (non-blocking). @@ -969,6 +1168,32 @@ const SQL_KEYWORDS = new Set([ 'false', 'asc', 'desc', + 'date', + 'day', + 'month', + 'quarter', + 'week', + 'year', + 'interval', + 'extract', + 'from', + 'over', + 'partition', + 'by', + 'rows', + 'range', + 'current', + 'row', + 'numeric', + 'decimal', + 'int', + 'integer', + 'float', + 'double', + 'string', + 'timestamp', + 'bool', + 'boolean', ]); function extractColumnReferences(expr: string): string[] { @@ -977,6 +1202,121 @@ function extractColumnReferences(expr: string): string[] { return [...new Set(tokens.filter((t) => !SQL_KEYWORDS.has(t.toLowerCase())))]; } +function manifestEntryMatchesRef(source: SemanticLayerSource, ref: string): boolean { + if (source.name.toLowerCase() === ref.toLowerCase()) { + return true; + } + const table = source.table?.toLowerCase(); + const lowered = ref.toLowerCase(); + return !!table && (table === lowered || table.endsWith(`.${lowered}`)); +} + +function normalizeSqlExpressionForIdentifierScan(expr: string): string { + return expr + .replace(/--.*$/gm, ' ') + .replace(/\/\*[\s\S]*?\*\//g, ' ') + .replace(/'([^']|'')*'/g, ' ') + .replace(/"([^"]+)"/g, '$1') + .replace(/`([^`]+)`/g, '$1') + .replace(/\[([^\]]+)\]/g, '$1'); +} + +function extractSqlIdentifierRefs(expr: string): Array<{ qualifier?: string; name: string }> { + const normalized = normalizeSqlExpressionForIdentifierScan(expr); + const refs = new Map(); + const re = /(?:\b([A-Za-z_][\w$]*)\s*\.\s*)?(\b[A-Za-z_][\w$]*)\b/g; + for (const match of normalized.matchAll(re)) { + const qualifier = match[1]; + const name = match[2]; + if (!name) { + continue; + } + const nameLower = name.toLowerCase(); + const qualifierLower = qualifier?.toLowerCase(); + const after = normalized.slice((match.index ?? 0) + match[0].length).trimStart(); + if (!qualifier && after.startsWith('(')) { + continue; + } + if (SQL_KEYWORDS.has(nameLower) || (qualifierLower && SQL_KEYWORDS.has(qualifierLower))) { + continue; + } + refs.set(`${qualifierLower ?? ''}.${nameLower}`, qualifier ? { qualifier, name } : { name }); + } + return [...refs.values()]; +} + +function refBelongsToSource( + ref: { qualifier?: string; name: string }, + sourceName: string, + sourceNames: Set, +): boolean { + if (!ref.qualifier) { + return true; + } + const qualifier = ref.qualifier.toLowerCase(); + if (qualifier === sourceName.toLowerCase()) { + return true; + } + return !sourceNames.has(qualifier); +} + +function missingLocalExpressionRefs(input: { + expr: string; + sourceName: string; + sourceNames: Set; + validColumns: Set; + validMeasures: Set; +}): string[] { + const missing = new Set(); + for (const ref of extractSqlIdentifierRefs(input.expr)) { + if (!refBelongsToSource(ref, input.sourceName, input.sourceNames)) { + continue; + } + const name = ref.name.toLowerCase(); + if (!input.validColumns.has(name) && !input.validMeasures.has(name)) { + missing.add(ref.name); + } + } + return [...missing].sort(); +} + +function parseJoinSide(side: string): { qualifier?: string; column: string } | null { + const match = side.trim().match(/^(?:(\w+)\.)?(\w+)$/); + if (!match) { + return null; + } + return match[1] ? { qualifier: match[1], column: match[2] } : { column: match[2] }; +} + +function parseJoinColumns( + on: string, + sourceName: string, + targetName: string, +): { localColumn: string; targetColumn: string } | null { + const sides = on.split('='); + if (sides.length !== 2) { + return null; + } + const left = parseJoinSide(sides[0]); + const right = parseJoinSide(sides[1]); + if (!left || !right) { + return null; + } + + const sourceLower = sourceName.toLowerCase(); + const targetLower = targetName.toLowerCase(); + const leftQualifier = left.qualifier?.toLowerCase(); + const rightQualifier = right.qualifier?.toLowerCase(); + + if (leftQualifier === targetLower || rightQualifier === sourceLower) { + return { localColumn: right.column, targetColumn: left.column }; + } + if (rightQualifier === targetLower || leftQualifier === sourceLower || !leftQualifier) { + return { localColumn: left.column, targetColumn: right.column }; + } + return { localColumn: left.column, targetColumn: right.column }; +} + /** * Returns one message per measure-level segment reference that doesn't resolve to * a segment defined on the source. Array is empty when every reference checks out. diff --git a/packages/context/src/sl/tools/sl-warehouse-validation.test.ts b/packages/context/src/sl/tools/sl-warehouse-validation.test.ts index d0f7f04a..16d4ec7f 100644 --- a/packages/context/src/sl/tools/sl-warehouse-validation.test.ts +++ b/packages/context/src/sl/tools/sl-warehouse-validation.test.ts @@ -9,6 +9,7 @@ function makeDeps(opts: { sourceYaml: string; executeQuery: ReturnType { + const yaml = `name: int_active_contract_arr +table: orbit_analytics.int_active_contract_arr +grain: [contract_id] +columns: + - {name: contract_id, type: string} + - {name: arr_cents, type: number} +measures: + - {name: arr, expr: sum(arr_cents)} +joins: [] +`; + const executeQuery = vi.fn(); + const deps = makeDeps({ sourceYaml: yaml, executeQuery }) as any; + deps.semanticLayerService.validatePhysicalTableReferences.mockResolvedValue([ + 'int_active_contract_arr.yaml: declared column(s) absent from physical table: arr_cents', + ]); + + const result = await validateSingleSource(deps, 'conn-1', 'int_active_contract_arr'); + + expect(result.errors).toContain( + 'int_active_contract_arr.yaml: declared column(s) absent from physical table: arr_cents', + ); + expect(executeQuery).not.toHaveBeenCalled(); + }); }); diff --git a/packages/context/src/sl/tools/sl-warehouse-validation.ts b/packages/context/src/sl/tools/sl-warehouse-validation.ts index b2799974..f9c5e4fd 100644 --- a/packages/context/src/sl/tools/sl-warehouse-validation.ts +++ b/packages/context/src/sl/tools/sl-warehouse-validation.ts @@ -4,6 +4,7 @@ import { SYSTEM_GIT_AUTHOR } from '../../tools/index.js'; import type { SlConnectionCatalogPort, SlSourcesIndexPort } from '../ports.js'; import { sourceOverlaySchema } from '../schemas.js'; import { SemanticLayerService } from '../semantic-layer.service.js'; +import type { SemanticLayerSource } from '../types.js'; import { sourceDefinitionSchema } from './base-semantic-layer.tool.js'; export interface SlValidationDeps { @@ -118,6 +119,14 @@ export async function validateSingleSource( return { errors, warnings }; } + if (!isOverlay && 'table' in result.data && result.data.table) { + errors.push( + ...(await deps.semanticLayerService.validatePhysicalTableReferences(connectionId, [ + result.data as SemanticLayerSource, + ])), + ); + } + const measures = (parsed.measures as Array<{ name: string }> | undefined) ?? []; const seenMeasures = new Set(); for (const m of measures) {