mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(context): add dictionary search service
This commit is contained in:
parent
ff3a8c5777
commit
d0b8996456
3 changed files with 454 additions and 0 deletions
228
packages/context/src/sl/dictionary-search.test.ts
Normal file
228
packages/context/src/sl/dictionary-search.test.ts
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
import { mkdtemp, rm } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { initKtxProject, type KtxLocalProject } from '../project/index.js';
|
||||
import { createKtxDictionarySearchService } from './dictionary-search.js';
|
||||
|
||||
describe('createKtxDictionarySearchService', () => {
|
||||
let tempDir: string;
|
||||
let project: KtxLocalProject;
|
||||
|
||||
beforeEach(async () => {
|
||||
tempDir = await mkdtemp(join(tmpdir(), 'ktx-dictionary-search-'));
|
||||
project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' });
|
||||
project.config.connections.warehouse = { driver: 'postgres', url: 'env:DATABASE_URL' };
|
||||
project.config.connections.billing = { driver: 'postgres', url: 'env:BILLING_DATABASE_URL' };
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
async function seedProfile(input: {
|
||||
connectionId: string;
|
||||
syncId: string;
|
||||
columns: Record<string, unknown>;
|
||||
}): Promise<void> {
|
||||
await project.fileStore.writeFile(
|
||||
`raw-sources/${input.connectionId}/live-database/${input.syncId}/enrichment/relationship-profile.json`,
|
||||
`${JSON.stringify(
|
||||
{
|
||||
connectionId: input.connectionId,
|
||||
driver: 'postgres',
|
||||
sqlAvailable: true,
|
||||
queryCount: 4,
|
||||
tables: [],
|
||||
columns: input.columns,
|
||||
warnings: [],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
)}\n`,
|
||||
'ktx',
|
||||
'ktx@example.com',
|
||||
'Seed relationship profile',
|
||||
);
|
||||
}
|
||||
|
||||
it('returns matches and non-authoritative misses across configured connections', async () => {
|
||||
await seedProfile({
|
||||
connectionId: 'warehouse',
|
||||
syncId: 'sync-1',
|
||||
columns: {
|
||||
'orders.status': {
|
||||
table: { catalog: null, db: 'public', name: 'orders' },
|
||||
column: 'status',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'string',
|
||||
distinctCount: 3,
|
||||
sampleValues: ['paid', 'refunded', 'pending'],
|
||||
},
|
||||
},
|
||||
});
|
||||
await seedProfile({
|
||||
connectionId: 'billing',
|
||||
syncId: 'sync-2',
|
||||
columns: {
|
||||
'customers.name': {
|
||||
table: { catalog: null, db: 'public', name: 'customers' },
|
||||
column: 'name',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'string',
|
||||
distinctCount: 4,
|
||||
sampleValues: ['Acme Corp', 'Globex'],
|
||||
},
|
||||
},
|
||||
});
|
||||
const service = createKtxDictionarySearchService(project);
|
||||
|
||||
await expect(service.search({ values: ['PAID', 'missing'] })).resolves.toEqual({
|
||||
searched: [
|
||||
{
|
||||
connectionId: 'billing',
|
||||
coverage: {
|
||||
sampledRows: null,
|
||||
valuesPerColumn: null,
|
||||
profiledColumns: 1,
|
||||
syncId: 'sync-2',
|
||||
profiledAt: null,
|
||||
},
|
||||
status: 'ready',
|
||||
},
|
||||
{
|
||||
connectionId: 'warehouse',
|
||||
coverage: {
|
||||
sampledRows: null,
|
||||
valuesPerColumn: null,
|
||||
profiledColumns: 1,
|
||||
syncId: 'sync-1',
|
||||
profiledAt: null,
|
||||
},
|
||||
status: 'ready',
|
||||
},
|
||||
],
|
||||
results: [
|
||||
{
|
||||
value: 'PAID',
|
||||
matches: [
|
||||
{
|
||||
connectionId: 'warehouse',
|
||||
sourceName: 'orders',
|
||||
columnName: 'status',
|
||||
matchedValue: 'paid',
|
||||
cardinality: 3,
|
||||
},
|
||||
],
|
||||
misses: [{ connectionId: 'billing', reason: 'value_not_in_sample' }],
|
||||
},
|
||||
{
|
||||
value: 'missing',
|
||||
matches: [],
|
||||
misses: [
|
||||
{ connectionId: 'billing', reason: 'value_not_in_sample' },
|
||||
{ connectionId: 'warehouse', reason: 'value_not_in_sample' },
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('distinguishes missing profile artifacts from profiles with no candidate columns', async () => {
|
||||
await seedProfile({
|
||||
connectionId: 'billing',
|
||||
syncId: 'sync-empty',
|
||||
columns: {
|
||||
'events.id': {
|
||||
table: { catalog: null, db: 'public', name: 'events' },
|
||||
column: 'id',
|
||||
nativeType: 'integer',
|
||||
normalizedType: 'integer',
|
||||
distinctCount: 100,
|
||||
sampleValues: [1, 2, 3],
|
||||
},
|
||||
},
|
||||
});
|
||||
const service = createKtxDictionarySearchService(project);
|
||||
|
||||
await expect(service.search({ values: ['Acme'] })).resolves.toEqual({
|
||||
searched: [
|
||||
{
|
||||
connectionId: 'billing',
|
||||
coverage: {
|
||||
sampledRows: null,
|
||||
valuesPerColumn: null,
|
||||
profiledColumns: 0,
|
||||
syncId: 'sync-empty',
|
||||
profiledAt: null,
|
||||
},
|
||||
status: 'no_candidate_columns',
|
||||
},
|
||||
{
|
||||
connectionId: 'warehouse',
|
||||
coverage: {
|
||||
sampledRows: null,
|
||||
valuesPerColumn: null,
|
||||
profiledColumns: 0,
|
||||
syncId: null,
|
||||
profiledAt: null,
|
||||
},
|
||||
status: 'no_profile_artifact',
|
||||
},
|
||||
],
|
||||
results: [
|
||||
{
|
||||
value: 'Acme',
|
||||
matches: [],
|
||||
misses: [
|
||||
{ connectionId: 'billing', reason: 'no_candidate_columns' },
|
||||
{ connectionId: 'warehouse', reason: 'no_profile_artifact' },
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('scopes search to the requested connection', async () => {
|
||||
await seedProfile({
|
||||
connectionId: 'warehouse',
|
||||
syncId: 'sync-1',
|
||||
columns: {
|
||||
'orders.status': {
|
||||
table: { catalog: null, db: 'public', name: 'orders' },
|
||||
column: 'status',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'string',
|
||||
distinctCount: 3,
|
||||
sampleValues: ['paid'],
|
||||
},
|
||||
},
|
||||
});
|
||||
await seedProfile({
|
||||
connectionId: 'billing',
|
||||
syncId: 'sync-2',
|
||||
columns: {
|
||||
'invoices.status': {
|
||||
table: { catalog: null, db: 'public', name: 'invoices' },
|
||||
column: 'status',
|
||||
nativeType: 'text',
|
||||
normalizedType: 'string',
|
||||
distinctCount: 2,
|
||||
sampleValues: ['paid'],
|
||||
},
|
||||
},
|
||||
});
|
||||
const service = createKtxDictionarySearchService(project);
|
||||
|
||||
await expect(service.search({ connectionId: 'billing', values: ['paid'] })).resolves.toMatchObject({
|
||||
searched: [{ connectionId: 'billing', status: 'ready' }],
|
||||
results: [
|
||||
{
|
||||
value: 'paid',
|
||||
matches: [{ connectionId: 'billing', sourceName: 'invoices', columnName: 'status', matchedValue: 'paid' }],
|
||||
misses: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
214
packages/context/src/sl/dictionary-search.ts
Normal file
214
packages/context/src/sl/dictionary-search.ts
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
import type { KtxLocalProject } from '../project/index.js';
|
||||
import { loadLatestSlDictionaryEntries, type SlDictionaryEntry } from './sl-dictionary-profile.js';
|
||||
|
||||
export type KtxDictionarySearchStatus = 'ready' | 'no_profile_artifact' | 'no_candidate_columns';
|
||||
export type KtxDictionarySearchMissReason = 'no_profile_artifact' | 'no_candidate_columns' | 'value_not_in_sample';
|
||||
|
||||
export interface KtxDictionarySearchInput {
|
||||
values: string[];
|
||||
connectionId?: string;
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchCoverage {
|
||||
sampledRows: number | null;
|
||||
valuesPerColumn: number | null;
|
||||
profiledColumns: number;
|
||||
syncId: string | null;
|
||||
profiledAt: string | null;
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchSearchedConnection {
|
||||
connectionId: string;
|
||||
coverage: KtxDictionarySearchCoverage;
|
||||
status: KtxDictionarySearchStatus;
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchMatch {
|
||||
connectionId: string;
|
||||
sourceName: string;
|
||||
columnName: string;
|
||||
matchedValue: string;
|
||||
cardinality: number | null;
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchMiss {
|
||||
connectionId: string;
|
||||
reason: KtxDictionarySearchMissReason;
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchValueResult {
|
||||
value: string;
|
||||
matches: KtxDictionarySearchMatch[];
|
||||
misses: KtxDictionarySearchMiss[];
|
||||
}
|
||||
|
||||
export interface KtxDictionarySearchResponse {
|
||||
searched: KtxDictionarySearchSearchedConnection[];
|
||||
results: KtxDictionarySearchValueResult[];
|
||||
}
|
||||
|
||||
interface RelationshipProfileArtifact {
|
||||
connectionId?: string;
|
||||
profileSampleRows?: unknown;
|
||||
sampleValuesPerColumn?: unknown;
|
||||
profiledAt?: unknown;
|
||||
extractedAt?: unknown;
|
||||
}
|
||||
|
||||
function uniqueSorted(values: Iterable<string>): string[] {
|
||||
return [...new Set([...values].filter((value) => value.trim().length > 0))].sort((left, right) =>
|
||||
left.localeCompare(right),
|
||||
);
|
||||
}
|
||||
|
||||
function latestProfileSyncId(path: string): string | null {
|
||||
const parts = path.split('/');
|
||||
return parts.at(-3) ?? null;
|
||||
}
|
||||
|
||||
function optionalNumber(value: unknown): number | null {
|
||||
return typeof value === 'number' && Number.isFinite(value) ? value : null;
|
||||
}
|
||||
|
||||
function optionalString(value: unknown): string | null {
|
||||
return typeof value === 'string' && value.trim().length > 0 ? value : null;
|
||||
}
|
||||
|
||||
async function latestProfilePath(project: KtxLocalProject, connectionId: string): Promise<string | null> {
|
||||
const root = `raw-sources/${connectionId}/live-database`;
|
||||
let files: string[];
|
||||
try {
|
||||
files = (await project.fileStore.listFiles(root)).files;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
|
||||
return (
|
||||
files
|
||||
.filter((path) => path.endsWith('/enrichment/relationship-profile.json'))
|
||||
.sort((left, right) => left.localeCompare(right))
|
||||
.at(-1) ?? null
|
||||
);
|
||||
}
|
||||
|
||||
async function readProfile(project: KtxLocalProject, path: string): Promise<RelationshipProfileArtifact> {
|
||||
const raw = await project.fileStore.readFile(path);
|
||||
const parsed = JSON.parse(raw.content) as unknown;
|
||||
return typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)
|
||||
? (parsed as RelationshipProfileArtifact)
|
||||
: {};
|
||||
}
|
||||
|
||||
function profiledColumnCount(entries: readonly SlDictionaryEntry[]): number {
|
||||
return new Set(entries.map((entry) => `${entry.sourceName}\u001f${entry.columnName}`)).size;
|
||||
}
|
||||
|
||||
async function searchedConnection(
|
||||
project: KtxLocalProject,
|
||||
connectionId: string,
|
||||
entries: readonly SlDictionaryEntry[],
|
||||
): Promise<KtxDictionarySearchSearchedConnection> {
|
||||
const path = await latestProfilePath(project, connectionId);
|
||||
if (!path) {
|
||||
return {
|
||||
connectionId,
|
||||
coverage: {
|
||||
sampledRows: null,
|
||||
valuesPerColumn: null,
|
||||
profiledColumns: 0,
|
||||
syncId: null,
|
||||
profiledAt: null,
|
||||
},
|
||||
status: 'no_profile_artifact',
|
||||
};
|
||||
}
|
||||
|
||||
const profile = await readProfile(project, path);
|
||||
const count = profiledColumnCount(entries);
|
||||
return {
|
||||
connectionId,
|
||||
coverage: {
|
||||
sampledRows: optionalNumber(profile.profileSampleRows),
|
||||
valuesPerColumn: optionalNumber(profile.sampleValuesPerColumn),
|
||||
profiledColumns: count,
|
||||
syncId: latestProfileSyncId(path),
|
||||
profiledAt: optionalString(profile.profiledAt) ?? optionalString(profile.extractedAt),
|
||||
},
|
||||
status: count > 0 ? 'ready' : 'no_candidate_columns',
|
||||
};
|
||||
}
|
||||
|
||||
function entryMatchesValue(entry: SlDictionaryEntry, value: string): boolean {
|
||||
return entry.value.toLowerCase().includes(value.toLowerCase());
|
||||
}
|
||||
|
||||
function toMatch(entry: SlDictionaryEntry): KtxDictionarySearchMatch {
|
||||
return {
|
||||
connectionId: entry.connectionId,
|
||||
sourceName: entry.sourceName,
|
||||
columnName: entry.columnName,
|
||||
matchedValue: entry.value,
|
||||
cardinality: entry.cardinality,
|
||||
};
|
||||
}
|
||||
|
||||
function sortMatches(matches: KtxDictionarySearchMatch[]): KtxDictionarySearchMatch[] {
|
||||
return matches.sort(
|
||||
(left, right) =>
|
||||
left.connectionId.localeCompare(right.connectionId) ||
|
||||
left.sourceName.localeCompare(right.sourceName) ||
|
||||
left.columnName.localeCompare(right.columnName) ||
|
||||
left.matchedValue.localeCompare(right.matchedValue),
|
||||
);
|
||||
}
|
||||
|
||||
function missReason(status: KtxDictionarySearchStatus): KtxDictionarySearchMissReason {
|
||||
return status === 'ready' ? 'value_not_in_sample' : status;
|
||||
}
|
||||
|
||||
export function createKtxDictionarySearchService(project: KtxLocalProject): {
|
||||
search(input: KtxDictionarySearchInput): Promise<KtxDictionarySearchResponse>;
|
||||
} {
|
||||
return {
|
||||
async search(input) {
|
||||
const connectionIds = input.connectionId
|
||||
? [input.connectionId]
|
||||
: uniqueSorted(Object.keys(project.config.connections));
|
||||
const entries = await loadLatestSlDictionaryEntries(project, connectionIds);
|
||||
const entriesByConnection = new Map<string, SlDictionaryEntry[]>();
|
||||
for (const connectionId of connectionIds) {
|
||||
entriesByConnection.set(
|
||||
connectionId,
|
||||
entries.filter((entry) => entry.connectionId === connectionId),
|
||||
);
|
||||
}
|
||||
|
||||
const searched = (
|
||||
await Promise.all(
|
||||
connectionIds.map((connectionId) =>
|
||||
searchedConnection(project, connectionId, entriesByConnection.get(connectionId) ?? []),
|
||||
),
|
||||
)
|
||||
).sort((left, right) => left.connectionId.localeCompare(right.connectionId));
|
||||
const searchedByConnection = new Map(searched.map((connection) => [connection.connectionId, connection]));
|
||||
|
||||
return {
|
||||
searched,
|
||||
results: input.values.map((value) => {
|
||||
const matches = sortMatches(entries.filter((entry) => entryMatchesValue(entry, value)).map(toMatch));
|
||||
const matchedConnections = new Set(matches.map((match) => match.connectionId));
|
||||
return {
|
||||
value,
|
||||
matches,
|
||||
misses: searched
|
||||
.filter((connection) => !matchedConnections.has(connection.connectionId))
|
||||
.map((connection) => ({
|
||||
connectionId: connection.connectionId,
|
||||
reason: missReason(searchedByConnection.get(connection.connectionId)?.status ?? 'no_profile_artifact'),
|
||||
})),
|
||||
};
|
||||
}),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -25,6 +25,18 @@ export {
|
|||
} from './semantic-layer.service.js';
|
||||
export { loadLatestSlDictionaryEntries } from './sl-dictionary-profile.js';
|
||||
export type { SlDictionaryEntry } from './sl-dictionary-profile.js';
|
||||
export { createKtxDictionarySearchService } from './dictionary-search.js';
|
||||
export type {
|
||||
KtxDictionarySearchCoverage,
|
||||
KtxDictionarySearchInput,
|
||||
KtxDictionarySearchMatch,
|
||||
KtxDictionarySearchMiss,
|
||||
KtxDictionarySearchMissReason,
|
||||
KtxDictionarySearchResponse,
|
||||
KtxDictionarySearchSearchedConnection,
|
||||
KtxDictionarySearchStatus,
|
||||
KtxDictionarySearchValueResult,
|
||||
} from './dictionary-search.js';
|
||||
export { buildSemanticLayerSourceSearchText, SlSearchService } from './sl-search.service.js';
|
||||
export { SqliteSlSourcesIndex, type SqliteSlSourcesIndexOptions } from './sqlite-sl-sources-index.js';
|
||||
export * from './local-sl.js';
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue