mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
fix(ingest): tighten final artifact gates
This commit is contained in:
parent
ae1ba0e2bd
commit
cc1eb7c51b
4 changed files with 228 additions and 18 deletions
|
|
@ -52,4 +52,99 @@ describe('artifact gates', () => {
|
|||
}),
|
||||
).toThrow(/provenance row references raw path outside this snapshot: cards\/missing\.json/);
|
||||
});
|
||||
|
||||
it('fails measure-level wiki frontmatter sl_refs that point at missing entities', async () => {
|
||||
const wikiService = {
|
||||
readPage: vi.fn().mockResolvedValue({
|
||||
pageKey: 'account-segments',
|
||||
frontmatter: {
|
||||
summary: 'Account segments',
|
||||
usage_mode: 'auto',
|
||||
sl_refs: ['mart_account_segments.total_contract_arr_cents'],
|
||||
},
|
||||
content: 'ARR uses a renamed measure.',
|
||||
}),
|
||||
};
|
||||
const semanticLayerService = {
|
||||
loadAllSources: vi.fn().mockResolvedValue({
|
||||
sources: [
|
||||
{
|
||||
name: 'mart_account_segments',
|
||||
grain: ['account_id'],
|
||||
columns: [{ name: 'account_id', type: 'string' }],
|
||||
joins: [],
|
||||
measures: [{ name: 'total_contract_arr', expr: 'sum(contract_arr)' }],
|
||||
table: 'analytics.mart_account_segments',
|
||||
},
|
||||
],
|
||||
loadErrors: [],
|
||||
}),
|
||||
};
|
||||
|
||||
await expect(
|
||||
validateFinalIngestArtifacts({
|
||||
connectionIds: ['warehouse'],
|
||||
changedWikiPageKeys: ['account-segments'],
|
||||
touchedSlSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
|
||||
wikiService: wikiService as never,
|
||||
semanticLayerService: semanticLayerService as never,
|
||||
validateTouchedSources: async () => ({ invalidSources: [], validSources: ['warehouse:mart_account_segments'] }),
|
||||
tableExists: async () => true,
|
||||
}),
|
||||
).rejects.toThrow(/unknown sl_refs entity mart_account_segments\.total_contract_arr_cents/);
|
||||
});
|
||||
|
||||
it('validates direct declared-join neighbors of touched semantic-layer sources', async () => {
|
||||
const semanticLayerService = {
|
||||
loadAllSources: vi.fn().mockResolvedValue({
|
||||
sources: [
|
||||
{
|
||||
name: 'orders',
|
||||
grain: ['order_id'],
|
||||
columns: [
|
||||
{ name: 'order_id', type: 'string' },
|
||||
{ name: 'account_id', type: 'string' },
|
||||
],
|
||||
joins: [{ to: 'accounts', on: 'orders.account_id = accounts.account_id', relationship: 'many_to_one' }],
|
||||
measures: [{ name: 'order_count', expr: 'count(*)' }],
|
||||
},
|
||||
{
|
||||
name: 'accounts',
|
||||
grain: ['account_id'],
|
||||
columns: [{ name: 'account_id', type: 'string' }],
|
||||
joins: [],
|
||||
measures: [{ name: 'account_count', expr: 'count(*)' }],
|
||||
},
|
||||
{
|
||||
name: 'segments',
|
||||
grain: ['segment_id'],
|
||||
columns: [
|
||||
{ name: 'segment_id', type: 'string' },
|
||||
{ name: 'account_id', type: 'string' },
|
||||
],
|
||||
joins: [{ to: 'accounts', on: 'segments.account_id = accounts.account_id', relationship: 'many_to_one' }],
|
||||
measures: [],
|
||||
},
|
||||
],
|
||||
loadErrors: [],
|
||||
}),
|
||||
};
|
||||
const validateTouchedSources = vi.fn().mockResolvedValue({ invalidSources: [], validSources: [] });
|
||||
|
||||
await validateFinalIngestArtifacts({
|
||||
connectionIds: ['warehouse'],
|
||||
changedWikiPageKeys: [],
|
||||
touchedSlSources: [{ connectionId: 'warehouse', sourceName: 'accounts' }],
|
||||
wikiService: { readPage: vi.fn() } as never,
|
||||
semanticLayerService: semanticLayerService as never,
|
||||
validateTouchedSources,
|
||||
tableExists: async () => true,
|
||||
});
|
||||
|
||||
expect(validateTouchedSources).toHaveBeenCalledWith([
|
||||
{ connectionId: 'warehouse', sourceName: 'accounts' },
|
||||
{ connectionId: 'warehouse', sourceName: 'orders' },
|
||||
{ connectionId: 'warehouse', sourceName: 'segments' },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -24,17 +24,75 @@ export interface ProvenanceRawPathValidationInput {
|
|||
deletedRawPaths: Set<string>;
|
||||
}
|
||||
|
||||
function bareSlRef(ref: string): string {
|
||||
function parseSlRef(ref: string): { connectionId: string | null; sourceName: string; entityName: string | null } {
|
||||
const withoutConnection = ref.includes('/') ? ref.slice(ref.indexOf('/') + 1) : ref;
|
||||
return withoutConnection.split('.')[0] ?? withoutConnection;
|
||||
const connectionId = ref.includes('/') ? ref.slice(0, ref.indexOf('/')) : null;
|
||||
const [sourceName = '', entityName = null] = withoutConnection.split('.', 2);
|
||||
return { connectionId, sourceName, entityName };
|
||||
}
|
||||
|
||||
function slEntityNames(source: Awaited<ReturnType<SemanticLayerService['loadAllSources']>>['sources'][number]): Set<string> {
|
||||
return new Set([
|
||||
...(source.measures ?? []).map((measure) => measure.name),
|
||||
...(source.columns ?? []).map((column) => column.name),
|
||||
...(source.segments ?? []).map((segment) => segment.name),
|
||||
]);
|
||||
}
|
||||
|
||||
function uniqueTouchedSources(sources: TouchedSlSource[]): TouchedSlSource[] {
|
||||
const seen = new Set<string>();
|
||||
const unique: TouchedSlSource[] = [];
|
||||
for (const source of sources) {
|
||||
const key = `${source.connectionId}:${source.sourceName}`;
|
||||
if (seen.has(key)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(key);
|
||||
unique.push(source);
|
||||
}
|
||||
return unique.sort((left, right) => {
|
||||
const byConnection = left.connectionId.localeCompare(right.connectionId);
|
||||
return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection;
|
||||
});
|
||||
}
|
||||
|
||||
async function expandTouchedSlSourcesWithDirectJoinNeighbors(input: FinalArtifactGateInput): Promise<TouchedSlSource[]> {
|
||||
const expanded = [...input.touchedSlSources];
|
||||
const touchedByConnection = new Map<string, Set<string>>();
|
||||
for (const source of input.touchedSlSources) {
|
||||
const bucket = touchedByConnection.get(source.connectionId) ?? new Set<string>();
|
||||
bucket.add(source.sourceName);
|
||||
touchedByConnection.set(source.connectionId, bucket);
|
||||
}
|
||||
|
||||
for (const connectionId of input.connectionIds) {
|
||||
const touched = touchedByConnection.get(connectionId);
|
||||
if (!touched || touched.size === 0) {
|
||||
continue;
|
||||
}
|
||||
const { sources } = await input.semanticLayerService.loadAllSources(connectionId);
|
||||
for (const source of sources) {
|
||||
const sourceIsTouched = touched.has(source.name);
|
||||
if (sourceIsTouched) {
|
||||
for (const join of source.joins ?? []) {
|
||||
expanded.push({ connectionId, sourceName: join.to });
|
||||
}
|
||||
}
|
||||
if ((source.joins ?? []).some((join) => touched.has(join.to))) {
|
||||
expanded.push({ connectionId, sourceName: source.name });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return uniqueTouchedSources(expanded);
|
||||
}
|
||||
|
||||
async function validateWikiSlRefs(input: FinalArtifactGateInput): Promise<string[]> {
|
||||
const errors: string[] = [];
|
||||
const sourcesByConnection = new Map<string, Set<string>>();
|
||||
const sourcesByConnection = new Map<string, Awaited<ReturnType<SemanticLayerService['loadAllSources']>>['sources']>();
|
||||
for (const connectionId of input.connectionIds) {
|
||||
const { sources } = await input.semanticLayerService.loadAllSources(connectionId);
|
||||
sourcesByConnection.set(connectionId, new Set(sources.map((source) => source.name)));
|
||||
sourcesByConnection.set(connectionId, sources);
|
||||
}
|
||||
|
||||
for (const pageKey of input.changedWikiPageKeys) {
|
||||
|
|
@ -43,11 +101,21 @@ async function validateWikiSlRefs(input: FinalArtifactGateInput): Promise<string
|
|||
continue;
|
||||
}
|
||||
for (const ref of page.frontmatter.sl_refs ?? []) {
|
||||
const sourceName = bareSlRef(ref);
|
||||
const connectionId = ref.includes('/') ? ref.slice(0, ref.indexOf('/')) : null;
|
||||
const sourceSets = connectionId ? [sourcesByConnection.get(connectionId)] : [...sourcesByConnection.values()];
|
||||
if (!sourceSets.some((set) => set?.has(sourceName))) {
|
||||
const parsed = parseSlRef(ref);
|
||||
const candidateConnections = parsed.connectionId ? [parsed.connectionId] : input.connectionIds;
|
||||
let source: Awaited<ReturnType<SemanticLayerService['loadAllSources']>>['sources'][number] | undefined;
|
||||
for (const connectionId of candidateConnections) {
|
||||
source = sourcesByConnection.get(connectionId)?.find((candidate) => candidate.name === parsed.sourceName);
|
||||
if (source) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!source) {
|
||||
errors.push(`${pageKey}: unknown sl_refs entry ${ref}`);
|
||||
continue;
|
||||
}
|
||||
if (parsed.entityName && !slEntityNames(source).has(parsed.entityName)) {
|
||||
errors.push(`${pageKey}: unknown sl_refs entity ${ref}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -55,7 +123,8 @@ async function validateWikiSlRefs(input: FinalArtifactGateInput): Promise<string
|
|||
}
|
||||
|
||||
export async function validateFinalIngestArtifacts(input: FinalArtifactGateInput): Promise<void> {
|
||||
const validation = await input.validateTouchedSources(input.touchedSlSources);
|
||||
const touchedWithDependencies = await expandTouchedSlSourcesWithDirectJoinNeighbors(input);
|
||||
const validation = await input.validateTouchedSources(touchedWithDependencies);
|
||||
const errors: string[] = validation.invalidSources.map((source) => `semantic-layer validation failed for ${source}`);
|
||||
errors.push(...(await validateWikiSlRefs(input)));
|
||||
|
||||
|
|
|
|||
|
|
@ -67,4 +67,39 @@ describe('wiki body refs', () => {
|
|||
|
||||
expect(invalid).toEqual([]);
|
||||
});
|
||||
|
||||
it('ignores two-part inline code when the source is not visible', async () => {
|
||||
const invalid = await findInvalidWikiBodyRefs({
|
||||
pageKey: 'engineering-notes',
|
||||
body: [
|
||||
'A version token like `node.v22` is not a semantic-layer reference.',
|
||||
'A raw table must use `table:analytics.mart_account_segments`.',
|
||||
].join('\n'),
|
||||
visibleConnectionIds: ['warehouse'],
|
||||
loadSources: async () => sources,
|
||||
tableExists: async (_connectionId, tableRef) => tableRef === 'analytics.mart_account_segments',
|
||||
});
|
||||
|
||||
expect(invalid).toEqual([]);
|
||||
});
|
||||
|
||||
it('still rejects explicit missing source and table references', async () => {
|
||||
const invalid = await findInvalidWikiBodyRefs({
|
||||
pageKey: 'account-segments',
|
||||
body: [
|
||||
'`source:missing_source`',
|
||||
'`warehouse/source:missing_source`',
|
||||
'`table:analytics.missing_table`',
|
||||
].join('\n'),
|
||||
visibleConnectionIds: ['warehouse'],
|
||||
loadSources: async () => sources,
|
||||
tableExists: async () => false,
|
||||
});
|
||||
|
||||
expect(invalid).toEqual([
|
||||
'account-segments: unknown semantic-layer source missing_source',
|
||||
'account-segments: unknown semantic-layer source warehouse/missing_source',
|
||||
'account-segments: unknown raw table analytics.missing_table',
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -96,6 +96,19 @@ export async function findInvalidWikiBodyRefs(input: WikiBodyRefValidationInput)
|
|||
return sources;
|
||||
};
|
||||
|
||||
const findSource = async (
|
||||
connectionIds: string[],
|
||||
sourceName: string,
|
||||
): Promise<{ connectionId: string; source: SemanticLayerSource } | null> => {
|
||||
for (const connectionId of connectionIds) {
|
||||
const source = (await loadSources(connectionId)).find((candidate) => candidate.name === sourceName);
|
||||
if (source) {
|
||||
return { connectionId, source };
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
for (const ref of parseWikiBodyRefs(input.body)) {
|
||||
const connectionIds = ref.connectionId ? [ref.connectionId] : input.visibleConnectionIds;
|
||||
if (ref.kind === 'table') {
|
||||
|
|
@ -106,18 +119,16 @@ export async function findInvalidWikiBodyRefs(input: WikiBodyRefValidationInput)
|
|||
continue;
|
||||
}
|
||||
|
||||
let source: SemanticLayerSource | undefined;
|
||||
for (const connectionId of connectionIds) {
|
||||
source = (await loadSources(connectionId)).find((candidate) => candidate.name === ref.sourceName);
|
||||
if (source) {
|
||||
break;
|
||||
const found = await findSource(connectionIds, ref.sourceName);
|
||||
if (!found) {
|
||||
if (ref.kind === 'sl_source') {
|
||||
errors.push(
|
||||
`${input.pageKey}: unknown semantic-layer source ${ref.connectionId ? `${ref.connectionId}/` : ''}${ref.sourceName}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (!source) {
|
||||
errors.push(`${input.pageKey}: unknown semantic-layer source ${ref.sourceName}`);
|
||||
continue;
|
||||
}
|
||||
if (ref.kind === 'sl_entity' && !entityNames(source).has(ref.entityName)) {
|
||||
if (ref.kind === 'sl_entity' && !entityNames(found.source).has(ref.entityName)) {
|
||||
errors.push(`${input.pageKey}: unknown semantic-layer entity ${ref.sourceName}.${ref.entityName}`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue