mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
refactor(cli): use dialect SQL in relationship workflows
This commit is contained in:
parent
47f3206979
commit
33356d38da
7 changed files with 110 additions and 224 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import Database from 'better-sqlite3';
|
||||
import { join } from 'node:path';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { getDialectForDriver } from '../connections/dialects.js';
|
||||
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
|
||||
import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js';
|
||||
import { discoverKtxCompositeRelationships } from './relationship-composite-candidates.js';
|
||||
|
|
@ -41,7 +42,7 @@ describe('composite relationship discovery detector', () => {
|
|||
const executor = new TestSqliteExecutor(fixture.dataPath ?? '');
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: snapshot.connectionId,
|
||||
driver: snapshot.driver,
|
||||
dialect: getDialectForDriver(snapshot.driver),
|
||||
schema,
|
||||
executor,
|
||||
ctx: { runId: 'test:composite-profile' },
|
||||
|
|
@ -49,7 +50,7 @@ describe('composite relationship discovery detector', () => {
|
|||
|
||||
const result = await discoverKtxCompositeRelationships({
|
||||
connectionId: snapshot.connectionId,
|
||||
driver: snapshot.driver,
|
||||
dialect: getDialectForDriver(snapshot.driver),
|
||||
schema,
|
||||
profiles,
|
||||
executor,
|
||||
|
|
|
|||
|
|
@ -1,11 +1,10 @@
|
|||
import type { KtxDialect } from '../connections/dialects.js';
|
||||
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable, KtxRelationshipType } from './enrichment-types.js';
|
||||
import {
|
||||
formatKtxRelationshipTableRef,
|
||||
quoteKtxRelationshipIdentifier,
|
||||
type KtxRelationshipProfileArtifact,
|
||||
type KtxRelationshipReadOnlyExecutor,
|
||||
} from './relationship-profiling.js';
|
||||
import type { KtxConnectionDriver, KtxQueryResult, KtxScanContext, KtxTableRef } from './types.js';
|
||||
import type { KtxQueryResult, KtxScanContext, KtxTableRef } from './types.js';
|
||||
|
||||
type KtxCompositeRelationshipStatus = 'accepted' | 'review' | 'rejected';
|
||||
|
||||
|
|
@ -57,7 +56,7 @@ export interface KtxCompositeRelationshipCandidate {
|
|||
|
||||
export interface DiscoverKtxCompositeRelationshipsInput {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
schema: KtxEnrichedSchema;
|
||||
profiles: KtxRelationshipProfileArtifact;
|
||||
executor: KtxRelationshipReadOnlyExecutor | null;
|
||||
|
|
@ -224,28 +223,16 @@ function numberAt(result: KtxQueryResult, header: string): number {
|
|||
return 0;
|
||||
}
|
||||
|
||||
function topSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return ` TOP (${Math.max(1, Math.floor(limit))})`;
|
||||
}
|
||||
return '';
|
||||
function sqlSuffix(fragment: string): string {
|
||||
return fragment ? ` ${fragment}` : '';
|
||||
}
|
||||
|
||||
function limitSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return '';
|
||||
}
|
||||
return ` LIMIT ${Math.max(1, Math.floor(limit))}`;
|
||||
function aliasedTupleSelect(dialect: KtxDialect, columns: readonly string[]): string {
|
||||
return columns.map((column, index) => `${dialect.quoteIdentifier(column)} AS c${index}`).join(', ');
|
||||
}
|
||||
|
||||
function aliasedTupleSelect(driver: KtxConnectionDriver, columns: readonly string[]): string {
|
||||
return columns
|
||||
.map((column, index) => `${quoteKtxRelationshipIdentifier(driver, column)} AS c${index}`)
|
||||
.join(', ');
|
||||
}
|
||||
|
||||
function nonNullPredicate(driver: KtxConnectionDriver, columns: readonly string[]): string {
|
||||
return columns.map((column) => `${quoteKtxRelationshipIdentifier(driver, column)} IS NOT NULL`).join(' AND ');
|
||||
function nonNullPredicate(dialect: KtxDialect, columns: readonly string[]): string {
|
||||
return columns.map((column) => `${dialect.quoteIdentifier(column)} IS NOT NULL`).join(' AND ');
|
||||
}
|
||||
|
||||
function tupleEquality(columns: number): string {
|
||||
|
|
@ -255,39 +242,39 @@ function tupleEquality(columns: number): string {
|
|||
}
|
||||
|
||||
function buildTupleDistinctSql(input: {
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
table: KtxTableRef;
|
||||
columns: readonly string[];
|
||||
}): string {
|
||||
const tableSql = formatKtxRelationshipTableRef(input.driver, input.table);
|
||||
const tableSql = input.dialect.formatTableName(input.table);
|
||||
return [
|
||||
'WITH tuple_values AS (',
|
||||
`SELECT DISTINCT ${aliasedTupleSelect(input.driver, input.columns)} FROM ${tableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.driver, input.columns)}`,
|
||||
`SELECT DISTINCT ${aliasedTupleSelect(input.dialect, input.columns)} FROM ${tableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.dialect, input.columns)}`,
|
||||
')',
|
||||
'SELECT COUNT(*) AS distinct_count FROM tuple_values',
|
||||
].join(' ');
|
||||
}
|
||||
|
||||
function buildCompositeCoverageSql(input: {
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
childTable: KtxTableRef;
|
||||
childColumns: readonly string[];
|
||||
parentTable: KtxTableRef;
|
||||
parentColumns: readonly string[];
|
||||
maxDistinctSourceValues: number;
|
||||
}): string {
|
||||
const childTableSql = formatKtxRelationshipTableRef(input.driver, input.childTable);
|
||||
const parentTableSql = formatKtxRelationshipTableRef(input.driver, input.parentTable);
|
||||
const top = topSql(input.driver, input.maxDistinctSourceValues);
|
||||
const limit = limitSql(input.driver, input.maxDistinctSourceValues);
|
||||
const childTableSql = input.dialect.formatTableName(input.childTable);
|
||||
const parentTableSql = input.dialect.formatTableName(input.parentTable);
|
||||
const top = input.dialect.getTopClause(input.maxDistinctSourceValues);
|
||||
const limit = sqlSuffix(input.dialect.getLimitOffsetClause(input.maxDistinctSourceValues));
|
||||
return [
|
||||
'WITH child_values AS (',
|
||||
`SELECT DISTINCT${top} ${aliasedTupleSelect(input.driver, input.childColumns)} FROM ${childTableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.driver, input.childColumns)}${limit}`,
|
||||
`SELECT DISTINCT${top ? ` ${top}` : ''} ${aliasedTupleSelect(input.dialect, input.childColumns)} FROM ${childTableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.dialect, input.childColumns)}${limit}`,
|
||||
'), parent_values AS (',
|
||||
`SELECT DISTINCT ${aliasedTupleSelect(input.driver, input.parentColumns)} FROM ${parentTableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.driver, input.parentColumns)}`,
|
||||
`SELECT DISTINCT ${aliasedTupleSelect(input.dialect, input.parentColumns)} FROM ${parentTableSql}`,
|
||||
`WHERE ${nonNullPredicate(input.dialect, input.parentColumns)}`,
|
||||
')',
|
||||
'SELECT',
|
||||
'(SELECT COUNT(*) FROM child_values) AS child_distinct,',
|
||||
|
|
@ -335,7 +322,7 @@ function hasAcceptedSubset(
|
|||
|
||||
async function detectCompositePrimaryKeys(input: {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
table: KtxEnrichedTable;
|
||||
profiles: KtxRelationshipProfileArtifact;
|
||||
executor: KtxRelationshipReadOnlyExecutor;
|
||||
|
|
@ -379,7 +366,7 @@ async function detectCompositePrimaryKeys(input: {
|
|||
{
|
||||
connectionId: input.connectionId,
|
||||
sql: buildTupleDistinctSql({
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
table: input.table.ref,
|
||||
columns: columnNames,
|
||||
}),
|
||||
|
|
@ -439,7 +426,7 @@ function compatibleTuple(sourceColumns: readonly KtxEnrichedColumn[], targetColu
|
|||
|
||||
async function validateCompositeRelationship(input: {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
sourceTable: KtxEnrichedTable;
|
||||
sourceColumns: readonly KtxEnrichedColumn[];
|
||||
targetKey: KtxCompositePrimaryKeyCandidate;
|
||||
|
|
@ -454,7 +441,7 @@ async function validateCompositeRelationship(input: {
|
|||
{
|
||||
connectionId: input.connectionId,
|
||||
sql: buildCompositeCoverageSql({
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
childTable: input.sourceTable.ref,
|
||||
childColumns: input.sourceColumns.map((column) => column.name),
|
||||
parentTable: input.targetTable.ref,
|
||||
|
|
@ -552,7 +539,7 @@ export async function discoverKtxCompositeRelationships(
|
|||
for (const table of tables) {
|
||||
const result = await detectCompositePrimaryKeys({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
table,
|
||||
profiles: input.profiles,
|
||||
executor: input.executor,
|
||||
|
|
@ -595,7 +582,7 @@ export async function discoverKtxCompositeRelationships(
|
|||
|
||||
const result = await validateCompositeRelationship({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
sourceTable,
|
||||
sourceColumns,
|
||||
targetKey,
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ async function detectCompositeRelationships(input: {
|
|||
try {
|
||||
const compositeDetection = await discoverKtxCompositeRelationships({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.dialect.type,
|
||||
dialect: input.dialect,
|
||||
schema: input.schema,
|
||||
profiles: input.profile,
|
||||
executor: input.executor,
|
||||
|
|
@ -223,7 +223,7 @@ export async function discoverKtxRelationships(
|
|||
const profileCache = createKtxRelationshipProfileCache();
|
||||
const profile = await profileKtxRelationshipSchema({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.dialect.type,
|
||||
dialect: input.dialect,
|
||||
schema: input.schema,
|
||||
executor,
|
||||
ctx: input.context,
|
||||
|
|
@ -256,7 +256,7 @@ export async function discoverKtxRelationships(
|
|||
warnings.push(...llmProposalResult.warnings);
|
||||
const validated = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.dialect.type,
|
||||
dialect: input.dialect,
|
||||
candidates,
|
||||
profiles: profile,
|
||||
executor,
|
||||
|
|
|
|||
|
|
@ -2,15 +2,11 @@ import { readFile } from 'node:fs/promises';
|
|||
import { join } from 'node:path';
|
||||
import Database from 'better-sqlite3';
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
import { getDialectForDriver } from '../connections/dialects.js';
|
||||
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
|
||||
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
|
||||
import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js';
|
||||
import {
|
||||
createKtxRelationshipProfileCache,
|
||||
formatKtxRelationshipTableRef,
|
||||
profileKtxRelationshipSchema,
|
||||
quoteKtxRelationshipIdentifier,
|
||||
} from './relationship-profiling.js';
|
||||
import { createKtxRelationshipProfileCache, profileKtxRelationshipSchema } from './relationship-profiling.js';
|
||||
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxScanContext } from './types.js';
|
||||
|
||||
class InMemorySqliteExecutor {
|
||||
|
|
@ -112,16 +108,6 @@ describe('relationship profiling', () => {
|
|||
expect(source).toMatch(/UNION ALL/);
|
||||
});
|
||||
|
||||
it('quotes identifiers and formats table refs for supported local SQL drivers', () => {
|
||||
expect(quoteKtxRelationshipIdentifier('sqlite', 'odd"name')).toBe('"odd""name"');
|
||||
expect(quoteKtxRelationshipIdentifier('mysql', 'odd`name')).toBe('`odd``name`');
|
||||
expect(quoteKtxRelationshipIdentifier('sqlserver', 'odd]name')).toBe('[odd]]name]');
|
||||
expect(formatKtxRelationshipTableRef('sqlite', { catalog: null, db: null, name: 'accounts' })).toBe('"accounts"');
|
||||
expect(formatKtxRelationshipTableRef('postgres', { catalog: null, db: 'analytics', name: 'accounts' })).toBe(
|
||||
'"analytics"."accounts"',
|
||||
);
|
||||
});
|
||||
|
||||
it('profiles row count, null rate, uniqueness, sample values, and text lengths', async () => {
|
||||
executor = new InMemorySqliteExecutor();
|
||||
executor.db.exec(`
|
||||
|
|
@ -135,7 +121,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
const result = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: schema([
|
||||
table('accounts', [
|
||||
column('accounts', 'id', { primaryKey: false, nullable: false }),
|
||||
|
|
@ -197,7 +183,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
const result = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: schema([
|
||||
table('accounts', [
|
||||
column('accounts', 'id', { nullable: false }),
|
||||
|
|
@ -240,7 +226,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: schema([
|
||||
table('accounts', [
|
||||
column('accounts', 'id', { nullable: false }),
|
||||
|
|
@ -291,7 +277,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
const first = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: relationshipSchema,
|
||||
executor,
|
||||
ctx: { runId: 'profile-cache-run' },
|
||||
|
|
@ -299,7 +285,7 @@ describe('relationship profiling', () => {
|
|||
});
|
||||
const second = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: relationshipSchema,
|
||||
executor,
|
||||
ctx: { runId: 'profile-cache-run' },
|
||||
|
|
@ -307,7 +293,7 @@ describe('relationship profiling', () => {
|
|||
});
|
||||
const third = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: relationshipSchema,
|
||||
executor,
|
||||
ctx: { runId: 'profile-cache-fresh-run' },
|
||||
|
|
@ -336,7 +322,7 @@ describe('relationship profiling', () => {
|
|||
try {
|
||||
const result = await profileKtxRelationshipSchema({
|
||||
connectionId: fixture.snapshot.connectionId,
|
||||
driver: fixture.snapshot.driver,
|
||||
dialect: getDialectForDriver(fixture.snapshot.driver),
|
||||
schema: snapshotToKtxEnrichedSchema(maskedSnapshot, new Map()),
|
||||
executor: scaleExecutor,
|
||||
ctx: { runId: 'scale-stress-profile-query-count' },
|
||||
|
|
@ -381,7 +367,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']),
|
||||
executor,
|
||||
ctx: { runId: 'profile-concurrency' },
|
||||
|
|
@ -417,7 +403,7 @@ describe('relationship profiling', () => {
|
|||
|
||||
const result = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: schemaWithTables(['accounts', 'orders']),
|
||||
executor,
|
||||
ctx: { runId: 'profile-error-isolated' },
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import type { KtxDialect } from '../connections/dialects.js';
|
||||
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
|
||||
import { mapWithConcurrency } from './relationship-validation.js';
|
||||
import type {
|
||||
|
|
@ -55,7 +56,7 @@ export interface KtxRelationshipProfileCache {
|
|||
|
||||
export interface ProfileKtxRelationshipSchemaInput {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
schema: KtxEnrichedSchema;
|
||||
executor: KtxRelationshipReadOnlyExecutor | null;
|
||||
ctx: KtxScanContext;
|
||||
|
|
@ -71,75 +72,6 @@ export function createKtxRelationshipProfileCache(): KtxRelationshipProfileCache
|
|||
|
||||
const SAMPLE_VALUE_DELIMITER = '\u001f';
|
||||
|
||||
type QuoteStyle = 'double' | 'backtick' | 'bracket';
|
||||
|
||||
function quoteStyle(driver: KtxConnectionDriver): QuoteStyle {
|
||||
if (driver === 'mysql' || driver === 'clickhouse') {
|
||||
return 'backtick';
|
||||
}
|
||||
if (driver === 'sqlserver') {
|
||||
return 'bracket';
|
||||
}
|
||||
return 'double';
|
||||
}
|
||||
|
||||
export function quoteKtxRelationshipIdentifier(driver: KtxConnectionDriver, identifier: string): string {
|
||||
switch (quoteStyle(driver)) {
|
||||
case 'backtick':
|
||||
return `\`${identifier.replace(/`/g, '``')}\``;
|
||||
case 'bracket':
|
||||
return `[${identifier.replace(/\]/g, ']]')}]`;
|
||||
case 'double':
|
||||
return `"${identifier.replace(/"/g, '""')}"`;
|
||||
}
|
||||
}
|
||||
|
||||
export function formatKtxRelationshipTableRef(driver: KtxConnectionDriver, table: KtxTableRef): string {
|
||||
const parts =
|
||||
driver === 'sqlite'
|
||||
? [table.name]
|
||||
: [table.catalog, table.db, table.name].filter((value): value is string => Boolean(value));
|
||||
return parts.map((part) => quoteKtxRelationshipIdentifier(driver, part)).join('.');
|
||||
}
|
||||
|
||||
function textLengthExpression(driver: KtxConnectionDriver, columnSql: string): string {
|
||||
if (driver === 'mysql') {
|
||||
return `CHAR_LENGTH(CAST(${columnSql} AS CHAR))`;
|
||||
}
|
||||
if (driver === 'sqlserver') {
|
||||
return `LEN(CAST(${columnSql} AS NVARCHAR(MAX)))`;
|
||||
}
|
||||
if (driver === 'bigquery') {
|
||||
return `LENGTH(CAST(${columnSql} AS STRING))`;
|
||||
}
|
||||
if (driver === 'clickhouse') {
|
||||
return `length(toString(${columnSql}))`;
|
||||
}
|
||||
return `LENGTH(CAST(${columnSql} AS TEXT))`;
|
||||
}
|
||||
|
||||
function limitSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return '';
|
||||
}
|
||||
return ` LIMIT ${Math.max(1, Math.floor(limit))}`;
|
||||
}
|
||||
|
||||
function topSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return ` TOP (${Math.max(1, Math.floor(limit))})`;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
function sampledTableSql(driver: KtxConnectionDriver, tableSql: string, limit: number): string {
|
||||
const safeLimit = Math.max(1, Math.floor(limit));
|
||||
if (driver === 'sqlserver') {
|
||||
return `(SELECT TOP (${safeLimit}) * FROM ${tableSql}) AS relationship_profile_sample`;
|
||||
}
|
||||
return `(SELECT * FROM ${tableSql}${limitSql(driver, safeLimit)}) AS relationship_profile_sample`;
|
||||
}
|
||||
|
||||
function firstRow(result: KtxQueryResult): unknown[] {
|
||||
return result.rows[0] ?? [];
|
||||
}
|
||||
|
|
@ -191,7 +123,7 @@ function columnKey(table: KtxEnrichedTable, column: KtxEnrichedColumn): string {
|
|||
|
||||
function tableProfileCacheKey(input: {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
ctx: KtxScanContext;
|
||||
table: KtxTableRef;
|
||||
sampleValuesPerColumn: number;
|
||||
|
|
@ -200,7 +132,7 @@ function tableProfileCacheKey(input: {
|
|||
return [
|
||||
input.ctx.runId,
|
||||
input.connectionId,
|
||||
input.driver,
|
||||
input.dialect.type,
|
||||
input.table.catalog ?? '',
|
||||
input.table.db ?? '',
|
||||
input.table.name,
|
||||
|
|
@ -213,57 +145,47 @@ function sqlStringLiteral(value: string): string {
|
|||
return `'${value.replace(/'/g, "''")}'`;
|
||||
}
|
||||
|
||||
function sampleAggregateSql(driver: KtxConnectionDriver, innerSql: string): string {
|
||||
if (driver === 'postgres') {
|
||||
return `(SELECT STRING_AGG(CAST(value AS TEXT), CHR(31)) FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
function sqlSuffix(fragment: string): string {
|
||||
return fragment ? ` ${fragment}` : '';
|
||||
}
|
||||
|
||||
function sampledTableSql(dialect: KtxDialect, tableSql: string, limit: number): string {
|
||||
const top = dialect.getTopClause(limit);
|
||||
if (top) {
|
||||
return `(SELECT ${top} * FROM ${tableSql}) AS relationship_profile_sample`;
|
||||
}
|
||||
if (driver === 'bigquery') {
|
||||
return `(SELECT STRING_AGG(CAST(value AS STRING), '\\u001F') FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
}
|
||||
if (driver === 'mysql') {
|
||||
return `(SELECT GROUP_CONCAT(CAST(value AS CHAR) SEPARATOR CHAR(31)) FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
}
|
||||
if (driver === 'sqlserver') {
|
||||
return `(SELECT STRING_AGG(CAST(value AS NVARCHAR(MAX)), CHAR(31)) FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
}
|
||||
if (driver === 'clickhouse') {
|
||||
return `(SELECT arrayStringConcat(groupArray(toString(value)), '\\x1F') FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
}
|
||||
if (driver === 'snowflake') {
|
||||
return `(SELECT LISTAGG(CAST(value AS VARCHAR), '\\x1f') FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
}
|
||||
return `(SELECT GROUP_CONCAT(CAST(value AS TEXT), char(31)) FROM (${innerSql}) AS relationship_profile_values)`;
|
||||
return `(SELECT * FROM ${tableSql}${sqlSuffix(dialect.getLimitOffsetClause(limit))}) AS relationship_profile_sample`;
|
||||
}
|
||||
|
||||
function sampleValuesSql(input: {
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
tableSql: string;
|
||||
columnSql: string;
|
||||
limit: number;
|
||||
}): string {
|
||||
const top = input.dialect.getTopClause(input.limit);
|
||||
return [
|
||||
`SELECT${topSql(input.driver, input.limit)} ${input.columnSql} AS value`,
|
||||
`SELECT${top ? ` ${top}` : ''} ${input.columnSql} AS value`,
|
||||
`FROM ${input.tableSql}`,
|
||||
`WHERE ${input.columnSql} IS NOT NULL`,
|
||||
`GROUP BY ${input.columnSql}`,
|
||||
`ORDER BY COUNT(*) DESC, ${input.columnSql} ASC`,
|
||||
limitSql(input.driver, input.limit),
|
||||
sqlSuffix(input.dialect.getLimitOffsetClause(input.limit)),
|
||||
].join(' ');
|
||||
}
|
||||
|
||||
function columnProfileSelectSql(input: {
|
||||
connectionDriver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
tableSql: string;
|
||||
profileTableSql: string;
|
||||
column: KtxEnrichedColumn;
|
||||
sampleValuesPerColumn: number;
|
||||
}): string {
|
||||
const columnSql = quoteKtxRelationshipIdentifier(input.connectionDriver, input.column.name);
|
||||
const textLengthSql = textLengthExpression(input.connectionDriver, columnSql);
|
||||
const samplesSql = sampleAggregateSql(
|
||||
input.connectionDriver,
|
||||
const columnSql = input.dialect.quoteIdentifier(input.column.name);
|
||||
const textLengthSql = input.dialect.textLengthExpression(columnSql);
|
||||
const samplesSql = input.dialect.getSampleValueAggregation(
|
||||
sampleValuesSql({
|
||||
driver: input.connectionDriver,
|
||||
dialect: input.dialect,
|
||||
tableSql: input.profileTableSql,
|
||||
columnSql,
|
||||
limit: input.sampleValuesPerColumn,
|
||||
|
|
@ -296,12 +218,12 @@ function splitSampleValues(value: unknown): string[] {
|
|||
|
||||
async function queryCount(input: {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
table: KtxTableRef;
|
||||
executor: KtxRelationshipReadOnlyExecutor;
|
||||
ctx: KtxScanContext;
|
||||
}): Promise<{ rowCount: number; queryCount: number }> {
|
||||
const tableSql = formatKtxRelationshipTableRef(input.driver, input.table);
|
||||
const tableSql = input.dialect.formatTableName(input.table);
|
||||
const result = await input.executor.executeReadOnly(
|
||||
{ connectionId: input.connectionId, sql: `SELECT COUNT(*) AS row_count FROM ${tableSql}`, maxRows: 1 },
|
||||
input.ctx,
|
||||
|
|
@ -311,7 +233,7 @@ async function queryCount(input: {
|
|||
|
||||
async function queryTableProfile(input: {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
table: KtxEnrichedTable;
|
||||
executor: KtxRelationshipReadOnlyExecutor;
|
||||
ctx: KtxScanContext;
|
||||
|
|
@ -325,7 +247,7 @@ async function queryTableProfile(input: {
|
|||
if (input.table.columns.length === 0) {
|
||||
const rowCount = await queryCount({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
table: input.table.ref,
|
||||
executor: input.executor,
|
||||
ctx: input.ctx,
|
||||
|
|
@ -337,12 +259,12 @@ async function queryTableProfile(input: {
|
|||
};
|
||||
}
|
||||
|
||||
const tableSql = formatKtxRelationshipTableRef(input.driver, input.table.ref);
|
||||
const profileTableSql = sampledTableSql(input.driver, tableSql, input.profileSampleRows);
|
||||
const tableSql = input.dialect.formatTableName(input.table.ref);
|
||||
const profileTableSql = sampledTableSql(input.dialect, tableSql, input.profileSampleRows);
|
||||
const sql = input.table.columns
|
||||
.map((column) =>
|
||||
columnProfileSelectSql({
|
||||
connectionDriver: input.driver,
|
||||
dialect: input.dialect,
|
||||
tableSql,
|
||||
profileTableSql,
|
||||
column,
|
||||
|
|
@ -401,7 +323,7 @@ export async function profileKtxRelationshipSchema(
|
|||
if (!input.executor) {
|
||||
return {
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
driver: input.dialect.type,
|
||||
sqlAvailable: false,
|
||||
queryCount: 0,
|
||||
tables: [],
|
||||
|
|
@ -425,7 +347,7 @@ export async function profileKtxRelationshipSchema(
|
|||
const profileSampleRows = input.profileSampleRows ?? 10000;
|
||||
const cacheKey = tableProfileCacheKey({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
ctx: input.ctx,
|
||||
table: table.ref,
|
||||
sampleValuesPerColumn,
|
||||
|
|
@ -439,7 +361,7 @@ export async function profileKtxRelationshipSchema(
|
|||
try {
|
||||
const tableProfile = await queryTableProfile({
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
dialect: input.dialect,
|
||||
table,
|
||||
executor,
|
||||
ctx: input.ctx,
|
||||
|
|
@ -481,7 +403,7 @@ export async function profileKtxRelationshipSchema(
|
|||
|
||||
return {
|
||||
connectionId: input.connectionId,
|
||||
driver: input.driver,
|
||||
driver: input.dialect.type,
|
||||
sqlAvailable: true,
|
||||
queryCount: queryTotal,
|
||||
tables,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import Database from 'better-sqlite3';
|
||||
import { afterEach, describe, expect, it } from 'vitest';
|
||||
import { getDialectForDriver } from '../connections/dialects.js';
|
||||
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
|
||||
import { generateKtxRelationshipDiscoveryCandidates } from './relationship-candidates.js';
|
||||
import type { KtxRelationshipProfileArtifact } from './relationship-profiling.js';
|
||||
|
|
@ -99,7 +100,7 @@ describe('relationship validation', () => {
|
|||
const testSchema = schema();
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'validate-test' },
|
||||
|
|
@ -110,7 +111,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const validated = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates,
|
||||
profiles,
|
||||
executor,
|
||||
|
|
@ -148,7 +149,7 @@ describe('relationship validation', () => {
|
|||
const testSchema = schema();
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'validate-test' },
|
||||
|
|
@ -159,7 +160,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const validated = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates,
|
||||
profiles,
|
||||
executor,
|
||||
|
|
@ -198,7 +199,7 @@ describe('relationship validation', () => {
|
|||
const testSchema = schema();
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'validate-budget-profile' },
|
||||
|
|
@ -211,7 +212,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const validated = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates,
|
||||
profiles,
|
||||
executor,
|
||||
|
|
@ -253,7 +254,7 @@ describe('relationship validation', () => {
|
|||
]);
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'validate-zero-budget-profile' },
|
||||
|
|
@ -263,7 +264,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const validated = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates,
|
||||
profiles,
|
||||
executor,
|
||||
|
|
@ -300,7 +301,7 @@ describe('relationship validation', () => {
|
|||
]);
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'llm-rejected-validation' },
|
||||
|
|
@ -329,7 +330,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const [validated] = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates: [llmCandidate],
|
||||
profiles,
|
||||
executor,
|
||||
|
|
@ -374,7 +375,7 @@ describe('relationship validation', () => {
|
|||
]);
|
||||
const profiles = await profileKtxRelationshipSchema({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
schema: testSchema,
|
||||
executor,
|
||||
ctx: { runId: 'validation-concurrency-profile' },
|
||||
|
|
@ -383,7 +384,7 @@ describe('relationship validation', () => {
|
|||
|
||||
await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates,
|
||||
profiles,
|
||||
executor: throttled,
|
||||
|
|
@ -475,7 +476,7 @@ describe('relationship validation', () => {
|
|||
|
||||
const [validated] = await validateKtxRelationshipDiscoveryCandidates({
|
||||
connectionId: 'warehouse',
|
||||
driver: 'sqlite',
|
||||
dialect: getDialectForDriver('sqlite'),
|
||||
candidates: [candidate],
|
||||
profiles,
|
||||
executor,
|
||||
|
|
|
|||
|
|
@ -1,13 +1,12 @@
|
|||
import type { KtxDialect } from '../connections/dialects.js';
|
||||
import type { KtxRelationshipEndpoint } from './enrichment-types.js';
|
||||
import { applyKtxRelationshipValidationBudget, type KtxRelationshipValidationBudget } from './relationship-budget.js';
|
||||
import type { KtxRelationshipDiscoveryCandidate } from './relationship-candidates.js';
|
||||
import {
|
||||
formatKtxRelationshipTableRef,
|
||||
type KtxRelationshipProfileArtifact,
|
||||
type KtxRelationshipReadOnlyExecutor,
|
||||
quoteKtxRelationshipIdentifier,
|
||||
} from './relationship-profiling.js';
|
||||
import type { KtxConnectionDriver, KtxQueryResult, KtxScanContext } from './types.js';
|
||||
import type { KtxQueryResult, KtxScanContext, KtxTableRef } from './types.js';
|
||||
|
||||
type KtxValidatedRelationshipStatus = 'accepted' | 'review' | 'rejected';
|
||||
|
||||
|
|
@ -45,7 +44,7 @@ export interface KtxValidatedRelationshipDiscoveryCandidate
|
|||
|
||||
export interface ValidateKtxRelationshipDiscoveryCandidatesInput {
|
||||
connectionId: string;
|
||||
driver: KtxConnectionDriver;
|
||||
dialect: KtxDialect;
|
||||
candidates: readonly KtxRelationshipDiscoveryCandidate[];
|
||||
profiles: KtxRelationshipProfileArtifact;
|
||||
executor: KtxRelationshipReadOnlyExecutor | null;
|
||||
|
|
@ -104,38 +103,28 @@ function numberAt(result: KtxQueryResult, header: string): number {
|
|||
return 0;
|
||||
}
|
||||
|
||||
function limitSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return '';
|
||||
}
|
||||
return ` LIMIT ${Math.max(1, Math.floor(limit))}`;
|
||||
}
|
||||
|
||||
function topSql(driver: KtxConnectionDriver, limit: number): string {
|
||||
if (driver === 'sqlserver') {
|
||||
return ` TOP (${Math.max(1, Math.floor(limit))})`;
|
||||
}
|
||||
return '';
|
||||
function sqlSuffix(fragment: string): string {
|
||||
return fragment ? ` ${fragment}` : '';
|
||||
}
|
||||
|
||||
function buildCoverageSql(input: {
|
||||
driver: KtxConnectionDriver;
|
||||
childTable: string;
|
||||
dialect: KtxDialect;
|
||||
childTable: KtxTableRef;
|
||||
childColumn: string;
|
||||
parentTable: string;
|
||||
parentTable: KtxTableRef;
|
||||
parentColumn: string;
|
||||
maxDistinctSourceValues: number;
|
||||
}): string {
|
||||
const childTable = formatKtxRelationshipTableRef(input.driver, { catalog: null, db: null, name: input.childTable });
|
||||
const parentTable = formatKtxRelationshipTableRef(input.driver, { catalog: null, db: null, name: input.parentTable });
|
||||
const childColumn = quoteKtxRelationshipIdentifier(input.driver, input.childColumn);
|
||||
const parentColumn = quoteKtxRelationshipIdentifier(input.driver, input.parentColumn);
|
||||
const limit = limitSql(input.driver, input.maxDistinctSourceValues);
|
||||
const top = topSql(input.driver, input.maxDistinctSourceValues);
|
||||
const childTable = input.dialect.formatTableName(input.childTable);
|
||||
const parentTable = input.dialect.formatTableName(input.parentTable);
|
||||
const childColumn = input.dialect.quoteIdentifier(input.childColumn);
|
||||
const parentColumn = input.dialect.quoteIdentifier(input.parentColumn);
|
||||
const limit = sqlSuffix(input.dialect.getLimitOffsetClause(input.maxDistinctSourceValues));
|
||||
const top = input.dialect.getTopClause(input.maxDistinctSourceValues);
|
||||
|
||||
return [
|
||||
'WITH child_values AS (',
|
||||
`SELECT DISTINCT${top} ${childColumn} AS value FROM ${childTable} WHERE ${childColumn} IS NOT NULL${limit}`,
|
||||
`SELECT DISTINCT${top ? ` ${top}` : ''} ${childColumn} AS value FROM ${childTable} WHERE ${childColumn} IS NOT NULL${limit}`,
|
||||
'), parent_values AS (',
|
||||
`SELECT DISTINCT ${parentColumn} AS value FROM ${parentTable} WHERE ${parentColumn} IS NOT NULL`,
|
||||
')',
|
||||
|
|
@ -271,10 +260,10 @@ export async function validateKtxRelationshipDiscoveryCandidates(
|
|||
{
|
||||
connectionId: input.connectionId,
|
||||
sql: buildCoverageSql({
|
||||
driver: input.driver,
|
||||
childTable: candidate.from.table.name,
|
||||
dialect: input.dialect,
|
||||
childTable: candidate.from.table,
|
||||
childColumn: sourceColumn,
|
||||
parentTable: candidate.to.table.name,
|
||||
parentTable: candidate.to.table,
|
||||
parentColumn: targetColumn,
|
||||
maxDistinctSourceValues: settings.maxDistinctSourceValues,
|
||||
}),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue