mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
Validate SL sources against manifests
This commit is contained in:
parent
d7a6d78c72
commit
7ae692cce0
10 changed files with 646 additions and 10 deletions
|
|
@ -84,6 +84,56 @@ describe('runKtxSl', () => {
|
|||
expect(listIo.stdout()).toContain('warehouse\torders\tcolumns=1\tmeasures=0\tjoins=0');
|
||||
});
|
||||
|
||||
it('fails validation when a table-backed source declares columns absent from a matching warehouse manifest', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
const project = await initKtxProject({ projectDir, projectName: 'warehouse' });
|
||||
await project.fileStore.writeFile(
|
||||
'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml',
|
||||
`tables:
|
||||
int_active_contract_arr:
|
||||
table: orbit_analytics.int_active_contract_arr
|
||||
columns:
|
||||
- { name: contract_id, type: string }
|
||||
- { name: contract_arr_cents, type: number }
|
||||
`,
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'Add warehouse manifest',
|
||||
);
|
||||
await project.fileStore.writeFile(
|
||||
'semantic-layer/dbt-main/int_active_contract_arr.yaml',
|
||||
`name: int_active_contract_arr
|
||||
table: orbit_analytics.int_active_contract_arr
|
||||
grain: [contract_id]
|
||||
columns:
|
||||
- { name: contract_id, type: string }
|
||||
- { name: arr_cents, type: number }
|
||||
measures:
|
||||
- { name: arr, expr: sum(arr_cents) }
|
||||
joins: []
|
||||
`,
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'Add invalid dbt source',
|
||||
);
|
||||
|
||||
const validateIo = makeIo();
|
||||
await expect(
|
||||
runKtxSl(
|
||||
{
|
||||
command: 'validate',
|
||||
projectDir,
|
||||
connectionId: 'dbt-main',
|
||||
sourceName: 'int_active_contract_arr',
|
||||
},
|
||||
validateIo.io,
|
||||
),
|
||||
).resolves.toBe(1);
|
||||
|
||||
expect(validateIo.stderr()).toContain('arr_cents');
|
||||
expect(validateIo.stderr()).toContain('absent from physical table');
|
||||
});
|
||||
|
||||
it('runs sl query and prints SQL output', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
const project = await initKtxProject({ projectDir, projectName: 'warehouse' });
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx
|
|||
if (!source) {
|
||||
throw new Error(`Semantic-layer source "${args.connectionId}/${args.sourceName}" was not found`);
|
||||
}
|
||||
const result = await validateLocalSlSource(source.yaml);
|
||||
const result = await validateLocalSlSource(source.yaml, { project, connectionId: args.connectionId });
|
||||
if (!result.valid) {
|
||||
for (const error of result.errors) {
|
||||
io.stderr.write(`${error}\n`);
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ All wiki writes go to the GLOBAL scope. Bundle ingests are not personal. The `wi
|
|||
<do_not>
|
||||
- Do not read peer files; only files listed in `rawFiles` or `dependencyPaths` are accessible. `read_raw_file` will reject everything else.
|
||||
- Do not invent measures/joins/rules not declared in the raw files.
|
||||
- Do not invent physical column names or grain keys. For table-backed SL sources, every `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr` column must come from raw-file column declarations or warehouse-backed discovery (`wiki_sl_search`, `sl_discover`, `sl_describe_table`). If column names are not confirmed, capture the business context in wiki instead of writing a full SL source.
|
||||
- Do not duplicate an artifact that prior provenance says you already produced; update it.
|
||||
- Do not silently accept a name collision with a prior WU's write when the formula differs. Trigger `ingest_triage`.
|
||||
</do_not>
|
||||
|
|
|
|||
|
|
@ -19,6 +19,14 @@ Use this skill for **uploaded** dbt projects (`dbt_project.yml` at stage root, `
|
|||
| `accepted_values` | Add a **brief** line in the column description: allowed values (truncate long lists) | Also mention enum-like use in `wiki_sl_search` / filters. |
|
||||
| `relationships` | Add or confirm `joins:` on the overlay **only** when `to` resolves to a real table via `read_raw_file` + `wiki_sl_search` / `sl_describe_table` | If the ref cannot be resolved, capture the intent in a wiki page instead. |
|
||||
|
||||
## Physical schema grounding
|
||||
|
||||
dbt YAML is documentation and test metadata; it is not permission to invent physical columns. Before writing any table-backed SL source, confirm the real warehouse shape with `wiki_sl_search`, `sl_discover`, or `sl_describe_table` and use only confirmed column names in `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr`.
|
||||
|
||||
If a `models:` entry has no `columns:` block, or the available raw files do not confirm the physical column names, do **not** synthesize a full standalone source. Write a wiki note or a description-only overlay for the resolved manifest table instead. If a business metric is described but its referenced column is not confirmed in the warehouse schema, omit the measure and capture the unresolved intent in the wiki.
|
||||
|
||||
After every `sl_write_source`, call `sl_validate`. A validation error saying a declared column or measure reference is absent from the physical table is a hard stop: re-read the warehouse-backed source and rewrite with confirmed names, or remove the invalid SL fields.
|
||||
|
||||
## 1.1 test hints (descriptions / meta)
|
||||
|
||||
When YAML shows `accepted_values` or `not_null`, add **short** hints into `columns[].descriptions` (e.g. under `user`) or freeform column notes so chat and validation see intent before the next git sync refreshes `constraints` / `enum_values` in `_schema`. Keep hints under a few words when possible.
|
||||
|
|
@ -30,5 +38,7 @@ If the same bundle also has MetricFlow `semantic_models:` / `metrics:`, the **`m
|
|||
## Do not
|
||||
|
||||
- Do not run `dbt` CLI or assume `target/` / `manifest.json` exists in the upload.
|
||||
- Do not invent column names, grain keys, or measure expressions from dbt model names, descriptions, tests, or common naming patterns.
|
||||
- Do not write `columns:`, `grain:`, or `measures:` for a dbt model unless those exact column names are confirmed by dbt YAML columns or warehouse schema discovery.
|
||||
- Do not invent joins from `relationships` tests if the target model/table is not found in SL or the warehouse.
|
||||
- Do not read `peerFileIndex` paths — use `read_raw_file` only on `rawFiles` and `dependencyPaths` from the WorkUnit.
|
||||
|
|
|
|||
|
|
@ -89,6 +89,39 @@ describe('local semantic-layer helpers', () => {
|
|||
await expect(validateLocalSlSource(ORDERS_YAML)).resolves.toEqual({ valid: true, errors: [] });
|
||||
});
|
||||
|
||||
it('validates table-backed sources against matching physical manifests when project context is provided', async () => {
|
||||
await project.fileStore.writeFile(
|
||||
'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml',
|
||||
`tables:
|
||||
int_active_contract_arr:
|
||||
table: orbit_analytics.int_active_contract_arr
|
||||
columns:
|
||||
- { name: contract_id, type: string }
|
||||
- { name: contract_arr_cents, type: number }
|
||||
`,
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'Add warehouse manifest',
|
||||
);
|
||||
|
||||
const invalidDbtSource = [
|
||||
'name: int_active_contract_arr',
|
||||
'table: orbit_analytics.int_active_contract_arr',
|
||||
'grain: [contract_id]',
|
||||
'columns:',
|
||||
' - { name: contract_id, type: string }',
|
||||
' - { name: arr_cents, type: number }',
|
||||
'measures:',
|
||||
' - { name: arr, expr: sum(arr_cents) }',
|
||||
'',
|
||||
].join('\n');
|
||||
|
||||
const result = await validateLocalSlSource(invalidDbtSource, { project, connectionId: 'dbt-main' });
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join('\n')).toContain('arr_cents');
|
||||
expect(result.errors.join('\n')).toContain('absent from physical table');
|
||||
});
|
||||
|
||||
it('lists and reads manifest-backed scan sources as queryable sources', async () => {
|
||||
await project.fileStore.writeFile(
|
||||
'semantic-layer/warehouse/_schema/public.yaml',
|
||||
|
|
|
|||
|
|
@ -7,7 +7,12 @@ import { HybridSearchCore, type SearchCandidateGenerator } from '../search/index
|
|||
import { DEFAULT_PRIORITY, resolveDescription } from './descriptions.js';
|
||||
import { normalizeSemanticLayerDescriptions } from './description-normalization.js';
|
||||
import { sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js';
|
||||
import { composeOverlay, type ManifestTableEntry, projectManifestEntry } from './semantic-layer.service.js';
|
||||
import {
|
||||
composeOverlay,
|
||||
type ManifestTableEntry,
|
||||
projectManifestEntry,
|
||||
SemanticLayerService,
|
||||
} from './semantic-layer.service.js';
|
||||
import type { PgliteSlSearchPrototypeOwnerOptions } from './pglite-sl-search-prototype.js';
|
||||
import { loadLatestSlDictionaryEntries } from './sl-dictionary-profile.js';
|
||||
import { buildSemanticLayerSourceSearchText, SlSearchService } from './sl-search.service.js';
|
||||
|
|
@ -246,12 +251,24 @@ export async function loadLocalSlSourceRecords(
|
|||
return [...sources.values()].sort((left, right) => left.name.localeCompare(right.name));
|
||||
}
|
||||
|
||||
export async function validateLocalSlSource(rawYaml: string): Promise<LocalSlValidationResult> {
|
||||
export async function validateLocalSlSource(
|
||||
rawYaml: string,
|
||||
options?: { project?: KtxLocalProject; connectionId?: string },
|
||||
): Promise<LocalSlValidationResult> {
|
||||
try {
|
||||
const parsed = parseYamlRecord(rawYaml);
|
||||
const schema = parsed.table || parsed.sql ? sourceDefinitionSchema : sourceOverlaySchema;
|
||||
schema.parse(parsed);
|
||||
return { valid: true, errors: [] };
|
||||
const result = schema.parse(parsed);
|
||||
const errors: string[] = [];
|
||||
|
||||
if (options?.project && options.connectionId && 'table' in result && result.table) {
|
||||
const service = new SemanticLayerService(options.project.fileStore, {} as never, {} as never);
|
||||
errors.push(
|
||||
...(await service.validatePhysicalTableReferences(options.connectionId, [result as SemanticLayerSource])),
|
||||
);
|
||||
}
|
||||
|
||||
return { valid: errors.length === 0, errors };
|
||||
} catch (error) {
|
||||
return { valid: false, errors: validationErrors(error) };
|
||||
}
|
||||
|
|
@ -261,7 +278,7 @@ export async function writeLocalSlSource(
|
|||
project: KtxLocalProject,
|
||||
input: { connectionId: string; sourceName: string; yaml: string },
|
||||
): Promise<KtxFileWriteResult> {
|
||||
const validation = await validateLocalSlSource(input.yaml);
|
||||
const validation = await validateLocalSlSource(input.yaml, { project, connectionId: input.connectionId });
|
||||
if (!validation.valid) {
|
||||
throw new Error(`Invalid semantic-layer source: ${validation.errors.join('; ')}`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -784,6 +784,156 @@ describe('validateWithProposedSource', () => {
|
|||
expect(result.errors[0]).toMatch(/Overlay 'orphan' has no matching manifest entry/);
|
||||
expect(pythonPort.validateSources).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('rejects table-backed sources whose declared columns are absent from a matching physical manifest', async () => {
|
||||
const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml';
|
||||
configService.listFiles.mockImplementation((dir: string) => {
|
||||
if (dir === 'semantic-layer/dbt-main') {
|
||||
return Promise.resolve({ files: [] });
|
||||
}
|
||||
if (dir === 'semantic-layer') {
|
||||
return Promise.resolve({ files: [schemaPath] });
|
||||
}
|
||||
if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') {
|
||||
return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] });
|
||||
}
|
||||
return Promise.resolve({ files: [] });
|
||||
});
|
||||
configService.readFile.mockImplementation((path: string) => {
|
||||
if (path === schemaPath) {
|
||||
return Promise.resolve({
|
||||
content: [
|
||||
'tables:',
|
||||
' int_procurement_qualifying_actions:',
|
||||
' table: orbit_analytics.int_procurement_qualifying_actions',
|
||||
' columns:',
|
||||
' - { name: action_id, type: string }',
|
||||
' - { name: account_id, type: string }',
|
||||
' - { name: user_id, type: string }',
|
||||
' - { name: action_date, type: time }',
|
||||
' - { name: action_type, type: string }',
|
||||
].join('\n'),
|
||||
});
|
||||
}
|
||||
return Promise.reject(new Error(`Unexpected readFile: ${path}`));
|
||||
});
|
||||
pythonPort.validateSources.mockResolvedValue({
|
||||
data: { errors: [], warnings: [] },
|
||||
});
|
||||
|
||||
const result = await service.validateWithProposedSource('dbt-main', {
|
||||
name: 'int_procurement_qualifying_actions',
|
||||
table: 'orbit_analytics.int_procurement_qualifying_actions',
|
||||
grain: ['purchase_request_id'],
|
||||
columns: [
|
||||
{ name: 'purchase_request_id', type: 'string' },
|
||||
{ name: 'account_id', type: 'string' },
|
||||
{ name: 'requester_user_id', type: 'string' },
|
||||
{ name: 'action_week', type: 'time' },
|
||||
],
|
||||
joins: [],
|
||||
measures: [{ name: 'qualifying_action_count', expr: 'count(purchase_request_id)' }],
|
||||
});
|
||||
|
||||
expect(result.errors.join('\n')).toMatch(/declared column\(s\) absent from physical table/);
|
||||
expect(result.errors.join('\n')).toMatch(/purchase_request_id/);
|
||||
expect(result.errors.join('\n')).toMatch(/requester_user_id/);
|
||||
expect(result.errors.join('\n')).toMatch(/action_week/);
|
||||
expect(result.errors.join('\n')).toMatch(/measure "qualifying_action_count" references unknown column\(s\)/);
|
||||
});
|
||||
|
||||
it('keeps valid table-backed sources clean when a physical manifest matches', async () => {
|
||||
const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml';
|
||||
configService.listFiles.mockImplementation((dir: string) => {
|
||||
if (dir === 'semantic-layer/dbt-main') {
|
||||
return Promise.resolve({ files: [] });
|
||||
}
|
||||
if (dir === 'semantic-layer') {
|
||||
return Promise.resolve({ files: [schemaPath] });
|
||||
}
|
||||
if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') {
|
||||
return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] });
|
||||
}
|
||||
return Promise.resolve({ files: [] });
|
||||
});
|
||||
configService.readFile.mockResolvedValue({
|
||||
content: [
|
||||
'tables:',
|
||||
' mart_revenue_daily:',
|
||||
' table: orbit_analytics.mart_revenue_daily',
|
||||
' columns:',
|
||||
' - { name: revenue_date, type: time }',
|
||||
' - { name: gross_revenue_cents, type: number }',
|
||||
' - { name: credits_cents, type: number }',
|
||||
' - { name: refunds_cents, type: number }',
|
||||
' - { name: net_revenue_cents, type: number }',
|
||||
].join('\n'),
|
||||
});
|
||||
pythonPort.validateSources.mockResolvedValue({
|
||||
data: { errors: [], warnings: [] },
|
||||
});
|
||||
|
||||
const result = await service.validateWithProposedSource('dbt-main', {
|
||||
name: 'mart_revenue_daily',
|
||||
table: 'orbit_analytics.mart_revenue_daily',
|
||||
grain: ['revenue_date'],
|
||||
columns: [
|
||||
{ name: 'revenue_date', type: 'time' },
|
||||
{ name: 'gross_revenue_cents', type: 'number' },
|
||||
{ name: 'credits_cents', type: 'number' },
|
||||
{ name: 'refunds_cents', type: 'number' },
|
||||
{ name: 'net_revenue_cents', type: 'number' },
|
||||
],
|
||||
joins: [],
|
||||
measures: [{ name: 'net_revenue', expr: 'sum(net_revenue_cents)' }],
|
||||
});
|
||||
|
||||
expect(result.errors).toEqual([]);
|
||||
});
|
||||
|
||||
it('rejects join keys that are absent from matched physical sources', async () => {
|
||||
const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml';
|
||||
configService.listFiles.mockImplementation((dir: string) => {
|
||||
if (dir === 'semantic-layer/dbt-main') {
|
||||
return Promise.resolve({ files: [] });
|
||||
}
|
||||
if (dir === 'semantic-layer') {
|
||||
return Promise.resolve({ files: [schemaPath] });
|
||||
}
|
||||
if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') {
|
||||
return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] });
|
||||
}
|
||||
return Promise.resolve({ files: [] });
|
||||
});
|
||||
configService.readFile.mockResolvedValue({
|
||||
content: [
|
||||
'tables:',
|
||||
' activity:',
|
||||
' table: orbit_analytics.activity',
|
||||
' columns:',
|
||||
' - { name: account_id, type: string }',
|
||||
' accounts:',
|
||||
' table: orbit_analytics.accounts',
|
||||
' columns:',
|
||||
' - { name: account_id, type: string }',
|
||||
].join('\n'),
|
||||
});
|
||||
pythonPort.validateSources.mockResolvedValue({
|
||||
data: { errors: [], warnings: [] },
|
||||
});
|
||||
|
||||
const result = await service.validateWithProposedSource('dbt-main', {
|
||||
name: 'activity',
|
||||
table: 'orbit_analytics.activity',
|
||||
grain: ['account_id'],
|
||||
columns: [{ name: 'account_id', type: 'string' }],
|
||||
joins: [{ to: 'accounts', on: 'activity.account_name = accounts.account_uuid', relationship: 'many_to_one' }],
|
||||
measures: [],
|
||||
});
|
||||
|
||||
expect(result.errors.join('\n')).toMatch(/local column "account_name"/);
|
||||
expect(result.errors.join('\n')).toMatch(/target column "account_uuid"/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('findDanglingSegmentRefs', () => {
|
||||
|
|
|
|||
|
|
@ -396,6 +396,174 @@ export class SemanticLayerService {
|
|||
return null;
|
||||
}
|
||||
|
||||
async findManifestEntryByTableRefAcrossConnections(
|
||||
preferredConnectionId: string,
|
||||
ref: string,
|
||||
): Promise<{ connectionId: string; source: SemanticLayerSource } | null> {
|
||||
const preferred = await this.findManifestEntryByTableRef(preferredConnectionId, ref);
|
||||
if (preferred) {
|
||||
return { connectionId: preferredConnectionId, source: preferred };
|
||||
}
|
||||
|
||||
for (const entry of await this.listAllManifestEntries()) {
|
||||
if (entry.connectionId === preferredConnectionId) {
|
||||
continue;
|
||||
}
|
||||
if (manifestEntryMatchesRef(entry.source, ref)) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async validatePhysicalTableReferences(
|
||||
connectionId: string,
|
||||
sources: SemanticLayerSource[],
|
||||
): Promise<string[]> {
|
||||
const errors: string[] = [];
|
||||
const sourceNames = new Set(sources.map((s) => s.name.toLowerCase()));
|
||||
const sourcesByName = new Map(sources.map((s) => [s.name.toLowerCase(), s]));
|
||||
|
||||
for (const source of sources) {
|
||||
if (!source.table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const manifestMatch = await this.findManifestEntryByTableRefAcrossConnections(connectionId, source.table);
|
||||
if (!manifestMatch) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const manifestSource = manifestMatch.source;
|
||||
const manifestColumns = new Map(manifestSource.columns.map((c) => [c.name.toLowerCase(), c.name]));
|
||||
const declaredColumns = source.columns ?? [];
|
||||
const declaredByLower = new Map(declaredColumns.map((c) => [c.name.toLowerCase(), c]));
|
||||
const validOutputColumns = new Set(
|
||||
declaredColumns
|
||||
.filter((c) => c.expr || manifestColumns.has(c.name.toLowerCase()))
|
||||
.map((c) => c.name.toLowerCase()),
|
||||
);
|
||||
const measureNames = new Set((source.measures ?? []).map((m) => m.name.toLowerCase()));
|
||||
const manifestLabel =
|
||||
manifestMatch.connectionId === connectionId
|
||||
? manifestSource.name
|
||||
: `${manifestMatch.connectionId}/${manifestSource.name}`;
|
||||
|
||||
const absentDeclaredColumns = declaredColumns
|
||||
.filter((c) => !c.expr && !manifestColumns.has(c.name.toLowerCase()))
|
||||
.map((c) => c.name);
|
||||
if (absentDeclaredColumns.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: table "${source.table}" matched manifest ${manifestLabel}, ` +
|
||||
`but declared column(s) absent from physical table: ${absentDeclaredColumns.join(', ')}. ` +
|
||||
`Available columns: ${[...manifestColumns.values()].join(', ')}`,
|
||||
);
|
||||
}
|
||||
|
||||
const missingGrainColumns = (source.grain ?? []).filter((grain) => {
|
||||
const declared = declaredByLower.get(grain.toLowerCase());
|
||||
return !declared || (!declared.expr && !manifestColumns.has(grain.toLowerCase()));
|
||||
});
|
||||
if (missingGrainColumns.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: grain column(s) absent from physical table "${source.table}": ${missingGrainColumns.join(', ')}`,
|
||||
);
|
||||
}
|
||||
|
||||
for (const column of declaredColumns) {
|
||||
if (!column.expr) {
|
||||
continue;
|
||||
}
|
||||
const missing = missingLocalExpressionRefs({
|
||||
expr: column.expr,
|
||||
sourceName: source.name,
|
||||
sourceNames,
|
||||
validColumns: new Set([...manifestColumns.keys(), ...validOutputColumns]),
|
||||
validMeasures: new Set(),
|
||||
});
|
||||
if (missing.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: computed column "${column.name}" references unknown column(s): ${missing.join(', ')}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (const segment of source.segments ?? []) {
|
||||
const missing = missingLocalExpressionRefs({
|
||||
expr: segment.expr,
|
||||
sourceName: source.name,
|
||||
sourceNames,
|
||||
validColumns: validOutputColumns,
|
||||
validMeasures: new Set(),
|
||||
});
|
||||
if (missing.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: segment "${segment.name}" references unknown column(s): ${missing.join(', ')}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (const measure of source.measures ?? []) {
|
||||
const exprMissing = missingLocalExpressionRefs({
|
||||
expr: measure.expr,
|
||||
sourceName: source.name,
|
||||
sourceNames,
|
||||
validColumns: validOutputColumns,
|
||||
validMeasures: measureNames,
|
||||
});
|
||||
if (exprMissing.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: measure "${measure.name}" references unknown column(s): ${exprMissing.join(', ')}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (measure.filter) {
|
||||
const filterMissing = missingLocalExpressionRefs({
|
||||
expr: measure.filter,
|
||||
sourceName: source.name,
|
||||
sourceNames,
|
||||
validColumns: validOutputColumns,
|
||||
validMeasures: new Set(),
|
||||
});
|
||||
if (filterMissing.length > 0) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: measure "${measure.name}" filter references unknown column(s): ${filterMissing.join(', ')}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const join of source.joins ?? []) {
|
||||
const parsed = parseJoinColumns(join.on, source.name, join.to);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
if (!validOutputColumns.has(parsed.localColumn.toLowerCase())) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: join to "${join.to}" references local column ` +
|
||||
`"${parsed.localColumn}" that is not a valid output column`,
|
||||
);
|
||||
}
|
||||
|
||||
const targetSource =
|
||||
sourcesByName.get(join.to.toLowerCase()) ??
|
||||
(await this.findManifestEntryByTableRefAcrossConnections(connectionId, join.to))?.source;
|
||||
if (targetSource) {
|
||||
const targetColumns = new Set(targetSource.columns.map((c) => c.name.toLowerCase()));
|
||||
if (!targetColumns.has(parsed.targetColumn.toLowerCase())) {
|
||||
errors.push(
|
||||
`${source.name}.yaml: join to "${join.to}" references target column ` +
|
||||
`"${parsed.targetColumn}" that does not exist on the target source`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
async getDialectForConnection(connectionId: string): Promise<string> {
|
||||
const connection = await this.connections.getConnectionById(connectionId);
|
||||
if (!connection) {
|
||||
|
|
@ -500,10 +668,15 @@ export class SemanticLayerService {
|
|||
return { errors: [errorMsg], warnings: [], perSourceWarnings: {} };
|
||||
}
|
||||
if (!data) {
|
||||
return { errors: [], warnings: [], perSourceWarnings: {} };
|
||||
return {
|
||||
errors: await this.validatePhysicalTableReferences(connectionId, validatable),
|
||||
warnings: [],
|
||||
perSourceWarnings: {},
|
||||
};
|
||||
}
|
||||
const physicalErrors = await this.validatePhysicalTableReferences(connectionId, validatable);
|
||||
return {
|
||||
errors: data.errors ?? [],
|
||||
errors: [...(data.errors ?? []), ...physicalErrors],
|
||||
warnings: data.warnings ?? [],
|
||||
perSourceWarnings: data.per_source_warnings ?? {},
|
||||
};
|
||||
|
|
@ -529,14 +702,40 @@ export class SemanticLayerService {
|
|||
return { errors: [formatPortError(error, 'Unknown validation error')], warnings: [] };
|
||||
}
|
||||
if (!data) {
|
||||
return { errors: [], warnings: [] };
|
||||
return { errors: await this.validatePhysicalTableReferences(connectionId, sources), warnings: [] };
|
||||
}
|
||||
const physicalErrors = await this.validatePhysicalTableReferences(connectionId, sources);
|
||||
return {
|
||||
errors: data.errors ?? [],
|
||||
errors: [...(data.errors ?? []), ...physicalErrors],
|
||||
warnings: data.warnings ?? [],
|
||||
};
|
||||
}
|
||||
|
||||
private async listAllManifestEntries(): Promise<Array<{ connectionId: string; source: SemanticLayerSource }>> {
|
||||
let files: string[];
|
||||
try {
|
||||
files = (await this.configService.listFiles(SL_DIR_PREFIX)).files;
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
|
||||
const schemaFiles = files.filter((file) => /^semantic-layer\/[^/]+\/_schema\/.+\.ya?ml$/.test(file));
|
||||
const entries: Array<{ connectionId: string; source: SemanticLayerSource }> = [];
|
||||
for (const filePath of schemaFiles) {
|
||||
const connectionId = filePath.split('/')[1];
|
||||
try {
|
||||
const { content } = await this.configService.readFile(filePath);
|
||||
const shard = YAML.parse(content) as { tables?: Record<string, ManifestTableEntry> };
|
||||
for (const [name, entry] of Object.entries(shard?.tables ?? {})) {
|
||||
entries.push({ connectionId, source: projectManifestEntry(name, entry) });
|
||||
}
|
||||
} catch {
|
||||
// skip unparseable shards
|
||||
}
|
||||
}
|
||||
return entries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate overlays and standalone sources against the current manifest.
|
||||
* Returns warnings for stale references (non-blocking).
|
||||
|
|
@ -969,6 +1168,32 @@ const SQL_KEYWORDS = new Set([
|
|||
'false',
|
||||
'asc',
|
||||
'desc',
|
||||
'date',
|
||||
'day',
|
||||
'month',
|
||||
'quarter',
|
||||
'week',
|
||||
'year',
|
||||
'interval',
|
||||
'extract',
|
||||
'from',
|
||||
'over',
|
||||
'partition',
|
||||
'by',
|
||||
'rows',
|
||||
'range',
|
||||
'current',
|
||||
'row',
|
||||
'numeric',
|
||||
'decimal',
|
||||
'int',
|
||||
'integer',
|
||||
'float',
|
||||
'double',
|
||||
'string',
|
||||
'timestamp',
|
||||
'bool',
|
||||
'boolean',
|
||||
]);
|
||||
|
||||
function extractColumnReferences(expr: string): string[] {
|
||||
|
|
@ -977,6 +1202,121 @@ function extractColumnReferences(expr: string): string[] {
|
|||
return [...new Set(tokens.filter((t) => !SQL_KEYWORDS.has(t.toLowerCase())))];
|
||||
}
|
||||
|
||||
function manifestEntryMatchesRef(source: SemanticLayerSource, ref: string): boolean {
|
||||
if (source.name.toLowerCase() === ref.toLowerCase()) {
|
||||
return true;
|
||||
}
|
||||
const table = source.table?.toLowerCase();
|
||||
const lowered = ref.toLowerCase();
|
||||
return !!table && (table === lowered || table.endsWith(`.${lowered}`));
|
||||
}
|
||||
|
||||
function normalizeSqlExpressionForIdentifierScan(expr: string): string {
|
||||
return expr
|
||||
.replace(/--.*$/gm, ' ')
|
||||
.replace(/\/\*[\s\S]*?\*\//g, ' ')
|
||||
.replace(/'([^']|'')*'/g, ' ')
|
||||
.replace(/"([^"]+)"/g, '$1')
|
||||
.replace(/`([^`]+)`/g, '$1')
|
||||
.replace(/\[([^\]]+)\]/g, '$1');
|
||||
}
|
||||
|
||||
function extractSqlIdentifierRefs(expr: string): Array<{ qualifier?: string; name: string }> {
|
||||
const normalized = normalizeSqlExpressionForIdentifierScan(expr);
|
||||
const refs = new Map<string, { qualifier?: string; name: string }>();
|
||||
const re = /(?:\b([A-Za-z_][\w$]*)\s*\.\s*)?(\b[A-Za-z_][\w$]*)\b/g;
|
||||
for (const match of normalized.matchAll(re)) {
|
||||
const qualifier = match[1];
|
||||
const name = match[2];
|
||||
if (!name) {
|
||||
continue;
|
||||
}
|
||||
const nameLower = name.toLowerCase();
|
||||
const qualifierLower = qualifier?.toLowerCase();
|
||||
const after = normalized.slice((match.index ?? 0) + match[0].length).trimStart();
|
||||
if (!qualifier && after.startsWith('(')) {
|
||||
continue;
|
||||
}
|
||||
if (SQL_KEYWORDS.has(nameLower) || (qualifierLower && SQL_KEYWORDS.has(qualifierLower))) {
|
||||
continue;
|
||||
}
|
||||
refs.set(`${qualifierLower ?? ''}.${nameLower}`, qualifier ? { qualifier, name } : { name });
|
||||
}
|
||||
return [...refs.values()];
|
||||
}
|
||||
|
||||
function refBelongsToSource(
|
||||
ref: { qualifier?: string; name: string },
|
||||
sourceName: string,
|
||||
sourceNames: Set<string>,
|
||||
): boolean {
|
||||
if (!ref.qualifier) {
|
||||
return true;
|
||||
}
|
||||
const qualifier = ref.qualifier.toLowerCase();
|
||||
if (qualifier === sourceName.toLowerCase()) {
|
||||
return true;
|
||||
}
|
||||
return !sourceNames.has(qualifier);
|
||||
}
|
||||
|
||||
function missingLocalExpressionRefs(input: {
|
||||
expr: string;
|
||||
sourceName: string;
|
||||
sourceNames: Set<string>;
|
||||
validColumns: Set<string>;
|
||||
validMeasures: Set<string>;
|
||||
}): string[] {
|
||||
const missing = new Set<string>();
|
||||
for (const ref of extractSqlIdentifierRefs(input.expr)) {
|
||||
if (!refBelongsToSource(ref, input.sourceName, input.sourceNames)) {
|
||||
continue;
|
||||
}
|
||||
const name = ref.name.toLowerCase();
|
||||
if (!input.validColumns.has(name) && !input.validMeasures.has(name)) {
|
||||
missing.add(ref.name);
|
||||
}
|
||||
}
|
||||
return [...missing].sort();
|
||||
}
|
||||
|
||||
function parseJoinSide(side: string): { qualifier?: string; column: string } | null {
|
||||
const match = side.trim().match(/^(?:(\w+)\.)?(\w+)$/);
|
||||
if (!match) {
|
||||
return null;
|
||||
}
|
||||
return match[1] ? { qualifier: match[1], column: match[2] } : { column: match[2] };
|
||||
}
|
||||
|
||||
function parseJoinColumns(
|
||||
on: string,
|
||||
sourceName: string,
|
||||
targetName: string,
|
||||
): { localColumn: string; targetColumn: string } | null {
|
||||
const sides = on.split('=');
|
||||
if (sides.length !== 2) {
|
||||
return null;
|
||||
}
|
||||
const left = parseJoinSide(sides[0]);
|
||||
const right = parseJoinSide(sides[1]);
|
||||
if (!left || !right) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const sourceLower = sourceName.toLowerCase();
|
||||
const targetLower = targetName.toLowerCase();
|
||||
const leftQualifier = left.qualifier?.toLowerCase();
|
||||
const rightQualifier = right.qualifier?.toLowerCase();
|
||||
|
||||
if (leftQualifier === targetLower || rightQualifier === sourceLower) {
|
||||
return { localColumn: right.column, targetColumn: left.column };
|
||||
}
|
||||
if (rightQualifier === targetLower || leftQualifier === sourceLower || !leftQualifier) {
|
||||
return { localColumn: left.column, targetColumn: right.column };
|
||||
}
|
||||
return { localColumn: left.column, targetColumn: right.column };
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns one message per measure-level segment reference that doesn't resolve to
|
||||
* a segment defined on the source. Array is empty when every reference checks out.
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ function makeDeps(opts: { sourceYaml: string; executeQuery: ReturnType<typeof vi
|
|||
listManifestSourceNames: vi.fn().mockResolvedValue([]),
|
||||
loadSource: vi.fn().mockResolvedValue(null),
|
||||
loadAllSources: vi.fn().mockResolvedValue([]),
|
||||
validatePhysicalTableReferences: vi.fn().mockResolvedValue([]),
|
||||
} as never,
|
||||
connections: {
|
||||
executeQuery: opts.executeQuery,
|
||||
|
|
@ -117,4 +118,29 @@ joins: []
|
|||
expect(probeSql).toMatch(/LIMIT 1\b/);
|
||||
expect(probeSql).not.toMatch(/LIMIT 0\b/);
|
||||
});
|
||||
|
||||
it('adds physical manifest errors for table-backed sources', async () => {
|
||||
const yaml = `name: int_active_contract_arr
|
||||
table: orbit_analytics.int_active_contract_arr
|
||||
grain: [contract_id]
|
||||
columns:
|
||||
- {name: contract_id, type: string}
|
||||
- {name: arr_cents, type: number}
|
||||
measures:
|
||||
- {name: arr, expr: sum(arr_cents)}
|
||||
joins: []
|
||||
`;
|
||||
const executeQuery = vi.fn();
|
||||
const deps = makeDeps({ sourceYaml: yaml, executeQuery }) as any;
|
||||
deps.semanticLayerService.validatePhysicalTableReferences.mockResolvedValue([
|
||||
'int_active_contract_arr.yaml: declared column(s) absent from physical table: arr_cents',
|
||||
]);
|
||||
|
||||
const result = await validateSingleSource(deps, 'conn-1', 'int_active_contract_arr');
|
||||
|
||||
expect(result.errors).toContain(
|
||||
'int_active_contract_arr.yaml: declared column(s) absent from physical table: arr_cents',
|
||||
);
|
||||
expect(executeQuery).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { SYSTEM_GIT_AUTHOR } from '../../tools/index.js';
|
|||
import type { SlConnectionCatalogPort, SlSourcesIndexPort } from '../ports.js';
|
||||
import { sourceOverlaySchema } from '../schemas.js';
|
||||
import { SemanticLayerService } from '../semantic-layer.service.js';
|
||||
import type { SemanticLayerSource } from '../types.js';
|
||||
import { sourceDefinitionSchema } from './base-semantic-layer.tool.js';
|
||||
|
||||
export interface SlValidationDeps {
|
||||
|
|
@ -118,6 +119,14 @@ export async function validateSingleSource(
|
|||
return { errors, warnings };
|
||||
}
|
||||
|
||||
if (!isOverlay && 'table' in result.data && result.data.table) {
|
||||
errors.push(
|
||||
...(await deps.semanticLayerService.validatePhysicalTableReferences(connectionId, [
|
||||
result.data as SemanticLayerSource,
|
||||
])),
|
||||
);
|
||||
}
|
||||
|
||||
const measures = (parsed.measures as Array<{ name: string }> | undefined) ?? [];
|
||||
const seenMeasures = new Set<string>();
|
||||
for (const m of measures) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue