diff --git a/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts b/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts new file mode 100644 index 00000000..6511580d --- /dev/null +++ b/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts @@ -0,0 +1,137 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { initKtxProject, type KtxLocalProject } from '../../../project/index.js'; +import type { ToolContext } from '../../../tools/index.js'; +import { EntityDetailsTool } from './entity-details.tool.js'; +import { WarehouseCatalogService } from './warehouse-catalog.service.js'; + +describe('EntityDetailsTool', () => { + let tempDir: string; + let project: KtxLocalProject; + let tool: EntityDetailsTool; + let context: ToolContext; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-entity-details-')); + project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' }); + await seedLiveDatabaseScan(); + tool = new EntityDetailsTool(() => new WarehouseCatalogService({ fileStore: project.fileStore })); + context = { + sourceId: 'ingest', + messageId: 'm1', + userId: 'system', + session: { + allowedConnectionNames: new Set(['warehouse']), + } as any, + }; + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-1') { + const root = `raw-sources/${connectionName}/live-database/${syncId}`; + await project.fileStore.writeFile( + `${root}/connection.json`, + JSON.stringify({ connectionId: connectionName, driver: 'postgres', extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2), + 'ktx', + 'ktx@example.com', + 'seed connection', + ); + await project.fileStore.writeFile( + `${root}/tables/orders.json`, + JSON.stringify( + { + catalog: null, + db: 'public', + name: 'orders', + kind: 'table', + comment: 'Customer orders', + estimatedRows: 12, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: 'Order id', + }, + { + name: 'status', + nativeType: 'text', + normalizedType: 'text', + dimensionType: 'string', + nullable: false, + primaryKey: false, + comment: 'Order status', + }, + ], + foreignKeys: [], + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed orders', + ); + await project.fileStore.writeFile( + `${root}/enrichment/relationship-profile.json`, + JSON.stringify( + { + connectionId: connectionName, + driver: 'postgres', + tables: [{ table: { catalog: null, db: 'public', name: 'orders' }, rowCount: 12 }], + columns: { + 'orders.status': { + table: { catalog: null, db: 'public', name: 'orders' }, + column: 'status', + rowCount: 12, + nullCount: 0, + distinctCount: 2, + nullRate: 0, + sampleValues: ['paid', 'refunded'], + }, + }, + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed profile', + ); + } + + it('returns scoped table detail for a display target', async () => { + const result = await tool.call({ connectionName: 'warehouse', targets: [{ display: 'public.orders' }] }, context); + + expect(result.markdown).toContain('### public.orders'); + expect(result.markdown).toContain('- status (text, nullable=false)'); + expect(result.markdown).toContain('sample: ["paid","refunded"]'); + expect(result.structured.scanAvailable).toBe(true); + expect(result.structured.resolved).toHaveLength(1); + }); + + it('returns a no-scan state distinct from not found', async () => { + const result = await tool.call( + { connectionName: 'empty', targets: [{ display: 'public.orders' }] }, + { ...context, session: { ...context.session!, allowedConnectionNames: new Set(['empty']) } }, + ); + + expect(result.markdown).toContain('No live-database scan available for connection "empty"; run `ktx scan` first.'); + expect(result.structured.scanAvailable).toBe(false); + }); + + it('refuses out-of-scope connections', async () => { + const result = await tool.call({ connectionName: 'billing', targets: [{ display: 'public.orders' }] }, context); + + expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.'); + expect(result.structured.scanAvailable).toBe(false); + }); +}); diff --git a/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts b/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts new file mode 100644 index 00000000..58f5fb35 --- /dev/null +++ b/packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts @@ -0,0 +1,116 @@ +import { z } from 'zod'; +import type { KtxTableRef } from '../../../scan/types.js'; +import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js'; +import { WarehouseCatalogService, type TableDetail } from './warehouse-catalog.service.js'; + +const targetSchema = z.union([ + z.object({ display: z.string().min(1) }), + z.object({ + catalog: z.string().nullable(), + db: z.string().nullable(), + name: z.string().min(1), + column: z.string().optional(), + }), +]); + +const entityDetailsInputSchema = z.object({ + connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/), + targets: z.array(targetSchema).min(1).max(50), +}); + +type EntityDetailsInput = z.infer; + +export interface EntityDetailsStructured { + resolved: TableDetail[]; + missing: Array<{ target: unknown; candidates: KtxTableRef[] }>; + scanAvailable: boolean; +} + +function allowedConnectionNames(context: ToolContext): ReadonlySet | null { + return context.session?.allowedConnectionNames ?? null; +} + +function sampleText(values: string[]): string { + return values.length > 0 ? ` - sample: ${JSON.stringify(values.slice(0, 10))}` : ''; +} + +function appendTableMarkdown(parts: string[], detail: TableDetail, columnName?: string): void { + const columns = columnName ? detail.columns.filter((column) => column.name === columnName) : detail.columns; + parts.push(`### ${detail.display}`); + parts.push(`Type: ${detail.kind} | Native columns: ${detail.columns.length}`); + if (detail.description || detail.comment) { + parts.push(`Description: ${detail.description ?? detail.comment}`); + } + parts.push('', 'Columns:'); + for (const column of columns) { + const pk = column.primaryKey ? ', PK' : ''; + parts.push(`- ${column.name} (${column.nativeType}, nullable=${column.nullable}${pk})${sampleText(column.sampleValues)}`); + } + parts.push(''); +} + +export class EntityDetailsTool extends BaseTool { + readonly name = 'entity_details'; + + constructor(private readonly catalogFactory: (context: ToolContext) => WarehouseCatalogService) { + super(); + } + + get description(): string { + return 'Verify warehouse tables and columns from the latest live-database scan before writing them into wiki or semantic-layer output.'; + } + + get inputSchema() { + return entityDetailsInputSchema; + } + + async call(input: EntityDetailsInput, context: ToolContext): Promise> { + const allowed = allowedConnectionNames(context); + if (allowed && !allowed.has(input.connectionName)) { + return { + markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`, + structured: { resolved: [], missing: [], scanAvailable: false }, + }; + } + + const catalog = this.catalogFactory(context); + const scanAvailable = await catalog.hasScan(input.connectionName); + if (!scanAvailable) { + return { + markdown: `No live-database scan available for connection "${input.connectionName}"; run \`ktx scan\` first.`, + structured: { resolved: [], missing: [], scanAvailable: false }, + }; + } + + const parts: string[] = []; + const resolved: TableDetail[] = []; + const missing: EntityDetailsStructured['missing'] = []; + + for (const target of input.targets) { + const resolution = + 'display' in target + ? await catalog.resolveDisplay(input.connectionName, target.display) + : { resolved: { catalog: target.catalog, db: target.db, name: target.name }, candidates: [], dialect: '' }; + if (!resolution.resolved) { + missing.push({ target, candidates: resolution.candidates }); + parts.push(`Not found in scan: ${'display' in target ? target.display : target.name}`); + if (resolution.candidates.length > 0) { + parts.push(`Closest matches: ${resolution.candidates.map((candidate) => candidate.name).join(', ')}`); + } + continue; + } + const detail = await catalog.getTable({ connectionName: input.connectionName, ...resolution.resolved }); + if (!detail) { + missing.push({ target, candidates: resolution.candidates }); + continue; + } + resolved.push(detail); + appendTableMarkdown(parts, detail, 'column' in target ? target.column : undefined); + } + + return { + markdown: parts.join('\n').trim(), + structured: { resolved, missing, scanAvailable: true }, + }; + } +}