mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(context): read warehouse scan catalog
This commit is contained in:
parent
8488261c00
commit
f165584fd8
2 changed files with 539 additions and 0 deletions
|
|
@ -0,0 +1,169 @@
|
|||
import { mkdtemp, rm } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
|
||||
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
|
||||
|
||||
describe('WarehouseCatalogService', () => {
|
||||
let tempDir: string;
|
||||
let project: KtxLocalProject;
|
||||
|
||||
beforeEach(async () => {
|
||||
tempDir = await mkdtemp(join(tmpdir(), 'ktx-warehouse-catalog-'));
|
||||
project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' });
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
|
||||
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
|
||||
await project.fileStore.writeFile(
|
||||
`${root}/connection.json`,
|
||||
JSON.stringify({ connectionId: connectionName, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'seed connection',
|
||||
);
|
||||
await project.fileStore.writeFile(
|
||||
`${root}/tables/orders.json`,
|
||||
JSON.stringify(
|
||||
{
|
||||
catalog: null,
|
||||
db: driver === 'sqlite' ? null : 'public',
|
||||
name: 'orders',
|
||||
kind: 'table',
|
||||
comment: 'Customer orders',
|
||||
estimatedRows: 12,
|
||||
columns: [
|
||||
{
|
||||
name: 'id',
|
||||
nativeType: 'integer',
|
||||
normalizedType: 'integer',
|
||||
dimensionType: 'number',
|
||||
nullable: false,
|
||||
primaryKey: true,
|
||||
comment: 'Order id',
|
||||
},
|
||||
{
|
||||
name: 'status',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'text',
|
||||
dimensionType: 'string',
|
||||
nullable: false,
|
||||
primaryKey: false,
|
||||
comment: 'Order status',
|
||||
},
|
||||
],
|
||||
foreignKeys: [],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'seed orders',
|
||||
);
|
||||
await project.fileStore.writeFile(
|
||||
`${root}/enrichment/relationship-profile.json`,
|
||||
JSON.stringify(
|
||||
{
|
||||
connectionId: connectionName,
|
||||
driver,
|
||||
sqlAvailable: true,
|
||||
queryCount: 3,
|
||||
tables: [{ table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' }, rowCount: 12 }],
|
||||
columns: {
|
||||
'orders.status': {
|
||||
table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' },
|
||||
column: 'status',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'text',
|
||||
rowCount: 12,
|
||||
nullCount: 0,
|
||||
distinctCount: 2,
|
||||
uniquenessRatio: 0.1667,
|
||||
nullRate: 0,
|
||||
sampleValues: ['paid', 'refunded'],
|
||||
minTextLength: 4,
|
||||
maxTextLength: 8,
|
||||
},
|
||||
},
|
||||
warnings: [],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'seed profile',
|
||||
);
|
||||
}
|
||||
|
||||
it('finds the latest sync and merges table schema with relationship profile values', async () => {
|
||||
await seedLiveDatabaseScan('warehouse', 'sync-1');
|
||||
await seedLiveDatabaseScan('warehouse', 'sync-2');
|
||||
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
|
||||
|
||||
await expect(catalog.getLatestSyncId('warehouse')).resolves.toBe('sync-2');
|
||||
const detail = await catalog.getTable({ connectionName: 'warehouse', catalog: null, db: 'public', name: 'orders' });
|
||||
|
||||
expect(detail).toMatchObject({
|
||||
connectionName: 'warehouse',
|
||||
display: 'public.orders',
|
||||
rowCount: 12,
|
||||
columns: [
|
||||
{ name: 'id', nativeType: 'integer', primaryKey: true },
|
||||
{ name: 'status', nativeType: 'text', sampleValues: ['paid', 'refunded'], distinctCount: 2 },
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('returns scanAvailable=false when no live-database scan exists', async () => {
|
||||
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
|
||||
await expect(catalog.getTable({ connectionName: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
|
||||
await expect(catalog.hasScan('missing')).resolves.toBe(false);
|
||||
});
|
||||
|
||||
it('resolves postgres display strings and returns closest candidates for missing tables', async () => {
|
||||
await seedLiveDatabaseScan();
|
||||
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
|
||||
|
||||
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
|
||||
resolved: { catalog: null, db: 'public', name: 'orders' },
|
||||
candidates: [],
|
||||
dialect: 'postgres',
|
||||
});
|
||||
await expect(catalog.resolveDisplay('warehouse', 'public.orderz')).resolves.toMatchObject({
|
||||
resolved: null,
|
||||
candidates: [{ name: 'orders' }],
|
||||
});
|
||||
});
|
||||
|
||||
it('treats two-part BigQuery identifiers as ambiguous instead of guessing', async () => {
|
||||
await seedLiveDatabaseScan('warehouse', 'sync-bigquery', 'bigquery');
|
||||
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
|
||||
|
||||
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
|
||||
resolved: null,
|
||||
dialect: 'bigquery',
|
||||
});
|
||||
});
|
||||
|
||||
it('searches table names, column names, comments, and descriptions', async () => {
|
||||
await seedLiveDatabaseScan();
|
||||
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
|
||||
|
||||
await expect(catalog.searchByName('warehouse', 'status', 10)).resolves.toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
kind: 'column',
|
||||
ref: expect.objectContaining({ db: 'public', name: 'orders', column: 'status' }),
|
||||
matchedOn: 'name',
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,370 @@
|
|||
import { getDialectForDriver } from '../../../connections/index.js';
|
||||
import type { KtxFileStorePort } from '../../../core/index.js';
|
||||
import type {
|
||||
KtxConnectionDriver,
|
||||
KtxSchemaColumn,
|
||||
KtxSchemaForeignKey,
|
||||
KtxSchemaTable,
|
||||
KtxTableRef,
|
||||
} from '../../../scan/types.js';
|
||||
|
||||
type CatalogDriver = KtxConnectionDriver | 'sqlite3';
|
||||
|
||||
export interface WarehouseCatalogServiceDeps {
|
||||
fileStore: KtxFileStorePort;
|
||||
}
|
||||
|
||||
export interface WarehouseColumnDetail extends KtxSchemaColumn {
|
||||
descriptions: Record<string, string>;
|
||||
rowCount: number | null;
|
||||
nullCount: number | null;
|
||||
distinctCount: number | null;
|
||||
nullRate: number | null;
|
||||
sampleValues: string[];
|
||||
}
|
||||
|
||||
export interface TableDetail {
|
||||
connectionName: string;
|
||||
catalog: string | null;
|
||||
db: string | null;
|
||||
name: string;
|
||||
display: string;
|
||||
kind: string;
|
||||
comment: string | null;
|
||||
description: string | null;
|
||||
rowCount: number | null;
|
||||
columns: WarehouseColumnDetail[];
|
||||
foreignKeys: KtxSchemaForeignKey[];
|
||||
}
|
||||
|
||||
export type RawSchemaHit =
|
||||
| { kind: 'table'; ref: KtxTableRef; display: string; matchedOn: 'name' | 'db' | 'comment' | 'description' }
|
||||
| { kind: 'column'; ref: KtxTableRef & { column: string }; display: string; matchedOn: 'name' | 'comment' | 'description' };
|
||||
|
||||
interface ConnectionArtifact {
|
||||
driver?: CatalogDriver;
|
||||
}
|
||||
|
||||
interface RelationshipProfileColumn {
|
||||
table?: KtxTableRef;
|
||||
column?: string;
|
||||
rowCount?: number;
|
||||
nullCount?: number;
|
||||
distinctCount?: number;
|
||||
nullRate?: number;
|
||||
sampleValues?: unknown[];
|
||||
}
|
||||
|
||||
interface RelationshipProfileArtifact {
|
||||
driver?: CatalogDriver;
|
||||
tables?: Array<{ table?: KtxTableRef; rowCount?: number }>;
|
||||
columns?: Record<string, RelationshipProfileColumn>;
|
||||
}
|
||||
|
||||
interface ConnectionCatalog {
|
||||
connectionName: string;
|
||||
syncId: string;
|
||||
driver: CatalogDriver;
|
||||
tables: KtxSchemaTable[];
|
||||
profile: RelationshipProfileArtifact | null;
|
||||
}
|
||||
|
||||
type TableWithDescriptions = KtxSchemaTable & {
|
||||
description?: string | null;
|
||||
descriptions?: Record<string, string>;
|
||||
columns: Array<KtxSchemaColumn & { description?: string | null; descriptions?: Record<string, string> }>;
|
||||
};
|
||||
|
||||
function normalize(value: string | null | undefined): string {
|
||||
return (value ?? '').toLowerCase();
|
||||
}
|
||||
|
||||
function refsEqual(left: KtxTableRef, right: KtxTableRef): boolean {
|
||||
return (
|
||||
normalize(left.catalog) === normalize(right.catalog) &&
|
||||
normalize(left.db) === normalize(right.db) &&
|
||||
normalize(left.name) === normalize(right.name)
|
||||
);
|
||||
}
|
||||
|
||||
function refKey(ref: KtxTableRef): string {
|
||||
return [ref.catalog, ref.db, ref.name].map((part) => normalize(part)).join('.');
|
||||
}
|
||||
|
||||
function columnKey(ref: KtxTableRef, column: string): string {
|
||||
return `${refKey(ref)}.${normalize(column)}`;
|
||||
}
|
||||
|
||||
function readJson<T>(content: string): T {
|
||||
return JSON.parse(content) as T;
|
||||
}
|
||||
|
||||
function cleanIdentifierPart(part: string): string {
|
||||
return part.trim().replace(/^["'`\[]|["'`\]]$/g, '');
|
||||
}
|
||||
|
||||
function splitDisplay(display: string): string[] {
|
||||
return display
|
||||
.trim()
|
||||
.split('.')
|
||||
.map(cleanIdentifierPart)
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function formatDisplay(driver: CatalogDriver, table: KtxTableRef): string {
|
||||
if (driver === 'sqlite' || driver === 'sqlite3') {
|
||||
return table.name;
|
||||
}
|
||||
return [table.catalog, table.db, table.name].filter((part): part is string => Boolean(part)).join('.');
|
||||
}
|
||||
|
||||
function parseDisplay(driver: CatalogDriver, display: string): KtxTableRef | null {
|
||||
const parts = splitDisplay(display);
|
||||
if (driver === 'sqlite' || driver === 'sqlite3') {
|
||||
return parts.length === 1 ? { catalog: null, db: null, name: parts[0]! } : null;
|
||||
}
|
||||
if (driver === 'bigquery' || driver === 'snowflake' || driver === 'sqlserver') {
|
||||
if (parts.length !== 3) {
|
||||
return null;
|
||||
}
|
||||
return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
|
||||
}
|
||||
if (parts.length === 2) {
|
||||
return { catalog: null, db: parts[0]!, name: parts[1]! };
|
||||
}
|
||||
if (parts.length === 3) {
|
||||
return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
|
||||
}
|
||||
return parts.length === 1 ? { catalog: null, db: null, name: parts[0]! } : null;
|
||||
}
|
||||
|
||||
function bestCandidates(tables: KtxSchemaTable[], display: string, limit = 5): KtxTableRef[] {
|
||||
const needle = normalize(splitDisplay(display).at(-1) ?? display);
|
||||
return tables
|
||||
.map((table) => {
|
||||
const name = normalize(table.name);
|
||||
let score = 0;
|
||||
if (name === needle) {
|
||||
score = 100;
|
||||
} else if (name.includes(needle) || needle.includes(name)) {
|
||||
score = 80;
|
||||
} else {
|
||||
const samePrefix = [...name].filter((char, index) => needle[index] === char).length;
|
||||
score = samePrefix / Math.max(name.length, needle.length, 1);
|
||||
}
|
||||
return { table, score };
|
||||
})
|
||||
.filter((entry) => entry.score > 0)
|
||||
.sort((left, right) => right.score - left.score || left.table.name.localeCompare(right.table.name))
|
||||
.slice(0, limit)
|
||||
.map(({ table }) => ({ catalog: table.catalog, db: table.db, name: table.name }));
|
||||
}
|
||||
|
||||
function firstDescription(descriptions: Record<string, string> | undefined): string | null {
|
||||
return Object.values(descriptions ?? {}).find((value) => value.trim().length > 0) ?? null;
|
||||
}
|
||||
|
||||
function matchedOnTable(table: TableWithDescriptions, query: string): RawSchemaHit['matchedOn'] | null {
|
||||
const q = normalize(query);
|
||||
if (!q) {
|
||||
return null;
|
||||
}
|
||||
if (normalize(table.name).includes(q)) {
|
||||
return 'name';
|
||||
}
|
||||
if (normalize(table.db).includes(q)) {
|
||||
return 'db';
|
||||
}
|
||||
if (normalize(table.comment).includes(q)) {
|
||||
return 'comment';
|
||||
}
|
||||
if (normalize(firstDescription(table.descriptions) ?? table.description).includes(q)) {
|
||||
return 'description';
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function matchedOnColumn(
|
||||
column: KtxSchemaColumn & { description?: string | null; descriptions?: Record<string, string> },
|
||||
query: string,
|
||||
): 'name' | 'comment' | 'description' | null {
|
||||
const q = normalize(query);
|
||||
if (!q) {
|
||||
return null;
|
||||
}
|
||||
if (normalize(column.name).includes(q)) {
|
||||
return 'name';
|
||||
}
|
||||
if (normalize(column.comment).includes(q)) {
|
||||
return 'comment';
|
||||
}
|
||||
if (normalize(firstDescription(column.descriptions) ?? column.description).includes(q)) {
|
||||
return 'description';
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export class WarehouseCatalogService {
|
||||
private readonly catalogs = new Map<string, Promise<ConnectionCatalog | null>>();
|
||||
|
||||
constructor(private readonly deps: WarehouseCatalogServiceDeps) {}
|
||||
|
||||
async hasScan(connectionName: string): Promise<boolean> {
|
||||
return (await this.loadCatalog(connectionName)) !== null;
|
||||
}
|
||||
|
||||
async getLatestSyncId(connectionName: string): Promise<string | null> {
|
||||
return (await this.loadCatalog(connectionName))?.syncId ?? null;
|
||||
}
|
||||
|
||||
async listTables(connectionName: string): Promise<KtxTableRef[]> {
|
||||
const catalog = await this.loadCatalog(connectionName);
|
||||
return catalog?.tables.map((table) => ({ catalog: table.catalog, db: table.db, name: table.name })) ?? [];
|
||||
}
|
||||
|
||||
async getTable(ref: { connectionName: string } & KtxTableRef): Promise<TableDetail | null> {
|
||||
const catalog = await this.loadCatalog(ref.connectionName);
|
||||
if (!catalog) {
|
||||
return null;
|
||||
}
|
||||
const table = catalog.tables.find((candidate) => refsEqual(candidate, ref)) as TableWithDescriptions | undefined;
|
||||
if (!table) {
|
||||
return null;
|
||||
}
|
||||
const profileTables = catalog.profile?.tables ?? [];
|
||||
const profileTable = profileTables.find((candidate) => candidate.table && refsEqual(candidate.table, table));
|
||||
const profileColumns = catalog.profile?.columns ?? {};
|
||||
|
||||
return {
|
||||
connectionName: ref.connectionName,
|
||||
catalog: table.catalog,
|
||||
db: table.db,
|
||||
name: table.name,
|
||||
display: formatDisplay(catalog.driver, table),
|
||||
kind: table.kind,
|
||||
comment: table.comment,
|
||||
description: table.description ?? firstDescription(table.descriptions),
|
||||
rowCount: profileTable?.rowCount ?? table.estimatedRows ?? null,
|
||||
columns: table.columns.map((column) => {
|
||||
const profileColumn =
|
||||
profileColumns[columnKey(table, column.name)] ??
|
||||
Object.entries(profileColumns).find(
|
||||
([key, value]) =>
|
||||
normalize(key) === `${normalize(table.name)}.${normalize(column.name)}` ||
|
||||
(value.table && refsEqual(value.table, table) && normalize(value.column) === normalize(column.name)),
|
||||
)?.[1];
|
||||
return {
|
||||
...column,
|
||||
descriptions: column.descriptions ?? {},
|
||||
rowCount: profileColumn?.rowCount ?? null,
|
||||
nullCount: profileColumn?.nullCount ?? null,
|
||||
distinctCount: profileColumn?.distinctCount ?? null,
|
||||
nullRate: profileColumn?.nullRate ?? null,
|
||||
sampleValues: (profileColumn?.sampleValues ?? []).map((value) => String(value)),
|
||||
};
|
||||
}),
|
||||
foreignKeys: table.foreignKeys,
|
||||
};
|
||||
}
|
||||
|
||||
async resolveDisplay(
|
||||
connectionName: string,
|
||||
display: string,
|
||||
): Promise<{
|
||||
resolved: KtxTableRef | null;
|
||||
candidates: KtxTableRef[];
|
||||
dialect: string;
|
||||
}> {
|
||||
const catalog = await this.loadCatalog(connectionName);
|
||||
if (!catalog) {
|
||||
return { resolved: null, candidates: [], dialect: 'unknown' };
|
||||
}
|
||||
const dialect = getDialectForDriver(catalog.driver).type;
|
||||
const parsed = parseDisplay(catalog.driver, display);
|
||||
if (!parsed) {
|
||||
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
|
||||
}
|
||||
const table = catalog.tables.find((candidate) => refsEqual(candidate, parsed));
|
||||
if (!table) {
|
||||
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
|
||||
}
|
||||
return { resolved: { catalog: table.catalog, db: table.db, name: table.name }, candidates: [], dialect };
|
||||
}
|
||||
|
||||
async searchByName(connectionName: string, query: string, limit: number): Promise<RawSchemaHit[]> {
|
||||
const catalog = await this.loadCatalog(connectionName);
|
||||
if (!catalog) {
|
||||
return [];
|
||||
}
|
||||
const hits: RawSchemaHit[] = [];
|
||||
for (const table of catalog.tables as TableWithDescriptions[]) {
|
||||
const tableMatch = matchedOnTable(table, query);
|
||||
if (tableMatch) {
|
||||
hits.push({
|
||||
kind: 'table',
|
||||
ref: { catalog: table.catalog, db: table.db, name: table.name },
|
||||
display: formatDisplay(catalog.driver, table),
|
||||
matchedOn: tableMatch,
|
||||
});
|
||||
}
|
||||
for (const column of table.columns) {
|
||||
const columnMatch = matchedOnColumn(column, query);
|
||||
if (!columnMatch) {
|
||||
continue;
|
||||
}
|
||||
hits.push({
|
||||
kind: 'column',
|
||||
ref: { catalog: table.catalog, db: table.db, name: table.name, column: column.name },
|
||||
display: `${formatDisplay(catalog.driver, table)}.${column.name}`,
|
||||
matchedOn: columnMatch,
|
||||
});
|
||||
}
|
||||
}
|
||||
return hits.slice(0, Math.max(0, limit));
|
||||
}
|
||||
|
||||
private loadCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
|
||||
const existing = this.catalogs.get(connectionName);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const pending = this.readCatalog(connectionName);
|
||||
this.catalogs.set(connectionName, pending);
|
||||
return pending;
|
||||
}
|
||||
|
||||
private async readCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
|
||||
const root = `raw-sources/${connectionName}/live-database`;
|
||||
const listed = await this.deps.fileStore.listFiles(root);
|
||||
const connectionFiles = listed.files.filter((file) => file.endsWith('/connection.json')).sort();
|
||||
const latestConnectionPath = connectionFiles.at(-1);
|
||||
if (!latestConnectionPath) {
|
||||
return null;
|
||||
}
|
||||
const latestRoot = latestConnectionPath.slice(0, -'/connection.json'.length);
|
||||
const syncId = latestRoot.split('/').at(-1) ?? '';
|
||||
const connection = readJson<ConnectionArtifact>((await this.deps.fileStore.readFile(latestConnectionPath)).content);
|
||||
const tablesListing = await this.deps.fileStore.listFiles(`${latestRoot}/tables`);
|
||||
const tables: KtxSchemaTable[] = [];
|
||||
for (const tablePath of tablesListing.files.filter((file) => file.endsWith('.json')).sort()) {
|
||||
tables.push(readJson<KtxSchemaTable>((await this.deps.fileStore.readFile(tablePath)).content));
|
||||
}
|
||||
|
||||
let profile: RelationshipProfileArtifact | null = null;
|
||||
try {
|
||||
profile = readJson<RelationshipProfileArtifact>(
|
||||
(await this.deps.fileStore.readFile(`${latestRoot}/enrichment/relationship-profile.json`)).content,
|
||||
);
|
||||
} catch {
|
||||
profile = null;
|
||||
}
|
||||
|
||||
return {
|
||||
connectionName,
|
||||
syncId,
|
||||
driver: connection.driver ?? profile?.driver ?? 'postgres',
|
||||
tables,
|
||||
profile,
|
||||
};
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue