ktx/docs/superpowers/plans/2026-05-12-warehouse-verification-tools.md
Andrey Avtomonov c22248dabf
feat(context): add warehouse verification tools (#46)
* feat(context): add warehouse dialect dispatch

* feat(context): read warehouse scan catalog

* feat(context): add entity details verification tool

* feat(context): add ingest SQL verification tool

* feat(context): add raw warehouse discovery tool

* feat(context): expose warehouse verification tools to ingest

* docs(context): add ingest identifier verification protocol

* test(context): guard ingest identifier verification prompts

* chore(context): verify warehouse verification tools

* docs: add warehouse verification tools plan and spec

* fix(context): expose target warehouses to Notion ingest

* fix(context): update ingest prompts for warehouse verification tools

* fix(context): scope raw schema discovery to allowed connections

* fix(context): verify warehouse column display targets

* docs: add notion warehouse verification gap closure plan

* fix(context): include raw discovery connection names

* fix(context): expose warehouse targets for LookML and MetricFlow

* fix(context): pass connection config to ingest query executors

* fix(cli): enable read-only SQL probes for local ingest

* docs: add warehouse verification final v1 closure plan

* fix(context): align warehouse sql probe prompt shape

* docs: add warehouse verification prompt shape closure plan

* test(context): catch connectionless sql execution prompt examples

* fix(context): include connection name in sl capture sql example

* docs: add warehouse verification sql example closure plan

* fix(context): report structured entity detail misses

* docs: add warehouse verification structured target miss closure plan

* fix: report untracked squash merge conflicts

* feat: require ingest verification ledger

* fix: stabilize ingest wiki references
2026-05-13 13:43:23 +02:00

59 KiB

Warehouse Verification Tools Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add synthesis-time warehouse verification tools so ingest agents can verify raw warehouse tables, columns, and sample values before writing wiki pages, SL sources, tables: frontmatter, sl_refs, or unmapped fallback records.

Architecture: Add a raw scan catalog service over raw-sources/<connection>/live-database/<sync>/, three BaseTool-backed ingest tools, and runner/tool-session scoping for allowed warehouse connections. Register the tools in the local ingest toolset so both WorkUnit and reconcile stages receive them through the existing toAiSdkTools() path.

Tech Stack: TypeScript, Node 22, Vitest, AI SDK v6 tools, Zod, KTX file store, KTX semantic layer and wiki tools.


Audit summary

The current repo has the original spec file only; no matching plan or implementation exists under docs/superpowers/plans. The following v1-blocking gaps remain:

  • packages/context/src/connections/dialects.ts does not exist.
  • packages/context/src/ingest/tools/warehouse-verification/ does not exist.
  • entity_details, sql_execution, and discover_data are not available to ingest WU or reconcile toolsets.
  • ToolSession does not carry the ingest stage's allowed warehouse connection IDs.
  • Prompt updates are absent from the 11 writer skills named in the spec.
  • Cleanup strings remain: orbit_analytics.customer, wiki_sl_search, and sl_describe_table.
  • Prompt-bundling and warehouse-tool tests are absent.

Non-blocking gaps remain out of scope for this plan:

  • Hard write-time validation in wiki_write and emit_unmapped_fallback.
  • dictionary_search.
  • semantic_query in synthesis toolsets.
  • A raw-schema FTS index.
  • A UUID identity layer for tables and columns.

One repo-specific adjustment is required: do not import @ktx/connector-* dialect classes into @ktx/context, because every connector package already depends on @ktx/context. Add a minimal context-local dialect dispatch instead.

File structure

Create these files:

  • packages/context/src/connections/dialects.ts: Context-local driver dispatch for identifier quoting and display formatting.
  • packages/context/src/connections/dialects.test.ts: Driver dispatch and display-format tests.
  • packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts: Reads the latest live-database scan, resolves display identifiers, and searches table and column metadata.
  • packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts: Fixture-backed catalog tests.
  • packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts: entity_details ingest tool.
  • packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts: Tool contract tests.
  • packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts: sql_execution ingest tool.
  • packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts: Read-only SQL and output tests.
  • packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts: discover_data ingest tool composing wiki, SL, and raw-schema search.
  • packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts: Discovery composition tests.
  • packages/context/src/ingest/tools/warehouse-verification/index.ts: Exports tool classes and createWarehouseVerificationTools().
  • packages/context/skills/_shared/identifier-verification.md: Shared protocol text kept in the tree for review even though writer skills inline it.

Modify these files:

  • packages/context/src/connections/index.ts: Export the dialect helper.
  • packages/context/src/tools/tool-session.ts: Add allowedConnectionNames.
  • packages/context/src/ingest/ingest-bundle.runner.ts: Populate allowedConnectionNames for WU and reconcile sessions.
  • packages/context/src/ingest/local-bundle-runtime.ts: Register the warehouse verification tools in LocalIngestToolsetFactory.
  • packages/context/src/ingest/ingest-bundle.runner.test.ts: Assert the runner scopes allowed warehouse connections.
  • packages/context/src/memory/memory-runtime-assets.test.ts: Assert writer skills contain the protocol and banned strings are gone.
  • packages/context/src/ingest/ingest-runtime-assets.test.ts: Assert ingest skill packaging includes the protocol.
  • packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts: Replace the fictional table example.
  • packages/context/src/sl/tools/sl-warehouse-validation.ts: Replace the stale sl_describe_table hint.
  • packages/context/skills/*/SKILL.md: Inline protocol updates for the writer skills listed in the spec.

Task 1: Add context-local dialect dispatch

Files:

  • Create: packages/context/src/connections/dialects.ts

  • Create: packages/context/src/connections/dialects.test.ts

  • Modify: packages/context/src/connections/index.ts

  • Step 1: Write the failing dialect tests

Create packages/context/src/connections/dialects.test.ts:

import { describe, expect, it } from 'vitest';
import { getDialectForDriver } from './dialects.js';

describe('getDialectForDriver', () => {
  it.each([
    ['postgres', '"public"."orders"'],
    ['postgresql', '"public"."orders"'],
    ['mysql', '`public`.`orders`'],
    ['clickhouse', '`public`.`orders`'],
    ['sqlite', '"orders"'],
    ['snowflake', '"analytics"."public"."orders"'],
    ['bigquery', '`analytics`.`public`.`orders`'],
    ['sqlserver', '[analytics].[public].[orders]'],
  ] as const)('formats table names for %s', (driver, expected) => {
    const dialect = getDialectForDriver(driver);
    expect(
      dialect.formatTableName({
        catalog: driver === 'snowflake' || driver === 'bigquery' || driver === 'sqlserver' ? 'analytics' : null,
        db: driver === 'sqlite' ? null : 'public',
        name: 'orders',
      }),
    ).toBe(expected);
  });

  it('throws with a supported-driver list for unknown drivers', () => {
    expect(() => getDialectForDriver('oracle')).toThrow(
      'Unsupported warehouse driver "oracle". Supported drivers: bigquery, clickhouse, mysql, postgres, postgresql, sqlite, sqlite3, snowflake, sqlserver',
    );
  });
});
  • Step 2: Run the failing test

Run:

pnpm --filter @ktx/context exec vitest run src/connections/dialects.test.ts

Expected: FAIL because ./dialects.js does not exist.

  • Step 3: Add the minimal dialect implementation

Create packages/context/src/connections/dialects.ts:

import type { KtxSchemaDimensionType, KtxTableRef } from '../scan/types.js';

export type SupportedDriver =
  | 'postgres'
  | 'postgresql'
  | 'mysql'
  | 'sqlserver'
  | 'snowflake'
  | 'bigquery'
  | 'clickhouse'
  | 'sqlite'
  | 'sqlite3';

export interface KtxDialect {
  readonly type: SupportedDriver;
  quoteIdentifier(identifier: string): string;
  formatTableName(table: KtxTableRef): string;
  mapToDimensionType(nativeType: string): KtxSchemaDimensionType;
}

const supportedDrivers: SupportedDriver[] = [
  'bigquery',
  'clickhouse',
  'mysql',
  'postgres',
  'postgresql',
  'sqlite',
  'sqlite3',
  'snowflake',
  'sqlserver',
];

function doubleQuoted(identifier: string): string {
  return `"${identifier.replace(/"/g, '""')}"`;
}

function backtickQuoted(identifier: string): string {
  return `\`${identifier.replace(/`/g, '``')}\``;
}

function bigQueryQuoted(identifier: string): string {
  return `\`${identifier.replace(/`/g, '\\`')}\``;
}

function bracketQuoted(identifier: string): string {
  return `[${identifier.replace(/\]/g, ']]')}]`;
}

function inferDimensionType(nativeType: string): KtxSchemaDimensionType {
  const normalized = nativeType.toLowerCase().trim();
  if (normalized.includes('date') || normalized.includes('time')) {
    return 'time';
  }
  if (
    normalized.includes('int') ||
    normalized.includes('num') ||
    normalized.includes('dec') ||
    normalized.includes('float') ||
    normalized.includes('double') ||
    normalized.includes('real')
  ) {
    return 'number';
  }
  if (normalized.includes('bool') || normalized === 'bit') {
    return 'boolean';
  }
  return 'string';
}

function formatWithParts(table: KtxTableRef, quote: (identifier: string) => string, sqlite = false): string {
  const parts = sqlite ? [table.name] : [table.catalog, table.db, table.name].filter((part): part is string => !!part);
  return parts.map(quote).join('.');
}

function createDialect(type: SupportedDriver, quote: (identifier: string) => string, sqlite = false): KtxDialect {
  return {
    type,
    quoteIdentifier: quote,
    formatTableName: (table) => formatWithParts(table, quote, sqlite),
    mapToDimensionType: inferDimensionType,
  };
}

const dialects: Record<SupportedDriver, KtxDialect> = {
  postgres: createDialect('postgres', doubleQuoted),
  postgresql: createDialect('postgresql', doubleQuoted),
  mysql: createDialect('mysql', backtickQuoted),
  clickhouse: createDialect('clickhouse', backtickQuoted),
  sqlite: createDialect('sqlite', doubleQuoted, true),
  sqlite3: createDialect('sqlite3', doubleQuoted, true),
  snowflake: createDialect('snowflake', doubleQuoted),
  bigquery: createDialect('bigquery', bigQueryQuoted),
  sqlserver: createDialect('sqlserver', bracketQuoted),
};

export function getDialectForDriver(driver: string): KtxDialect {
  const normalized = driver.toLowerCase().trim();
  if (normalized in dialects) {
    return dialects[normalized as SupportedDriver];
  }
  throw new Error(`Unsupported warehouse driver "${driver}". Supported drivers: ${supportedDrivers.join(', ')}`);
}

Modify packages/context/src/connections/index.ts:

export type { KtxDialect, SupportedDriver } from './dialects.js';
export { getDialectForDriver } from './dialects.js';
  • Step 4: Run the dialect tests

Run:

pnpm --filter @ktx/context exec vitest run src/connections/dialects.test.ts

Expected: PASS.

  • Step 5: Commit

Run:

git add packages/context/src/connections/dialects.ts packages/context/src/connections/dialects.test.ts packages/context/src/connections/index.ts
git commit -m "feat(context): add warehouse dialect dispatch"

Task 2: Add the raw scan warehouse catalog service

Files:

  • Create: packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts

  • Create: packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts

  • Step 1: Write failing catalog tests

Create packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts:

import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';

describe('WarehouseCatalogService', () => {
  let tempDir: string;
  let project: KtxLocalProject;

  beforeEach(async () => {
    tempDir = await mkdtemp(join(tmpdir(), 'ktx-warehouse-catalog-'));
    project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' });
  });

  afterEach(async () => {
    await rm(tempDir, { recursive: true, force: true });
  });

  async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
    const root = `raw-sources/${connectionName}/live-database/${syncId}`;
    await project.fileStore.writeFile(
      `${root}/connection.json`,
      JSON.stringify({ connectionId: connectionName, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
      'ktx',
      'ktx@example.com',
      'seed connection',
    );
    await project.fileStore.writeFile(
      `${root}/tables/orders.json`,
      JSON.stringify(
        {
          catalog: null,
          db: driver === 'sqlite' ? null : 'public',
          name: 'orders',
          kind: 'table',
          comment: 'Customer orders',
          estimatedRows: 12,
          columns: [
            {
              name: 'id',
              nativeType: 'integer',
              normalizedType: 'integer',
              dimensionType: 'number',
              nullable: false,
              primaryKey: true,
              comment: 'Order id',
            },
            {
              name: 'status',
              nativeType: 'text',
              normalizedType: 'text',
              dimensionType: 'string',
              nullable: false,
              primaryKey: false,
              comment: 'Order status',
            },
          ],
          foreignKeys: [],
        },
        null,
        2,
      ),
      'ktx',
      'ktx@example.com',
      'seed orders',
    );
    await project.fileStore.writeFile(
      `${root}/enrichment/relationship-profile.json`,
      JSON.stringify(
        {
          connectionId: connectionName,
          driver,
          sqlAvailable: true,
          queryCount: 3,
          tables: [{ table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' }, rowCount: 12 }],
          columns: {
            'orders.status': {
              table: { catalog: null, db: driver === 'sqlite' ? null : 'public', name: 'orders' },
              column: 'status',
              nativeType: 'text',
              normalizedType: 'text',
              rowCount: 12,
              nullCount: 0,
              distinctCount: 2,
              uniquenessRatio: 0.1667,
              nullRate: 0,
              sampleValues: ['paid', 'refunded'],
              minTextLength: 4,
              maxTextLength: 8,
            },
          },
          warnings: [],
        },
        null,
        2,
      ),
      'ktx',
      'ktx@example.com',
      'seed profile',
    );
  }

  it('finds the latest sync and merges table schema with relationship profile values', async () => {
    await seedLiveDatabaseScan('warehouse', 'sync-1');
    await seedLiveDatabaseScan('warehouse', 'sync-2');
    const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });

    await expect(catalog.getLatestSyncId('warehouse')).resolves.toBe('sync-2');
    const detail = await catalog.getTable({ connectionName: 'warehouse', catalog: null, db: 'public', name: 'orders' });

    expect(detail).toMatchObject({
      connectionName: 'warehouse',
      display: 'public.orders',
      rowCount: 12,
      columns: [
        { name: 'id', nativeType: 'integer', primaryKey: true },
        { name: 'status', nativeType: 'text', sampleValues: ['paid', 'refunded'], distinctCount: 2 },
      ],
    });
  });

  it('returns scanAvailable=false when no live-database scan exists', async () => {
    const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
    await expect(catalog.getTable({ connectionName: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
    await expect(catalog.hasScan('missing')).resolves.toBe(false);
  });

  it('resolves postgres display strings and returns closest candidates for missing tables', async () => {
    await seedLiveDatabaseScan();
    const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });

    await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
      resolved: { catalog: null, db: 'public', name: 'orders' },
      candidates: [],
      dialect: 'postgres',
    });
    await expect(catalog.resolveDisplay('warehouse', 'public.orderz')).resolves.toMatchObject({
      resolved: null,
      candidates: [{ name: 'orders' }],
    });
  });

  it('treats two-part BigQuery identifiers as ambiguous instead of guessing', async () => {
    await seedLiveDatabaseScan('warehouse', 'sync-bigquery', 'bigquery');
    const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });

    await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
      resolved: null,
      dialect: 'bigquery',
    });
  });

  it('searches table names, column names, comments, and descriptions', async () => {
    await seedLiveDatabaseScan();
    const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });

    await expect(catalog.searchByName('warehouse', 'status', 10)).resolves.toEqual(
      expect.arrayContaining([
        expect.objectContaining({
          kind: 'column',
          ref: expect.objectContaining({ db: 'public', name: 'orders', column: 'status' }),
          matchedOn: 'name',
        }),
      ]),
    );
  });
});
  • Step 2: Run the failing catalog tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts

Expected: FAIL because the service file does not exist.

  • Step 3: Add the catalog service

Create packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts with these exported shapes and behavior:

import type { KtxFileStorePort } from '../../../core/index.js';
import { getDialectForDriver } from '../../../connections/index.js';
import type { KtxConnectionDriver, KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaTable, KtxTableRef } from '../../../scan/types.js';

export interface WarehouseCatalogServiceDeps {
  fileStore: KtxFileStorePort;
}

export interface WarehouseColumnDetail extends KtxSchemaColumn {
  descriptions: Record<string, string>;
  rowCount: number | null;
  nullCount: number | null;
  distinctCount: number | null;
  nullRate: number | null;
  sampleValues: string[];
}

export interface TableDetail {
  connectionName: string;
  catalog: string | null;
  db: string | null;
  name: string;
  display: string;
  kind: string;
  comment: string | null;
  description: string | null;
  rowCount: number | null;
  columns: WarehouseColumnDetail[];
  foreignKeys: KtxSchemaForeignKey[];
}

export type RawSchemaHit =
  | { kind: 'table'; ref: KtxTableRef; display: string; matchedOn: 'name' | 'db' | 'comment' | 'description' }
  | { kind: 'column'; ref: KtxTableRef & { column: string }; display: string; matchedOn: 'name' | 'comment' | 'description' };

interface ConnectionArtifact {
  driver?: KtxConnectionDriver;
}

interface RelationshipProfileColumn {
  table?: KtxTableRef;
  column?: string;
  rowCount?: number;
  nullCount?: number;
  distinctCount?: number;
  nullRate?: number;
  sampleValues?: unknown[];
}

interface RelationshipProfileArtifact {
  driver?: KtxConnectionDriver;
  tables?: Array<{ table?: KtxTableRef; rowCount?: number }>;
  columns?: Record<string, RelationshipProfileColumn>;
}

interface ConnectionCatalog {
  connectionName: string;
  syncId: string;
  driver: KtxConnectionDriver;
  tables: KtxSchemaTable[];
  profile: RelationshipProfileArtifact | null;
}

The implementation must:

  • Use fileStore.listFiles("raw-sources/<connectionName>/live-database") and choose the lexicographically latest path ending in /connection.json.
  • Read every JSON file under <latestRoot>/tables/ rather than reconstructing a path from the table ref. This supports encoded and simple table filenames already present in tests.
  • Parse display strings by driver:
    • Postgres, MySQL, and ClickHouse: schema.table.
    • SQL Server, Snowflake, and BigQuery: catalog.schema.table.
    • SQLite: table.
    • For BigQuery, a two-part display must return resolved: null and candidate matches.
  • Match table refs case-insensitively, while preserving stored casing in outputs.
  • Merge relationship-profile fields by (catalog, db, name, column), with fallback matching on table.name + "." + column.
  • Cache a loaded connection catalog per connectionName within the service instance.
  • Return null from getTable() when the scan is absent or the table ref is not found.

Use these method signatures:

export class WarehouseCatalogService {
  constructor(private readonly deps: WarehouseCatalogServiceDeps) {}

  async hasScan(connectionName: string): Promise<boolean>;
  async getLatestSyncId(connectionName: string): Promise<string | null>;
  async listTables(connectionName: string): Promise<KtxTableRef[]>;
  async getTable(ref: { connectionName: string } & KtxTableRef): Promise<TableDetail | null>;
  async resolveDisplay(connectionName: string, display: string): Promise<{
    resolved: KtxTableRef | null;
    candidates: KtxTableRef[];
    dialect: string;
  }>;
  async searchByName(connectionName: string, query: string, limit: number): Promise<RawSchemaHit[]>;
}
  • Step 4: Run the catalog tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts

Expected: PASS.

  • Step 5: Commit

Run:

git add packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts
git commit -m "feat(context): read warehouse scan catalog"

Task 3: Add entity_details

Files:

  • Create: packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts

  • Create: packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts

  • Step 1: Write failing entity_details tests

Create tests that instantiate the tool with a seeded WarehouseCatalogService and a ToolContext whose session has allowedConnectionNames: new Set(['warehouse']). Test these cases:

it('returns scoped table detail for a display target', async () => {
  const result = await tool.call(
    { connectionName: 'warehouse', targets: [{ display: 'public.orders' }] },
    context,
  );
  expect(result.markdown).toContain('### public.orders');
  expect(result.markdown).toContain('- status (text, nullable=false)');
  expect(result.markdown).toContain('sample: ["paid","refunded"]');
  expect(result.structured.scanAvailable).toBe(true);
  expect(result.structured.resolved).toHaveLength(1);
});

it('returns a no-scan state distinct from not found', async () => {
  const result = await tool.call(
    { connectionName: 'empty', targets: [{ display: 'public.orders' }] },
    { ...context, session: { ...context.session!, allowedConnectionNames: new Set(['empty']) } },
  );
  expect(result.markdown).toContain('No live-database scan available for connection "empty"; run `ktx scan` first.');
  expect(result.structured.scanAvailable).toBe(false);
});

it('refuses out-of-scope connections', async () => {
  const result = await tool.call(
    { connectionName: 'billing', targets: [{ display: 'public.orders' }] },
    context,
  );
  expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
  expect(result.structured.scanAvailable).toBe(false);
});
  • Step 2: Run the failing tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/entity-details.tool.test.ts

Expected: FAIL because the tool file does not exist.

  • Step 3: Implement the tool

Create packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts:

import { z } from 'zod';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import type { KtxTableRef } from '../../../scan/types.js';
import { WarehouseCatalogService, type TableDetail } from './warehouse-catalog.service.js';

const targetSchema = z.union([
  z.object({ display: z.string().min(1) }),
  z.object({
    catalog: z.string().nullable(),
    db: z.string().nullable(),
    name: z.string().min(1),
    column: z.string().optional(),
  }),
]);

const entityDetailsInputSchema = z.object({
  connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
  targets: z.array(targetSchema).min(1).max(50),
});

type EntityDetailsInput = z.infer<typeof entityDetailsInputSchema>;

export interface EntityDetailsStructured {
  resolved: TableDetail[];
  missing: Array<{ target: unknown; candidates: KtxTableRef[] }>;
  scanAvailable: boolean;
}

function allowedConnectionNames(context: ToolContext): ReadonlySet<string> | null {
  return context.session?.allowedConnectionNames ?? null;
}

function sampleText(values: string[]): string {
  return values.length > 0 ? ` - sample: ${JSON.stringify(values.slice(0, 10))}` : '';
}

function appendTableMarkdown(parts: string[], detail: TableDetail, columnName?: string): void {
  const columns = columnName ? detail.columns.filter((column) => column.name === columnName) : detail.columns;
  parts.push(`### ${detail.display}`);
  parts.push(`Type: ${detail.kind} | Native columns: ${detail.columns.length}`);
  if (detail.description || detail.comment) {
    parts.push(`Description: ${detail.description ?? detail.comment}`);
  }
  parts.push('', 'Columns:');
  for (const column of columns) {
    const pk = column.primaryKey ? ', PK' : '';
    parts.push(`- ${column.name} (${column.nativeType}, nullable=${column.nullable}${pk})${sampleText(column.sampleValues)}`);
  }
  parts.push('');
}

export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema> {
  readonly name = 'entity_details';

  constructor(private readonly catalogFactory: (context: ToolContext) => WarehouseCatalogService) {
    super();
  }

  get description(): string {
    return 'Verify warehouse tables and columns from the latest live-database scan before writing them into wiki or semantic-layer output.';
  }

  get inputSchema() {
    return entityDetailsInputSchema;
  }

  async call(input: EntityDetailsInput, context: ToolContext): Promise<ToolOutput<EntityDetailsStructured>> {
    const allowed = allowedConnectionNames(context);
    if (allowed && !allowed.has(input.connectionName)) {
      return {
        markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
        structured: { resolved: [], missing: [], scanAvailable: false },
      };
    }

    const catalog = this.catalogFactory(context);
    const scanAvailable = await catalog.hasScan(input.connectionName);
    if (!scanAvailable) {
      return {
        markdown: `No live-database scan available for connection "${input.connectionName}"; run \`ktx scan\` first.`,
        structured: { resolved: [], missing: [], scanAvailable: false },
      };
    }

    const parts: string[] = [];
    const resolved: TableDetail[] = [];
    const missing: EntityDetailsStructured['missing'] = [];

    for (const target of input.targets) {
      const resolution =
        'display' in target
          ? await catalog.resolveDisplay(input.connectionName, target.display)
          : { resolved: { catalog: target.catalog, db: target.db, name: target.name }, candidates: [], dialect: '' };
      if (!resolution.resolved) {
        missing.push({ target, candidates: resolution.candidates });
        parts.push(`Not found in scan: ${'display' in target ? target.display : target.name}`);
        if (resolution.candidates.length > 0) {
          parts.push(`Closest matches: ${resolution.candidates.map((candidate) => candidate.name).join(', ')}`);
        }
        continue;
      }
      const detail = await catalog.getTable({ connectionName: input.connectionName, ...resolution.resolved });
      if (!detail) {
        missing.push({ target, candidates: resolution.candidates });
        continue;
      }
      resolved.push(detail);
      appendTableMarkdown(parts, detail, 'column' in target ? target.column : undefined);
    }

    return {
      markdown: parts.join('\n').trim(),
      structured: { resolved, missing, scanAvailable: true },
    };
  }
}
  • Step 4: Run the entity_details tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/entity-details.tool.test.ts

Expected: PASS.

  • Step 5: Commit

Run:

git add packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts
git commit -m "feat(context): add entity details verification tool"

Task 4: Add sql_execution

Files:

  • Create: packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts

  • Create: packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts

  • Step 1: Write failing sql_execution tests

Create tests for:

it('wraps read-only SQL with a capped row limit', async () => {
  connections.executeQuery.mockResolvedValue({ headers: ['status'], rows: [['paid']], totalRows: 1 });
  const result = await tool.call(
    { connectionName: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
    context,
  );
  expect(connections.executeQuery).toHaveBeenCalledWith(
    'warehouse',
    'select * from (select status from public.orders) as ktx_query_result limit 5',
  );
  expect(result.markdown).toContain('| status |');
  expect(result.structured.wrappedSql).toContain('limit 5');
});

it.each(['insert into x values (1)', 'drop table x', 'vacuum'])('rejects mutating SQL: %s', async (sql) => {
  const result = await tool.call({ connectionName: 'warehouse', sql }, context);
  expect(result.markdown).toContain('Only read-only SELECT/WITH queries can be executed locally.');
  expect(connections.executeQuery).not.toHaveBeenCalled();
});

it('surfaces connector errors verbatim', async () => {
  connections.executeQuery.mockRejectedValue(new Error('relation "orbit_analytics.customer" does not exist'));
  const result = await tool.call(
    { connectionName: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
    context,
  );
  expect(result.markdown).toContain('relation "orbit_analytics.customer" does not exist');
  expect(result.structured.error).toContain('relation "orbit_analytics.customer" does not exist');
});
  • Step 2: Run the failing tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts

Expected: FAIL because the tool file does not exist.

  • Step 3: Implement the tool

Create packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts:

import { z } from 'zod';
import { assertReadOnlySql, limitSqlForExecution } from '../../../connections/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';

const sqlExecutionInputSchema = z.object({
  connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
  sql: z.string().min(1),
  rowLimit: z.number().int().positive().max(1000).optional().default(100),
});

type SqlExecutionInput = z.infer<typeof sqlExecutionInputSchema>;

export interface SqlExecutionStructured {
  headers: string[];
  rows: unknown[][];
  rowCount: number;
  truncated: boolean;
  sql: string;
  wrappedSql: string;
  error?: string;
}

function markdownTable(headers: string[], rows: unknown[][], totalRows: number): string {
  if (headers.length === 0) {
    return rows.length === 0 ? 'Query returned no rows.' : JSON.stringify(rows.slice(0, 20));
  }
  const visible = rows.slice(0, 20);
  const lines = [
    `| ${headers.join(' | ')} |`,
    `| ${headers.map(() => '---').join(' | ')} |`,
    ...visible.map((row) => `| ${row.map((value) => String(value ?? '')).join(' | ')} |`),
  ];
  if (totalRows > visible.length) {
    lines.push(`... +${totalRows - visible.length} more rows`);
  }
  return lines.join('\n');
}

export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
  readonly name = 'sql_execution';

  constructor(private readonly connections: SlConnectionCatalogPort) {
    super();
  }

  get description(): string {
    return 'Run a single read-only SELECT or WITH probe against an allowed warehouse connection and return a capped markdown table or the warehouse error.';
  }

  get inputSchema() {
    return sqlExecutionInputSchema;
  }

  async call(input: SqlExecutionInput, context: ToolContext): Promise<ToolOutput<SqlExecutionStructured>> {
    const allowed = context.session?.allowedConnectionNames;
    if (allowed && !allowed.has(input.connectionName)) {
      return {
        markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
        structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql: input.sql, wrappedSql: '', error: 'connection_not_allowed' },
      };
    }

    let sql: string;
    let wrappedSql: string;
    try {
      sql = assertReadOnlySql(input.sql);
      wrappedSql = limitSqlForExecution(sql, input.rowLimit);
    } catch (error) {
      const message = error instanceof Error ? error.message : String(error);
      return {
        markdown: message,
        structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql: input.sql, wrappedSql: '', error: message },
      };
    }

    try {
      const result = await this.connections.executeQuery(input.connectionName, wrappedSql);
      const headers = result.headers ?? [];
      const rows = result.rows ?? [];
      const rowCount = result.totalRows ?? rows.length;
      return {
        markdown: markdownTable(headers, rows, rowCount),
        structured: { headers, rows, rowCount, truncated: rowCount > rows.length, sql, wrappedSql },
      };
    } catch (error) {
      const message = error instanceof Error ? error.message : String(error);
      return {
        markdown: `SQL execution failed: ${message}`,
        structured: { headers: [], rows: [], rowCount: 0, truncated: false, sql, wrappedSql, error: message },
      };
    }
  }
}
  • Step 4: Run the sql_execution tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts

Expected: PASS.

  • Step 5: Commit

Run:

git add packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts
git commit -m "feat(context): add ingest SQL verification tool"

Task 5: Add discover_data

Files:

  • Create: packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts

  • Create: packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts

  • Create: packages/context/src/ingest/tools/warehouse-verification/index.ts

  • Step 1: Write failing discover_data tests

Create tests with fake wikiSearchTool.call, slDiscoverTool.call, and WarehouseCatalogService.searchByName. Cover:

it('groups wiki, semantic layer, and raw schema hits with routing hints', async () => {
  const result = await tool.call({ query: 'orders', connectionName: 'warehouse', limit: 5 }, context);
  expect(result.markdown).toContain('## Wiki Pages');
  expect(result.markdown).toContain('use `wiki_read(blockKey)` for full content');
  expect(result.markdown).toContain('## Semantic Layer Sources');
  expect(result.markdown).toContain('use `sl_read_source(sourceName)` for the YAML');
  expect(result.markdown).toContain('## Raw Warehouse Schema');
  expect(result.markdown).toContain('use `entity_details({connectionName, targets: [{display}]})`');
  expect(result.structured.raw?.hits).toHaveLength(1);
});

it('delegates sourceName inspect mode to sl_discover only', async () => {
  const result = await tool.call({ sourceName: 'orders', connectionName: 'warehouse' }, context);
  expect(slDiscoverTool.call).toHaveBeenCalledWith({ sourceName: 'orders', connectionId: 'warehouse' }, context);
  expect(wikiSearchTool.call).not.toHaveBeenCalled();
  expect(catalog.searchByName).not.toHaveBeenCalled();
  expect(result.markdown).toContain('source detail');
});

it('returns the empty-state message when all sections are empty', async () => {
  const result = await tool.call({ query: 'customer source', connectionName: 'warehouse' }, emptyContext);
  expect(result.markdown).toContain('No matches for "customer source" across wiki, semantic layer, or raw warehouse schema.');
});
  • Step 2: Run the failing tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/discover-data.tool.test.ts

Expected: FAIL because the tool file does not exist.

  • Step 3: Implement the tool and index export

Create packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts:

import { z } from 'zod';
import type { BaseTool, ToolContext, ToolOutput } from '../../../tools/index.js';
import { BaseTool as ToolBase } from '../../../tools/index.js';
import { WarehouseCatalogService, type RawSchemaHit } from './warehouse-catalog.service.js';

const discoverDataInputSchema = z.object({
  query: z.string().optional(),
  connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
  limit: z.number().int().positive().max(50).optional().default(10),
  sourceName: z.string().optional(),
});

type DiscoverDataInput = z.infer<typeof discoverDataInputSchema>;

export interface DiscoverDataStructured {
  wiki: unknown | null;
  sl: unknown | null;
  raw: { hits: RawSchemaHit[] } | null;
}

interface DiscoverDataDeps {
  wikiSearchTool: BaseTool;
  slDiscoverTool: BaseTool;
  catalogFactory: (context: ToolContext) => WarehouseCatalogService;
}

export class DiscoverDataTool extends ToolBase<typeof discoverDataInputSchema> {
  readonly name = 'discover_data';

  constructor(private readonly deps: DiscoverDataDeps) {
    super();
  }

  get description(): string {
    return 'Discover existing wiki pages, semantic layer sources, and raw warehouse schema hits before writing ingest output.';
  }

  get inputSchema() {
    return discoverDataInputSchema;
  }

  async call(input: DiscoverDataInput, context: ToolContext): Promise<ToolOutput<DiscoverDataStructured>> {
    if (input.sourceName) {
      const sl = await this.deps.slDiscoverTool.call(
        { sourceName: input.sourceName, connectionId: input.connectionName },
        context,
      );
      return { markdown: sl.markdown, structured: { wiki: null, sl: sl.structured, raw: null } };
    }

    const query = input.query?.trim() || '';
    const limit = input.limit ?? 10;
    const parts: string[] = [];
    let wiki: unknown | null = null;
    let sl: unknown | null = null;
    let raw: DiscoverDataStructured['raw'] = null;

    if (query) {
      const wikiResult = await this.deps.wikiSearchTool.call({ query, limit }, context);
      if (wikiResult.structured?.totalFound > 0) {
        parts.push('## Wiki Pages', '> use `wiki_read(blockKey)` for full content', wikiResult.markdown, '');
        wiki = wikiResult.structured;
      }
    }

    const slResult = await this.deps.slDiscoverTool.call(
      { query: query || undefined, connectionId: input.connectionName },
      context,
    );
    if (slResult.structured?.totalSources > 0) {
      parts.push('## Semantic Layer Sources', '> use `sl_read_source(sourceName)` for the YAML, or `entity_details` for warehouse-shape details', slResult.markdown, '');
      sl = slResult.structured;
    }

    const catalog = this.deps.catalogFactory(context);
    const connections = input.connectionName
      ? [input.connectionName]
      : [...(context.session?.allowedConnectionNames ?? [])].sort();
    const rawHits: RawSchemaHit[] = [];
    for (const connectionName of connections) {
      rawHits.push(...(await catalog.searchByName(connectionName, query, limit)));
    }
    if (rawHits.length > 0) {
      parts.push('## Raw Warehouse Schema', '> use `entity_details({connectionName, targets: [{display}]})` for full DDL + sample values');
      parts.push(
        rawHits
          .slice(0, limit)
          .map((hit) => `- ${hit.kind}: ${hit.display} (matched on ${hit.matchedOn})`)
          .join('\n'),
      );
      raw = { hits: rawHits.slice(0, limit) };
    }

    if (parts.length === 0) {
      return {
        markdown: `No matches for "${query}" across wiki, semantic layer, or raw warehouse schema. Try broader terms; this concept may not exist yet.`,
        structured: { wiki, sl, raw },
      };
    }

    return { markdown: parts.join('\n'), structured: { wiki, sl, raw } };
  }
}

Create packages/context/src/ingest/tools/warehouse-verification/index.ts:

import type { BaseTool, ToolContext } from '../../../tools/index.js';
import type { KtxFileStorePort } from '../../../core/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { SqlExecutionTool } from './sql-execution.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';

export { DiscoverDataTool } from './discover-data.tool.js';
export { EntityDetailsTool } from './entity-details.tool.js';
export { SqlExecutionTool } from './sql-execution.tool.js';
export { WarehouseCatalogService } from './warehouse-catalog.service.js';
export type { TableDetail, WarehouseColumnDetail, RawSchemaHit } from './warehouse-catalog.service.js';

export function createWarehouseVerificationTools(deps: {
  connections: SlConnectionCatalogPort;
  fallbackFileStore: KtxFileStorePort;
  wikiSearchTool: BaseTool;
  slDiscoverTool: BaseTool;
}): BaseTool[] {
  const catalogFactory = (context: ToolContext) =>
    new WarehouseCatalogService({
      fileStore: context.session?.configService ?? deps.fallbackFileStore,
    });
  return [
    new EntityDetailsTool(catalogFactory),
    new SqlExecutionTool(deps.connections),
    new DiscoverDataTool({
      wikiSearchTool: deps.wikiSearchTool,
      slDiscoverTool: deps.slDiscoverTool,
      catalogFactory,
    }),
  ];
}
  • Step 4: Run the discover_data tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/tools/warehouse-verification/discover-data.tool.test.ts

Expected: PASS.

  • Step 5: Commit

Run:

git add packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts packages/context/src/ingest/tools/warehouse-verification/index.ts
git commit -m "feat(context): add raw warehouse discovery tool"

Task 6: Wire tools into ingest sessions

Files:

  • Modify: packages/context/src/tools/tool-session.ts

  • Modify: packages/context/src/ingest/ingest-bundle.runner.ts

  • Modify: packages/context/src/ingest/local-bundle-runtime.ts

  • Modify: packages/context/src/ingest/ingest-bundle.runner.test.ts

  • Step 1: Write failing scoping test

Add to packages/context/src/ingest/ingest-bundle.runner.test.ts:

it('threads target warehouse connection names into WorkUnit and reconcile tool sessions', async () => {
  const deps = makeDeps();
  const sessions: any[] = [];
  deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse']);
  deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
    sessions.push(toolSession);
    return {
      toAiSdkTools: vi.fn().mockReturnValue({}),
      getAllTools: vi.fn().mockReturnValue([]),
      getToolNames: vi.fn().mockReturnValue([]),
    };
  });
  deps.agentRunner.runLoop.mockResolvedValue({ stopReason: 'natural' });

  const runner = buildRunner(deps);
  (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
    currentHashes: new Map([['a.yml', 'h1']]),
    rawDirInWorktree: 'raw-sources/notion/fake/s',
  });
  (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');

  await runner.run({
    jobId: 'j1',
    connectionId: 'notion',
    sourceKey: 'fake',
    trigger: 'upload',
    bundleRef: { kind: 'upload', uploadId: 'upload-x' },
  });

  expect([...sessions[0].allowedConnectionNames].sort()).toEqual(['notion', 'warehouse']);
});
  • Step 2: Run the failing runner test

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/ingest-bundle.runner.test.ts -t "threads target warehouse connection names"

Expected: FAIL because allowedConnectionNames is absent.

  • Step 3: Thread allowed connection names

Modify packages/context/src/tools/tool-session.ts:

  allowedRawPaths?: ReadonlySet<string>;
  allowedConnectionNames?: ReadonlySet<string>;
  semanticLayerService: SemanticLayerService;

Modify WU session creation in packages/context/src/ingest/ingest-bundle.runner.ts:

            allowedRawPaths: new Set(wu.rawFiles),
            allowedConnectionNames: new Set(slConnectionIds),
            semanticLayerService: scopedSemanticLayerService,

Modify reconcile session creation in the same file:

        allowedRawPaths: reconciliationAllowedRawPaths,
        allowedConnectionNames: new Set(slConnectionIds),
        semanticLayerService: rcScopedSl,
  • Step 4: Register the tools in the local ingest toolset

Modify packages/context/src/ingest/local-bundle-runtime.ts:

import {
  createWarehouseVerificationTools,
} from './tools/warehouse-verification/index.js';

Refactor the existing inline wiki and SL tool instances in LocalIngestToolsetFactory so wikiSearchTool and slDiscoverTool are named constants, then add the warehouse tools:

    const wikiSearchTool = new WikiSearchTool({
      search: async (input) => {
        const results = await searchLocalKnowledgePages(deps.project, {
          userId: input.userId,
          query: input.query,
          limit: input.limit,
          embeddingService: deps.embedding,
        });
        return {
          results: results.slice(0, input.limit).map((result) => ({
            key: result.key,
            path: result.path,
            summary: result.summary,
            score: result.score,
            matchReasons: result.matchReasons,
            lanes: result.lanes,
          })),
          totalFound: results.length,
        };
      },
    });
    const slDiscoverTool = new SlDiscoverTool(slDeps, { maxSources: 25, minRrfScore: 0, maxDetailedSources: 5 });
    const warehouseVerificationTools = createWarehouseVerificationTools({
      connections: deps.connections,
      fallbackFileStore: deps.project.fileStore,
      wikiSearchTool,
      slDiscoverTool,
    });

    this.baseTools = [
      new WikiReadTool(deps.wikiService, deps.knowledgeIndex),
      wikiSearchTool,
      new WikiListTagsTool(deps.wikiService, deps.knowledgeIndex),
      new WikiWriteTool(deps.wikiService, deps.knowledgeIndex, deps.knowledgeEvents),
      new WikiRemoveTool(deps.wikiService, deps.knowledgeIndex, deps.knowledgeEvents),
      slDiscoverTool,
      new SlEditSourceTool(slDeps),
      new SlReadSourceTool(slDeps),
      new SlWriteSourceTool(slDeps),
      new SlValidateTool(slDeps),
      new SlRollbackTool(deps.slSourcesRepository, deps.connections, 0),
      ...warehouseVerificationTools,
    ];
  • Step 5: Run integration and toolset tests

Run:

pnpm --filter @ktx/context exec vitest run src/ingest/ingest-bundle.runner.test.ts -t "threads target warehouse connection names"
pnpm --filter @ktx/context exec vitest run src/ingest/local-bundle-runtime.test.ts

Expected: PASS.

  • Step 6: Commit

Run:

git add packages/context/src/tools/tool-session.ts packages/context/src/ingest/ingest-bundle.runner.ts packages/context/src/ingest/local-bundle-runtime.ts packages/context/src/ingest/ingest-bundle.runner.test.ts
git commit -m "feat(context): expose warehouse verification tools to ingest"

Task 7: Update writer prompts and cleanup stale references

Files:

  • Create: packages/context/skills/_shared/identifier-verification.md

  • Modify: packages/context/skills/notion_synthesize/SKILL.md

  • Modify: packages/context/skills/dbt_ingest/SKILL.md

  • Modify: packages/context/skills/lookml_ingest/SKILL.md

  • Modify: packages/context/skills/looker_ingest/SKILL.md

  • Modify: packages/context/skills/metabase_ingest/SKILL.md

  • Modify: packages/context/skills/metricflow_ingest/SKILL.md

  • Modify: packages/context/skills/live_database_ingest/SKILL.md

  • Modify: packages/context/skills/historic_sql_table_digest/SKILL.md

  • Modify: packages/context/skills/historic_sql_patterns/SKILL.md

  • Modify: packages/context/skills/knowledge_capture/SKILL.md

  • Modify: packages/context/skills/sl_capture/SKILL.md

  • Modify: packages/context/skills/sl/SKILL.md

  • Modify: packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts

  • Modify: packages/context/src/sl/tools/sl-warehouse-validation.ts

  • Step 1: Add the shared protocol file

Create packages/context/skills/_shared/identifier-verification.md:

## Identifier Verification Protocol

Before writing a wiki page or SL source on any topic:

1. `discover_data({query: "<topic>"})` - see what wikis, SL sources, and raw
   tables already exist. Prefer updating existing pages over creating new ones.

Before emitting any `schema.table` or `schema.table.column` into a wiki body,
SL source, `tables:` frontmatter, `sl_refs`, or `emit_unmapped_fallback`:

2. `entity_details({connectionName, targets: [{display: "<identifier>"}]})` -
   confirm the identifier resolves; inspect native types, FK/PK, and
   sampleValues.
3. For literal values from the source, such as status codes or plan tiers,
   check whether they appear in `entity_details` sampleValues for the relevant
   column. If sampleValues is short or the sample may have missed real values,
   run a `sql_execution` probe:
   `SELECT DISTINCT <col> FROM <ref> LIMIT 50`.
4. If the candidate identifier still does not resolve, do one of:
   - Use `sql_execution` with `SELECT 1 FROM <ref> LIMIT 0`. If it errors, the
     identifier is fictional.
   - Wrap the identifier in `[unverified - from <rawPath>]` in the wiki body,
     citing the exact raw path that mentioned it.
   - When recording `emit_unmapped_fallback` with `no_physical_table`, include
     the failing probe error in `clarification`.
5. Never copy `<schema>.<table>` placeholder strings from these instructions
   into output.
  • Step 2: Inline the protocol into writer skills

Add the same protocol block to these skills:

packages/context/skills/notion_synthesize/SKILL.md
packages/context/skills/dbt_ingest/SKILL.md
packages/context/skills/lookml_ingest/SKILL.md
packages/context/skills/looker_ingest/SKILL.md
packages/context/skills/metabase_ingest/SKILL.md
packages/context/skills/metricflow_ingest/SKILL.md
packages/context/skills/live_database_ingest/SKILL.md
packages/context/skills/historic_sql_patterns/SKILL.md
packages/context/skills/knowledge_capture/SKILL.md
packages/context/skills/sl_capture/SKILL.md

For packages/context/skills/historic_sql_table_digest/SKILL.md, add this shorter block:

## Identifier Verification Protocol

Only mention columns visible in the table's scan record. Use
`entity_details({connectionName, targets: [{display: "<identifier>"}]})` if
the table or column attribution is uncertain. Do not infer join columns or
filters from neighboring SQL unless the scan record confirms the column exists
on the named table.

For packages/context/skills/sl/SKILL.md, add this cross-reference:

For capture-time identifier verification, load `sl_capture`. Synthesis writer
skills must verify warehouse identifiers with `discover_data`,
`entity_details`, and `sql_execution` before emitting table or column names.
  • Step 3: Apply per-skill edits

Make these exact content changes:

  • In notion_synthesize, add discover_data, entity_details, and sql_execution to the Allowed: line. Replace tableRef: "orbit_analytics.customer" with tableRef: "<schema>.<table>".

  • In dbt_ingest, replace wiki_sl_search with discover_data and sl_describe_table with entity_details.

  • In lookml_ingest, add: Verify each sql_table_name from the LookML view with entity_details before mapping to an SL source.

  • In looker_ingest, add: For every Looker field reference, call entity_details on the underlying schema.table.column before promoting it to sl_refs or quoting it in wiki body.

  • In metabase_ingest, add: Before writing a wiki page derived from a Metabase question SQL, verify each schema.table.column mentioned with entity_details.

  • In metricflow_ingest, add: Verify each MetricFlow model source table with entity_details before producing the corresponding sl_write_source.

  • In live_database_ingest, add: Sample values come from the scan record; do not invent values not present in relationship-profile.json.

  • In historic_sql_patterns, add: Every join column mentioned in pattern descriptions must be verified via entity_details for both sides of the join.

  • In knowledge_capture, update the workflow to call discover_data first when a page relates to data or SL concepts.

  • In sl_capture, add: Before sl_write_source, call entity_details on the target table to confirm column names and types match the YAML being written.

  • Step 4: Remove stale code and prompt strings

Modify packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts:

.describe('The fully-qualified table or source reference that triggered the fallback (e.g. "<schema>.<table>"). Used to generate canonical detail text.'),

Modify packages/context/src/sl/tools/sl-warehouse-validation.ts:

          `that inherits the manifest schema. Call sl_read_source to inspect the existing source first.`,
  • Step 5: Commit

Run:

git add packages/context/skills packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts packages/context/src/sl/tools/sl-warehouse-validation.ts
git commit -m "docs(context): add ingest identifier verification protocol"

Task 8: Add prompt-bundling and banned-string tests

Files:

  • Modify: packages/context/src/memory/memory-runtime-assets.test.ts

  • Modify: packages/context/src/ingest/ingest-runtime-assets.test.ts

  • Step 1: Add failing asset tests

Add to packages/context/src/memory/memory-runtime-assets.test.ts:

const verificationWriterSkills = [
  'notion_synthesize',
  'dbt_ingest',
  'lookml_ingest',
  'looker_ingest',
  'metabase_ingest',
  'metricflow_ingest',
  'live_database_ingest',
  'historic_sql_table_digest',
  'historic_sql_patterns',
  'knowledge_capture',
  'sl_capture',
] as const;

it('ships identifier verification protocol in every synthesis writer skill', async () => {
  for (const skillName of verificationWriterSkills) {
    const body = await readFile(join(skillsDir, skillName, 'SKILL.md'), 'utf-8');
    expect(body).toContain('## Identifier Verification Protocol');
    expect(body).toMatch(/discover_data|entity_details/);
  }
});

it('does not ship stale warehouse verification tool names or fictional identifiers', async () => {
  for (const skillName of verificationWriterSkills) {
    const body = await readFile(join(skillsDir, skillName, 'SKILL.md'), 'utf-8');
    expect(body).not.toContain('orbit_analytics.customer');
    expect(body).not.toContain('wiki_sl_search');
    expect(body).not.toContain('sl_describe_table');
  }
});

Add to packages/context/src/ingest/ingest-runtime-assets.test.ts:

it('packages identifier verification prompt assets', async () => {
  const shared = await readFile(join(skillsDir, '_shared', 'identifier-verification.md'), 'utf-8');
  expect(shared).toContain('## Identifier Verification Protocol');
  expect(shared).toContain('discover_data');
  expect(shared).toContain('entity_details');
  expect(shared).toContain('sql_execution');
});
  • Step 2: Run the asset tests

Run:

pnpm --filter @ktx/context exec vitest run src/memory/memory-runtime-assets.test.ts src/ingest/ingest-runtime-assets.test.ts

Expected: PASS after Task 7.

  • Step 3: Commit

Run:

git add packages/context/src/memory/memory-runtime-assets.test.ts packages/context/src/ingest/ingest-runtime-assets.test.ts
git commit -m "test(context): guard ingest identifier verification prompts"

Task 9: Run the full v1 verification set

Files:

  • Verify all files changed by Tasks 1-8.

  • Step 1: Run focused tests

Run:

pnpm --filter @ktx/context exec vitest run \
  src/connections/dialects.test.ts \
  src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts \
  src/ingest/tools/warehouse-verification/entity-details.tool.test.ts \
  src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts \
  src/ingest/tools/warehouse-verification/discover-data.tool.test.ts \
  src/ingest/ingest-bundle.runner.test.ts \
  src/memory/memory-runtime-assets.test.ts \
  src/ingest/ingest-runtime-assets.test.ts

Expected: PASS.

  • Step 2: Run package type-check

Run:

pnpm --filter @ktx/context run type-check

Expected: PASS.

  • Step 3: Run package tests

Run:

pnpm --filter @ktx/context run test

Expected: PASS.

  • Step 4: Run pre-commit on changed files when configured

Run:

uv run pre-commit run --files \
  packages/context/src/connections/dialects.ts \
  packages/context/src/connections/dialects.test.ts \
  packages/context/src/connections/index.ts \
  packages/context/src/tools/tool-session.ts \
  packages/context/src/ingest/ingest-bundle.runner.ts \
  packages/context/src/ingest/local-bundle-runtime.ts \
  packages/context/src/ingest/ingest-bundle.runner.test.ts \
  packages/context/src/ingest/tools/emit-unmapped-fallback.tool.ts \
  packages/context/src/sl/tools/sl-warehouse-validation.ts \
  packages/context/src/memory/memory-runtime-assets.test.ts \
  packages/context/src/ingest/ingest-runtime-assets.test.ts \
  packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts \
  packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.test.ts \
  packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.ts \
  packages/context/src/ingest/tools/warehouse-verification/entity-details.tool.test.ts \
  packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.ts \
  packages/context/src/ingest/tools/warehouse-verification/sql-execution.tool.test.ts \
  packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.ts \
  packages/context/src/ingest/tools/warehouse-verification/discover-data.tool.test.ts \
  packages/context/src/ingest/tools/warehouse-verification/index.ts \
  packages/context/skills/_shared/identifier-verification.md \
  packages/context/skills/notion_synthesize/SKILL.md \
  packages/context/skills/dbt_ingest/SKILL.md \
  packages/context/skills/lookml_ingest/SKILL.md \
  packages/context/skills/looker_ingest/SKILL.md \
  packages/context/skills/metabase_ingest/SKILL.md \
  packages/context/skills/metricflow_ingest/SKILL.md \
  packages/context/skills/live_database_ingest/SKILL.md \
  packages/context/skills/historic_sql_table_digest/SKILL.md \
  packages/context/skills/historic_sql_patterns/SKILL.md \
  packages/context/skills/knowledge_capture/SKILL.md \
  packages/context/skills/sl_capture/SKILL.md \
  packages/context/skills/sl/SKILL.md

Expected: PASS. If the repo has no pre-commit config or the local uv version cannot satisfy the project pin, record the exact error and rely on the focused tests plus type-check.

  • Step 5: Commit final verification notes if any files changed during checks

Run:

git status --short

Expected: only intentional files are modified. Commit any formatter-driven edits with:

git add packages/context
git commit -m "chore(context): verify warehouse verification tools"

Self-review checklist

  • Spec coverage: the plan covers dialect dispatch, raw scan catalog reads, entity_details, sql_execution, discover_data, WU and reconcile availability, prompt updates, cleanups, and tests.
  • Placeholder scan: no task relies on unnamed future work.
  • Type consistency: tool inputs use connectionName; existing sl_discover calls receive connectionId internally; raw SQL execution uses SlConnectionCatalogPort.executeQuery() because SemanticLayerService.executeQuery() currently accepts semantic-layer query input, not raw SQL.