refactor(context): use connectionId in warehouse verification tools

This commit is contained in:
Andrey Avtomonov 2026-05-14 19:25:03 +02:00
parent 6179667e45
commit e4f2863fed
10 changed files with 176 additions and 103 deletions

View file

@ -1,7 +1,7 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { BaseTool, ToolContext } from '../../../tools/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import type { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('DiscoverDataTool', () => {
const wikiSearchTool = { call: vi.fn() } as unknown as BaseTool & { call: ReturnType<typeof vi.fn> };
@ -36,7 +36,7 @@ describe('DiscoverDataTool', () => {
catalog.searchByName.mockResolvedValue([
{
kind: 'table',
connectionName: 'warehouse',
connectionId: 'warehouse',
ref: { catalog: null, db: 'public', name: 'orders' },
display: 'public.orders',
matchedOn: 'name',
@ -45,28 +45,28 @@ describe('DiscoverDataTool', () => {
});
it('groups wiki, semantic layer, and raw schema hits with routing hints', async () => {
const result = await tool.call({ query: 'orders', connectionName: 'warehouse', limit: 5 }, context);
const result = await tool.call({ query: 'orders', connectionId: 'warehouse', limit: 5 }, context);
expect(result.markdown).toContain('## Wiki Pages');
expect(result.markdown).toContain('use `wiki_read(blockKey)` for full content');
expect(result.markdown).toContain('## Semantic Layer Sources');
expect(result.markdown).toContain('use `sl_read_source(sourceName)` for the YAML');
expect(result.markdown).toContain('## Raw Warehouse Schema');
expect(result.markdown).toContain('use `entity_details({connectionName, targets: [{display}]})`');
expect(result.markdown).toContain('use `entity_details({connectionId, targets: [{display}]})`');
expect(result.structured.raw?.hits).toHaveLength(1);
});
it('includes connectionName on raw schema hits so entity_details can follow up', async () => {
it('includes connectionId on raw schema hits so entity_details can follow up', async () => {
const multiConnectionContext: ToolContext = {
...context,
session: { allowedConnectionNames: new Set(['warehouse', 'analytics']) } as any,
};
catalog.searchByName.mockImplementation(async (connectionName: string, query: string) => [
catalog.searchByName.mockImplementation(async (connectionId: string, query: string) => [
{
kind: 'table',
connectionName,
ref: { catalog: null, db: 'public', name: `${connectionName}_${query}` },
display: `public.${connectionName}_${query}`,
connectionId,
ref: { catalog: null, db: 'public', name: `${connectionId}_${query}` },
display: `public.${connectionId}_${query}`,
matchedOn: 'name',
},
]);
@ -75,16 +75,16 @@ describe('DiscoverDataTool', () => {
expect(catalog.searchByName).toHaveBeenCalledWith('analytics', 'orders', 10);
expect(catalog.searchByName).toHaveBeenCalledWith('warehouse', 'orders', 10);
expect(result.markdown).toContain('connectionName=analytics');
expect(result.markdown).toContain('connectionName=warehouse');
expect(result.markdown).toContain('connectionId=analytics');
expect(result.markdown).toContain('connectionId=warehouse');
expect(result.markdown).toContain(
'entity_details({connectionName: "analytics", targets: [{display: "public.analytics_orders"}]})',
'entity_details({connectionId: "analytics", targets: [{display: "public.analytics_orders"}]})',
);
expect(result.structured.raw?.hits.map((hit) => hit.connectionName)).toEqual(['analytics', 'warehouse']);
expect(result.structured.raw?.hits.map((hit) => hit.connectionId)).toEqual(['analytics', 'warehouse']);
});
it('refuses explicit out-of-scope connection names', async () => {
const result = await tool.call({ query: 'orders', connectionName: 'billing' }, context);
const result = await tool.call({ query: 'orders', connectionId: 'billing' }, context);
expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
expect(result.structured).toEqual({ wiki: null, sl: null, raw: null });
@ -99,7 +99,7 @@ describe('DiscoverDataTool', () => {
structured: { sourceName: 'orders' },
});
const result = await tool.call({ sourceName: 'orders', connectionName: 'warehouse' }, context);
const result = await tool.call({ sourceName: 'orders', connectionId: 'warehouse' }, context);
expect(slDiscoverTool.call).toHaveBeenCalledWith({ sourceName: 'orders', connectionId: 'warehouse' }, context);
expect(wikiSearchTool.call).not.toHaveBeenCalled();
@ -112,8 +112,20 @@ describe('DiscoverDataTool', () => {
slDiscoverTool.call.mockResolvedValueOnce({ markdown: '', structured: { totalSources: 0, sources: [] } });
catalog.searchByName.mockResolvedValueOnce([]);
const result = await tool.call({ query: 'customer source', connectionName: 'warehouse' }, context);
const result = await tool.call({ query: 'customer source', connectionId: 'warehouse' }, context);
expect(result.markdown).toContain('No matches for "customer source" across wiki, semantic layer, or raw warehouse schema.');
});
it('uses connectionId as the optional connection filter', () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(tool.parseInput({ query: 'orders', connectionId: 'warehouse', limit: 5 })).toEqual({
query: 'orders',
connectionId: 'warehouse',
limit: 5,
});
expect(() => tool.parseInput({ query: 'orders', [legacyConnectionField]: 'warehouse', limit: 5 })).toThrow();
});
});

View file

@ -1,13 +1,13 @@
import { z } from 'zod';
import { WarehouseCatalogService, type RawSchemaHit } from '../../../scan/warehouse-catalog.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import { WarehouseCatalogService, type RawSchemaHit } from './warehouse-catalog.service.js';
const discoverDataInputSchema = z.object({
query: z.string().optional(),
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
limit: z.number().int().positive().max(50).optional().default(10),
sourceName: z.string().optional(),
});
}).strict();
type DiscoverDataInput = z.input<typeof discoverDataInputSchema>;
@ -62,16 +62,16 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
async call(input: DiscoverDataInput, context: ToolContext): Promise<ToolOutput<DiscoverDataStructured>> {
const allowed = allowedConnectionNames(context);
if (input.connectionName && allowed && !allowed.has(input.connectionName)) {
if (input.connectionId && allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: { wiki: null, sl: null, raw: null },
};
}
if (input.sourceName) {
const sl = await this.deps.slDiscoverTool.call(
{ sourceName: input.sourceName, connectionId: input.connectionName },
{ sourceName: input.sourceName, connectionId: input.connectionId },
context,
);
return { markdown: sl.markdown, structured: { wiki: null, sl: sl.structured, raw: null } };
@ -93,7 +93,7 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
}
const slResult = await this.deps.slDiscoverTool.call(
{ query: query || undefined, connectionId: input.connectionName },
{ query: query || undefined, connectionId: input.connectionId },
context,
);
if (totalSources(slResult.structured) > 0) {
@ -107,23 +107,23 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
}
const catalog = this.deps.catalogFactory(context);
const connections = input.connectionName ? [input.connectionName] : [...(allowed ?? [])].sort();
const connections = input.connectionId ? [input.connectionId] : [...(allowed ?? [])].sort();
const rawHits: RawSchemaHit[] = [];
for (const connectionName of connections) {
rawHits.push(...(await catalog.searchByName(connectionName, query, limit)));
for (const connectionId of connections) {
rawHits.push(...(await catalog.searchByName(connectionId, query, limit)));
}
if (rawHits.length > 0) {
parts.push(
'## Raw Warehouse Schema',
'> use `entity_details({connectionName, targets: [{display}]})` for full DDL + sample values',
'> use `entity_details({connectionId, targets: [{display}]})` for full DDL + sample values',
);
parts.push(
rawHits
.slice(0, limit)
.map(
(hit) =>
`- ${hit.kind}: ${hit.display} [connectionName=${hit.connectionName}] (matched on ${hit.matchedOn}) - ` +
`follow up with \`entity_details({connectionName: "${hit.connectionName}", targets: [{display: "${hit.display}"}]})\``,
`- ${hit.kind}: ${hit.display} [connectionId=${hit.connectionId}] (matched on ${hit.matchedOn}) - ` +
`follow up with \`entity_details({connectionId: "${hit.connectionId}", targets: [{display: "${hit.display}"}]})\``,
)
.join('\n'),
);

View file

@ -3,9 +3,9 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { ToolContext } from '../../../tools/index.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('EntityDetailsTool', () => {
let tempDir: string;
@ -32,11 +32,11 @@ describe('EntityDetailsTool', () => {
await rm(tempDir, { recursive: true, force: true });
});
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-1') {
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
async function seedLiveDatabaseScan(connectionId = 'warehouse', syncId = 'sync-1') {
const root = `raw-sources/${connectionId}/live-database/${syncId}`;
await project.fileStore.writeFile(
`${root}/connection.json`,
JSON.stringify({ connectionId: connectionName, driver: 'postgres', extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
JSON.stringify({ connectionId, driver: 'postgres', extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
'ktx',
'ktx@example.com',
'seed connection',
@ -84,7 +84,7 @@ describe('EntityDetailsTool', () => {
`${root}/enrichment/relationship-profile.json`,
JSON.stringify(
{
connectionId: connectionName,
connectionId,
driver: 'postgres',
tables: [{ table: { catalog: null, db: 'public', name: 'orders' }, rowCount: 12 }],
columns: {
@ -109,7 +109,7 @@ describe('EntityDetailsTool', () => {
}
it('returns scoped table detail for a display target', async () => {
const result = await tool.call({ connectionName: 'warehouse', targets: [{ display: 'public.orders' }] }, context);
const result = await tool.call({ connectionId: 'warehouse', targets: [{ display: 'public.orders' }] }, context);
expect(result.markdown).toContain('### public.orders');
expect(result.markdown).toContain('- status (text, nullable=false)');
@ -120,7 +120,7 @@ describe('EntityDetailsTool', () => {
it('resolves display targets that include a column name', async () => {
const result = await tool.call(
{ connectionName: 'warehouse', targets: [{ display: 'public.orders.status' }] },
{ connectionId: 'warehouse', targets: [{ display: 'public.orders.status' }] },
context,
);
@ -133,7 +133,7 @@ describe('EntityDetailsTool', () => {
it('reports missing explicit columns instead of returning an empty column list', async () => {
const result = await tool.call(
{ connectionName: 'warehouse', targets: [{ display: 'public.orders.plan_tier' }] },
{ connectionId: 'warehouse', targets: [{ display: 'public.orders.plan_tier' }] },
context,
);
@ -146,7 +146,7 @@ describe('EntityDetailsTool', () => {
it('reports missing structured table targets in model-visible markdown', async () => {
const result = await tool.call(
{
connectionName: 'warehouse',
connectionId: 'warehouse',
targets: [{ catalog: null, db: 'public', name: 'orderz' }],
},
context,
@ -161,7 +161,7 @@ describe('EntityDetailsTool', () => {
it('reports missing structured column targets in model-visible markdown', async () => {
const result = await tool.call(
{
connectionName: 'warehouse',
connectionId: 'warehouse',
targets: [{ catalog: null, db: 'public', name: 'orders', column: 'plan_tier' }],
},
context,
@ -175,7 +175,7 @@ describe('EntityDetailsTool', () => {
it('returns a no-scan state distinct from not found', async () => {
const result = await tool.call(
{ connectionName: 'empty', targets: [{ display: 'public.orders' }] },
{ connectionId: 'empty', targets: [{ display: 'public.orders' }] },
{ ...context, session: { ...context.session!, allowedConnectionNames: new Set(['empty']) } },
);
@ -184,9 +184,30 @@ describe('EntityDetailsTool', () => {
});
it('refuses out-of-scope connections', async () => {
const result = await tool.call({ connectionName: 'billing', targets: [{ display: 'public.orders' }] }, context);
const result = await tool.call({ connectionId: 'billing', targets: [{ display: 'public.orders' }] }, context);
expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
expect(result.structured.scanAvailable).toBe(false);
});
it('uses connectionId as the public input field', async () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(
tool.parseInput({
connectionId: 'warehouse',
targets: [{ display: 'public.orders' }],
}),
).toEqual({
connectionId: 'warehouse',
targets: [{ display: 'public.orders' }],
});
expect(() =>
tool.parseInput({
[legacyConnectionField]: 'warehouse',
targets: [{ display: 'public.orders' }],
}),
).toThrow();
});
});

View file

@ -1,7 +1,7 @@
import { z } from 'zod';
import type { KtxTableRef } from '../../../scan/types.js';
import { WarehouseCatalogService, type TableDetail } from '../../../scan/warehouse-catalog.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import { WarehouseCatalogService, type TableDetail } from './warehouse-catalog.service.js';
const targetSchema = z.union([
z.object({ display: z.string().min(1) }),
@ -14,9 +14,9 @@ const targetSchema = z.union([
]);
const entityDetailsInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
targets: z.array(targetSchema).min(1).max(50),
});
}).strict();
type EntityDetailsInput = z.infer<typeof entityDetailsInputSchema>;
type EntityDetailsTarget = EntityDetailsInput['targets'][number];
@ -47,14 +47,14 @@ function appendMissingTargetMarkdown(parts: string[], target: EntityDetailsTarge
async function resolveTarget(
catalog: WarehouseCatalogService,
connectionName: string,
connectionId: string,
target: EntityDetailsTarget,
): Promise<{ resolved: (KtxTableRef & { column?: string }) | null; candidates: KtxTableRef[] }> {
if ('display' in target) {
return catalog.resolveDisplayTarget(connectionName, target.display);
return catalog.resolveDisplayTarget(connectionId, target.display);
}
const candidateResolution = await catalog.resolveDisplayTarget(connectionName, targetLabel(target));
const candidateResolution = await catalog.resolveDisplayTarget(connectionId, targetLabel(target));
return {
resolved: {
catalog: target.catalog,
@ -107,18 +107,18 @@ export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema>
async call(input: EntityDetailsInput, context: ToolContext): Promise<ToolOutput<EntityDetailsStructured>> {
const allowed = allowedConnectionNames(context);
if (allowed && !allowed.has(input.connectionName)) {
if (allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
const catalog = this.catalogFactory(context);
const scanAvailable = await catalog.hasScan(input.connectionName);
const scanAvailable = await catalog.hasScan(input.connectionId);
if (!scanAvailable) {
return {
markdown: `No live-database scan available for connection "${input.connectionName}"; run \`ktx scan\` first.`,
markdown: `No live-database scan available for connection "${input.connectionId}"; run \`ktx scan\` first.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
@ -128,13 +128,13 @@ export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema>
const missing: EntityDetailsStructured['missing'] = [];
for (const target of input.targets) {
const resolution = await resolveTarget(catalog, input.connectionName, target);
const resolution = await resolveTarget(catalog, input.connectionId, target);
if (!resolution.resolved) {
missing.push({ target, candidates: resolution.candidates });
appendMissingTargetMarkdown(parts, target, resolution.candidates);
continue;
}
const detail = await catalog.getTable({ connectionName: input.connectionName, ...resolution.resolved });
const detail = await catalog.getTable({ connectionId: input.connectionId, ...resolution.resolved });
if (!detail) {
missing.push({ target, candidates: resolution.candidates });
appendMissingTargetMarkdown(parts, target, resolution.candidates);

View file

@ -1,10 +1,10 @@
import type { KtxFileStorePort } from '../../../core/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { BaseTool, ToolContext } from '../../../tools/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { SqlExecutionTool } from './sql-execution.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
export function createWarehouseVerificationTools(deps: {
connections: SlConnectionCatalogPort;

View file

@ -19,7 +19,7 @@ describe('SqlExecutionTool', () => {
connections.executeQuery.mockResolvedValue({ headers: ['status'], rows: [['paid']], totalRows: 1 });
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
{ connectionId: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
context,
);
@ -34,7 +34,7 @@ describe('SqlExecutionTool', () => {
it.each(['insert into x values (1)', 'drop table x', 'vacuum'])('rejects mutating SQL: %s', async (sql) => {
connections.executeQuery.mockClear();
const result = await tool.call({ connectionName: 'warehouse', sql }, context);
const result = await tool.call({ connectionId: 'warehouse', sql }, context);
expect(result.markdown).toContain('Only read-only SELECT/WITH queries can be executed locally.');
expect(connections.executeQuery).not.toHaveBeenCalled();
@ -44,11 +44,35 @@ describe('SqlExecutionTool', () => {
connections.executeQuery.mockRejectedValue(new Error('relation "orbit_analytics.customer" does not exist'));
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
{ connectionId: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
context,
);
expect(result.markdown).toContain('relation "orbit_analytics.customer" does not exist');
expect(result.structured.error).toContain('relation "orbit_analytics.customer" does not exist');
});
it('uses connectionId as the public input field', () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(
tool.parseInput({
connectionId: 'warehouse',
sql: 'select 1',
rowLimit: 5,
}),
).toEqual({
connectionId: 'warehouse',
sql: 'select 1',
rowLimit: 5,
});
expect(() =>
tool.parseInput({
[legacyConnectionField]: 'warehouse',
sql: 'select 1',
rowLimit: 5,
}),
).toThrow();
});
});

View file

@ -4,10 +4,10 @@ import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
const sqlExecutionInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
sql: z.string().min(1),
rowLimit: z.number().int().positive().max(1000).optional().default(100),
});
}).strict();
type SqlExecutionInput = z.input<typeof sqlExecutionInputSchema>;
@ -54,9 +54,9 @@ export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
async call(input: SqlExecutionInput, context: ToolContext): Promise<ToolOutput<SqlExecutionStructured>> {
const allowed = context.session?.allowedConnectionNames;
if (allowed && !allowed.has(input.connectionName)) {
if (allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: {
headers: [],
rows: [],
@ -83,7 +83,7 @@ export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
}
try {
const result = await this.connections.executeQuery(input.connectionName, wrappedSql);
const result = await this.connections.executeQuery(input.connectionId, wrappedSql);
const headers = result.headers ?? [];
const rows = result.rows ?? [];
const rowCount = result.totalRows ?? rows.length;

View file

@ -72,6 +72,13 @@ export type {
KtxEntityDetailsTableInput,
} from './entity-details.js';
export { createKtxEntityDetailsService } from './entity-details.js';
export type {
DisplayTargetResolution,
RawSchemaHit,
TableDetail,
WarehouseCatalogServiceDeps,
} from './warehouse-catalog.js';
export { WarehouseCatalogService } from './warehouse-catalog.js';
export type {
KtxColumnSampleUpdate,
KtxDescriptionSource,

View file

@ -2,8 +2,8 @@ import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
import { initKtxProject, type KtxLocalProject } from '../project/index.js';
import { WarehouseCatalogService } from './warehouse-catalog.js';
describe('WarehouseCatalogService', () => {
let tempDir: string;
@ -18,8 +18,8 @@ describe('WarehouseCatalogService', () => {
await rm(tempDir, { recursive: true, force: true });
});
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
async function seedLiveDatabaseScan(connectionId = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
const root = `raw-sources/${connectionId}/live-database/${syncId}`;
const tableRef = {
catalog: driver === 'bigquery' ? 'analytics' : null,
db: driver === 'sqlite' ? null : 'public',
@ -27,7 +27,7 @@ describe('WarehouseCatalogService', () => {
};
await project.fileStore.writeFile(
`${root}/connection.json`,
JSON.stringify({ connectionId: connectionName, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
JSON.stringify({ connectionId, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
'ktx',
'ktx@example.com',
'seed connection',
@ -75,7 +75,7 @@ describe('WarehouseCatalogService', () => {
`${root}/enrichment/relationship-profile.json`,
JSON.stringify(
{
connectionId: connectionName,
connectionId,
driver,
sqlAvailable: true,
queryCount: 3,
@ -113,10 +113,10 @@ describe('WarehouseCatalogService', () => {
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getLatestSyncId('warehouse')).resolves.toBe('sync-2');
const detail = await catalog.getTable({ connectionName: 'warehouse', catalog: null, db: 'public', name: 'orders' });
const detail = await catalog.getTable({ connectionId: 'warehouse', catalog: null, db: 'public', name: 'orders' });
expect(detail).toMatchObject({
connectionName: 'warehouse',
connectionId: 'warehouse',
display: 'public.orders',
rowCount: 12,
columns: [
@ -124,11 +124,20 @@ describe('WarehouseCatalogService', () => {
{ name: 'status', nativeType: 'text', sampleValues: ['paid', 'refunded'], distinctCount: 2 },
],
});
expect(detail).not.toHaveProperty(['connection', 'Name'].join(''));
const hits = await catalog.searchByName('warehouse', 'orders', 5);
expect(hits[0]).toMatchObject({
kind: 'table',
connectionId: 'warehouse',
display: 'public.orders',
});
expect(hits[0]).not.toHaveProperty(['connection', 'Name'].join(''));
});
it('returns scanAvailable=false when no live-database scan exists', async () => {
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getTable({ connectionName: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
await expect(catalog.getTable({ connectionId: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
await expect(catalog.hasScan('missing')).resolves.toBe(false);
});

View file

@ -1,12 +1,12 @@
import { getDialectForDriver } from '../../../connections/index.js';
import type { KtxFileStorePort } from '../../../core/index.js';
import { getDialectForDriver } from '../connections/index.js';
import type { KtxFileStorePort } from '../core/index.js';
import type {
KtxConnectionDriver,
KtxSchemaColumn,
KtxSchemaForeignKey,
KtxSchemaTable,
KtxTableRef,
} from '../../../scan/types.js';
} from './types.js';
type CatalogDriver = KtxConnectionDriver | 'sqlite3';
@ -24,7 +24,7 @@ interface WarehouseColumnDetail extends KtxSchemaColumn {
}
export interface TableDetail {
connectionName: string;
connectionId: string;
catalog: string | null;
db: string | null;
name: string;
@ -40,14 +40,14 @@ export interface TableDetail {
export type RawSchemaHit =
| {
kind: 'table';
connectionName: string;
connectionId: string;
ref: KtxTableRef;
display: string;
matchedOn: 'name' | 'db' | 'comment' | 'description';
}
| {
kind: 'column';
connectionName: string;
connectionId: string;
ref: KtxTableRef & { column: string };
display: string;
matchedOn: 'name' | 'comment' | 'description';
@ -80,7 +80,7 @@ interface RelationshipProfileArtifact {
}
interface ConnectionCatalog {
connectionName: string;
connectionId: string;
syncId: string;
driver: CatalogDriver;
tables: KtxSchemaTable[];
@ -250,21 +250,21 @@ export class WarehouseCatalogService {
constructor(private readonly deps: WarehouseCatalogServiceDeps) {}
async hasScan(connectionName: string): Promise<boolean> {
return (await this.loadCatalog(connectionName)) !== null;
async hasScan(connectionId: string): Promise<boolean> {
return (await this.loadCatalog(connectionId)) !== null;
}
async getLatestSyncId(connectionName: string): Promise<string | null> {
return (await this.loadCatalog(connectionName))?.syncId ?? null;
async getLatestSyncId(connectionId: string): Promise<string | null> {
return (await this.loadCatalog(connectionId))?.syncId ?? null;
}
async listTables(connectionName: string): Promise<KtxTableRef[]> {
const catalog = await this.loadCatalog(connectionName);
async listTables(connectionId: string): Promise<KtxTableRef[]> {
const catalog = await this.loadCatalog(connectionId);
return catalog?.tables.map((table) => ({ catalog: table.catalog, db: table.db, name: table.name })) ?? [];
}
async getTable(ref: { connectionName: string } & KtxTableRef): Promise<TableDetail | null> {
const catalog = await this.loadCatalog(ref.connectionName);
async getTable(ref: { connectionId: string } & KtxTableRef): Promise<TableDetail | null> {
const catalog = await this.loadCatalog(ref.connectionId);
if (!catalog) {
return null;
}
@ -277,7 +277,7 @@ export class WarehouseCatalogService {
const profileColumns = catalog.profile?.columns ?? {};
return {
connectionName: ref.connectionName,
connectionId: ref.connectionId,
catalog: table.catalog,
db: table.db,
name: table.name,
@ -310,14 +310,14 @@ export class WarehouseCatalogService {
}
async resolveDisplay(
connectionName: string,
connectionId: string,
display: string,
): Promise<{
resolved: KtxTableRef | null;
candidates: KtxTableRef[];
dialect: string;
}> {
const catalog = await this.loadCatalog(connectionName);
const catalog = await this.loadCatalog(connectionId);
if (!catalog) {
return { resolved: null, candidates: [], dialect: 'unknown' };
}
@ -333,14 +333,14 @@ export class WarehouseCatalogService {
return { resolved: { catalog: table.catalog, db: table.db, name: table.name }, candidates: [], dialect };
}
async resolveDisplayTarget(connectionName: string, display: string): Promise<DisplayTargetResolution> {
const catalog = await this.loadCatalog(connectionName);
async resolveDisplayTarget(connectionId: string, display: string): Promise<DisplayTargetResolution> {
const catalog = await this.loadCatalog(connectionId);
if (!catalog) {
return { resolved: null, candidates: [], dialect: 'unknown' };
}
const dialect = getDialectForDriver(catalog.driver).type;
const tableResolution = await this.resolveDisplay(connectionName, display);
const tableResolution = await this.resolveDisplay(connectionId, display);
if (tableResolution.resolved) {
return tableResolution;
}
@ -367,8 +367,8 @@ export class WarehouseCatalogService {
};
}
async searchByName(connectionName: string, query: string, limit: number): Promise<RawSchemaHit[]> {
const catalog = await this.loadCatalog(connectionName);
async searchByName(connectionId: string, query: string, limit: number): Promise<RawSchemaHit[]> {
const catalog = await this.loadCatalog(connectionId);
if (!catalog) {
return [];
}
@ -378,7 +378,7 @@ export class WarehouseCatalogService {
if (tableMatch) {
hits.push({
kind: 'table',
connectionName,
connectionId,
ref: { catalog: table.catalog, db: table.db, name: table.name },
display: formatDisplay(catalog.driver, table),
matchedOn: tableMatch,
@ -391,7 +391,7 @@ export class WarehouseCatalogService {
}
hits.push({
kind: 'column',
connectionName,
connectionId,
ref: { catalog: table.catalog, db: table.db, name: table.name, column: column.name },
display: `${formatDisplay(catalog.driver, table)}.${column.name}`,
matchedOn: columnMatch,
@ -401,18 +401,18 @@ export class WarehouseCatalogService {
return hits.slice(0, Math.max(0, limit));
}
private loadCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
const existing = this.catalogs.get(connectionName);
private loadCatalog(connectionId: string): Promise<ConnectionCatalog | null> {
const existing = this.catalogs.get(connectionId);
if (existing) {
return existing;
}
const pending = this.readCatalog(connectionName);
this.catalogs.set(connectionName, pending);
const pending = this.readCatalog(connectionId);
this.catalogs.set(connectionId, pending);
return pending;
}
private async readCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
const root = `raw-sources/${connectionName}/live-database`;
private async readCatalog(connectionId: string): Promise<ConnectionCatalog | null> {
const root = `raw-sources/${connectionId}/live-database`;
const listed = await this.deps.fileStore.listFiles(root);
const connectionFiles = listed.files.filter((file) => file.endsWith('/connection.json')).sort();
const latestConnectionPath = connectionFiles.at(-1);
@ -438,7 +438,7 @@ export class WarehouseCatalogService {
}
return {
connectionName,
connectionId,
syncId,
driver: connection.driver ?? profile?.driver ?? 'postgres',
tables,