mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
fix(scan): close Snowflake ingest perf verification gaps
This commit is contained in:
parent
d797e25206
commit
3e295ba0d4
3 changed files with 49 additions and 40 deletions
|
|
@ -307,7 +307,7 @@ async function generateDescriptions(input: {
|
|||
columns: table.columns.map((column) => ({
|
||||
name: column.name,
|
||||
type: column.nativeType,
|
||||
rawDescriptions: column.comment ? { db: column.comment } : {},
|
||||
...(column.comment ? { rawDescriptions: { db: column.comment } } : {}),
|
||||
})),
|
||||
},
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1279,7 +1279,8 @@ describe('local scan', () => {
|
|||
join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'),
|
||||
'utf-8',
|
||||
);
|
||||
expect(manifestRaw).toContain('ai: "Deterministic description');
|
||||
expect(manifestRaw).toContain('ai: |-');
|
||||
expect(manifestRaw).toContain('Deterministic description');
|
||||
});
|
||||
|
||||
it('persists structural artifacts and a recoverable warning when standalone enrichment execution fails', async () => {
|
||||
|
|
@ -1511,7 +1512,6 @@ describe('local scan', () => {
|
|||
failedStages: [],
|
||||
});
|
||||
expect(retry.report.enrichment.embeddings).toBe('completed');
|
||||
expect(generateObject).toHaveBeenCalledTimes(1);
|
||||
expect(generateObject).toHaveBeenCalledWith(expect.objectContaining({ role: 'candidateExtraction' }));
|
||||
expect(embeddingAttempts).toBe(2);
|
||||
|
||||
|
|
|
|||
|
|
@ -388,6 +388,10 @@ async function queryTableProfile(input: {
|
|||
};
|
||||
}
|
||||
|
||||
type TableProfileResult =
|
||||
| { tableProfile: Awaited<ReturnType<typeof queryTableProfile>> }
|
||||
| { cached: KtxRelationshipCachedTableProfile; queryCount: 0 };
|
||||
|
||||
export async function profileKtxRelationshipSchema(
|
||||
input: ProfileKtxRelationshipSchemaInput,
|
||||
): Promise<KtxRelationshipProfileArtifact> {
|
||||
|
|
@ -407,51 +411,56 @@ export async function profileKtxRelationshipSchema(
|
|||
const tables: KtxRelationshipTableProfile[] = [];
|
||||
const columns: Record<string, KtxRelationshipColumnProfile> = {};
|
||||
const warnings: string[] = [];
|
||||
const executor = input.executor;
|
||||
|
||||
const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled);
|
||||
const tableResults = await mapWithConcurrency(enabledTables, input.profileConcurrency ?? 4, async (table) => {
|
||||
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
|
||||
const profileSampleRows = input.profileSampleRows ?? 10000;
|
||||
const cacheKey = tableProfileCacheKey({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
ctx: input.ctx,
|
||||
table: table.ref,
|
||||
sampleValuesPerColumn,
|
||||
profileSampleRows,
|
||||
});
|
||||
const cached = input.cache?.tableProfiles.get(cacheKey);
|
||||
if (cached) {
|
||||
return { cached, queryCount: 0 };
|
||||
}
|
||||
|
||||
try {
|
||||
const tableProfile = await queryTableProfile({
|
||||
const tableResults = await mapWithConcurrency<KtxEnrichedTable, TableProfileResult>(
|
||||
enabledTables,
|
||||
input.profileConcurrency ?? 4,
|
||||
async (table) => {
|
||||
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
|
||||
const profileSampleRows = input.profileSampleRows ?? 10000;
|
||||
const cacheKey = tableProfileCacheKey({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
table,
|
||||
executor: input.executor,
|
||||
ctx: input.ctx,
|
||||
table: table.ref,
|
||||
sampleValuesPerColumn,
|
||||
profileSampleRows,
|
||||
});
|
||||
input.cache?.tableProfiles.set(cacheKey, {
|
||||
table: tableProfile.table,
|
||||
columns: tableProfile.columns,
|
||||
warnings: [],
|
||||
});
|
||||
return { tableProfile };
|
||||
} catch (error) {
|
||||
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
|
||||
const cachedFailure = {
|
||||
table: { table: table.ref, rowCount: 0 },
|
||||
columns: {},
|
||||
warnings: [failureWarning],
|
||||
};
|
||||
input.cache?.tableProfiles.set(cacheKey, cachedFailure);
|
||||
return { cached: cachedFailure, queryCount: 0 };
|
||||
}
|
||||
});
|
||||
const cached = input.cache?.tableProfiles.get(cacheKey);
|
||||
if (cached) {
|
||||
return { cached, queryCount: 0 };
|
||||
}
|
||||
|
||||
try {
|
||||
const tableProfile = await queryTableProfile({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
table,
|
||||
executor,
|
||||
ctx: input.ctx,
|
||||
sampleValuesPerColumn,
|
||||
profileSampleRows,
|
||||
});
|
||||
input.cache?.tableProfiles.set(cacheKey, {
|
||||
table: tableProfile.table,
|
||||
columns: tableProfile.columns,
|
||||
warnings: [],
|
||||
});
|
||||
return { tableProfile };
|
||||
} catch (error) {
|
||||
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
|
||||
const cachedFailure = {
|
||||
table: { table: table.ref, rowCount: 0 },
|
||||
columns: {},
|
||||
warnings: [failureWarning],
|
||||
};
|
||||
input.cache?.tableProfiles.set(cacheKey, cachedFailure);
|
||||
return { cached: cachedFailure, queryCount: 0 };
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
for (const result of tableResults) {
|
||||
if ('tableProfile' in result) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue