Improve schema setup and Notion ingest UX

This commit is contained in:
Luca Martial 2026-05-11 12:34:33 -07:00
parent 155613c794
commit 72a4ace13c
21 changed files with 540 additions and 118 deletions

View file

@ -327,8 +327,19 @@ describe('createRepainter', () => {
repainter.paint('hello');
repainter.paint('bye');
expect(io.stdout()).toContain('\rbye');
expect(io.stdout()).not.toContain('\u001b[1A\rbye');
expect(io.stdout()).toContain('bye');
expect(io.stdout()).not.toMatch(/\[\d+A/);
});
it('does not undershoot cursor-up when a line is exactly the terminal width', () => {
const io = makeIo({ isTTY: true, columns: 10 });
const repainter = createRepainter(io.io);
repainter.paint('0123456789\nsecond\n');
repainter.paint('0123456789\nsecond\n');
const cursorMoves = [...io.stdout().matchAll(/\[(\d+)A/g)].map((m) => Number(m[1]));
expect(cursorMoves).toEqual([2]);
});
});

View file

@ -378,7 +378,8 @@ export function createRepainter(io: KtxCliIo) {
}
io.stdout.write('\r');
}
io.stdout.write(content.replaceAll('\n', `${ESC}[K\n`));
io.stdout.write(`${ESC}[2K`);
io.stdout.write(content.replaceAll('\n', `\n${ESC}[2K`));
io.stdout.write(`${ESC}[J`);
hasPainted = true;
lastCursorUpRows = cursorUpRowsAfterWrite(content);

View file

