* feat(context): add warehouse dialect dispatch * feat(context): read warehouse scan catalog * feat(context): add entity details verification tool * feat(context): add ingest SQL verification tool * feat(context): add raw warehouse discovery tool * feat(context): expose warehouse verification tools to ingest * docs(context): add ingest identifier verification protocol * test(context): guard ingest identifier verification prompts * chore(context): verify warehouse verification tools * docs: add warehouse verification tools plan and spec * fix(context): expose target warehouses to Notion ingest * fix(context): update ingest prompts for warehouse verification tools * fix(context): scope raw schema discovery to allowed connections * fix(context): verify warehouse column display targets * docs: add notion warehouse verification gap closure plan * fix(context): include raw discovery connection names * fix(context): expose warehouse targets for LookML and MetricFlow * fix(context): pass connection config to ingest query executors * fix(cli): enable read-only SQL probes for local ingest * docs: add warehouse verification final v1 closure plan * fix(context): align warehouse sql probe prompt shape * docs: add warehouse verification prompt shape closure plan * test(context): catch connectionless sql execution prompt examples * fix(context): include connection name in sl capture sql example * docs: add warehouse verification sql example closure plan * fix(context): report structured entity detail misses * docs: add warehouse verification structured target miss closure plan * fix: report untracked squash merge conflicts * feat: require ingest verification ledger * fix: stabilize ingest wiki references
59 KiB
Warehouse Verification Tools Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add synthesis-time warehouse verification tools so ingest agents can verify raw warehouse tables, columns, and sample values before writing wiki pages, SL sources, tables: frontmatter, sl_refs, or unmapped fallback records.
Architecture: Add a raw scan catalog service over raw-sources/<connection>/live-database/<sync>/, three BaseTool-backed ingest tools, and runner/tool-session scoping for allowed warehouse connections. Register the tools in the local ingest toolset so both WorkUnit and reconcile stages receive them through the existing toAiSdkTools() path.
Tech Stack: TypeScript, Node 22, Vitest, AI SDK v6 tools, Zod, KTX file store, KTX semantic layer and wiki tools.
Audit summary
The current repo has the original spec file only; no matching plan or implementation exists under docs/superpowers/plans. The following v1-blocking gaps remain:
packages/context/src/connections/dialects.tsdoes not exist.packages/context/src/ingest/tools/warehouse-verification/does not exist.entity_details,sql_execution, anddiscover_dataare not available to ingest WU or reconcile toolsets.ToolSessiondoes not carry the ingest stage's allowed warehouse connection IDs.- Prompt updates are absent from the 11 writer skills named in the spec.
- Cleanup strings remain:
orbit_analytics.customer,wiki_sl_search, andsl_describe_table. - Prompt-bundling and warehouse-tool tests are absent.
Non-blocking gaps remain out of scope for this plan:
- Hard write-time validation in
wiki_writeandemit_unmapped_fallback. dictionary_search.semantic_queryin synthesis toolsets.- A raw-schema FTS index.
- A UUID identity layer for tables and columns.
One repo-specific adjustment is required: do not import @ktx/connector-*
dialect classes into @ktx/context, because every connector package already
depends on @ktx/context. Add a minimal context-local dialect dispatch instead.
File structure
Create these files:
packages/context/src/connections/dialects.ts: Context-local driver dispatch for identifier quoting and display formatting.packages/context/src/connections/dialects.test.ts: Driver dispatch and display-format tests.packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts: Reads the latest live-database scan, resolves display identifiers, and searches table and column metadata.packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts: Fixture-backed catalog tests.packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts:entity_detailsingest tool.packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts: Tool contract tests.packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts:sql_executioningest tool.packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts: Read-only SQL and output tests.packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts:discover_dataingest tool composing wiki, SL, and raw-schema search.packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts: Discovery composition tests.packages/context/src/ingest/tools/warehouse-verification/index.ts: Exports tool classes andcreateWarehouseVerificationTools().packages/context/skills/_shared/identifier-verification.md: Shared protocol text kept in the tree for review even though writer skills inline it.
Modify these files:
packages/context/src/connections/index.ts: Export the dialect helper.packages/context/src/tools/tool-session.ts: AddallowedConnectionNames.packages/context/src/ingest/ingest-bundle.runner.ts: PopulateallowedConnectionNamesfor WU and reconcile sessions.packages/context/src/ingest/local-bundle-runtime.ts: Register the warehouse verification tools inLocalIngestToolsetFactory.packages/context/src/ingest/ingest-bundle.runner.test.ts: Assert the runner scopes allowed warehouse connections.packages/context/src/memory/memory-runtime-assets.test.ts: Assert writer skills contain the protocol and banned strings are gone.packages/context/src/ingest/ingest-runtime-assets.test.ts: Assert ingest skill packaging includes the protocol.packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts: Replace the fictional table example.packages/context/src/sl/tools/sl-warehouse-validation.ts: Replace the stalesl_describe_tablehint.packages/context/skills/*/SKILL.md: Inline protocol updates for the writer skills listed in the spec.
Task 1: Add context-local dialect dispatch
Files:
-
Create:
packages/context/src/connections/dialects.ts -
Create:
packages/context/src/connections/dialects.test.ts -
Modify:
packages/context/src/connections/index.ts -
Step 1: Write the failing dialect tests
Create packages/context/src/connections/dialects.test.ts:
import { describe, expect, it } from 'vitest';
import { getDialectForDriver } from './dialects.js';
describe('getDialectForDriver', () => {
it.each([
['postgres', '"public"."orders"'],
['postgresql', '"public"."orders"'],
['mysql', '`public`.`orders`'],
['clickhouse', '`public`.`orders`'],
['sqlite', '"orders"'],
['snowflake', '"analytics"."public"."orders"'],
['bigquery', '`analytics`.`public`.`orders`'],
['sqlserver', '[analytics].[public].[orders]'],
] as const)('formats table names for %s', (driver, expected) => {
const dialect = getDialectForDriver(driver);
expect(
dialect.formatTableName({
catalog: driver === 'snowflake' || driver === 'bigquery' || driver === 'sqlserver' ? 'analytics' : null,
db: driver === 'sqlite' ? null : 'public',
name: 'orders',
}),
).toBe(expected);
});
it('throws with a supported-driver list for unknown drivers', () => {
expect(() => getDialectForDriver('oracle')).toThrow(
'Unsupported warehouse driver "oracle". Supported drivers: bigquery, clickhouse, mysql, postgres, postgresql, sqlite, sqlite3, snowflake, sqlserver',
);
});
});
- Step 2: Run the failing test
Run:
pnpm --filter @ktx/context exec vitest run src/connections/dialects.test.ts
Expected: FAIL because ./dialects.js does not exist.
- Step 3: Add the minimal dialect implementation
Create packages/context/src/connections/dialects.ts:
import type { KtxSchemaDimensionType, KtxTableRef } from '../scan/types.js';
export type SupportedDriver =
| 'postgres'
| 'postgresql'
| 'mysql'
| 'sqlserver'
| 'snowflake'
| 'bigquery'
| 'clickhouse'
| 'sqlite'
| 'sqlite3';
export interface KtxDialect {
readonly type: SupportedDriver;
quoteIdentifier(identifier: string): string;
formatTableName(table: KtxTableRef): string;
mapToDimensionType(nativeType: string): KtxSchemaDimensionType;
}
const supportedDrivers: SupportedDriver[] = [
'bigquery',
'clickhouse',
'mysql',
'postgres',
'postgresql',
'sqlite',
'sqlite3',
'snowflake',
'sqlserver',
];
function doubleQuoted(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
function backtickQuoted(identifier: string): string {
return `\`${identifier.replace(/`/g, '``')}\``;
}
function bigQueryQuoted(identifier: string): string {
return `\`${identifier.replace(/`/g, '\\`')}\``;
}
function bracketQuoted(identifier: string): string {
return `[${identifier.replace(/\]/g, ']]')}]`;
}
function inferDimensionType(nativeType: string): KtxSchemaDimensionType {
const normalized = nativeType.toLowerCase().trim();
if (normalized.includes('date') || normalized.includes('time')) {
return 'time';
}
if (
normalized.includes('int') ||
normalized.includes('num') ||
normalized.includes('dec') ||
normalized.includes('float') ||
normalized.includes('double') ||
normalized.includes('real')
) {
return 'number';
}
if (normalized.includes('bool') || normalized === 'bit') {
return 'boolean';
}
return 'string';
}
function formatWithParts(table: KtxTableRef, quote: (identifier: string) => string, sqlite = false): string {
const parts = sqlite ? [table.name] : [table.catalog, table.db, table.name].filter((part): part is string => !!part);
return parts.map(quote).join('.');
}
function createDialect(type: SupportedDriver, quote: (identifier: string) => string, sqlite = false): KtxDialect {
return {
type,
quoteIdentifier: quote,
formatTableName: (table) => formatWithParts(table, quote, sqlite),
mapToDimensionType: inferDimensionType,
};
}
const dialects: Record<SupportedDriver, KtxDialect> = {
postgres: createDialect('postgres', doubleQuoted),
postgresql: createDialect('postgresql', doubleQuoted),
mysql: createDialect('mysql', backtickQuoted),
clickhouse: createDialect('clickhouse', backtickQuoted),
sqlite: createDialect('sqlite', doubleQuoted, true),
sqlite3: createDialect('sqlite3', doubleQuoted, true),
snowflake: createDialect('snowflake', doubleQuoted),
bigquery: createDialect('bigquery', bigQueryQuoted),
sqlserver: createDialect('sqlserver', bracketQuoted),
};
export function getDialectForDriver(driver: string): KtxDialect {
const normalized = driver.toLowerCase().trim();
if (normalized in dialects) {
return dialects[normalized as SupportedDriver];
}
throw new Error(`Unsupported warehouse driver "${driver}". Supported drivers: ${supportedDrivers.join(', ')}`);
}
Modify packages/context/src/connections/index.ts:
export type { KtxDialect, SupportedDriver } from './dialects.js';
export { getDialectForDriver } from './dialects.js';
- Step 4: Run the dialect tests
Run:
pnpm --filter @ktx/context exec vitest run src/connections/dialects.test.ts
Expected: PASS.
- Step 5: Commit
Run:
git add packages/context/src/connections/dialects.ts packages/context/src/connections/dialects.test.ts packages/context/src/connections/index.ts
git commit -m "feat(context): add warehouse dialect dispatch"
Task 2: Add the raw scan warehouse catalog service
Files:
-
Create:
packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts -
Create:
packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts -
Step 1: Write failing catalog tests
Create packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts:
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('WarehouseCatalogService', () => {
let tempDir: string;
let project: KtxLocalProject;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-warehouse-catalog-'));
project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' });
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
await project.fileStore.writeFile(
`${root}/connection.json`,
JSON.stringify({ connectionId: connectionName, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
'ktx',
'ktx@example.com',
'seed connection',
);
await project.fileStore.writeFile(
`${root}/tables/orders.json`,
JSON.stringify(
{
catalog: null,
db: driver === 'sqlite' ? null : 'public',
name: 'orders',
kind: 'table',
comment: 'Customer orders',
estimatedRows: 12,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
{
name: 'status',
nativeType: 'text',
normalizedType: 'text',
dimensionType: 'string',
nullable: false,
primaryKey: false,
comment: 'Order status',
},
],
foreignKeys: [],
},
null,
2,
),
'ktx',
'ktx@example.com',
'seed orders',
);
await project.fileStore.writeFile(
`${root}/enrichment/relationship-profile.json`,
JSON.stringify(
{
connectionId: connectionName,
driver,
sqlAvailable: true,
queryCount: 3,
tables: [{ table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' }, rowCount: 12 }],
columns: {
'orders.status': {
table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' },
column: 'status',
nativeType: 'text',
normalizedType: 'text',
rowCount: 12,
nullCount: 0,
distinctCount: 2,
uniquenessRatio: 0.1667,
nullRate: 0,
sampleValues: ['paid', 'refunded'],
minTextLength: 4,
maxTextLength: 8,
},
},
warnings: [],
},
null,
2,
),
'ktx',
'ktx@example.com',
'seed profile',
);
}
it('finds the latest sync and merges table schema with relationship profile values', async () => {
await seedLiveDatabaseScan('warehouse', 'sync-1');
await seedLiveDatabaseScan('warehouse', 'sync-2');
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getLatestSyncId('warehouse')).resolves.toBe('sync-2');
const detail = await catalog.getTable({ connectionName: 'warehouse', catalog: null, db: 'public', name: 'orders' });
expect(detail).toMatchObject({
connectionName: 'warehouse',
display: 'public.orders',
rowCount: 12,
columns: [
{ name: 'id', nativeType: 'integer', primaryKey: true },
{ name: 'status', nativeType: 'text', sampleValues: ['paid', 'refunded'], distinctCount: 2 },
],
});
});
it('returns scanAvailable=false when no live-database scan exists', async () => {
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getTable({ connectionName: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
await expect(catalog.hasScan('missing')).resolves.toBe(false);
});
it('resolves postgres display strings and returns closest candidates for missing tables', async () => {
await seedLiveDatabaseScan();
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
resolved: { catalog: null, db: 'public', name: 'orders' },
candidates: [],
dialect: 'postgres',
});
await expect(catalog.resolveDisplay('warehouse', 'public.orderz')).resolves.toMatchObject({
resolved: null,
candidates: [{ name: 'orders' }],
});
});
it('treats two-part BigQuery identifiers as ambiguous instead of guessing', async () => {
await seedLiveDatabaseScan('warehouse', 'sync-bigquery', 'bigquery');
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
resolved: null,
dialect: 'bigquery',
});
});
it('searches table names, column names, comments, and descriptions', async () => {
await seedLiveDatabaseScan();
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.searchByName('warehouse', 'status', 10)).resolves.toEqual(
expect.arrayContaining([
expect.objectContaining({
kind: 'column',
ref: expect.objectContaining({ db: 'public', name: 'orders', column: 'status' }),
matchedOn: 'name',
}),
]),
);
});
});
- Step 2: Run the failing catalog tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts
Expected: FAIL because the service file does not exist.
- Step 3: Add the catalog service
Create packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts with these exported shapes and behavior:
import type { KtxFileStorePort } from '../../../core/index.js';
import { getDialectForDriver } from '../../../connections/index.js';
import type { KtxConnectionDriver, KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaTable, KtxTableRef } from '../../../scan/types.js';
export interface WarehouseCatalogServiceDeps {
fileStore: KtxFileStorePort;
}
export interface WarehouseColumnDetail extends KtxSchemaColumn {
descriptions: Record<string, string>;
rowCount: number | null;
nullCount: number | null;
distinctCount: number | null;
nullRate: number | null;
sampleValues: string[];
}
export interface TableDetail {
connectionName: string;
catalog: string | null;
db: string | null;
name: string;
display: string;
kind: string;
comment: string | null;
description: string | null;
rowCount: number | null;
columns: WarehouseColumnDetail[];
foreignKeys: KtxSchemaForeignKey[];
}
export type RawSchemaHit =
| { kind: 'table'; ref: KtxTableRef; display: string; matchedOn: 'name' | 'db' | 'comment' | 'description' }
| { kind: 'column'; ref: KtxTableRef & { column: string }; display: string; matchedOn: 'name' | 'comment' | 'description' };
interface ConnectionArtifact {
driver?: KtxConnectionDriver;
}
interface RelationshipProfileColumn {
table?: KtxTableRef;
column?: string;
rowCount?: number;
nullCount?: number;
distinctCount?: number;
nullRate?: number;
sampleValues?: unknown[];
}
interface RelationshipProfileArtifact {
driver?: KtxConnectionDriver;
tables?: Array<{ table?: KtxTableRef; rowCount?: number }>;
columns?: Record<string, RelationshipProfileColumn>;
}
interface ConnectionCatalog {
connectionName: string;
syncId: string;
driver: KtxConnectionDriver;
tables: KtxSchemaTable[];
profile: RelationshipProfileArtifact | null;
}
The implementation must:
- Use
fileStore.listFiles("raw-sources/<connectionName>/live-database")and choose the lexicographically latest path ending in/connection.json. - Read every JSON file under
<latestRoot>/tables/rather than reconstructing a path from the table ref. This supports encoded and simple table filenames already present in tests. - Parse display strings by driver:
- Postgres, MySQL, and ClickHouse:
schema.table. - SQL Server, Snowflake, and BigQuery:
catalog.schema.table. - SQLite:
table. - For BigQuery, a two-part display must return
resolved: nulland candidate matches.
- Postgres, MySQL, and ClickHouse:
- Match table refs case-insensitively, while preserving stored casing in outputs.
- Merge relationship-profile fields by
(catalog, db, name, column), with fallback matching ontable.name + "." + column. - Cache a loaded connection catalog per
connectionNamewithin the service instance. - Return
nullfromgetTable()when the scan is absent or the table ref is not found.
Use these method signatures:
export class WarehouseCatalogService {
constructor(private readonly deps: WarehouseCatalogServiceDeps) {}
async hasScan(connectionName: string): Promise<boolean>;
async getLatestSyncId(connectionName: string): Promise<string | null>;
async listTables(connectionName: string): Promise<KtxTableRef[]>;
async getTable(ref: { connectionName: string } & KtxTableRef): Promise<TableDetail | null>;
async resolveDisplay(connectionName: string, display: string): Promise<{
resolved: KtxTableRef | null;
candidates: KtxTableRef[];
dialect: string;
}>;
async searchByName(connectionName: string, query: string, limit: number): Promise<RawSchemaHit[]>;
}
- Step 4: Run the catalog tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts
Expected: PASS.
- Step 5: Commit
Run:
git add packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts
git commit -m "feat(context): read warehouse scan catalog"
Task 3: Add entity_details
Files:
-
Create:
packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts -
Create:
packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts -
Step 1: Write failing
entity_detailstests
Create tests that instantiate the tool with a seeded WarehouseCatalogService and a ToolContext whose session has allowedConnectionNames: new Set(['warehouse']). Test these cases:
it('returns scoped table detail for a display target', async () => {
const result = await tool.call(
{ connectionName: 'warehouse', targets: [{ display: 'public.orders' }] },
context,
);
expect(result.markdown).toContain('### public.orders');
expect(result.markdown).toContain('- status (text, nullable=false)');
expect(result.markdown).toContain('sample: ["paid","refunded"]');
expect(result.structured.scanAvailable).toBe(true);
expect(result.structured.resolved).toHaveLength(1);
});
it('returns a no-scan state distinct from not found', async () => {
const result = await tool.call(
{ connectionName: 'empty', targets: [{ display: 'public.orders' }] },
{ ...context, session: { ...context.session!, allowedConnectionNames: new Set(['empty']) } },
);
expect(result.markdown).toContain('No live-database scan available for connection "empty"; run `ktx scan` first.');
expect(result.structured.scanAvailable).toBe(false);
});
it('refuses out-of-scope connections', async () => {
const result = await tool.call(
{ connectionName: 'billing', targets: [{ display: 'public.orders' }] },
context,
);
expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
expect(result.structured.scanAvailable).toBe(false);
});
- Step 2: Run the failing tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/entity-details.tool.test.ts
Expected: FAIL because the tool file does not exist.
- Step 3: Implement the tool
Create packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts:
import { z } from 'zod';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import type { KtxTableRef } from '../../../scan/types.js';
import { WarehouseCatalogService, type TableDetail } from './warehouse-catalog.service.js';
const targetSchema = z.union([
z.object({ display: z.string().min(1) }),
z.object({
catalog: z.string().nullable(),
db: z.string().nullable(),
name: z.string().min(1),
column: z.string().optional(),
}),
]);
const entityDetailsInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
targets: z.array(targetSchema).min(1).max(50),
});
type EntityDetailsInput = z.infer<typeof entityDetailsInputSchema>;
export interface EntityDetailsStructured {
resolved: TableDetail[];
missing: Array<{ target: unknown; candidates: KtxTableRef[] }>;
scanAvailable: boolean;
}
function allowedConnectionNames(context: ToolContext): ReadonlySet<string> | null {
return context.session?.allowedConnectionNames ?? null;
}
function sampleText(values: string[]): string {
return values.length > 0 ? ` - sample: ${JSON.stringify(values.slice(0, 10))}` : '';
}
function appendTableMarkdown(parts: string[], detail: TableDetail, columnName?: string): void {
const columns = columnName ? detail.columns.filter((column) => column.name === columnName) : detail.columns;
parts.push(`### ${detail.display}`);
parts.push(`Type: ${detail.kind} | Native columns: ${detail.columns.length}`);
if (detail.description || detail.comment) {
parts.push(`Description: ${detail.description ?? detail.comment}`);
}
parts.push('', 'Columns:');
for (const column of columns) {
const pk = column.primaryKey ? ', PK' : '';
parts.push(`- ${column.name} (${column.nativeType}, nullable=${column.nullable}${pk})${sampleText(column.sampleValues)}`);
}
parts.push('');
}
export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema> {
readonly name = 'entity_details';
constructor(private readonly catalogFactory: (context: ToolContext) => WarehouseCatalogService) {
super();
}
get description(): string {
return 'Verify warehouse tables and columns from the latest live-database scan before writing them into wiki or semantic-layer output.';
}
get inputSchema() {
return entityDetailsInputSchema;
}
async call(input: EntityDetailsInput, context: ToolContext): Promise<ToolOutput<EntityDetailsStructured>> {
const allowed = allowedConnectionNames(context);
if (allowed && !allowed.has(input.connectionName)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
const catalog = this.catalogFactory(context);
const scanAvailable = await catalog.hasScan(input.connectionName);
if (!scanAvailable) {
return {
markdown: `No live-database scan available for connection "${input.connectionName}"; run \`ktx scan\` first.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
const parts: string[] = [];
const resolved: TableDetail[] = [];
const missing: EntityDetailsStructured['missing'] = [];
for (const target of input.targets) {
const resolution =
'display' in target
? await catalog.resolveDisplay(input.connectionName, target.display)
: { resolved: { catalog: target.catalog, db: target.db, name: target.name }, candidates: [], dialect: '' };
if (!resolution.resolved) {
missing.push({ target, candidates: resolution.candidates });
parts.push(`Not found in scan: ${'display' in target ? target.display : target.name}`);
if (resolution.candidates.length > 0) {
parts.push(`Closest matches: ${resolution.candidates.map((candidate) => candidate.name).join(', ')}`);
}
continue;
}
const detail = await catalog.getTable({ connectionName: input.connectionName, ...resolution.resolved });
if (!detail) {
missing.push({ target, candidates: resolution.candidates });
continue;
}
resolved.push(detail);
appendTableMarkdown(parts, detail, 'column' in target ? target.column : undefined);
}
return {
markdown: parts.join('\n').trim(),
structured: { resolved, missing, scanAvailable: true },
};
}
}
- Step 4: Run the
entity_detailstests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/entity-details.tool.test.ts
Expected: PASS.
- Step 5: Commit
Run:
git add packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts
git commit -m "feat(context): add entity details verification tool"
Task 4: Add sql_execution
Files:
-
Create:
packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts -
Create:
packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts -
Step 1: Write failing
sql_executiontests
Create tests for:
it('wraps read-only SQL with a capped row limit', async () => {
connections.executeQuery.mockResolvedValue({ headers: ['status'], rows: [['paid']], totalRows: 1 });
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
context,
);
expect(connections.executeQuery).toHaveBeenCalledWith(
'warehouse',
'select * from (select status from public.orders) as ktx_query_result limit 5',
);
expect(result.markdown).toContain('| status |');
expect(result.structured.wrappedSql).toContain('limit 5');
});
it.each(['insert into x values (1)', 'drop table x', 'vacuum'])('rejects mutating SQL: %s', async (sql) => {
const result = await tool.call({ connectionName: 'warehouse', sql }, context);
expect(result.markdown).toContain('Only read-only SELECT/WITH queries can be executed locally.');
expect(connections.executeQuery).not.toHaveBeenCalled();
});
it('surfaces connector errors verbatim', async () => {
connections.executeQuery.mockRejectedValue(new Error('relation "orbit_analytics.customer" does not exist'));
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
context,
);
expect(result.markdown).toContain('relation "orbit_analytics.customer" does not exist');
expect(result.structured.error).toContain('relation "orbit_analytics.customer" does not exist');
});
- Step 2: Run the failing tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts
Expected: FAIL because the tool file does not exist.
- Step 3: Implement the tool
Create packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts:
import { z } from 'zod';
import { assertReadOnlySql, limitSqlForExecution } from '../../../connections/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
const sqlExecutionInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
sql: z.string().min(1),
rowLimit: z.number().int().positive().max(1000).optional().default(100),
});
type SqlExecutionInput = z.infer<typeof sqlExecutionInputSchema>;
export interface SqlExecutionStructured {
headers: string[];
rows: unknown[][];
rowCount: number;
truncated: boolean;
sql: string;
wrappedSql: string;
error?: string;
}
function markdownTable(headers: string[], rows: unknown[][], totalRows: number): string {
if (headers.length === 0) {
return rows.length === 0 ? 'Query returned no rows.' : JSON.stringify(rows.slice(0, 20));
}
const visible = rows.slice(0, 20);
const lines = [
`| ${headers.join(' | ')} |`,
`| ${headers.map(() => '---').join(' | ')} |`,
...visible.map((row) => `| ${row.map((value) => String(value ?? '')).join(' | ')} |`),
];
if (totalRows > visible.length) {
lines.push(`... +${totalRows - visible.length} more rows`);
}
return lines.join('\n');
}
export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
readonly name = 'sql_execution';
constructor(private readonly connections: SlConnectionCatalogPort) {
super();
}
get description(): string {
return 'Run a single read-only SELECT or WITH probe against an allowed warehouse connection and return a capped markdown table or the warehouse error.';
}
get inputSchema() {
return sqlExecutionInputSchema;
}
async call(input: SqlExecutionInput, context: ToolContext): Promise<ToolOutput<SqlExecutionStructured>> {
const allowed = context.session?.allowedConnectionNames;
if (allowed && !allowed.has(input.connectionName)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql: input.sql, wrappedSql: '', error: 'connection_not_allowed' },
};
}
let sql: string;
let wrappedSql: string;
try {
sql = assertReadOnlySql(input.sql);
wrappedSql = limitSqlForExecution(sql, input.rowLimit);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return {
markdown: message,
structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql: input.sql, wrappedSql: '', error: message },
};
}
try {
const result = await this.connections.executeQuery(input.connectionName, wrappedSql);
const headers = result.headers ?? [];
const rows = result.rows ?? [];
const rowCount = result.totalRows ?? rows.length;
return {
markdown: markdownTable(headers, rows, rowCount),
structured: { headers, rows, rowCount, truncated: rowCount > rows.length, sql, wrappedSql },
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return {
markdown: `SQL execution failed: ${message}`,
structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql, wrappedSql, error: message },
};
}
}
}
- Step 4: Run the
sql_executiontests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts
Expected: PASS.
- Step 5: Commit
Run:
git add packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts
git commit -m "feat(context): add ingest SQL verification tool"
Task 5: Add discover_data
Files:
-
Create:
packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts -
Create:
packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts -
Create:
packages/context/src/ingest/tools/warehouse-verification/index.ts -
Step 1: Write failing
discover_datatests
Create tests with fake wikiSearchTool.call, slDiscoverTool.call, and WarehouseCatalogService.searchByName. Cover:
it('groups wiki, semantic layer, and raw schema hits with routing hints', async () => {
const result = await tool.call({ query: 'orders', connectionName: 'warehouse', limit: 5 }, context);
expect(result.markdown).toContain('## Wiki Pages');
expect(result.markdown).toContain('use `wiki_read(blockKey)` for full content');
expect(result.markdown).toContain('## Semantic Layer Sources');
expect(result.markdown).toContain('use `sl_read_source(sourceName)` for the YAML');
expect(result.markdown).toContain('## Raw Warehouse Schema');
expect(result.markdown).toContain('use `entity_details({connectionName, targets: [{display}]})`');
expect(result.structured.raw?.hits).toHaveLength(1);
});
it('delegates sourceName inspect mode to sl_discover only', async () => {
const result = await tool.call({ sourceName: 'orders', connectionName: 'warehouse' }, context);
expect(slDiscoverTool.call).toHaveBeenCalledWith({ sourceName: 'orders', connectionId: 'warehouse' }, context);
expect(wikiSearchTool.call).not.toHaveBeenCalled();
expect(catalog.searchByName).not.toHaveBeenCalled();
expect(result.markdown).toContain('source detail');
});
it('returns the empty-state message when all sections are empty', async () => {
const result = await tool.call({ query: 'customer source', connectionName: 'warehouse' }, emptyContext);
expect(result.markdown).toContain('No matches for "customer source" across wiki, semantic layer, or raw warehouse schema.');
});
- Step 2: Run the failing tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/discover-data.tool.test.ts
Expected: FAIL because the tool file does not exist.
- Step 3: Implement the tool and index export
Create packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts:
import { z } from 'zod';
import type { BaseTool, ToolContext, ToolOutput } from '../../../tools/index.js';
import { BaseTool as ToolBase } from '../../../tools/index.js';
import { WarehouseCatalogService, type RawSchemaHit } from './warehouse-catalog.service.js';
const discoverDataInputSchema = z.object({
query: z.string().optional(),
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
limit: z.number().int().positive().max(50).optional().default(10),
sourceName: z.string().optional(),
});
type DiscoverDataInput = z.infer<typeof discoverDataInputSchema>;
export interface DiscoverDataStructured {
wiki: unknown | null;
sl: unknown | null;
raw: { hits: RawSchemaHit[] } | null;
}
interface DiscoverDataDeps {
wikiSearchTool: BaseTool;
slDiscoverTool: BaseTool;
catalogFactory: (context: ToolContext) => WarehouseCatalogService;
}
export class DiscoverDataTool extends ToolBase<typeof discoverDataInputSchema> {
readonly name = 'discover_data';
constructor(private readonly deps: DiscoverDataDeps) {
super();
}
get description(): string {
return 'Discover existing wiki pages, semantic layer sources, and raw warehouse schema hits before writing ingest output.';
}
get inputSchema() {
return discoverDataInputSchema;
}
async call(input: DiscoverDataInput, context: ToolContext): Promise<ToolOutput<DiscoverDataStructured>> {
if (input.sourceName) {
const sl = await this.deps.slDiscoverTool.call(
{ sourceName: input.sourceName, connectionId: input.connectionName },
context,
);
return { markdown: sl.markdown, structured: { wiki: null, sl: sl.structured, raw: null } };
}
const query = input.query?.trim() || '';
const limit = input.limit ?? 10;
const parts: string[] = [];
let wiki: unknown | null = null;
let sl: unknown | null = null;
let raw: DiscoverDataStructured['raw'] = null;
if (query) {
const wikiResult = await this.deps.wikiSearchTool.call({ query, limit }, context);
if (wikiResult.structured?.totalFound > 0) {
parts.push('## Wiki Pages', '> use `wiki_read(blockKey)` for full content', wikiResult.markdown, '');
wiki = wikiResult.structured;
}
}
const slResult = await this.deps.slDiscoverTool.call(
{ query: query || undefined, connectionId: input.connectionName },
context,
);
if (slResult.structured?.totalSources > 0) {
parts.push('## Semantic Layer Sources', '> use `sl_read_source(sourceName)` for the YAML, or `entity_details` for warehouse-shape details', slResult.markdown, '');
sl = slResult.structured;
}
const catalog = this.deps.catalogFactory(context);
const connections = input.connectionName
? [input.connectionName]
: [...(context.session?.allowedConnectionNames ?? [])].sort();
const rawHits: RawSchemaHit[] = [];
for (const connectionName of connections) {
rawHits.push(...(await catalog.searchByName(connectionName, query, limit)));
}
if (rawHits.length > 0) {
parts.push('## Raw Warehouse Schema', '> use `entity_details({connectionName, targets: [{display}]})` for full DDL + sample values');
parts.push(
rawHits
.slice(0, limit)
.map((hit) => `- ${hit.kind}: ${hit.display} (matched on ${hit.matchedOn})`)
.join('\n'),
);
raw = { hits: rawHits.slice(0, limit) };
}
if (parts.length === 0) {
return {
markdown: `No matches for "${query}" across wiki, semantic layer, or raw warehouse schema. Try broader terms; this concept may not exist yet.`,
structured: { wiki, sl, raw },
};
}
return { markdown: parts.join('\n'), structured: { wiki, sl, raw } };
}
}
Create packages/context/src/ingest/tools/warehouse-verification/index.ts:
import type { BaseTool, ToolContext } from '../../../tools/index.js';
import type { KtxFileStorePort } from '../../../core/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { SqlExecutionTool } from './sql-execution.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
export { DiscoverDataTool } from './discover-data.tool.js';
export { EntityDetailsTool } from './entity-details.tool.js';
export { SqlExecutionTool } from './sql-execution.tool.js';
export { WarehouseCatalogService } from './warehouse-catalog.service.js';
export type { TableDetail, WarehouseColumnDetail, RawSchemaHit } from './warehouse-catalog.service.js';
export function createWarehouseVerificationTools(deps: {
connections: SlConnectionCatalogPort;
fallbackFileStore: KtxFileStorePort;
wikiSearchTool: BaseTool;
slDiscoverTool: BaseTool;
}): BaseTool[] {
const catalogFactory = (context: ToolContext) =>
new WarehouseCatalogService({
fileStore: context.session?.configService ?? deps.fallbackFileStore,
});
return [
new EntityDetailsTool(catalogFactory),
new SqlExecutionTool(deps.connections),
new DiscoverDataTool({
wikiSearchTool: deps.wikiSearchTool,
slDiscoverTool: deps.slDiscoverTool,
catalogFactory,
}),
];
}
- Step 4: Run the
discover_datatests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/discover-data.tool.test.ts
Expected: PASS.
- Step 5: Commit
Run:
git add packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts packages/context/src/ingest/tools/warehouse-verification/index.ts
git commit -m "feat(context): add raw warehouse discovery tool"
Task 6: Wire tools into ingest sessions
Files:
-
Modify:
packages/context/src/tools/tool-session.ts -
Modify:
packages/context/src/ingest/ingest-bundle.runner.ts -
Modify:
packages/context/src/ingest/local-bundle-runtime.ts -
Modify:
packages/context/src/ingest/ingest-bundle.runner.test.ts -
Step 1: Write failing scoping test
Add to packages/context/src/ingest/ingest-bundle.runner.test.ts:
it('threads target warehouse connection names into WorkUnit and reconcile tool sessions', async () => {
const deps = makeDeps();
const sessions: any[] = [];
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse']);
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
sessions.push(toolSession);
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.agentRunner.runLoop.mockResolvedValue({ stopReason: 'natural' });
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/notion/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'notion',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect([...sessions[0].allowedConnectionNames].sort()).toEqual(['notion', 'warehouse']);
});
- Step 2: Run the failing runner test
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/ingest-bundle.runner.test.ts -t "threads target warehouse connection names"
Expected: FAIL because allowedConnectionNames is absent.
- Step 3: Thread allowed connection names
Modify packages/context/src/tools/tool-session.ts:
allowedRawPaths?: ReadonlySet<string>;
allowedConnectionNames?: ReadonlySet<string>;
semanticLayerService: SemanticLayerService;
Modify WU session creation in packages/context/src/ingest/ingest-bundle.runner.ts:
allowedRawPaths: new Set(wu.rawFiles),
allowedConnectionNames: new Set(slConnectionIds),
semanticLayerService: scopedSemanticLayerService,
Modify reconcile session creation in the same file:
allowedRawPaths: reconciliationAllowedRawPaths,
allowedConnectionNames: new Set(slConnectionIds),
semanticLayerService: rcScopedSl,
- Step 4: Register the tools in the local ingest toolset
Modify packages/context/src/ingest/local-bundle-runtime.ts:
import {
createWarehouseVerificationTools,
} from './tools/warehouse-verification/index.js';
Refactor the existing inline wiki and SL tool instances in LocalIngestToolsetFactory so wikiSearchTool and slDiscoverTool are named constants, then add the warehouse tools:
const wikiSearchTool = new WikiSearchTool({
search: async (input) => {
const results = await searchLocalKnowledgePages(deps.project, {
userId: input.userId,
query: input.query,
limit: input.limit,
embeddingService: deps.embedding,
});
return {
results: results.slice(0, input.limit).map((result) => ({
key: result.key,
path: result.path,
summary: result.summary,
score: result.score,
matchReasons: result.matchReasons,
lanes: result.lanes,
})),
totalFound: results.length,
};
},
});
const slDiscoverTool = new SlDiscoverTool(slDeps, { maxSources: 25, minRrfScore: 0, maxDetailedSources: 5 });
const warehouseVerificationTools = createWarehouseVerificationTools({
connections: deps.connections,
fallbackFileStore: deps.project.fileStore,
wikiSearchTool,
slDiscoverTool,
});
this.baseTools = [
new WikiReadTool(deps.wikiService, deps.knowledgeIndex),
wikiSearchTool,
new WikiListTagsTool(deps.wikiService, deps.knowledgeIndex),
new WikiWriteTool(deps.wikiService, deps.knowledgeIndex, deps.knowledgeEvents),
new WikiRemoveTool(deps.wikiService, deps.knowledgeIndex, deps.knowledgeEvents),
slDiscoverTool,
new SlEditSourceTool(slDeps),
new SlReadSourceTool(slDeps),
new SlWriteSourceTool(slDeps),
new SlValidateTool(slDeps),
new SlRollbackTool(deps.slSourcesRepository, deps.connections, 0),
...warehouseVerificationTools,
];
- Step 5: Run integration and toolset tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/ingest-bundle.runner.test.ts -t "threads target warehouse connection names"
pnpm --filter @ktx/context exec vitest run src/ingest/local-bundle-runtime.test.ts
Expected: PASS.
- Step 6: Commit
Run:
git add packages/context/src/tools/tool-session.ts packages/context/src/ingest/ingest-bundle.runner.ts packages/context/src/ingest/local-bundle-runtime.ts packages/context/src/ingest/ingest-bundle.runner.test.ts
git commit -m "feat(context): expose warehouse verification tools to ingest"
Task 7: Update writer prompts and cleanup stale references
Files:
-
Create:
packages/context/skills/_shared/identifier-verification.md -
Modify:
packages/context/skills/notion_synthesize/SKILL.md -
Modify:
packages/context/skills/dbt_ingest/SKILL.md -
Modify:
packages/context/skills/lookml_ingest/SKILL.md -
Modify:
packages/context/skills/looker_ingest/SKILL.md -
Modify:
packages/context/skills/metabase_ingest/SKILL.md -
Modify:
packages/context/skills/metricflow_ingest/SKILL.md -
Modify:
packages/context/skills/live_database_ingest/SKILL.md -
Modify:
packages/context/skills/historic_sql_table_digest/SKILL.md -
Modify:
packages/context/skills/historic_sql_patterns/SKILL.md -
Modify:
packages/context/skills/knowledge_capture/SKILL.md -
Modify:
packages/context/skills/sl_capture/SKILL.md -
Modify:
packages/context/skills/sl/SKILL.md -
Modify:
packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts -
Modify:
packages/context/src/sl/tools/sl-warehouse-validation.ts -
Step 1: Add the shared protocol file
Create packages/context/skills/_shared/identifier-verification.md:
## Identifier Verification Protocol
Before writing a wiki page or SL source on any topic:
1. `discover_data({query: "<topic>"})` - see what wikis, SL sources, and raw
tables already exist. Prefer updating existing pages over creating new ones.
Before emitting any `schema.table` or `schema.table.column` into a wiki body,
SL source, `tables:` frontmatter, `sl_refs`, or `emit_unmapped_fallback`:
2. `entity_details({connectionName, targets: [{display: "<identifier>"}]})` -
confirm the identifier resolves; inspect native types, FK/PK, and
sampleValues.
3. For literal values from the source, such as status codes or plan tiers,
check whether they appear in `entity_details` sampleValues for the relevant
column. If sampleValues is short or the sample may have missed real values,
run a `sql_execution` probe:
`SELECT DISTINCT <col> FROM <ref> LIMIT 50`.
4. If the candidate identifier still does not resolve, do one of:
- Use `sql_execution` with `SELECT 1 FROM <ref> LIMIT 0`. If it errors, the
identifier is fictional.
- Wrap the identifier in `[unverified - from <rawPath>]` in the wiki body,
citing the exact raw path that mentioned it.
- When recording `emit_unmapped_fallback` with `no_physical_table`, include
the failing probe error in `clarification`.
5. Never copy `<schema>.<table>` placeholder strings from these instructions
into output.
- Step 2: Inline the protocol into writer skills
Add the same protocol block to these skills:
packages/context/skills/notion_synthesize/SKILL.md
packages/context/skills/dbt_ingest/SKILL.md
packages/context/skills/lookml_ingest/SKILL.md
packages/context/skills/looker_ingest/SKILL.md
packages/context/skills/metabase_ingest/SKILL.md
packages/context/skills/metricflow_ingest/SKILL.md
packages/context/skills/live_database_ingest/SKILL.md
packages/context/skills/historic_sql_patterns/SKILL.md
packages/context/skills/knowledge_capture/SKILL.md
packages/context/skills/sl_capture/SKILL.md
For packages/context/skills/historic_sql_table_digest/SKILL.md, add this shorter block:
## Identifier Verification Protocol
Only mention columns visible in the table's scan record. Use
`entity_details({connectionName, targets: [{display: "<identifier>"}]})` if
the table or column attribution is uncertain. Do not infer join columns or
filters from neighboring SQL unless the scan record confirms the column exists
on the named table.
For packages/context/skills/sl/SKILL.md, add this cross-reference:
For capture-time identifier verification, load `sl_capture`. Synthesis writer
skills must verify warehouse identifiers with `discover_data`,
`entity_details`, and `sql_execution` before emitting table or column names.
- Step 3: Apply per-skill edits
Make these exact content changes:
-
In
notion_synthesize, adddiscover_data,entity_details, andsql_executionto theAllowed:line. ReplacetableRef: "orbit_analytics.customer"withtableRef: "<schema>.<table>". -
In
dbt_ingest, replacewiki_sl_searchwithdiscover_dataandsl_describe_tablewithentity_details. -
In
lookml_ingest, add:Verify each sql_table_name from the LookML view with entity_details before mapping to an SL source. -
In
looker_ingest, add:For every Looker field reference, call entity_details on the underlying schema.table.column before promoting it to sl_refs or quoting it in wiki body. -
In
metabase_ingest, add:Before writing a wiki page derived from a Metabase question SQL, verify each schema.table.column mentioned with entity_details. -
In
metricflow_ingest, add:Verify each MetricFlow model source table with entity_details before producing the corresponding sl_write_source. -
In
live_database_ingest, add:Sample values come from the scan record; do not invent values not present in relationship-profile.json. -
In
historic_sql_patterns, add:Every join column mentioned in pattern descriptions must be verified via entity_details for both sides of the join. -
In
knowledge_capture, update the workflow to calldiscover_datafirst when a page relates to data or SL concepts. -
In
sl_capture, add:Before sl_write_source, call entity_details on the target table to confirm column names and types match the YAML being written. -
Step 4: Remove stale code and prompt strings
Modify packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts:
.describe('The fully-qualified table or source reference that triggered the fallback (e.g. "<schema>.<table>"). Used to generate canonical detail text.'),
Modify packages/context/src/sl/tools/sl-warehouse-validation.ts:
`that inherits the manifest schema. Call sl_read_source to inspect the existing source first.`,
- Step 5: Commit
Run:
git add packages/context/skills packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts packages/context/src/sl/tools/sl-warehouse-validation.ts
git commit -m "docs(context): add ingest identifier verification protocol"
Task 8: Add prompt-bundling and banned-string tests
Files:
-
Modify:
packages/context/src/memory/memory-runtime-assets.test.ts -
Modify:
packages/context/src/ingest/ingest-runtime-assets.test.ts -
Step 1: Add failing asset tests
Add to packages/context/src/memory/memory-runtime-assets.test.ts:
const verificationWriterSkills = [
'notion_synthesize',
'dbt_ingest',
'lookml_ingest',
'looker_ingest',
'metabase_ingest',
'metricflow_ingest',
'live_database_ingest',
'historic_sql_table_digest',
'historic_sql_patterns',
'knowledge_capture',
'sl_capture',
] as const;
it('ships identifier verification protocol in every synthesis writer skill', async () => {
for (const skillName of verificationWriterSkills) {
const body = await readFile(join(skillsDir, skillName, 'SKILL.md'), 'utf-8');
expect(body).toContain('## Identifier Verification Protocol');
expect(body).toMatch(/discover_data|entity_details/);
}
});
it('does not ship stale warehouse verification tool names or fictional identifiers', async () => {
for (const skillName of verificationWriterSkills) {
const body = await readFile(join(skillsDir, skillName, 'SKILL.md'), 'utf-8');
expect(body).not.toContain('orbit_analytics.customer');
expect(body).not.toContain('wiki_sl_search');
expect(body).not.toContain('sl_describe_table');
}
});
Add to packages/context/src/ingest/ingest-runtime-assets.test.ts:
it('packages identifier verification prompt assets', async () => {
const shared = await readFile(join(skillsDir, '_shared', 'identifier-verification.md'), 'utf-8');
expect(shared).toContain('## Identifier Verification Protocol');
expect(shared).toContain('discover_data');
expect(shared).toContain('entity_details');
expect(shared).toContain('sql_execution');
});
- Step 2: Run the asset tests
Run:
pnpm --filter @ktx/context exec vitest run src/memory/memory-runtime-assets.test.ts src/ingest/ingest-runtime-assets.test.ts
Expected: PASS after Task 7.
- Step 3: Commit
Run:
git add packages/context/src/memory/memory-runtime-assets.test.ts packages/context/src/ingest/ingest-runtime-assets.test.ts
git commit -m "test(context): guard ingest identifier verification prompts"
Task 9: Run the full v1 verification set
Files:
-
Verify all files changed by Tasks 1-8.
-
Step 1: Run focused tests
Run:
pnpm --filter @ktx/context exec vitest run \
src/connections/dialects.test.ts \
src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts \
src/ingest/tools/warehouse-verification/entity-details.tool.test.ts \
src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts \
src/ingest/tools/warehouse-verification/discover-data.tool.test.ts \
src/ingest/ingest-bundle.runner.test.ts \
src/memory/memory-runtime-assets.test.ts \
src/ingest/ingest-runtime-assets.test.ts
Expected: PASS.
- Step 2: Run package type-check
Run:
pnpm --filter @ktx/context run type-check
Expected: PASS.
- Step 3: Run package tests
Run:
pnpm --filter @ktx/context run test
Expected: PASS.
- Step 4: Run pre-commit on changed files when configured
Run:
uv run pre-commit run --files \
packages/context/src/connections/dialects.ts \
packages/context/src/connections/dialects.test.ts \
packages/context/src/connections/index.ts \
packages/context/src/tools/tool-session.ts \
packages/context/src/ingest/ingest-bundle.runner.ts \
packages/context/src/ingest/local-bundle-runtime.ts \
packages/context/src/ingest/ingest-bundle.runner.test.ts \
packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts \
packages/context/src/sl/tools/sl-warehouse-validation.ts \
packages/context/src/memory/memory-runtime-assets.test.ts \
packages/context/src/ingest/ingest-runtime-assets.test.ts \
packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts \
packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts \
packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts \
packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts \
packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts \
packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts \
packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts \
packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts \
packages/context/src/ingest/tools/warehouse-verification/index.ts \
packages/context/skills/_shared/identifier-verification.md \
packages/context/skills/notion_synthesize/SKILL.md \
packages/context/skills/dbt_ingest/SKILL.md \
packages/context/skills/lookml_ingest/SKILL.md \
packages/context/skills/looker_ingest/SKILL.md \
packages/context/skills/metabase_ingest/SKILL.md \
packages/context/skills/metricflow_ingest/SKILL.md \
packages/context/skills/live_database_ingest/SKILL.md \
packages/context/skills/historic_sql_table_digest/SKILL.md \
packages/context/skills/historic_sql_patterns/SKILL.md \
packages/context/skills/knowledge_capture/SKILL.md \
packages/context/skills/sl_capture/SKILL.md \
packages/context/skills/sl/SKILL.md
Expected: PASS. If the repo has no pre-commit config or the local uv version cannot satisfy the project pin, record the exact error and rely on the focused tests plus type-check.
- Step 5: Commit final verification notes if any files changed during checks
Run:
git status --short
Expected: only intentional files are modified. Commit any formatter-driven edits with:
git add packages/context
git commit -m "chore(context): verify warehouse verification tools"
Self-review checklist
- Spec coverage: the plan covers dialect dispatch, raw scan catalog reads,
entity_details,sql_execution,discover_data, WU and reconcile availability, prompt updates, cleanups, and tests. - Placeholder scan: no task relies on unnamed future work.
- Type consistency: tool inputs use
connectionName; existingsl_discovercalls receiveconnectionIdinternally; raw SQL execution usesSlConnectionCatalogPort.executeQuery()becauseSemanticLayerService.executeQuery()currently accepts semantic-layer query input, not raw SQL.