diff --git a/docs-site/content/docs/integrations/context-sources.mdx b/docs-site/content/docs/integrations/context-sources.mdx index 56e2f539..c2afd8ab 100644 --- a/docs-site/content/docs/integrations/context-sources.mdx +++ b/docs-site/content/docs/integrations/context-sources.mdx @@ -19,13 +19,19 @@ Agents must configure and ingest context sources in this order: 5. Review generated `semantic-layer/` YAML and `wiki/` Markdown files in git. 6. Validate changed semantic sources with `ktx sl validate`. -## Shared source fields +## Common source fields + +Git repository fields are source-specific. dbt uses top-level `repo_url`, +LookML uses top-level `repoUrl`, and MetricFlow uses nested +`metricflow.repoUrl`. | Field | Required | Description | |-------|----------|-------------| | `driver` | Yes | Source adapter: `dbt`, `metricflow`, `lookml`, `metabase`, `looker`, or `notion` | | `source_dir` | For local file sources | Absolute or project-relative source directory | -| `repo_url` | For Git-hosted sources | Git repository URL | +| `repo_url` | For Git-hosted dbt sources | Git repository URL | +| `repoUrl` | For Git-hosted LookML sources | Git repository URL | +| `metricflow.repoUrl` | For Git-hosted MetricFlow sources | Git repository URL | | `branch` | No | Git branch to read | | `path` | No | Subdirectory inside a monorepo | | `auth_token_ref` | For private APIs/repos | `env:NAME` or `file:/path/to/secret` token reference | @@ -351,7 +357,7 @@ Create an integration at [notion.so/my-integrations](https://www.notion.so/my-in | `root_page_ids` | Page IDs to crawl from (for `selected_roots`) | `[]` | | `root_database_ids` | Database IDs to include | `[]` | | `max_pages_per_run` | Pages processed per sync | `1000` | -| `max_knowledge_creates_per_run` | New pages created per sync | `5` | +| `max_knowledge_creates_per_run` | New pages created per sync | `25` | | `max_knowledge_updates_per_run` | Pages updated per sync | `20` | ### What gets ingested @@ -365,13 +371,14 @@ Create an integration at [notion.so/my-integrations](https://www.notion.so/my-in - Notion is knowledge-only — it does not produce semantic layer sources - Rate limits apply; large workspaces may require multiple ingestion runs -- `last_successful_cursor` is auto-managed for incremental sync +- Incremental sync cursors are stored in `.ktx/db.sqlite`; don't add + `last_successful_cursor` to `ktx.yaml` ## Common errors | Error or symptom | Likely cause | Recovery | |------------------|--------------|----------| -| Adapter cannot read source files | `source_dir`, `repo_url`, `branch`, or `path` is wrong | Verify the path locally or clone the repo manually with the same credentials | +| Adapter cannot read source files | `source_dir`, `repo_url`, `repoUrl`, `metricflow.repoUrl`, `branch`, or `path` is wrong | Verify the path locally or clone the repo manually with the same credentials | | Private repo/API authentication fails | Token env var or secret file is missing | Export the env var or update `auth_token_ref` to a readable file | | Ingest creates duplicate context | Existing source names or wiki pages do not match imported terminology | Review the diff, rename duplicates, and add wiki pages with canonical names | | Notion ingest skips pages | Integration lacks access or root ids are missing | Share pages with the Notion integration and set `root_page_ids` or use `all_accessible` carefully | diff --git a/docs-site/content/docs/integrations/primary-sources.mdx b/docs-site/content/docs/integrations/primary-sources.mdx index beb9cb27..b15d93ab 100644 --- a/docs-site/content/docs/integrations/primary-sources.mdx +++ b/docs-site/content/docs/integrations/primary-sources.mdx @@ -27,6 +27,9 @@ Agents should prefer environment or file references over literal secrets. | `schema` or `schemas` | No | schema-aware warehouses | Single schema or list of schemas to scan | | `context.queryHistory` | No | PostgreSQL, Snowflake, BigQuery | Enables query-history ingestion when the warehouse supports it | | `path` | Yes for path-style SQLite | SQLite | Local SQLite database path or `env:NAME` reference | +| `max_bytes_billed` | No | BigQuery | Maximum bytes billed per query job | +| `job_timeout_ms` | No | BigQuery | BigQuery query job timeout in milliseconds | +| `project_id` | No | BigQuery | Optional local descriptor and mapping metadata; not used for BigQuery authentication | ## PostgreSQL @@ -216,6 +219,9 @@ For multiple datasets: | Environment variable | `credentials_json: env:BIGQUERY_CREDENTIALS_JSON` | The project ID is extracted automatically from the service account JSON file. +If you set `project_id` in `ktx.yaml`, KTX treats it as local descriptor and +mapping metadata. The BigQuery connector still authenticates with the +`project_id` inside `credentials_json`. ### Features @@ -254,7 +260,7 @@ staged artifact shape as Postgres and Snowflake. - Parameter binding uses named `@param` syntax - Arrays flattened to comma-separated strings in results - Location specified at query execution time -- Supports `maxBytesBilled` and `jobTimeoutMs` limits +- Supports `max_bytes_billed` and `job_timeout_ms` limits from `ktx.yaml` --- diff --git a/packages/cli/src/doctor.test.ts b/packages/cli/src/doctor.test.ts index 5461a26b..b89e52d3 100644 --- a/packages/cli/src/doctor.test.ts +++ b/packages/cli/src/doctor.test.ts @@ -418,6 +418,70 @@ describe('runKtxDoctor', () => { expect(testIo.stdout()).toContain('ktx setup'); }); + it('warns about stale and unsupported per-driver connection fields', async () => { + process.env.ANTHROPIC_API_KEY = 'test-key'; // pragma: allowlist secret + process.env.WAREHOUSE_DATABASE_URL = 'postgresql://reader@example.test/warehouse'; + process.env.NOTION_TOKEN = 'notion-secret'; + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'project: warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' readonly: true', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + ' windowDays: 30', + ' concurrency: 4', + ' local:', + ' driver: sqlite', + ' file_path: ./warehouse.db', + ' docs:', + ' driver: notion', + ' auth_token_ref: env:NOTION_TOKEN', + ' crawl_mode: all_accessible', + ' last_successful_cursor: \'{"phase":"all_accessible_pages","cursor":"cursor-1"}\'', + 'ingest:', + ' adapters:', + ' - live-database', + 'llm:', + ' provider:', + ' backend: anthropic', + '', + ].join('\n'), + 'utf-8', + ); + const testIo = makeIo(); + + await expect( + runKtxDoctor( + { command: 'project', projectDir: tempDir, outputMode: 'plain', inputMode: 'disabled' }, + testIo.io, + { + postgresQueryHistoryProbe: async () => ({ + pgServerVersion: 'PostgreSQL 16.4', + warnings: [], + info: [], + }), + }, + ), + ).resolves.toBe(0); + + const out = testIo.stdout(); + expect(out).toContain('Warnings'); + expect(out).toContain('connections.warehouse.readonly is no longer used.'); + expect(out).toContain('connections.warehouse.historicSql.concurrency is no longer used.'); + expect(out).toContain('connections.warehouse.historicSql.windowDays does not constrain pg_stat_statements.'); + expect(out).toContain('connections.local.file_path was removed.'); + expect(out).toContain('connections.docs.last_successful_cursor is local sync state.'); + delete process.env.ANTHROPIC_API_KEY; + delete process.env.WAREHOUSE_DATABASE_URL; + delete process.env.NOTION_TOKEN; + }); + it('warns when semantic-search embeddings are not configured', async () => { process.env.ANTHROPIC_API_KEY = 'test-key'; // pragma: allowlist secret await writeFile( diff --git a/packages/cli/src/local-scan-connectors.test.ts b/packages/cli/src/local-scan-connectors.test.ts index e8a5c1e9..d287b563 100644 --- a/packages/cli/src/local-scan-connectors.test.ts +++ b/packages/cli/src/local-scan-connectors.test.ts @@ -9,7 +9,6 @@ const bigQueryMock = vi.hoisted(() => ({ constructorInputs: [] as Array<{ connectionId: string; connection: unknown; - maxBytesBilled?: number | string; }>, })); @@ -20,7 +19,7 @@ vi.mock('@ktx/connector-bigquery', () => ({ readonly id: string; readonly driver = 'bigquery'; - constructor(options: { connectionId: string; connection: unknown; maxBytesBilled?: number | string }) { + constructor(options: { connectionId: string; connection: unknown }) { bigQueryMock.constructorInputs.push(options); this.id = `bigquery:${options.connectionId}`; } @@ -61,7 +60,7 @@ describe('createKtxCliScanConnector', () => { expect(connector.driver).toBe('sqlite'); }); - it('passes BigQuery max_bytes_billed from standalone config', async () => { + it('passes canonical BigQuery YAML scan limits through to the connector', async () => { await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' }); await writeFile( join(tempDir, 'ktx.yaml'), @@ -72,6 +71,7 @@ describe('createKtxCliScanConnector', () => { ' driver: bigquery', ' dataset_id: analytics', ' max_bytes_billed: "987654321"', + ' job_timeout_ms: 30000', '', ].join('\n'), 'utf-8', @@ -85,9 +85,13 @@ describe('createKtxCliScanConnector', () => { expect(bigQueryMock.constructorInputs).toEqual([ expect.objectContaining({ connectionId: 'warehouse', - maxBytesBilled: '987654321', + connection: expect.objectContaining({ + max_bytes_billed: '987654321', + job_timeout_ms: 30000, + }), }), ]); + expect(bigQueryMock.constructorInputs[0]).not.toHaveProperty('maxBytesBilled'); }); it('throws for structural daemon-only fallback configs', async () => { diff --git a/packages/cli/src/local-scan-connectors.ts b/packages/cli/src/local-scan-connectors.ts index 3058b96e..b28e4f5a 100644 --- a/packages/cli/src/local-scan-connectors.ts +++ b/packages/cli/src/local-scan-connectors.ts @@ -3,20 +3,6 @@ import type { KtxScanConnector } from '@ktx/context/scan'; const SUPPORTED_DRIVERS = 'sqlite, postgres, mysql, clickhouse, sqlserver, bigquery, snowflake'; -function bigQueryMaxBytesBilled( - connection: KtxLocalProject['config']['connections'][string], -): number | string | undefined { - const raw = connection.max_bytes_billed; - if (typeof raw === 'number') { - return Number.isFinite(raw) && raw > 0 ? raw : undefined; - } - if (typeof raw === 'string') { - const trimmed = raw.trim(); - return trimmed.length > 0 ? trimmed : undefined; - } - return undefined; -} - export async function createKtxCliScanConnector( project: KtxLocalProject, connectionId: string, @@ -64,12 +50,7 @@ export async function createKtxCliScanConnector( if (driver === 'bigquery') { const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('@ktx/connector-bigquery'); if (isKtxBigQueryConnectionConfig(connection)) { - const maxBytesBilled = bigQueryMaxBytesBilled(connection); - return new KtxBigQueryScanConnector({ - connectionId, - connection, - ...(maxBytesBilled !== undefined ? { maxBytesBilled } : {}), - }); + return new KtxBigQueryScanConnector({ connectionId, connection }); } } if (driver === 'snowflake') { diff --git a/packages/cli/src/setup-sources.test.ts b/packages/cli/src/setup-sources.test.ts index 5b0187bf..d1d541b8 100644 --- a/packages/cli/src/setup-sources.test.ts +++ b/packages/cli/src/setup-sources.test.ts @@ -252,6 +252,7 @@ describe('setup sources step', () => { max_knowledge_creates_per_run: 25, max_knowledge_updates_per_run: 20, }); + expect((await readConfig()).connections['notion-main']?.last_successful_cursor).toBeUndefined(); }); it('accepts former ingest subcommand names as interactive source connection ids', async () => { diff --git a/packages/cli/src/setup-sources.ts b/packages/cli/src/setup-sources.ts index 359cf0d8..c55f3fd8 100644 --- a/packages/cli/src/setup-sources.ts +++ b/packages/cli/src/setup-sources.ts @@ -501,7 +501,6 @@ function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionC max_pages_per_run: 1000, max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN, max_knowledge_updates_per_run: 20, - last_successful_cursor: null, }; } diff --git a/packages/cli/src/standalone-smoke.test.ts b/packages/cli/src/standalone-smoke.test.ts index 3305b460..537f373c 100644 --- a/packages/cli/src/standalone-smoke.test.ts +++ b/packages/cli/src/standalone-smoke.test.ts @@ -166,18 +166,18 @@ describe('standalone built ktx CLI smoke', () => { expect(result.stderr).toContain("unknown command 'agent'"); }); - it('runs doctor setup through the built binary', async () => { - const env = { ...process.env }; - delete env.KTX_PROJECT_DIR; - const result = await runBuiltCli(['status', '--no-input'], { cwd: tempDir, env }); + it('runs status setup checks through the built binary', async () => { + const result = await runBuiltCli(['status', '--verbose', '--no-input']); - expect(result.stdout).toMatch(/KTX (setup doctor|project doctor|status)/); + expect(result.stdout).toMatch(/KTX status/); if (result.stdout.includes('No project here yet.')) { expect(result.stdout).toContain('ktx setup'); } else { expect(result.stdout).toContain('Node 22+'); expect(result.stdout).toContain('Workspace-local CLI'); } + expect(result.stdout).toContain('Node 22+'); + expect(result.stdout).toContain('Workspace-local CLI'); expect(result.stderr === '' || result.stderr.startsWith('Project: ')).toBe(true); expect([0, 1]).toContain(result.code); }); diff --git a/packages/cli/src/status-project.ts b/packages/cli/src/status-project.ts index d90cf37e..08686355 100644 --- a/packages/cli/src/status-project.ts +++ b/packages/cli/src/status-project.ts @@ -61,6 +61,14 @@ interface WarningItem { fix?: string; } +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function hasOwnField(value: Record, key: string): boolean { + return Object.prototype.hasOwnProperty.call(value, key); +} + export interface ProjectStatus { projectName: string; projectDir: string; @@ -471,6 +479,51 @@ function buildWarnings( ): WarningItem[] { const warnings: WarningItem[] = []; + for (const [connectionId, connection] of Object.entries(config.connections)) { + const driver = String(connection.driver ?? '').toLowerCase(); + if (hasOwnField(connection, 'readonly')) { + warnings.push({ + message: `connections.${connectionId}.readonly is no longer used.`, + fix: `Remove connections.${connectionId}.readonly from ktx.yaml.`, + }); + } + + if ((driver === 'sqlite' || driver === 'sqlite3') && hasOwnField(connection, 'file_path')) { + warnings.push({ + message: `connections.${connectionId}.file_path was removed.`, + fix: `Rename connections.${connectionId}.file_path to path.`, + }); + } + + if (driver === 'notion' && hasOwnField(connection, 'last_successful_cursor')) { + warnings.push({ + message: `connections.${connectionId}.last_successful_cursor is local sync state.`, + fix: 'Remove it from ktx.yaml. KTX stores the Notion cursor in .ktx/db.sqlite.', + }); + } + + const historicSql = isRecord(connection.historicSql) ? connection.historicSql : null; + if (!historicSql) { + continue; + } + if (hasOwnField(historicSql, 'concurrency')) { + warnings.push({ + message: `connections.${connectionId}.historicSql.concurrency is no longer used.`, + fix: `Remove connections.${connectionId}.historicSql.concurrency from ktx.yaml.`, + }); + } + const historicDialect = String(historicSql.dialect ?? driver).toLowerCase(); + if ( + (historicDialect === 'postgres' || historicDialect === 'postgresql') && + hasOwnField(historicSql, 'windowDays') + ) { + warnings.push({ + message: `connections.${connectionId}.historicSql.windowDays does not constrain pg_stat_statements.`, + fix: `Remove connections.${connectionId}.historicSql.windowDays from ktx.yaml.`, + }); + } + } + for (const adapter of config.ingest.adapters) { const requiredDrivers = ADAPTER_DRIVER_REQUIREMENT[adapter]; if (!requiredDrivers) continue; diff --git a/packages/connector-bigquery/src/connector.test.ts b/packages/connector-bigquery/src/connector.test.ts index 46dc3b53..c78770e6 100644 --- a/packages/connector-bigquery/src/connector.test.ts +++ b/packages/connector-bigquery/src/connector.test.ts @@ -267,6 +267,30 @@ describe('KtxBigQueryScanConnector', () => { ); }); + it('applies canonical BigQuery YAML scan limits to query jobs', async () => { + const clientFactory = fakeClientFactory(); + const connector = new KtxBigQueryScanConnector({ + connectionId: 'warehouse', + connection: { ...connection, max_bytes_billed: '987654321', job_timeout_ms: 30_000 }, + clientFactory, + }); + + await expect( + connector.executeReadOnly( + { connectionId: 'warehouse', sql: 'select id, status from `project-1`.`analytics`.`orders`', maxRows: 1 }, + { runId: 'scan-run-1' }, + ), + ).resolves.toMatchObject({ rows: [[1, 'paid']], rowCount: 1 }); + + const client = vi.mocked(clientFactory.createClient).mock.results[0]?.value as KtxBigQueryClient; + expect(client.createQueryJob).toHaveBeenLastCalledWith( + expect.objectContaining({ + maximumBytesBilled: '987654321', + jobTimeoutMs: 30_000, + }), + ); + }); + it('adapts native snapshots to live-database introspection snapshots', async () => { const introspection = createBigQueryLiveDatabaseIntrospection({ connections: { warehouse: connection }, diff --git a/packages/connector-bigquery/src/connector.ts b/packages/connector-bigquery/src/connector.ts index 72cb8129..c69d6030 100644 --- a/packages/connector-bigquery/src/connector.ts +++ b/packages/connector-bigquery/src/connector.ts @@ -30,6 +30,8 @@ export interface KtxBigQueryConnectionConfig { dataset_ids?: string[]; credentials_json?: string; location?: string; + max_bytes_billed?: number | string; + job_timeout_ms?: number; [key: string]: unknown; } @@ -158,6 +160,28 @@ function datasetIds(connection: KtxBigQueryConnectionConfig, env: NodeJS.Process return datasetId ? [datasetId] : []; } +function bigQueryMaxBytesBilledFromConnection( + connection: KtxBigQueryConnectionConfig | undefined, +): number | string | undefined { + const value = connection?.max_bytes_billed; + if (typeof value === 'number') { + return Number.isFinite(value) && value > 0 ? value : undefined; + } + if (typeof value === 'string') { + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; + } + return undefined; +} + +function bigQueryJobTimeoutMsFromConnection(connection: KtxBigQueryConnectionConfig | undefined): number | undefined { + const value = connection?.job_timeout_ms; + if (typeof value !== 'number') { + return undefined; + } + return Number.isInteger(value) && value > 0 ? value : undefined; +} + function tableKind(metadataType: string | undefined): KtxSchemaTable['kind'] { const type = String(metadataType ?? '').toUpperCase(); if (type === 'VIEW' || type === 'MATERIALIZED_VIEW') { @@ -258,8 +282,8 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { }); this.clientFactory = options.clientFactory ?? new DefaultBigQueryClientFactory(); this.now = options.now ?? (() => new Date()); - this.maxBytesBilled = options.maxBytesBilled; - this.queryTimeoutMs = options.queryTimeoutMs; + this.maxBytesBilled = options.maxBytesBilled ?? bigQueryMaxBytesBilledFromConnection(options.connection); + this.queryTimeoutMs = options.queryTimeoutMs ?? bigQueryJobTimeoutMsFromConnection(options.connection); this.id = `bigquery:${options.connectionId}`; } diff --git a/packages/connector-sqlite/src/connector.test.ts b/packages/connector-sqlite/src/connector.test.ts index 9bee5567..c3c11d64 100644 --- a/packages/connector-sqlite/src/connector.test.ts +++ b/packages/connector-sqlite/src/connector.test.ts @@ -90,6 +90,13 @@ describe('KtxSqliteScanConnector', () => { connection: { driver: 'sqlite', path: 'warehouse.db' }, }), ).toBe(dbPath); + expect(() => + sqliteDatabasePathFromConfig({ + connectionId: 'warehouse', + projectDir: tempDir, + connection: { driver: 'sqlite', file_path: 'warehouse.db' }, + }), + ).toThrow('Native SQLite connector requires connections.warehouse.path or url'); } finally { if (originalDatabaseUrl === undefined) { delete process.env.KTX_SQLITE_TEST_URL; diff --git a/packages/connector-sqlite/src/connector.ts b/packages/connector-sqlite/src/connector.ts index 2a0d997e..137fc240 100644 --- a/packages/connector-sqlite/src/connector.ts +++ b/packages/connector-sqlite/src/connector.ts @@ -28,7 +28,6 @@ export interface KtxSqliteConnectionConfig { driver?: string; path?: string; url?: string; - file_path?: string; [key: string]: unknown; } @@ -146,12 +145,9 @@ export function sqliteDatabasePathFromConfig(input: SqliteDatabasePathInput): st if (!isKtxSqliteConnectionConfig(input.connection)) { throw new Error(`Native SQLite connector cannot run driver "${inputDriver}"`); } - const configuredPath = - stringConfigValue(input.connection, 'path') ?? - stringConfigValue(input.connection, 'file_path') ?? - sqlitePathFromUrl(stringConfigValue(input.connection, 'url') ?? ''); + const configuredPath = stringConfigValue(input.connection, 'path') ?? sqlitePathFromUrl(stringConfigValue(input.connection, 'url') ?? ''); if (!configuredPath) { - throw new Error(`Native SQLite connector requires connections.${input.connectionId}.path, file_path, or url`); + throw new Error(`Native SQLite connector requires connections.${input.connectionId}.path or url`); } return isAbsolute(configuredPath) ? configuredPath : resolve(input.projectDir ?? process.cwd(), configuredPath); } diff --git a/packages/context/src/connections/notion-config.test.ts b/packages/context/src/connections/notion-config.test.ts index 0167f186..6416bf99 100644 --- a/packages/context/src/connections/notion-config.test.ts +++ b/packages/context/src/connections/notion-config.test.ts @@ -39,8 +39,8 @@ describe('standalone Notion connection config', () => { max_pages_per_run: 1000, max_knowledge_creates_per_run: 25, max_knowledge_updates_per_run: 20, - last_successful_cursor: null, }); + expect(parsed).not.toHaveProperty('last_successful_cursor'); }); it('parses inline Notion auth tokens without requiring auth_token_ref', () => { @@ -132,7 +132,7 @@ describe('standalone Notion connection config', () => { maxPagesPerRun: 12, maxKnowledgeCreatesPerRun: 2, maxKnowledgeUpdatesPerRun: 7, - lastSuccessfulCursor: '{"phase":"all_accessible_pages","cursor":"cursor-1"}', + lastSuccessfulCursor: null, }); }); diff --git a/packages/context/src/connections/notion-config.ts b/packages/context/src/connections/notion-config.ts index d678ba2f..c7068512 100644 --- a/packages/context/src/connections/notion-config.ts +++ b/packages/context/src/connections/notion-config.ts @@ -24,7 +24,6 @@ export interface KtxNotionConnectionConfig extends KtxProjectConnectionConfig { max_pages_per_run: number; max_knowledge_creates_per_run: number; max_knowledge_updates_per_run: number; - last_successful_cursor: string | null; } export interface RedactedKtxNotionConnectionConfig { @@ -115,7 +114,6 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo } return { - ...input, driver: 'notion', auth_token: authToken, auth_token_ref: authTokenRef, @@ -138,7 +136,6 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo 0, 100, ), - last_successful_cursor: optionalString(input.last_successful_cursor), }; } @@ -206,6 +203,6 @@ export async function notionConnectionToPullConfig( maxPagesPerRun: config.max_pages_per_run, maxKnowledgeCreatesPerRun: config.max_knowledge_creates_per_run, maxKnowledgeUpdatesPerRun: config.max_knowledge_updates_per_run, - lastSuccessfulCursor: config.last_successful_cursor, + lastSuccessfulCursor: null, }); } diff --git a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts index 2726ddf3..b9ee73b3 100644 --- a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts @@ -103,7 +103,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => { for await (const row of reader.fetchAggregated( client, { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, - { dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, + { dialect: 'bigquery', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, )) { rows.push(row); } @@ -136,7 +136,6 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => { dialect: 'bigquery', minExecutions: 5, windowDays: 90, - concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], diff --git a/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts index b59e1e2e..a91171cd 100644 --- a/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts @@ -215,7 +215,7 @@ describe('PostgresPgssReader aggregate path', () => { for await (const row of reader.fetchAggregated( { executeQuery }, { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, - { dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, + { dialect: 'postgres', minExecutions: 5, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, )) { rows.push(row); } diff --git a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts index 31ef22d5..b33183d7 100644 --- a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts @@ -102,7 +102,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => { for await (const row of reader.fetchAggregated( client, { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, - { dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, + { dialect: 'snowflake', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 }, )) { rows.push(row); } @@ -135,7 +135,6 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => { dialect: 'snowflake', minExecutions: 5, windowDays: 90, - concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts index fddf9362..18675b11 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts @@ -148,6 +148,10 @@ function isEnabledTable(table: string, filter: EnabledTableFilter | null): boole return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized)); } +function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number { + return 'windowDays' in config ? config.windowDays : 90; +} + function redactTemplateSql( template: AggregatedTemplate, redactors: readonly HistoricSqlRedactionPattern[], @@ -279,7 +283,7 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql const enabledTableFilter = buildEnabledTableFilter(config.enabledTables); const redactors = compileHistoricSqlRedactionPatterns(config.redactionPatterns); const now = input.now ?? new Date(); - const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000); + const windowStart = new Date(now.getTime() - historicSqlWindowDays(config) * 24 * 60 * 60 * 1000); const probe = await input.reader.probe(input.queryClient); const snapshot: AggregatedTemplate[] = []; let snapshotRowCount = 0; diff --git a/packages/context/src/ingest/adapters/historic-sql/types.test.ts b/packages/context/src/ingest/adapters/historic-sql/types.test.ts index f5a6f853..95b253a8 100644 --- a/packages/context/src/ingest/adapters/historic-sql/types.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/types.test.ts @@ -12,11 +12,15 @@ describe('historic-sql unified contracts', () => { expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).toMatchObject({ dialect: 'postgres', minExecutions: 9, - windowDays: 90, - concurrency: 12, redactionPatterns: [], staleArchiveAfterDays: 90, }); + expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).not.toHaveProperty( + 'windowDays', + ); + expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).not.toHaveProperty( + 'concurrency', + ); const parsed = historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', diff --git a/packages/context/src/ingest/adapters/historic-sql/types.ts b/packages/context/src/ingest/adapters/historic-sql/types.ts index b1d30d15..ddb1846a 100644 --- a/packages/context/src/ingest/adapters/historic-sql/types.ts +++ b/packages/context/src/ingest/adapters/historic-sql/types.ts @@ -8,11 +8,8 @@ export type HistoricSqlDialect = z.infer; const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']); -export const historicSqlUnifiedPullConfigSchema = z.object({ - dialect: historicSqlDialectSchema, - windowDays: z.number().int().positive().default(90), +const historicSqlCommonPullConfigSchema = z.object({ minExecutions: z.number().int().nonnegative().default(5), - concurrency: z.number().int().positive().default(12), enabledTables: z.array(z.string().min(1)).default([]), filters: z.object({ serviceAccounts: z.object({ @@ -32,6 +29,20 @@ export const historicSqlUnifiedPullConfigSchema = z.object({ staleArchiveAfterDays: z.number().int().positive().default(90), }); +const historicSqlWindowedPullConfigSchema = historicSqlCommonPullConfigSchema.extend({ + dialect: z.enum(['snowflake', 'bigquery']), + windowDays: z.number().int().positive().default(90), +}); + +const historicSqlPostgresPullConfigSchema = historicSqlCommonPullConfigSchema.extend({ + dialect: z.literal('postgres'), +}); + +export const historicSqlUnifiedPullConfigSchema = z.discriminatedUnion('dialect', [ + historicSqlWindowedPullConfigSchema, + historicSqlPostgresPullConfigSchema, +]); + export type HistoricSqlUnifiedPullConfig = z.infer; export const aggregatedTemplateSchema = z.object({ diff --git a/packages/context/src/ingest/adapters/notion/local-state-store.test.ts b/packages/context/src/ingest/adapters/notion/local-state-store.test.ts new file mode 100644 index 00000000..892ea6c1 --- /dev/null +++ b/packages/context/src/ingest/adapters/notion/local-state-store.test.ts @@ -0,0 +1,36 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { LocalNotionRuntimeStore } from './local-state-store.js'; + +describe('LocalNotionRuntimeStore', () => { + let tempDir: string; + let dbPath: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-notion-state-')); + dbPath = join(tempDir, '.ktx', 'db.sqlite'); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('stores Notion cursors in local state and clears them after complete snapshots', async () => { + const cursor = '{"phase":"all_accessible_pages","cursor":"cursor-1"}'; + const store = new LocalNotionRuntimeStore({ + dbPath, + now: () => new Date('2026-05-13T10:00:00.000Z'), + }); + + await expect(store.readCursor('notion-main')).resolves.toBeNull(); + await store.setCursor('notion-main', cursor); + + const reopened = new LocalNotionRuntimeStore({ dbPath }); + await expect(reopened.readCursor('notion-main')).resolves.toBe(cursor); + + await reopened.setCursor('notion-main', null); + await expect(reopened.readCursor('notion-main')).resolves.toBeNull(); + }); +}); diff --git a/packages/context/src/ingest/adapters/notion/local-state-store.ts b/packages/context/src/ingest/adapters/notion/local-state-store.ts new file mode 100644 index 00000000..9fce32ed --- /dev/null +++ b/packages/context/src/ingest/adapters/notion/local-state-store.ts @@ -0,0 +1,60 @@ +import { mkdirSync } from 'node:fs'; +import { dirname } from 'node:path'; +import Database from 'better-sqlite3'; + +interface LocalNotionRuntimeStoreOptions { + dbPath: string; + now?: () => Date; +} + +export class LocalNotionRuntimeStore { + private readonly db: Database.Database; + private readonly now: () => Date; + + constructor(options: LocalNotionRuntimeStoreOptions) { + mkdirSync(dirname(options.dbPath), { recursive: true }); + this.db = new Database(options.dbPath); + this.db.pragma('journal_mode = WAL'); + this.db.pragma('foreign_keys = ON'); + this.now = options.now ?? (() => new Date()); + this.db.exec(` + CREATE TABLE IF NOT EXISTS local_notion_runtime_config ( + notion_connection_id TEXT PRIMARY KEY, + last_successful_cursor TEXT, + updated_at TEXT NOT NULL + ); + `); + } + + async readCursor(notionConnectionId: string): Promise { + const row = this.db + .prepare( + ` + SELECT last_successful_cursor + FROM local_notion_runtime_config + WHERE notion_connection_id = ? + `, + ) + .get(notionConnectionId) as { last_successful_cursor: string | null } | undefined; + + return row?.last_successful_cursor ?? null; + } + + async setCursor(notionConnectionId: string, cursor: string | null): Promise { + this.db + .prepare( + ` + INSERT INTO local_notion_runtime_config ( + notion_connection_id, + last_successful_cursor, + updated_at + ) + VALUES (?, ?, ?) + ON CONFLICT(notion_connection_id) DO UPDATE SET + last_successful_cursor = excluded.last_successful_cursor, + updated_at = excluded.updated_at + `, + ) + .run(notionConnectionId, cursor, this.now().toISOString()); + } +} diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index 3c238d98..1a7ed721 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -314,6 +314,7 @@ export type { } from './adapters/metricflow/pull-config.js'; export { NOTION_ORG_KNOWLEDGE_WARNING } from './adapters/notion/chunk.js'; export { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js'; +export { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js'; export { NotionSourceAdapter, type NotionSourceAdapterDeps } from './adapters/notion/notion.adapter.js'; export { NotionClient, type NotionApi, type NotionBotInfo } from './adapters/notion/notion-client.js'; export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js'; diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index f5e78e5f..e8cbf5a5 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -1,4 +1,4 @@ -import { mkdtemp, rm } from 'node:fs/promises'; +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; @@ -6,6 +6,7 @@ import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project import type { SqlAnalysisPort } from '../sql-analysis/index.js'; import type { HistoricSqlReader } from './adapters/historic-sql/types.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; +import { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js'; import { createDefaultLocalIngestAdapters, localPullConfigForAdapter } from './local-adapters.js'; describe('local ingest adapters', () => { @@ -192,9 +193,7 @@ describe('local ingest adapters', () => { await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).resolves.toEqual({ dialect: 'postgres', - windowDays: 90, minExecutions: 7, - concurrency: 12, enabledTables: [], filters: { serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' }, @@ -223,7 +222,6 @@ describe('local ingest adapters', () => { await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({ dialect: 'postgres', - windowDays: 45, minExecutions: 7, filters: { dropTrivialProbes: true }, }); @@ -241,7 +239,7 @@ describe('local ingest adapters', () => { await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({ dialect: 'postgres', - windowDays: 30, + minExecutions: 5, }); }); @@ -566,6 +564,79 @@ describe('local ingest adapters', () => { await expect(notion?.listTargetConnectionIds?.('/tmp/staged-notion')).resolves.toEqual(['warehouse']); }); + it('reads Notion cursors from local state instead of ktx.yaml', async () => { + const cursor = '{"phase":"all_accessible_pages","cursor":"cursor-1"}'; + const notionProject = projectWithConnections({ + notion: { + driver: 'notion', + auth_token: 'secret', + crawl_mode: 'all_accessible', + last_successful_cursor: '{"phase":"all_accessible_pages","cursor":"stale-yaml"}', + }, + } as never); + await new LocalNotionRuntimeStore({ dbPath: join(notionProject.projectDir, '.ktx', 'db.sqlite') }).setCursor( + 'notion', + cursor, + ); + + const notion = createDefaultLocalIngestAdapters(notionProject).find((adapter) => adapter.source === 'notion'); + + await expect(localPullConfigForAdapter(notionProject, notion!, 'notion')).resolves.toMatchObject({ + lastSuccessfulCursor: cursor, + }); + }); + + it('persists Notion next cursors to local state after successful pulls', async () => { + const cursor = '{"phase":"all_accessible_data_sources","cursor":"cursor-2"}'; + const notionProject = projectWithConnections({ + notion: { + driver: 'notion', + auth_token: 'secret', + crawl_mode: 'all_accessible', + }, + } as never); + const stagedDir = await mkdtemp(join(tempDir, 'notion-staged-')); + await writeFile( + join(stagedDir, 'manifest.json'), + JSON.stringify({ + source: 'notion', + apiVersion: '2026-03-11', + crawlMode: 'all_accessible', + rootPageIds: [], + rootDatabaseIds: [], + rootDataSourceIds: [], + fetchedAt: '2026-05-13T10:00:00.000Z', + pageCount: 1, + databaseCount: 0, + dataSourceCount: 0, + capped: true, + continuedFromCursor: false, + partialSnapshot: true, + maxPagesPerRun: 1, + maxKnowledgeCreatesPerRun: 25, + maxKnowledgeUpdatesPerRun: 20, + nextSuccessfulCursor: cursor, + skipped: [], + warnings: [], + }), + 'utf-8', + ); + + const notion = createDefaultLocalIngestAdapters(notionProject).find((adapter) => adapter.source === 'notion'); + await notion?.onPullSucceeded?.({ + connectionId: 'notion', + sourceKey: 'notion', + syncId: 'sync-1', + trigger: 'scheduled_pull', + completedAt: new Date('2026-05-13T10:00:00.000Z'), + stagedDir, + }); + + await expect( + new LocalNotionRuntimeStore({ dbPath: join(notionProject.projectDir, '.ktx', 'db.sqlite') }).readCursor('notion'), + ).resolves.toBe(cursor); + }); + it('passes primary warehouse connection ids to local LookML and MetricFlow adapters', async () => { const adapters = createDefaultLocalIngestAdapters( projectWithConnections({ diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 9c86b61e..2b13327a 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -1,7 +1,7 @@ import { join } from 'node:path'; import { localConnectionToWarehouseDescriptor, notionConnectionToPullConfig, parseNotionConnectionConfig } from '../connections/index.js'; import { resolveKtxConfigReference } from '../core/config-reference.js'; -import type { KtxLocalProject } from '../project/index.js'; +import { ktxLocalStateDbPath, type KtxLocalProject } from '../project/index.js'; import type { SqlAnalysisPort } from '../sql-analysis/index.js'; import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; @@ -37,6 +37,7 @@ import type { MetabaseClientLogger } from './adapters/metabase/client.js'; import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js'; import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js'; import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js'; +import { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js'; import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js'; import type { NotionFetchLogger } from './adapters/notion/fetch.js'; import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; @@ -115,6 +116,9 @@ export function createDefaultLocalIngestAdapters( }), new NotionSourceAdapter({ targetConnectionIds: primaryWarehouseConnectionIds(project), + onPullSucceeded: async ({ connectionId, nextSuccessfulCursor }) => { + await localNotionRuntimeStore(project).setCursor(connectionId, nextSuccessfulCursor); + }, ...(options.logger ? { logger: options.logger } : {}), }), ]; @@ -152,6 +156,10 @@ function primaryWarehouseConnectionIds(project: KtxLocalProject): string[] { .sort((left, right) => left.localeCompare(right)); } +function localNotionRuntimeStore(project: KtxLocalProject): LocalNotionRuntimeStore { + return new LocalNotionRuntimeStore({ dbPath: ktxLocalStateDbPath(project) }); +} + function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } @@ -293,7 +301,11 @@ export async function localPullConfigForAdapter( return localDbtPullConfigFromConnection(connection, options.looker?.env ?? process.env); } if (adapter.source === 'notion') { - return notionConnectionToPullConfig(parseNotionConnectionConfig(connection)); + const pullConfig = await notionConnectionToPullConfig(parseNotionConnectionConfig(connection)); + return { + ...pullConfig, + lastSuccessfulCursor: await localNotionRuntimeStore(project).readCursor(connectionId), + }; } if (adapter.source === 'metricflow') { const metricflow = connection.metricflow; diff --git a/scripts/package-artifacts.mjs b/scripts/package-artifacts.mjs index 1cd487c6..219f3e3c 100644 --- a/scripts/package-artifacts.mjs +++ b/scripts/package-artifacts.mjs @@ -912,11 +912,12 @@ try { requireStdout('ktx setup --help', setupHelp, /Usage: ktx setup/); requireStdout('ktx setup --help', setupHelp, /--no-input/); - const doctor = await run('pnpm', ['exec', 'ktx', 'status', '--no-input']); + const doctor = await run('pnpm', ['exec', 'ktx', 'status', '--verbose', '--no-input']); assert.ok([0, 1].includes(doctor.code), 'ktx status setup exit code must be 0 or 1'); requireStdout('ktx status setup', doctor, /KTX status/); requireStdout('ktx status setup', doctor, /No project here yet\\./); requireStdout('ktx status setup', doctor, /ktx setup/); + requireStdout('ktx status setup', doctor, /Node 22\\+/); assert.equal(doctor.stderr, '', 'ktx status setup wrote unexpected stderr'); } finally { await rm(root, { recursive: true, force: true }); diff --git a/scripts/package-artifacts.test.mjs b/scripts/package-artifacts.test.mjs index 708e9ccd..1f0bf164 100644 --- a/scripts/package-artifacts.test.mjs +++ b/scripts/package-artifacts.test.mjs @@ -510,7 +510,7 @@ describe('verification snippets', () => { assert.match(source, /pnpm', \['exec', 'ktx', 'setup', '--help'\]/); assert.match(source, /Usage: ktx setup/); assert.doesNotMatch(source, new RegExp(["'demo'", "'--mode'", "'deterministic'"].join(', '))); - assert.match(source, /'status', '--no-input'/); + assert.match(source, /'status', '--verbose', '--no-input'/); assert.match(source, /KTX status/); assert.match(source, /No project here yet/); assert.doesNotMatch(source, /function requireProjectStderr/);