From d0b8996456b61fdfbb998dfef08f8151b8249ba3 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 14 May 2026 18:21:52 +0200 Subject: [PATCH] feat(context): add dictionary search service --- .../context/src/sl/dictionary-search.test.ts | 228 ++++++++++++++++++ packages/context/src/sl/dictionary-search.ts | 214 ++++++++++++++++ packages/context/src/sl/index.ts | 12 + 3 files changed, 454 insertions(+) create mode 100644 packages/context/src/sl/dictionary-search.test.ts create mode 100644 packages/context/src/sl/dictionary-search.ts diff --git a/packages/context/src/sl/dictionary-search.test.ts b/packages/context/src/sl/dictionary-search.test.ts new file mode 100644 index 00000000..29a269c7 --- /dev/null +++ b/packages/context/src/sl/dictionary-search.test.ts @@ -0,0 +1,228 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { initKtxProject, type KtxLocalProject } from '../project/index.js'; +import { createKtxDictionarySearchService } from './dictionary-search.js'; + +describe('createKtxDictionarySearchService', () => { + let tempDir: string; + let project: KtxLocalProject; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-dictionary-search-')); + project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' }); + project.config.connections.warehouse = { driver: 'postgres', url: 'env:DATABASE_URL' }; + project.config.connections.billing = { driver: 'postgres', url: 'env:BILLING_DATABASE_URL' }; + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + async function seedProfile(input: { + connectionId: string; + syncId: string; + columns: Record; + }): Promise { + await project.fileStore.writeFile( + `raw-sources/${input.connectionId}/live-database/${input.syncId}/enrichment/relationship-profile.json`, + `${JSON.stringify( + { + connectionId: input.connectionId, + driver: 'postgres', + sqlAvailable: true, + queryCount: 4, + tables: [], + columns: input.columns, + warnings: [], + }, + null, + 2, + )}\n`, + 'ktx', + 'ktx@example.com', + 'Seed relationship profile', + ); + } + + it('returns matches and non-authoritative misses across configured connections', async () => { + await seedProfile({ + connectionId: 'warehouse', + syncId: 'sync-1', + columns: { + 'orders.status': { + table: { catalog: null, db: 'public', name: 'orders' }, + column: 'status', + nativeType: 'text', + normalizedType: 'string', + distinctCount: 3, + sampleValues: ['paid', 'refunded', 'pending'], + }, + }, + }); + await seedProfile({ + connectionId: 'billing', + syncId: 'sync-2', + columns: { + 'customers.name': { + table: { catalog: null, db: 'public', name: 'customers' }, + column: 'name', + nativeType: 'text', + normalizedType: 'string', + distinctCount: 4, + sampleValues: ['Acme Corp', 'Globex'], + }, + }, + }); + const service = createKtxDictionarySearchService(project); + + await expect(service.search({ values: ['PAID', 'missing'] })).resolves.toEqual({ + searched: [ + { + connectionId: 'billing', + coverage: { + sampledRows: null, + valuesPerColumn: null, + profiledColumns: 1, + syncId: 'sync-2', + profiledAt: null, + }, + status: 'ready', + }, + { + connectionId: 'warehouse', + coverage: { + sampledRows: null, + valuesPerColumn: null, + profiledColumns: 1, + syncId: 'sync-1', + profiledAt: null, + }, + status: 'ready', + }, + ], + results: [ + { + value: 'PAID', + matches: [ + { + connectionId: 'warehouse', + sourceName: 'orders', + columnName: 'status', + matchedValue: 'paid', + cardinality: 3, + }, + ], + misses: [{ connectionId: 'billing', reason: 'value_not_in_sample' }], + }, + { + value: 'missing', + matches: [], + misses: [ + { connectionId: 'billing', reason: 'value_not_in_sample' }, + { connectionId: 'warehouse', reason: 'value_not_in_sample' }, + ], + }, + ], + }); + }); + + it('distinguishes missing profile artifacts from profiles with no candidate columns', async () => { + await seedProfile({ + connectionId: 'billing', + syncId: 'sync-empty', + columns: { + 'events.id': { + table: { catalog: null, db: 'public', name: 'events' }, + column: 'id', + nativeType: 'integer', + normalizedType: 'integer', + distinctCount: 100, + sampleValues: [1, 2, 3], + }, + }, + }); + const service = createKtxDictionarySearchService(project); + + await expect(service.search({ values: ['Acme'] })).resolves.toEqual({ + searched: [ + { + connectionId: 'billing', + coverage: { + sampledRows: null, + valuesPerColumn: null, + profiledColumns: 0, + syncId: 'sync-empty', + profiledAt: null, + }, + status: 'no_candidate_columns', + }, + { + connectionId: 'warehouse', + coverage: { + sampledRows: null, + valuesPerColumn: null, + profiledColumns: 0, + syncId: null, + profiledAt: null, + }, + status: 'no_profile_artifact', + }, + ], + results: [ + { + value: 'Acme', + matches: [], + misses: [ + { connectionId: 'billing', reason: 'no_candidate_columns' }, + { connectionId: 'warehouse', reason: 'no_profile_artifact' }, + ], + }, + ], + }); + }); + + it('scopes search to the requested connection', async () => { + await seedProfile({ + connectionId: 'warehouse', + syncId: 'sync-1', + columns: { + 'orders.status': { + table: { catalog: null, db: 'public', name: 'orders' }, + column: 'status', + nativeType: 'text', + normalizedType: 'string', + distinctCount: 3, + sampleValues: ['paid'], + }, + }, + }); + await seedProfile({ + connectionId: 'billing', + syncId: 'sync-2', + columns: { + 'invoices.status': { + table: { catalog: null, db: 'public', name: 'invoices' }, + column: 'status', + nativeType: 'text', + normalizedType: 'string', + distinctCount: 2, + sampleValues: ['paid'], + }, + }, + }); + const service = createKtxDictionarySearchService(project); + + await expect(service.search({ connectionId: 'billing', values: ['paid'] })).resolves.toMatchObject({ + searched: [{ connectionId: 'billing', status: 'ready' }], + results: [ + { + value: 'paid', + matches: [{ connectionId: 'billing', sourceName: 'invoices', columnName: 'status', matchedValue: 'paid' }], + misses: [], + }, + ], + }); + }); +}); diff --git a/packages/context/src/sl/dictionary-search.ts b/packages/context/src/sl/dictionary-search.ts new file mode 100644 index 00000000..041b828d --- /dev/null +++ b/packages/context/src/sl/dictionary-search.ts @@ -0,0 +1,214 @@ +import type { KtxLocalProject } from '../project/index.js'; +import { loadLatestSlDictionaryEntries, type SlDictionaryEntry } from './sl-dictionary-profile.js'; + +export type KtxDictionarySearchStatus = 'ready' | 'no_profile_artifact' | 'no_candidate_columns'; +export type KtxDictionarySearchMissReason = 'no_profile_artifact' | 'no_candidate_columns' | 'value_not_in_sample'; + +export interface KtxDictionarySearchInput { + values: string[]; + connectionId?: string; +} + +export interface KtxDictionarySearchCoverage { + sampledRows: number | null; + valuesPerColumn: number | null; + profiledColumns: number; + syncId: string | null; + profiledAt: string | null; +} + +export interface KtxDictionarySearchSearchedConnection { + connectionId: string; + coverage: KtxDictionarySearchCoverage; + status: KtxDictionarySearchStatus; +} + +export interface KtxDictionarySearchMatch { + connectionId: string; + sourceName: string; + columnName: string; + matchedValue: string; + cardinality: number | null; +} + +export interface KtxDictionarySearchMiss { + connectionId: string; + reason: KtxDictionarySearchMissReason; +} + +export interface KtxDictionarySearchValueResult { + value: string; + matches: KtxDictionarySearchMatch[]; + misses: KtxDictionarySearchMiss[]; +} + +export interface KtxDictionarySearchResponse { + searched: KtxDictionarySearchSearchedConnection[]; + results: KtxDictionarySearchValueResult[]; +} + +interface RelationshipProfileArtifact { + connectionId?: string; + profileSampleRows?: unknown; + sampleValuesPerColumn?: unknown; + profiledAt?: unknown; + extractedAt?: unknown; +} + +function uniqueSorted(values: Iterable): string[] { + return [...new Set([...values].filter((value) => value.trim().length > 0))].sort((left, right) => + left.localeCompare(right), + ); +} + +function latestProfileSyncId(path: string): string | null { + const parts = path.split('/'); + return parts.at(-3) ?? null; +} + +function optionalNumber(value: unknown): number | null { + return typeof value === 'number' && Number.isFinite(value) ? value : null; +} + +function optionalString(value: unknown): string | null { + return typeof value === 'string' && value.trim().length > 0 ? value : null; +} + +async function latestProfilePath(project: KtxLocalProject, connectionId: string): Promise { + const root = `raw-sources/${connectionId}/live-database`; + let files: string[]; + try { + files = (await project.fileStore.listFiles(root)).files; + } catch { + return null; + } + + return ( + files + .filter((path) => path.endsWith('/enrichment/relationship-profile.json')) + .sort((left, right) => left.localeCompare(right)) + .at(-1) ?? null + ); +} + +async function readProfile(project: KtxLocalProject, path: string): Promise { + const raw = await project.fileStore.readFile(path); + const parsed = JSON.parse(raw.content) as unknown; + return typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed) + ? (parsed as RelationshipProfileArtifact) + : {}; +} + +function profiledColumnCount(entries: readonly SlDictionaryEntry[]): number { + return new Set(entries.map((entry) => `${entry.sourceName}\u001f${entry.columnName}`)).size; +} + +async function searchedConnection( + project: KtxLocalProject, + connectionId: string, + entries: readonly SlDictionaryEntry[], +): Promise { + const path = await latestProfilePath(project, connectionId); + if (!path) { + return { + connectionId, + coverage: { + sampledRows: null, + valuesPerColumn: null, + profiledColumns: 0, + syncId: null, + profiledAt: null, + }, + status: 'no_profile_artifact', + }; + } + + const profile = await readProfile(project, path); + const count = profiledColumnCount(entries); + return { + connectionId, + coverage: { + sampledRows: optionalNumber(profile.profileSampleRows), + valuesPerColumn: optionalNumber(profile.sampleValuesPerColumn), + profiledColumns: count, + syncId: latestProfileSyncId(path), + profiledAt: optionalString(profile.profiledAt) ?? optionalString(profile.extractedAt), + }, + status: count > 0 ? 'ready' : 'no_candidate_columns', + }; +} + +function entryMatchesValue(entry: SlDictionaryEntry, value: string): boolean { + return entry.value.toLowerCase().includes(value.toLowerCase()); +} + +function toMatch(entry: SlDictionaryEntry): KtxDictionarySearchMatch { + return { + connectionId: entry.connectionId, + sourceName: entry.sourceName, + columnName: entry.columnName, + matchedValue: entry.value, + cardinality: entry.cardinality, + }; +} + +function sortMatches(matches: KtxDictionarySearchMatch[]): KtxDictionarySearchMatch[] { + return matches.sort( + (left, right) => + left.connectionId.localeCompare(right.connectionId) || + left.sourceName.localeCompare(right.sourceName) || + left.columnName.localeCompare(right.columnName) || + left.matchedValue.localeCompare(right.matchedValue), + ); +} + +function missReason(status: KtxDictionarySearchStatus): KtxDictionarySearchMissReason { + return status === 'ready' ? 'value_not_in_sample' : status; +} + +export function createKtxDictionarySearchService(project: KtxLocalProject): { + search(input: KtxDictionarySearchInput): Promise; +} { + return { + async search(input) { + const connectionIds = input.connectionId + ? [input.connectionId] + : uniqueSorted(Object.keys(project.config.connections)); + const entries = await loadLatestSlDictionaryEntries(project, connectionIds); + const entriesByConnection = new Map(); + for (const connectionId of connectionIds) { + entriesByConnection.set( + connectionId, + entries.filter((entry) => entry.connectionId === connectionId), + ); + } + + const searched = ( + await Promise.all( + connectionIds.map((connectionId) => + searchedConnection(project, connectionId, entriesByConnection.get(connectionId) ?? []), + ), + ) + ).sort((left, right) => left.connectionId.localeCompare(right.connectionId)); + const searchedByConnection = new Map(searched.map((connection) => [connection.connectionId, connection])); + + return { + searched, + results: input.values.map((value) => { + const matches = sortMatches(entries.filter((entry) => entryMatchesValue(entry, value)).map(toMatch)); + const matchedConnections = new Set(matches.map((match) => match.connectionId)); + return { + value, + matches, + misses: searched + .filter((connection) => !matchedConnections.has(connection.connectionId)) + .map((connection) => ({ + connectionId: connection.connectionId, + reason: missReason(searchedByConnection.get(connection.connectionId)?.status ?? 'no_profile_artifact'), + })), + }; + }), + }; + }, + }; +} diff --git a/packages/context/src/sl/index.ts b/packages/context/src/sl/index.ts index 1a0167cb..600a5a93 100644 --- a/packages/context/src/sl/index.ts +++ b/packages/context/src/sl/index.ts @@ -25,6 +25,18 @@ export { } from './semantic-layer.service.js'; export { loadLatestSlDictionaryEntries } from './sl-dictionary-profile.js'; export type { SlDictionaryEntry } from './sl-dictionary-profile.js'; +export { createKtxDictionarySearchService } from './dictionary-search.js'; +export type { + KtxDictionarySearchCoverage, + KtxDictionarySearchInput, + KtxDictionarySearchMatch, + KtxDictionarySearchMiss, + KtxDictionarySearchMissReason, + KtxDictionarySearchResponse, + KtxDictionarySearchSearchedConnection, + KtxDictionarySearchStatus, + KtxDictionarySearchValueResult, +} from './dictionary-search.js'; export { buildSemanticLayerSourceSearchText, SlSearchService } from './sl-search.service.js'; export { SqliteSlSourcesIndex, type SqliteSlSourcesIndexOptions } from './sqlite-sl-sources-index.js'; export * from './local-sl.js';