@ -531,7 +531,7 @@ describe('setup databases step', () => {
message: 'Primary sources already configured: warehouse\nWhat would you like to do?',
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
});
@ -582,7 +582,7 @@ describe('setup databases step', () => {
message: 'Primary sources already configured: warehouse\nWhat would you like to do?',
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
});
@ -617,7 +617,7 @@ describe('setup databases step', () => {
message: 'Primary sources already configured: postgres-warehouse\nWhat would you like to do?',
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
});
@ -652,7 +652,7 @@ describe('setup databases step', () => {
message: 'Primary sources already configured: postgres-warehouse\nWhat would you like to do?',
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
});
@ -695,7 +695,7 @@ describe('setup databases step', () => {
message: 'Primary sources already configured: warehouse\nWhat would you like to do?',
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
});
@ -918,6 +918,10 @@ describe('setup databases step', () => {
'│ ✓ Connection test passed',
'│ Driver: PostgreSQL · Tables: 2',
'│',
].join('\n'),
);
expect(io.stdout()).toContain(
[
'◇ Scanning postgres-warehouse',
'│ ✓ Structural scan completed',
'│ Changes: 2 new tables',
@ -1007,7 +1011,7 @@ describe('setup databases step', () => {
expect(config.connections['postgres-warehouse']).toMatchObject({
schemas: ['orbit_analytics', 'orbit_raw'],
});
expect(io.stdout()).toContain('Schemas: orbit_analytics, orbit_raw');
expect(io.stdout()).toContain(' orbit_analytics, orbit_raw');
});
it('auto-selects all discovered Postgres schemas in non-interactive setup', async () => {
@ -1043,7 +1047,7 @@ describe('setup databases step', () => {
expect(config.connections.warehouse).toMatchObject({
schemas: ['orbit_analytics', 'orbit_raw', 'public'],
});
expect(io.stdout()).toContain('Schemas: orbit_analytics, orbit_raw, public');
expect(io.stdout()).toContain(' orbit_analytics, orbit_raw, public');
});
it('adds one non-interactive Postgres URL connection, tests it, scans it, and marks databases complete', async () => {

View file

@ -112,6 +112,56 @@ const DEFAULT_CONNECTION_IDS: Record<KtxSetupDatabaseDriver, string> = {
snowflake: 'snowflake-warehouse',
};
interface ScopeDiscoverySpec {
noun: string;
nounPlural: string;
promptLabel: string;
configArrayField: string;
configSingleField: string;
defaultSelection: (values: string[]) => string[];
}
const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscoverySpec>> = {
postgres: {
noun: 'schema',
nounPlural: 'schemas',
promptLabel: 'PostgreSQL schemas',
configArrayField: 'schemas',
configSingleField: 'schema',
defaultSelection(schemas) {
const nonPublic = schemas.filter((s) => s !== 'public');
return nonPublic.length > 0 ? nonPublic : schemas;
},
},
sqlserver: {
noun: 'schema',
nounPlural: 'schemas',
promptLabel: 'SQL Server schemas',
configArrayField: 'schemas',
configSingleField: 'schema',
defaultSelection: (schemas) => schemas,
},
bigquery: {
noun: 'dataset',
nounPlural: 'datasets',
promptLabel: 'BigQuery datasets',
configArrayField: 'dataset_ids',
configSingleField: 'dataset_id',
defaultSelection: (datasets) => datasets,
},
snowflake: {
noun: 'schema',
nounPlural: 'schemas',
promptLabel: 'Snowflake schemas',
configArrayField: 'schema_names',
configSingleField: 'schema_name',
defaultSelection(schemas) {
const nonPublic = schemas.filter((s) => s !== 'PUBLIC');
return nonPublic.length > 0 ? nonPublic : schemas;
},
},
};
type UrlDriverType = Extract<KtxSetupDatabaseDriver, 'postgres' | 'mysql' | 'clickhouse' | 'sqlserver'>;
const DRIVER_CONNECTION_DEFAULTS: Record<UrlDriverType, { port: string }> = {
@ -260,16 +310,53 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr
async function defaultListSchemas(projectDir: string, connectionId: string): Promise<string[]> {
const project = await loadKtxProject({ projectDir });
const connection = project.config.connections[connectionId];
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('@ktx/connector-postgres');
if (!isKtxPostgresConnectionConfig(connection)) {
return [];
const driver = normalizeDriver(connection?.driver);
if (driver === 'postgres') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('@ktx/connector-postgres');
if (!isKtxPostgresConnectionConfig(connection)) return [];
const connector = new KtxPostgresScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
const connector = new KtxPostgresScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('@ktx/connector-sqlserver');
if (!isKtxSqlServerConnectionConfig(connection)) return [];
const connector = new KtxSqlServerScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('@ktx/connector-bigquery');
if (!isKtxBigQueryConnectionConfig(connection)) return [];
const connector = new KtxBigQueryScanConnector({ connectionId, connection });
try {
return await connector.listDatasets();
} finally {
await connector.cleanup();
}
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('@ktx/connector-snowflake');
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
const connector = new KtxSnowflakeScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
return [];
}
function existingConnectionIdsByDriver(
@ -307,7 +394,7 @@ function configuredPrimarySourcesPrompt(connectionIds: string[]): {
message: `Primary sources already configured: ${connectionIds.join(', ')}\nWhat would you like to do?`,
options: [
{ value: 'add', label: 'Add another primary source' },
{ value: 'continue', label: 'Continue setup' },
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'back', label: 'Back' },
],
};
@ -831,41 +918,44 @@ async function writeConnectionConfig(input: {
}
}
function configuredSchemas(connection: KtxProjectConnectionConfig | undefined): string[] {
function configuredScopeValues(
connection: KtxProjectConnectionConfig | undefined,
spec: ScopeDiscoverySpec,
): string[] {
if (!connection) return [];
if (Array.isArray(connection.schemas)) {
return connection.schemas
.filter((schema): schema is string => typeof schema === 'string' && schema.trim().length > 0)
.map((schema) => schema.trim());
const arrayVal = connection[spec.configArrayField];
if (Array.isArray(arrayVal)) {
return arrayVal
.filter((v): v is string => typeof v === 'string' && v.trim().length > 0)
.map((v) => v.trim());
}
return typeof connection.schema === 'string' && connection.schema.trim().length > 0 ? [connection.schema.trim()] : [];
const singleVal = connection[spec.configSingleField];
return typeof singleVal === 'string' && singleVal.trim().length > 0 ? [singleVal.trim()] : [];
}
function defaultSchemaSelection(schemas: string[]): string[] {
const nonPublic = schemas.filter((schema) => schema !== 'public');
return nonPublic.length > 0 ? nonPublic : schemas;
}
async function writeConnectionSchemas(input: {
async function writeScopeConfig(input: {
projectDir: string;
connectionId: string;
schemas: string[];
values: string[];
spec: ScopeDiscoverySpec;
}): Promise<void> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
if (!connection) return;
const { schema: _schema, ...connectionWithoutLegacySchema } = connection;
const cleaned = Object.fromEntries(
Object.entries(connection).filter(([key]) => key !== input.spec.configSingleField),
) as KtxProjectConnectionConfig;
await writeConnectionConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
connection: {
...connectionWithoutLegacySchema,
schemas: unique(input.schemas),
...cleaned,
[input.spec.configArrayField]: unique(input.values),
},
});
}
async function maybeConfigurePostgresSchemas(input: {
async function maybeConfigureSchemaScope(input: {
projectDir: string;
connectionId: string;
args: KtxSetupDatabasesArgs;
@ -875,65 +965,77 @@ async function maybeConfigurePostgresSchemas(input: {
}): Promise<boolean> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
if (normalizeDriver(connection?.driver) !== 'postgres') {
return true;
}
const driver = normalizeDriver(connection?.driver);
if (!driver) return true;
if (configuredSchemas(connection).length > 0) {
const spec = SCOPE_DISCOVERY_SPECS[driver];
if (!spec) return true;
const arrayVal = connection?.[spec.configArrayField];
if (Array.isArray(arrayVal) && arrayVal.length > 0) {
return true;
}
if (input.args.databaseSchemas.length > 0) {
await writeConnectionSchemas({
await writeScopeConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
schemas: input.args.databaseSchemas,
values: input.args.databaseSchemas,
spec,
});
return true;
}
let discoveredSchemas: string[];
writeSetupSection(input.io, `Discovering ${spec.promptLabel.toLowerCase()}`, [
`Connecting to ${input.connectionId}`,
]);
let discovered: string[];
try {
discoveredSchemas = unique(
discovered = unique(
await (input.deps.listSchemas ?? defaultListSchemas)(input.projectDir, input.connectionId),
);
} catch (error) {
input.io.stderr.write(
`Could not discover PostgreSQL schemas for ${input.connectionId}; continuing with existing schema scope. ` +
`Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; continuing with existing ${spec.noun} scope. ` +
`Pass --database-schema to set it explicitly. ${error instanceof Error ? error.message : String(error)}\n`,
);
return true;
}
if (discoveredSchemas.length === 0) {
if (discovered.length === 0) {
return true;
}
let selectedSchemas: string[];
if (input.args.inputMode === 'disabled' || discoveredSchemas.length === 1) {
selectedSchemas = discoveredSchemas;
let selected: string[];
if (input.args.inputMode === 'disabled' || discovered.length === 1) {
selected = discovered;
} else {
const initialValues = defaultSchemaSelection(discoveredSchemas);
const preconfigured = configuredScopeValues(connection, spec).filter((v) => discovered.includes(v));
const initialValues = preconfigured.length > 0 ? preconfigured : spec.defaultSelection(discovered);
const choices = await input.prompts.multiselect({
message: withMultiselectNavigation(
'PostgreSQL schemas to scan\nKTX found multiple non-system schemas. Select every schema agents should use.',
`${spec.promptLabel} to scan\n` +
`KTX found multiple ${spec.nounPlural}. Select every ${spec.noun} agents should use.`,
),
options: discoveredSchemas.map((schema) => ({ value: schema, label: schema })),
options: discovered.map((v) => ({ value: v, label: v })),
initialValues,
required: true,
});
if (choices.includes('back')) {
return false;
}
selectedSchemas = choices.length > 0 ? choices : initialValues;
selected = choices.length > 0 ? choices : initialValues;
}
await writeConnectionSchemas({
await writeScopeConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
schemas: selectedSchemas,
values: selected,
spec,
});
writeSetupSection(input.io, `Selecting schemas for ${input.connectionId}`, [
`Schemas: ${selectedSchemas.join(', ')}`,
const capitalNounPlural = spec.nounPlural[0]!.toUpperCase() + spec.nounPlural.slice(1);
writeSetupSection(input.io, `${capitalNounPlural} saved for ${input.connectionId}`, [
`${selected.join(', ')}`,
]);
return true;
}
@ -1049,7 +1151,7 @@ async function validateAndScanConnection(input: {
testLines.push(`Driver: ${driverDisplay}${Number.isFinite(tableCount) ? ` · Tables: ${tableCount}` : ''}`);
writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines);
if (!(await maybeConfigurePostgresSchemas(input))) {
if (!(await maybeConfigureSchemaScope(input))) {
return false;
}

View file

@ -211,6 +211,37 @@ describe('setup sources step', () => {
expect(runMapping).toHaveBeenCalledWith(projectDir, 'prod_metabase', io.io);
});
it('writes Notion config with the full default knowledge create budget', async () => {
await addPrimarySource();
const validateNotion = vi.fn(async () => ({ ok: true as const, detail: 'roots=1' }));
await expect(
runKtxSetupSourcesStep(
{
projectDir,
inputMode: 'disabled',
source: 'notion',
sourceConnectionId: 'notion-main',
sourceApiKeyRef: 'env:NOTION_TOKEN',
notionCrawlMode: 'selected_roots',
notionRootPageIds: ['page-1'],
runInitialSourceIngest: false,
skipSources: false,
},
makeIo().io,
{ validateNotion },
),
).resolves.toEqual({ status: 'ready', projectDir, connectionIds: ['notion-main'] });
expect((await readConfig()).connections['notion-main']).toMatchObject({
driver: 'notion',
auth_token_ref: 'env:NOTION_TOKEN',
root_page_ids: ['page-1'],
max_knowledge_creates_per_run: 25,
max_knowledge_updates_per_run: 20,
});
});
it('defaults interactive Metabase and Looker source setup to the only warehouse connection', async () => {
await addPrimarySource();
const cases: Array<{

View file

@ -36,6 +36,8 @@ import { writeProjectLocalSecretReference } from './setup-secrets.js';
export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion';
const DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN = 25;
export interface KtxSetupSourcesArgs {
projectDir: string;
inputMode: 'auto' | 'disabled';
@ -521,7 +523,7 @@ function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionC
root_database_ids: [],
root_data_source_ids: [],
max_pages_per_run: 1000,
max_knowledge_creates_per_run: 5,
max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN,
max_knowledge_updates_per_run: 20,
last_successful_cursor: null,
};

View file

@ -19,7 +19,7 @@ Each WorkUnit is either a single Notion page/span or a topical cluster of relate
3. Search `wiki_search` for existing pages that overlap the WorkUnit topics. Prefer updating an existing page over creating a duplicate.
4. Use `context_evidence_search`, `context_evidence_read`, and `context_evidence_neighbors` to pull supporting chunks when indexed evidence is relevant. Pass `chunkId` and `documentId` values verbatim as returned by the evidence tools.
5. Write durable business knowledge with `wiki_write`. Aim for a small number of high-quality pages per WorkUnit or cluster.
6. When the Notion content defines a reusable dataset, metric, segment, join rule, source-of-truth mapping, or table with explicit columns, load `sl_capture`, discover existing sources first with `sl_discover` or `sl_read_source`, then use `sl_write_source` or `sl_edit_source`.
6. When the Notion content defines a reusable dataset, metric, segment, join rule, source-of-truth mapping, or table with explicit columns, load `sl_capture`, discover existing sources first with `sl_discover` or `sl_read_source`, then use `sl_write_source` or `sl_edit_source` only for a confirmed mapped non-Notion target source. If no mapped target exists, call `emit_unmapped_fallback` and keep the content wiki-only.
7. For every deleted raw path in the Eviction Set, call `eviction_list`, decide retention, then `context_eviction_decision_write`. Do this even when no wiki write is needed.
## What To Capture
@ -61,9 +61,10 @@ If a clustered WorkUnit includes several related pages, synthesize the shared ru
- Discover existing sources first with `sl_discover`; read existing source YAML before editing.
- Prefer overlays on manifest-backed sources over standalone SQL.
- If Notion describes a dashboard or metric but does not define executable logic, write a wiki page and attach `sl_refs` only after confirming the referenced source exists.
- Do not create SL sources under the Notion connection just because a page mentions a warehouse, dbt, Looker, or Metabase object. Use the mapped warehouse/source connection after discovery, or emit an unmapped fallback and write wiki-only.
## Tools
Allowed: `read_raw_file`, `read_raw_span`, `wiki_search`, `wiki_read`, `wiki_write`, `sl_discover`, `sl_read_source`, `sl_write_source`, `sl_edit_source`, `sl_validate`, `context_evidence_search`, `context_evidence_read`, `context_evidence_neighbors`, `eviction_list`, `context_eviction_decision_write`.
Allowed: `read_raw_file`, `read_raw_span`, `wiki_search`, `wiki_read`, `wiki_write`, `sl_discover`, `sl_read_source`, `sl_write_source`, `sl_edit_source`, `sl_validate`, `context_evidence_search`, `context_evidence_read`, `context_evidence_neighbors`, `emit_unmapped_fallback`, `eviction_list`, `context_eviction_decision_write`.
Not allowed: `context_candidate_write`, `context_candidate_mark`.

View file

@ -36,7 +36,7 @@ describe('standalone Notion connection config', () => {
root_database_ids: [],
root_data_source_ids: [],
max_pages_per_run: 1000,
max_knowledge_creates_per_run: 5,
max_knowledge_creates_per_run: 25,
max_knowledge_updates_per_run: 20,
last_successful_cursor: null,
});
@ -60,7 +60,7 @@ describe('standalone Notion connection config', () => {
rootDatabaseIds: [],
rootDataSourceIds: [],
maxPagesPerRun: 80,
maxKnowledgeCreatesPerRun: 5,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
warning: 'Anything accessible to this Notion integration can become organization knowledge.',
});

View file

@ -1,7 +1,11 @@
import { readFile } from 'node:fs/promises';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { type NotionPullConfig, notionPullConfigSchema } from '../ingest/adapters/notion/types.js';
import {
NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN,
type NotionPullConfig,
notionPullConfigSchema,
} from '../ingest/adapters/notion/types.js';
import type { KtxProjectConnectionConfig } from '../project/config.js';
export const KTX_NOTION_ORG_KNOWLEDGE_WARNING =
@ -119,7 +123,7 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
max_pages_per_run: boundedInteger(input.max_pages_per_run, 1000, 'max_pages_per_run', 1, 10_000),
max_knowledge_creates_per_run: boundedInteger(
input.max_knowledge_creates_per_run,
5,
NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN,
'max_knowledge_creates_per_run',
0,
25,

View file

@ -7,6 +7,8 @@ import { notionManifestSchema, notionMetadataSchema } from './types.js';
const MAX_NOTION_WORK_UNIT_CHARS = 40_000;
export const NOTION_ORG_KNOWLEDGE_WARNING =
'Anything accessible to this Notion integration can become organization knowledge.';
const NOTION_SL_WRITE_GUIDANCE =
'Write wiki entries with wiki_write. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
async function walk(root: string): Promise<string[]> {
const entries = await readdir(root, { withFileTypes: true, recursive: true });
@ -92,7 +94,7 @@ export async function chunkNotionStagedDir(stagedDir: string, diffSet?: DiffSet)
rawFiles,
dependencyPaths,
peerFileIndex,
notes: `Synthesize durable wiki and SL knowledge from this Notion page span only. Use read_raw_span on ${pagePath} for lines ${range.startLine}-${range.endLine}; do not call read_raw_file for oversized pages. Cite evidence chunk/page IDs.`,
notes: `Synthesize durable wiki and SL knowledge from this Notion page span only. Use read_raw_span on ${pagePath} for lines ${range.startLine}-${range.endLine}; do not call read_raw_file for oversized pages. ${NOTION_SL_WRITE_GUIDANCE} Cite evidence chunk/page IDs.`,
});
}
continue;
@ -105,7 +107,7 @@ export async function chunkNotionStagedDir(stagedDir: string, diffSet?: DiffSet)
dependencyPaths,
peerFileIndex,
notes:
'Synthesize durable wiki and SL knowledge from this Notion page. Write wiki entries with wiki_write and SL sources with sl_write_source; cite evidence chunk/page IDs.',
`Synthesize durable wiki and SL knowledge from this Notion page. ${NOTION_SL_WRITE_GUIDANCE} Cite evidence chunk/page IDs.`,
});
}

View file

@ -79,6 +79,8 @@ describe('clusterNotionWorkUnits', () => {
expect(wu.unitKey).toMatch(/^notion-cluster-\d+$/);
expect(wu.rawFiles.length).toBeGreaterThan(0);
expect(wu.notes).toMatch(/Synthesize/);
expect(wu.notes).toContain('emit_unmapped_fallback');
expect(wu.notes).toContain('Do not create SL sources under the Notion connection');
}
});

View file

@ -8,6 +8,8 @@ import { notionMetadataSchema } from './types.js';
export const MIN_PAGES_TO_CLUSTER = 5;
const CLUSTER_TEXT_BODY_CHARS = 1024;
const CLUSTER_SEED = 42;
const NOTION_CLUSTER_SL_WRITE_GUIDANCE =
'Write wiki entries directly with wiki_write. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
interface ClusterNotionWorkUnitsArgs {
workUnits: WorkUnit[];
@ -63,7 +65,7 @@ function mergeWorkUnits(bucket: WorkUnit[], clusterIndex: number): WorkUnit {
`Synthesize durable wiki and SL knowledge from these ${bucket.length} related Notion pages. ` +
'Read each page with read_raw_file (or read_raw_span for oversized pages). ' +
'Search nearby evidence with context_evidence_search/_read/_neighbors when needed. ' +
'Write wiki entries directly with wiki_write and SL sources directly with sl_write_source. ' +
`${NOTION_CLUSTER_SL_WRITE_GUIDANCE} ` +
'Do not call context_candidate_write.',
};
}

View file

@ -117,7 +117,7 @@ describe('NotionSourceAdapter', () => {
continuedFromCursor: false,
partialSnapshot: true,
maxPagesPerRun: 1,
maxKnowledgeCreatesPerRun: 5,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
skipped: [],
warnings: ['maxPagesPerRun reached at 1'],
@ -167,7 +167,7 @@ describe('NotionSourceAdapter', () => {
continuedFromCursor: true,
partialSnapshot: true,
maxPagesPerRun: 100,
maxKnowledgeCreatesPerRun: 5,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
nextSuccessfulCursor: null,
skipped: [],
@ -218,7 +218,7 @@ describe('NotionSourceAdapter', () => {
continuedFromCursor: false,
partialSnapshot: false,
maxPagesPerRun: 100,
maxKnowledgeCreatesPerRun: 5,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
skipped: [],
warnings: [],
@ -241,8 +241,10 @@ describe('NotionSourceAdapter', () => {
dependencyPaths: ['manifest.json', 'pages/page-1/blocks.json'],
});
expect(result.workUnits[0].notes).toContain('Synthesize durable wiki and SL knowledge');
expect(result.workUnits[0].notes).toContain('emit_unmapped_fallback');
expect(result.workUnits[0].notes).toContain('Do not create SL sources under the Notion connection');
expect(result.reconcileNotes).toEqual([
'Notion maxKnowledgeCreatesPerRun=5',
'Notion maxKnowledgeCreatesPerRun=25',
'Notion maxKnowledgeUpdatesPerRun=20',
]);
expect(result.contextReport).toEqual({ capped: false, warnings: [NOTION_ORG_KNOWLEDGE_WARNING] });

View file

@ -2,6 +2,7 @@ import { z } from 'zod';
export const NOTION_API_VERSION = '2026-03-11';
export const NOTION_SOURCE_KEY = 'notion';
export const NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN = 25;
export const notionPullConfigSchema = z.object({
authToken: z.string().min(1),
@ -10,7 +11,7 @@ export const notionPullConfigSchema = z.object({
rootDatabaseIds: z.array(z.string().min(1)).default([]),
rootDataSourceIds: z.array(z.string().min(1)).default([]),
maxPagesPerRun: z.number().int().min(1).max(10_000).default(1000),
maxKnowledgeCreatesPerRun: z.number().int().min(0).max(25).default(5),
maxKnowledgeCreatesPerRun: z.number().int().min(0).max(25).default(NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN),
maxKnowledgeUpdatesPerRun: z.number().int().min(0).max(100).default(20),
lastSuccessfulCursor: z.string().nullable().default(null),
});

View file

@ -315,6 +315,7 @@ export type {
MetricflowPullConfig,
} from './adapters/metricflow/pull-config.js';
export { NOTION_ORG_KNOWLEDGE_WARNING } from './adapters/notion/chunk.js';
export { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
export { NotionSourceAdapter, type NotionSourceAdapterDeps } from './adapters/notion/notion.adapter.js';
export { NotionClient, type NotionApi, type NotionBotInfo } from './adapters/notion/notion-client.js';
export { chunkHistoricSqlStagedDir, describeHistoricSqlScope } from './adapters/historic-sql/chunk.js';

View file

@ -8,6 +8,7 @@ import type { CaptureSession, MemoryAction } from '../memory/index.js';
import type { SlValidationDeps } from '../sl/index.js';
import { createTouchedSlSources, type ToolContext, type ToolSession } from '../tools/index.js';
import { actionTargetConnectionId } from './action-identity.js';
import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
import { selectRelevantCanonicalPins } from './canonical-pins.js';
import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js';
import type { MemoryFlowPlannedWorkUnit } from './memory-flow/types.js';
@ -38,6 +39,11 @@ import { createReadRawSpanTool } from './tools/read-raw-span.tool.js';
import { createStageDiffTool } from './tools/stage-diff.tool.js';
import { createStageListTool } from './tools/stage-list.tool.js';
import { type ToolCallLogEntry, wrapToolsWithLogger } from './tools/tool-call-logger.js';
import {
createMutableToolTranscriptSummary,
recordToolTranscriptEntry,
type MutableToolTranscriptSummary,
} from './tools/tool-transcript-summary.js';
import type {
EvictionUnit,
IngestBundleJob,
@ -47,14 +53,6 @@ import type {
WorkUnit,
} from './types.js';
interface MutableToolTranscriptSummary {
unitKey: string;
path: string;
toolCallCount: number;
errorCount: number;
toolNames: Set<string>;
}
function workUnitToMemoryFlowPlannedWorkUnit(workUnit: WorkUnit): MemoryFlowPlannedWorkUnit {
return {
unitKey: workUnit.unitKey,
@ -79,21 +77,6 @@ function countMemoryFlowActions(actions: MemoryAction[], target: MemoryAction['t
return actions.filter((action) => action.target === target).length;
}
function isStructuredToolFailure(output: unknown): boolean {
if (!output || typeof output !== 'object') {
return false;
}
const structured = (output as { structured?: unknown }).structured;
return !!structured && typeof structured === 'object' && (structured as { success?: unknown }).success === false;
}
function isFailedToolCall(entry: ToolCallLogEntry): boolean {
if (entry.error) {
return true;
}
return (entry.toolName === 'sl_write_source' || entry.toolName === 'wiki_write') && isStructuredToolFailure(entry.output);
}
function reportIdFromCreateResult(result: unknown): string | undefined {
if (!result || typeof result !== 'object' || !('id' in result)) {
return undefined;
@ -296,7 +279,9 @@ export class IngestBundleRunner {
? (bundleRef.config as Record<string, unknown>)
: {};
const configuredCreates =
typeof rawConfig.maxKnowledgeCreatesPerRun === 'number' ? rawConfig.maxKnowledgeCreatesPerRun : 5;
typeof rawConfig.maxKnowledgeCreatesPerRun === 'number'
? rawConfig.maxKnowledgeCreatesPerRun
: NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN;
const configuredUpdates =
typeof rawConfig.maxKnowledgeUpdatesPerRun === 'number' ? rawConfig.maxKnowledgeUpdatesPerRun : 20;
const wikiActions = stageIndex.workUnits.flatMap((wu) => wu.actions).filter((action) => action.target === 'wiki');
@ -350,17 +335,8 @@ export class IngestBundleRunner {
(path: string) =>
(entry: ToolCallLogEntry): void => {
const current =
transcriptSummaries.get(entry.wuKey) ??
({
unitKey: entry.wuKey,
path,
toolCallCount: 0,
errorCount: 0,
toolNames: new Set<string>(),
} satisfies MutableToolTranscriptSummary);
current.toolCallCount += 1;
current.errorCount += isFailedToolCall(entry) ? 1 : 0;
current.toolNames.add(entry.toolName);
transcriptSummaries.get(entry.wuKey) ?? createMutableToolTranscriptSummary(entry.wuKey, path);
recordToolTranscriptEntry(current, entry);
transcriptSummaries.set(entry.wuKey, current);
};
const overrideReport = await this.loadOverrideReport(job);
@ -727,7 +703,7 @@ export class IngestBundleRunner {
sourceKey: job.sourceKey,
connectionId: job.connectionId,
jobId: job.jobId,
toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.errorCount ?? 0,
toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
onStepFinish: ({ stepIndex, stepBudget }) => {
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
},

View file

@ -611,7 +611,7 @@ describe('local ingest', () => {
continuedFromCursor: false,
partialSnapshot: false,
maxPagesPerRun: 1000,
maxKnowledgeCreatesPerRun: 5,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
nextSuccessfulCursor: null,
skipped: [],
@ -654,6 +654,7 @@ describe('local ingest', () => {
crawlMode: 'selected_roots',
rootPageIds: ['page-1'],
maxPagesPerRun: 1000,
maxKnowledgeCreatesPerRun: 25,
}),
expect.any(String),
{ connectionId: 'notion-main', sourceKey: 'notion' },

View file

@ -0,0 +1,99 @@
import { describe, expect, it } from 'vitest';
import type { ToolCallLogEntry } from './tool-call-logger.js';
import { createMutableToolTranscriptSummary, recordToolTranscriptEntry } from './tool-transcript-summary.js';
function entry(overrides: Partial<ToolCallLogEntry>): ToolCallLogEntry {
return {
ts: '2026-05-11T00:00:00.000Z',
wuKey: 'wu-1',
toolName: 'wiki_write',
durationMs: 1,
input: {},
...overrides,
};
}
describe('tool transcript summaries', () => {
it('keeps recovered wiki_write structured failures out of fatal failures', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
input: { key: 'orbit-customers' },
output: { structured: { success: false, key: 'orbit-customers' } },
}),
);
recordToolTranscriptEntry(
summary,
entry({
input: { key: 'orbit-customers' },
output: { structured: { success: true, key: 'orbit-customers' } },
}),
);
expect(summary.errorCount).toBe(1);
expect(summary.fatalErrorCount).toBe(0);
});
it('keeps unrecovered structured write failures fatal', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
input: { key: 'orbit-customers' },
output: { structured: { success: false, key: 'orbit-customers' } },
}),
);
expect(summary.errorCount).toBe(1);
expect(summary.fatalErrorCount).toBe(1);
});
it('treats a later sl_edit_source success as recovery for the same SL source', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
toolName: 'sl_write_source',
input: { connectionId: 'warehouse', sourceName: 'orbit_customers' },
output: { structured: { success: false, sourceName: 'orbit_customers' } },
}),
);
recordToolTranscriptEntry(
summary,
entry({
toolName: 'sl_edit_source',
input: { connectionId: 'warehouse', sourceName: 'orbit_customers' },
output: { structured: { success: true, sourceName: 'orbit_customers' } },
}),
);
expect(summary.errorCount).toBe(1);
expect(summary.fatalErrorCount).toBe(0);
});
it('keeps thrown tool errors fatal even after a successful write', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
input: { key: 'orbit-customers' },
error: { message: 'tool crashed' },
}),
);
recordToolTranscriptEntry(
summary,
entry({
input: { key: 'orbit-customers' },
output: { structured: { success: true, key: 'orbit-customers' } },
}),
);
expect(summary.errorCount).toBe(1);
expect(summary.fatalErrorCount).toBe(1);
});
});

View file

@ -0,0 +1,130 @@
import type { ToolCallLogEntry } from './tool-call-logger.js';
export interface MutableToolTranscriptSummary {
unitKey: string;
path: string;
toolCallCount: number;
errorCount: number;
fatalErrorCount: number;
toolNames: Set<string>;
hardErrorCount: number;
recoverableFailureCounts: Map<string, number>;
}
export function createMutableToolTranscriptSummary(unitKey: string, path: string): MutableToolTranscriptSummary {
return {
unitKey,
path,
toolCallCount: 0,
errorCount: 0,
fatalErrorCount: 0,
toolNames: new Set<string>(),
hardErrorCount: 0,
recoverableFailureCounts: new Map<string, number>(),
};
}
export function recordToolTranscriptEntry(summary: MutableToolTranscriptSummary, entry: ToolCallLogEntry): void {
summary.toolCallCount += 1;
summary.toolNames.add(entry.toolName);
if (entry.error) {
summary.errorCount += 1;
summary.hardErrorCount += 1;
refreshFatalErrorCount(summary);
return;
}
const recoverableFailureKey = recoverableStructuredFailureKey(entry);
if (recoverableFailureKey) {
summary.errorCount += 1;
summary.recoverableFailureCounts.set(
recoverableFailureKey,
(summary.recoverableFailureCounts.get(recoverableFailureKey) ?? 0) + 1,
);
refreshFatalErrorCount(summary);
return;
}
const recoveryKey = recoverableStructuredSuccessKey(entry);
if (recoveryKey) {
summary.recoverableFailureCounts.delete(recoveryKey);
}
refreshFatalErrorCount(summary);
}
function refreshFatalErrorCount(summary: MutableToolTranscriptSummary): void {
summary.fatalErrorCount =
summary.hardErrorCount + [...summary.recoverableFailureCounts.values()].reduce((sum, count) => sum + count, 0);
}
function recoverableStructuredFailureKey(entry: ToolCallLogEntry): string | null {
if (!isStructuredToolFailure(entry.output)) {
return null;
}
if (entry.toolName === 'wiki_write') {
return wikiTargetKey(entry);
}
if (entry.toolName === 'sl_write_source') {
return slTargetKey(entry);
}
return null;
}
function recoverableStructuredSuccessKey(entry: ToolCallLogEntry): string | null {
if (!isStructuredToolSuccess(entry.output)) {
return null;
}
if (entry.toolName === 'wiki_write') {
return wikiTargetKey(entry);
}
if (entry.toolName === 'sl_write_source' || entry.toolName === 'sl_edit_source') {
return slTargetKey(entry);
}
return null;
}
function isStructuredToolFailure(output: unknown): boolean {
return structuredSuccess(output) === false;
}
function isStructuredToolSuccess(output: unknown): boolean {
return structuredSuccess(output) === true;
}
function structuredSuccess(output: unknown): boolean | null {
const structured = recordField(output, 'structured');
const success = structured?.success;
return typeof success === 'boolean' ? success : null;
}
function wikiTargetKey(entry: ToolCallLogEntry): string | null {
const key = stringField(recordField(entry.output, 'structured'), 'key') ?? stringField(entry.input, 'key');
return key ? `wiki:${key}` : null;
}
function slTargetKey(entry: ToolCallLogEntry): string | null {
const structured = recordField(entry.output, 'structured');
const sourceName = stringField(structured, 'sourceName') ?? stringField(entry.input, 'sourceName');
if (!sourceName) {
return null;
}
const connectionId = stringField(entry.input, 'connectionId') ?? '';
return `sl:${connectionId}:${sourceName}`;
}
function recordField(value: unknown, field: string): Record<string, unknown> | null {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return null;
}
const nested = (value as Record<string, unknown>)[field];
return nested && typeof nested === 'object' && !Array.isArray(nested) ? (nested as Record<string, unknown>) : null;
}
function stringField(value: unknown, field: string): string | null {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return null;
}
const raw = (value as Record<string, unknown>)[field];
return typeof raw === 'string' && raw.length > 0 ? raw : null;
}

View file

@ -100,6 +100,50 @@ describe('WikiWriteTool', () => {
expect(result.markdown).toMatch(/content.*or.*replacements/i);
});
it('updates frontmatter only on an existing page while preserving content', async () => {
const { tool, wikiService } = makeTool({
wikiService: {
readPage: vi.fn().mockResolvedValue({
pageKey: 'orbit-customers',
frontmatter: {
summary: 'Customer source details',
usage_mode: 'auto',
sort_order: 0,
tags: ['notion'],
refs: ['notion:old'],
sl_refs: ['postgres-warehouse/orbit_analytics.customer'],
},
content: '# Orbit Customers\n\nSource: Notion - Orbit Customers Source.',
}),
},
});
const result = await tool.call(
{
key: 'orbit-customers',
summary: 'Customer source details mapped to the warehouse customer view',
sl_refs: ['postgres-warehouse/orbit_analytics.customer', 'dbt-main/customer'],
} as any,
baseContext,
);
expect(result.structured).toEqual({ success: true, key: 'orbit-customers', action: 'updated' });
expect(wikiService.writePage).toHaveBeenCalledWith(
'USER',
'u',
'orbit-customers',
expect.objectContaining({
summary: 'Customer source details mapped to the warehouse customer view',
tags: ['notion'],
refs: ['notion:old'],
sl_refs: ['postgres-warehouse/orbit_analytics.customer', 'dbt-main/customer'],
}),
'# Orbit Customers\n\nSource: Notion - Orbit Customers Source.',
expect.any(String),
expect.any(String),
);
});
it('writes historic-SQL frontmatter fields', async () => {
const { tool, wikiService } = makeTool();

View file

@ -77,6 +77,7 @@ export class WikiWriteTool extends BaseTool<typeof wikiWriteInputSchema> {
get description(): string {
return `<purpose>
Create or update a knowledge page. Provide content for create/rewrite, or replacements for targeted edits.
For existing pages, you may provide only frontmatter fields such as summary, tags, refs, or sl_refs to update metadata while preserving content.
tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to clear, [values] to set.
</purpose>`;
}
@ -90,17 +91,20 @@ tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to
const writesGlobal = !!context.session;
const skipIndex = context.session?.isWorktreeScoped === true;
if (!input.content && (!input.replacements || input.replacements.length === 0)) {
const scope: BlockScope = writesGlobal ? 'GLOBAL' : 'USER';
const scopeId = scope === 'USER' ? context.userId : null;
const existing = await wikiService.readPage(scope, scopeId, input.key);
const content = input.content;
const hasContent = typeof content === 'string' && content.length > 0;
const hasReplacements = !!input.replacements && input.replacements.length > 0;
if (!existing && !hasContent && !hasReplacements) {
return {
markdown: 'Error: provide either content (for create/rewrite) or replacements (for edits).',
structured: { success: false, key: input.key },
};
}
const scope: BlockScope = writesGlobal ? 'GLOBAL' : 'USER';
const scopeId = scope === 'USER' ? context.userId : null;
const existing = await wikiService.readPage(scope, scopeId, input.key);
if (!existing && !input.content) {
return {
markdown: `Page "${input.key}" does not exist. Provide content to create it.`,
@ -140,9 +144,9 @@ tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to
fingerprints: input.fingerprints === undefined ? existingFm?.fingerprints : input.fingerprints,
};
if (input.content) {
finalContent = normalizeAccidentalEscapedMarkdownNewlines(input.content);
} else {
if (hasContent) {
finalContent = normalizeAccidentalEscapedMarkdownNewlines(content);
} else if (hasReplacements) {
const editResult = applySqlEdits(existing?.content ?? '', input.replacements ?? []);
if (!editResult.success) {
return {
@ -151,6 +155,8 @@ tags/refs/sl_refs use REPLACE semantics: omit to keep existing on update, [] to
};
}
finalContent = editResult.sql;
} else {
finalContent = existing?.content ?? '';
}
await wikiService.writePage(scope, scopeId, input.key, finalFm, finalContent, SYSTEM_AUTHOR, SYSTEM_EMAIL);