diff --git a/packages/context/src/search/discover.test.ts b/packages/context/src/search/discover.test.ts new file mode 100644 index 00000000..a2551c49 --- /dev/null +++ b/packages/context/src/search/discover.test.ts @@ -0,0 +1,264 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { initKtxProject, type KtxLocalProject } from '../project/index.js'; +import { writeLocalKnowledgePage } from '../wiki/local-knowledge.js'; +import { createKtxDiscoverDataService } from './discover.js'; + +describe('createKtxDiscoverDataService', () => { + let tempDir: string; + let project: KtxLocalProject; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-discover-data-')); + project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' }); + project.config.connections.warehouse = { driver: 'postgres', url: 'env:DATABASE_URL' }; + project.config.connections.billing = { driver: 'postgres', url: 'env:BILLING_DATABASE_URL' }; + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + async function seedWiki(): Promise { + await writeLocalKnowledgePage(project, { + key: 'orders-playbook', + scope: 'GLOBAL', + summary: 'Paid order operations', + content: 'Use paid orders and order_count to inspect monthly customer activity for Acme Corp.', + tags: ['orders'], + }); + } + + async function seedSl(): Promise { + await project.fileStore.writeFile( + 'semantic-layer/warehouse/orders.yaml', + [ + 'name: orders', + 'descriptions:', + ' user: Paid order facts', + 'table: public.orders', + 'grain: [id]', + 'columns:', + ' - name: status', + ' type: string', + ' descriptions:', + ' user: Payment status for the order', + ' - name: ordered_at', + ' type: time', + 'measures:', + ' - name: order_count', + ' expr: count(*)', + ' description: Number of paid orders', + '', + ].join('\n'), + 'ktx', + 'ktx@example.com', + 'seed sl source', + ); + } + + async function seedScan(input: { + connectionId?: string; + syncId: string; + tableName?: string; + comment?: string; + sampleValues?: string[]; + }): Promise { + const connectionId = input.connectionId ?? 'warehouse'; + const root = `raw-sources/${connectionId}/live-database/${input.syncId}`; + const tableName = input.tableName ?? 'orders'; + await project.fileStore.writeFile( + `${root}/connection.json`, + JSON.stringify( + { + connectionId, + driver: 'postgres', + extractedAt: `2026-05-14T09:00:00.000Z`, + scope: { schemas: ['public'] }, + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed scan connection', + ); + await project.fileStore.writeFile( + `${root}/tables/public-${tableName}.json`, + JSON.stringify( + { + catalog: null, + db: 'public', + name: tableName, + kind: 'table', + comment: input.comment ?? 'Orders table from warehouse', + estimatedRows: 123, + descriptions: { db: input.comment ?? 'Orders table from warehouse' }, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: 'Order id', + }, + { + name: 'status', + nativeType: 'text', + normalizedType: 'text', + dimensionType: 'string', + nullable: false, + primaryKey: false, + comment: 'Order status', + sampleValues: input.sampleValues ?? ['paid', 'pending'], + }, + ], + foreignKeys: [], + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed table', + ); + await project.fileStore.writeFile( + `${root}/scan-report.json`, + JSON.stringify( + { + connectionId, + driver: 'postgres', + syncId: input.syncId, + runId: `scan-${input.syncId}`, + trigger: 'mcp', + mode: 'enriched', + dryRun: false, + artifactPaths: { + rawSourcesDir: root, + reportPath: `${root}/scan-report.json`, + manifestShards: [], + enrichmentArtifacts: [], + }, + diffSummary: { + tablesAdded: 1, + tablesModified: 0, + tablesDeleted: 0, + tablesUnchanged: 0, + columnsAdded: 0, + columnsModified: 0, + columnsDeleted: 0, + }, + manifestShardsWritten: 0, + structuralSyncStats: { + tablesCreated: 0, + tablesUpdated: 0, + tablesDeleted: 0, + columnsCreated: 0, + columnsUpdated: 0, + columnsDeleted: 0, + }, + enrichment: { + dataDictionary: 'completed', + tableDescriptions: 'completed', + columnDescriptions: 'completed', + embeddings: 'skipped', + deterministicRelationships: 'skipped', + llmRelationshipValidation: 'skipped', + statisticalValidation: 'skipped', + }, + capabilityGaps: [], + warnings: [], + relationships: { accepted: 0, review: 0, rejected: 0, skipped: 0 }, + enrichmentState: { resumedStages: [], completedStages: [], failedStages: [] }, + createdAt: '2026-05-14T09:00:00.000Z', + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed scan report', + ); + } + + it('returns unified ranked refs across wiki, semantic-layer, and raw schema', async () => { + await seedWiki(); + await seedSl(); + await seedScan({ syncId: 'sync-1', sampleValues: ['paid', 'refunded'] }); + const service = createKtxDiscoverDataService(project, { userId: 'local-user' }); + + const results = await service.search({ query: 'paid orders', connectionId: 'warehouse', limit: 10 }); + + expect(results.map((result) => result.kind)).toEqual( + expect.arrayContaining(['wiki', 'sl_source', 'sl_measure', 'sl_dimension', 'table', 'column']), + ); + expect(results.every((result) => result.score >= 0 && result.score <= 1)).toBe(true); + expect(results.every((result) => result.snippet === null || result.snippet.length <= 200)).toBe(true); + expect(results).toContainEqual( + expect.objectContaining({ + kind: 'table', + id: 'public.orders', + connectionId: 'warehouse', + tableRef: { catalog: null, db: 'public', name: 'orders' }, + matchedOn: expect.stringMatching(/name|description|comment|display/), + }), + ); + expect(results).toContainEqual( + expect.objectContaining({ + kind: 'column', + id: 'public.orders.status', + connectionId: 'warehouse', + columnName: 'status', + matchedOn: expect.stringMatching(/name|comment|description|sample_value/), + }), + ); + expect(results).toContainEqual( + expect.objectContaining({ + kind: 'sl_measure', + id: 'orders.order_count', + connectionId: 'warehouse', + summary: 'Number of paid orders', + snippet: 'count(*)', + matchedOn: expect.stringMatching(/name|description|expr/), + }), + ); + }); + + it('honors kind filters and connection scope', async () => { + await seedWiki(); + await seedSl(); + await seedScan({ syncId: 'sync-1', connectionId: 'warehouse', tableName: 'orders' }); + await seedScan({ syncId: 'sync-2', connectionId: 'billing', tableName: 'invoices', comment: 'Billing invoices' }); + const service = createKtxDiscoverDataService(project); + + const results = await service.search({ + query: 'orders', + connectionId: 'warehouse', + kinds: ['table', 'column'], + limit: 10, + }); + + expect(results.every((result) => result.kind === 'table' || result.kind === 'column')).toBe(true); + expect(results.every((result) => result.connectionId === 'warehouse')).toBe(true); + expect(results.some((result) => result.id.includes('invoices'))).toBe(false); + expect(results.some((result) => result.kind === 'wiki')).toBe(false); + }); + + it('re-reads the latest scan artifacts on each call', async () => { + await seedScan({ syncId: 'sync-1', tableName: 'orders', comment: 'Old orders table' }); + const service = createKtxDiscoverDataService(project); + await expect( + service.search({ query: 'orders', connectionId: 'warehouse', kinds: ['table'], limit: 10 }), + ).resolves.toEqual(expect.arrayContaining([expect.objectContaining({ id: 'public.orders' })])); + + await seedScan({ syncId: 'sync-2', tableName: 'invoices', comment: 'Invoice facts' }); + const fresh = await service.search({ query: 'invoice', connectionId: 'warehouse', kinds: ['table'], limit: 10 }); + + expect(fresh).toEqual(expect.arrayContaining([expect.objectContaining({ id: 'public.invoices' })])); + expect(fresh.some((result) => result.id === 'public.orders')).toBe(false); + }); +}); diff --git a/packages/context/src/search/discover.ts b/packages/context/src/search/discover.ts new file mode 100644 index 00000000..53694f6a --- /dev/null +++ b/packages/context/src/search/discover.ts @@ -0,0 +1,466 @@ +import type { KtxEmbeddingPort } from '../core/index.js'; +import type { KtxLocalProject } from '../project/index.js'; +import type { KtxScanReport, KtxSchemaColumn, KtxSchemaTable, KtxTableRef } from '../scan/index.js'; +import { DEFAULT_PRIORITY, loadLocalSlSourceRecords, resolveDescription } from '../sl/index.js'; +import { readLocalKnowledgePage, searchLocalKnowledgePages } from '../wiki/local-knowledge.js'; +import { HybridSearchCore, type FusedSearchCandidate, type SearchCandidateGenerator } from './index.js'; + +export type KtxDiscoverDataKind = 'wiki' | 'sl_source' | 'sl_measure' | 'sl_dimension' | 'table' | 'column'; +export type KtxDiscoverDataMatchedOn = 'name' | 'display' | 'description' | 'comment' | 'expr' | 'sample_value' | 'body'; + +export interface KtxDiscoverDataInput { + query: string; + connectionId?: string; + kinds?: KtxDiscoverDataKind[]; + limit?: number; +} + +export interface KtxDiscoverDataRef { + kind: KtxDiscoverDataKind; + id: string; + score: number; + summary: string | null; + snippet: string | null; + matchedOn: KtxDiscoverDataMatchedOn; + connectionId?: string; + tableRef?: KtxTableRef; + columnName?: string; +} + +export type KtxDiscoverDataResponse = KtxDiscoverDataRef[]; + +export interface KtxDiscoverDataServiceOptions { + userId?: string; + embeddingService?: KtxEmbeddingPort | null; +} + +interface CandidateRecord { + ref: Omit; + rankScore: number; +} + +type RawTable = KtxSchemaTable & { + descriptions?: Record; + columns: Array; sampleValues?: unknown[] }>; +}; + +interface LatestScan { + report: KtxScanReport; + rawSourcesDir: string; + tables: RawTable[]; +} + +const ALL_KINDS: KtxDiscoverDataKind[] = ['wiki', 'sl_source', 'sl_measure', 'sl_dimension', 'table', 'column']; + +function normalize(value: string | null | undefined): string { + return (value ?? '').toLowerCase(); +} + +function queryTerms(query: string): string[] { + return query + .toLowerCase() + .split(/[^a-z0-9_]+/u) + .map((term) => term.trim()) + .filter(Boolean); +} + +function hasKind(kinds: ReadonlySet, kind: KtxDiscoverDataKind): boolean { + return kinds.has(kind); +} + +function cap200(value: string | null | undefined): string | null { + if (!value) { + return null; + } + const compact = value.replace(/\s+/g, ' ').trim(); + return compact.length > 200 ? compact.slice(0, 200) : compact; +} + +function snippetAround(text: string | null | undefined, terms: readonly string[]): string | null { + if (!text) { + return null; + } + const lower = text.toLowerCase(); + const index = + terms + .map((term) => lower.indexOf(term)) + .filter((position) => position >= 0) + .sort((a, b) => a - b)[0] ?? 0; + return cap200(text.slice(Math.max(0, index - 60), index + 140)); +} + +function textScore(value: string | null | undefined, terms: readonly string[]): number { + const haystack = normalize(value); + if (!haystack || terms.length === 0) { + return 0; + } + const matched = terms.filter((term) => haystack.includes(term)).length; + return matched / terms.length; +} + +function bestField( + fields: Array<{ matchedOn: KtxDiscoverDataMatchedOn; text: string | null | undefined; weight: number }>, + terms: readonly string[], +): { matchedOn: KtxDiscoverDataMatchedOn; score: number; text: string | null } | null { + const scored = fields + .map((field) => ({ + matchedOn: field.matchedOn, + score: textScore(field.text, terms) * field.weight, + text: field.text ?? null, + })) + .filter((field) => field.score > 0) + .sort((left, right) => right.score - left.score || left.matchedOn.localeCompare(right.matchedOn)); + return scored[0] ?? null; +} + +function displayForTable(table: KtxTableRef): string { + return [table.catalog, table.db, table.name].filter((part): part is string => Boolean(part)).join('.'); +} + +function tableRef(table: KtxSchemaTable): KtxTableRef { + return { catalog: table.catalog, db: table.db, name: table.name }; +} + +async function readJson(project: KtxLocalProject, path: string): Promise { + return JSON.parse((await project.fileStore.readFile(path)).content) as T; +} + +async function latestScan(project: KtxLocalProject, connectionId: string): Promise { + const root = `raw-sources/${connectionId}/live-database`; + let files: string[]; + try { + files = (await project.fileStore.listFiles(root)).files; + } catch { + return null; + } + + const reportPath = files + .filter((path) => path.endsWith('/scan-report.json')) + .sort() + .at(-1); + if (!reportPath) { + return null; + } + const report = await readJson(project, reportPath); + const rawSourcesDir = report.artifactPaths.rawSourcesDir ?? reportPath.slice(0, -'/scan-report.json'.length); + const listedTables = await project.fileStore.listFiles(`${rawSourcesDir}/tables`); + const tables: RawTable[] = []; + for (const path of listedTables.files.filter((file) => file.endsWith('.json')).sort()) { + tables.push(await readJson(project, path)); + } + return { report, rawSourcesDir, tables }; +} + +function configuredConnectionIds(project: KtxLocalProject, connectionId?: string): string[] { + return connectionId ? [connectionId] : Object.keys(project.config.connections).sort(); +} + +async function wikiCandidates( + project: KtxLocalProject, + input: KtxDiscoverDataInput, + options: KtxDiscoverDataServiceOptions, + terms: readonly string[], +): Promise { + const searchResults = await searchLocalKnowledgePages(project, { + query: input.query, + userId: options.userId, + embeddingService: options.embeddingService ?? null, + limit: Math.max(input.limit ?? 15, 25), + }); + const records: CandidateRecord[] = []; + for (const result of searchResults) { + const page = await readLocalKnowledgePage(project, { key: result.key, userId: options.userId }); + const content = page?.content ?? ''; + const matched = bestField( + [ + { matchedOn: 'name', text: result.key, weight: 1.1 }, + { matchedOn: 'description', text: result.summary, weight: 1 }, + { matchedOn: 'body', text: content, weight: 0.8 }, + ], + terms, + ); + records.push({ + rankScore: result.score + (matched?.score ?? 0), + ref: { + kind: 'wiki', + id: result.key, + summary: result.summary || null, + snippet: snippetAround(content, terms), + matchedOn: matched?.matchedOn ?? 'body', + }, + }); + } + return records.sort((left, right) => right.rankScore - left.rankScore || left.ref.id.localeCompare(right.ref.id)); +} + +async function slCandidates( + project: KtxLocalProject, + input: KtxDiscoverDataInput, + kinds: ReadonlySet, + terms: readonly string[], +): Promise { + const records: CandidateRecord[] = []; + for (const connectionId of configuredConnectionIds(project, input.connectionId)) { + const sources = await loadLocalSlSourceRecords(project, { connectionId }).catch(() => []); + for (const sourceRecord of sources) { + const source = sourceRecord.source; + if (hasKind(kinds, 'sl_source')) { + const description = resolveDescription(source.descriptions, { priority: DEFAULT_PRIORITY }); + const matched = bestField( + [ + { matchedOn: 'name', text: source.name, weight: 1.2 }, + { matchedOn: 'description', text: description, weight: 1 }, + { matchedOn: 'display', text: source.table ?? source.sql ?? null, weight: 0.8 }, + ], + terms, + ); + if (matched) { + records.push({ + rankScore: matched.score, + ref: { + kind: 'sl_source', + id: source.name, + connectionId, + summary: description, + snippet: + matched.matchedOn === 'description' + ? snippetAround(description, terms) + : cap200( + `${source.name}: ${[ + ...source.measures.map((measure) => measure.name), + ...source.columns.map((column) => column.name), + ] + .slice(0, 3) + .join(', ')}`, + ), + matchedOn: matched.matchedOn, + }, + }); + } + } + + if (hasKind(kinds, 'sl_measure')) { + for (const measure of source.measures) { + const matched = bestField( + [ + { matchedOn: 'name', text: measure.name, weight: 1.2 }, + { matchedOn: 'description', text: measure.description, weight: 1 }, + { matchedOn: 'expr', text: measure.expr, weight: 0.9 }, + ], + terms, + ); + if (matched) { + records.push({ + rankScore: matched.score, + ref: { + kind: 'sl_measure', + id: `${source.name}.${measure.name}`, + connectionId, + summary: measure.description ?? null, + snippet: cap200(measure.expr), + matchedOn: matched.matchedOn, + }, + }); + } + } + } + + if (hasKind(kinds, 'sl_dimension')) { + for (const column of source.columns) { + const description = resolveDescription(column.descriptions, { priority: DEFAULT_PRIORITY }); + const matched = bestField( + [ + { matchedOn: 'name', text: column.name, weight: 1.2 }, + { matchedOn: 'display', text: `${source.name}.${column.name}`, weight: 1.1 }, + { matchedOn: 'description', text: description, weight: 1 }, + { matchedOn: 'expr', text: column.expr, weight: 0.9 }, + ], + terms, + ); + if (matched) { + records.push({ + rankScore: matched.score, + ref: { + kind: 'sl_dimension', + id: `${source.name}.${column.name}`, + connectionId, + summary: description, + snippet: cap200(`${column.name} (${column.type})`), + matchedOn: matched.matchedOn, + }, + }); + } + } + } + } + } + return records.sort((left, right) => right.rankScore - left.rankScore || left.ref.id.localeCompare(right.ref.id)); +} + +async function rawCandidates( + project: KtxLocalProject, + input: KtxDiscoverDataInput, + kinds: ReadonlySet, + terms: readonly string[], +): Promise { + const records: CandidateRecord[] = []; + for (const connectionId of configuredConnectionIds(project, input.connectionId)) { + const scan = await latestScan(project, connectionId); + if (!scan) { + continue; + } + for (const table of scan.tables) { + const ref = tableRef(table); + const display = displayForTable(ref); + const tableDescription = resolveDescription(table.descriptions, { priority: DEFAULT_PRIORITY }) ?? table.comment; + if (hasKind(kinds, 'table')) { + const matched = bestField( + [ + { matchedOn: 'name', text: table.name, weight: 1.2 }, + { matchedOn: 'display', text: display, weight: 1.1 }, + { matchedOn: 'description', text: tableDescription, weight: 1 }, + { matchedOn: 'comment', text: table.comment, weight: 1 }, + ], + terms, + ); + if (matched) { + records.push({ + rankScore: matched.score, + ref: { + kind: 'table', + id: display, + connectionId, + tableRef: ref, + summary: tableDescription, + snippet: + matched.matchedOn === 'description' || matched.matchedOn === 'comment' + ? snippetAround(matched.text, terms) + : cap200(table.columns.slice(0, 5).map((column) => column.name).join(', ')), + matchedOn: matched.matchedOn, + }, + }); + } + } + + if (hasKind(kinds, 'column')) { + for (const column of table.columns) { + const columnDescription = resolveDescription(column.descriptions, { priority: DEFAULT_PRIORITY }) ?? column.comment; + const samples = (column.sampleValues ?? []).map((value) => String(value)).slice(0, 5); + const matched = bestField( + [ + { matchedOn: 'name', text: column.name, weight: 1.2 }, + { matchedOn: 'display', text: `${display}.${column.name}`, weight: 1.1 }, + { matchedOn: 'description', text: columnDescription, weight: 1 }, + { matchedOn: 'comment', text: column.comment, weight: 1 }, + { matchedOn: 'sample_value', text: samples.join(' '), weight: 1.3 }, + ], + terms, + ); + if (matched) { + records.push({ + rankScore: matched.score, + ref: { + kind: 'column', + id: `${display}.${column.name}`, + connectionId, + tableRef: ref, + columnName: column.name, + summary: columnDescription, + snippet: + matched.matchedOn === 'sample_value' + ? cap200(`${column.nativeType} - samples: ${samples.join(', ')}`) + : matched.matchedOn === 'description' || matched.matchedOn === 'comment' + ? snippetAround(matched.text, terms) + : cap200(column.nativeType), + matchedOn: matched.matchedOn, + }, + }); + } + } + } + } + } + return records.sort((left, right) => right.rankScore - left.rankScore || left.ref.id.localeCompare(right.ref.id)); +} + +function generator( + name: string, + candidates: CandidateRecord[], + refsByKey: Map>, +): SearchCandidateGenerator { + candidates.forEach((candidate) => + refsByKey.set(`${candidate.ref.kind}:${candidate.ref.connectionId ?? ''}:${candidate.ref.id}`, candidate.ref), + ); + return { + lane: name, + weight: 1, + async generate() { + return { + candidates: candidates.map((candidate, index) => ({ + id: `${candidate.ref.kind}:${candidate.ref.connectionId ?? ''}:${candidate.ref.id}`, + rank: index + 1, + rawScore: candidate.rankScore, + })), + }; + }, + }; +} + +function hydrate( + fused: FusedSearchCandidate[], + refsByKey: Map>, +): KtxDiscoverDataRef[] { + const maxScore = Math.max(...fused.map((candidate) => candidate.score), 0); + return fused + .map((candidate) => { + const ref = refsByKey.get(candidate.id); + if (!ref) { + return null; + } + return { + ...ref, + score: maxScore > 0 ? Number((candidate.score / maxScore).toFixed(6)) : 0, + }; + }) + .filter((result): result is KtxDiscoverDataRef => result !== null); +} + +export function createKtxDiscoverDataService( + project: KtxLocalProject, + options: KtxDiscoverDataServiceOptions = {}, +): { search(input: KtxDiscoverDataInput): Promise } { + return { + async search(input) { + const limit = Math.max(1, Math.min(input.limit ?? 15, 50)); + const query = input.query.trim(); + if (!query) { + return []; + } + const kinds = new Set(input.kinds ?? ALL_KINDS); + const terms = queryTerms(query); + const refsByKey = new Map>(); + const generators: SearchCandidateGenerator[] = []; + + if (hasKind(kinds, 'wiki')) { + generators.push(generator('wiki', await wikiCandidates(project, { ...input, limit }, options, terms), refsByKey)); + } + if (hasKind(kinds, 'sl_source') || hasKind(kinds, 'sl_measure') || hasKind(kinds, 'sl_dimension')) { + generators.push(generator('semantic_layer', await slCandidates(project, { ...input, limit }, kinds, terms), refsByKey)); + } + if (hasKind(kinds, 'table') || hasKind(kinds, 'column')) { + generators.push(generator('raw_schema', await rawCandidates(project, { ...input, limit }, kinds, terms), refsByKey)); + } + if (generators.length === 0) { + return []; + } + + const result = await new HybridSearchCore().search({ + queryText: query, + limit, + generators, + laneWeights: { wiki: 1, semantic_layer: 1, raw_schema: 1 }, + }); + return hydrate(result.results, refsByKey); + }, + }; +} diff --git a/packages/context/src/search/index.ts b/packages/context/src/search/index.ts index a62ae7bc..9cec3602 100644 --- a/packages/context/src/search/index.ts +++ b/packages/context/src/search/index.ts @@ -10,6 +10,15 @@ export { assertSearchBackendCapabilities, assertSearchBackendConformanceCase, } from './backend-conformance.js'; +export { createKtxDiscoverDataService } from './discover.js'; +export type { + KtxDiscoverDataInput, + KtxDiscoverDataKind, + KtxDiscoverDataMatchedOn, + KtxDiscoverDataRef, + KtxDiscoverDataResponse, + KtxDiscoverDataServiceOptions, +} from './discover.js'; export { HybridSearchCore } from './hybrid-search-core.js'; export { defaultLaneCandidatePoolLimit, normalizeSearchQuery } from './query.js'; export {