Merge origin/main into merge-scan-into-ingest-v1

This commit is contained in:
Andrey Avtomonov 2026-05-14 01:40:11 +02:00
commit e501d1d81c
28 changed files with 432 additions and 71 deletions

View file

@ -418,6 +418,70 @@ describe('runKtxDoctor', () => {
expect(testIo.stdout()).toContain('ktx setup');
});
it('warns about stale and unsupported per-driver connection fields', async () => {
process.env.ANTHROPIC_API_KEY = 'test-key'; // pragma: allowlist secret
process.env.WAREHOUSE_DATABASE_URL = 'postgresql://reader@example.test/warehouse';
process.env.NOTION_TOKEN = 'notion-secret';
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' historicSql:',
' enabled: true',
' dialect: postgres',
' windowDays: 30',
' concurrency: 4',
' local:',
' driver: sqlite',
' file_path: ./warehouse.db',
' docs:',
' driver: notion',
' auth_token_ref: env:NOTION_TOKEN',
' crawl_mode: all_accessible',
' last_successful_cursor: \'{"phase":"all_accessible_pages","cursor":"cursor-1"}\'',
'ingest:',
' adapters:',
' - live-database',
'llm:',
' provider:',
' backend: anthropic',
'',
].join('\n'),
'utf-8',
);
const testIo = makeIo();
await expect(
runKtxDoctor(
{ command: 'project', projectDir: tempDir, outputMode: 'plain', inputMode: 'disabled' },
testIo.io,
{
postgresQueryHistoryProbe: async () => ({
pgServerVersion: 'PostgreSQL 16.4',
warnings: [],
info: [],
}),
},
),
).resolves.toBe(0);
const out = testIo.stdout();
expect(out).toContain('Warnings');
expect(out).toContain('connections.warehouse.readonly is no longer used.');
expect(out).toContain('connections.warehouse.historicSql.concurrency is no longer used.');
expect(out).toContain('connections.warehouse.historicSql.windowDays does not constrain pg_stat_statements.');
expect(out).toContain('connections.local.file_path was removed.');
expect(out).toContain('connections.docs.last_successful_cursor is local sync state.');
delete process.env.ANTHROPIC_API_KEY;
delete process.env.WAREHOUSE_DATABASE_URL;
delete process.env.NOTION_TOKEN;
});
it('warns when semantic-search embeddings are not configured', async () => {
process.env.ANTHROPIC_API_KEY = 'test-key'; // pragma: allowlist secret
await writeFile(

View file

@ -9,7 +9,6 @@ const bigQueryMock = vi.hoisted(() => ({
constructorInputs: [] as Array<{
connectionId: string;
connection: unknown;
maxBytesBilled?: number | string;
}>,
}));
@ -20,7 +19,7 @@ vi.mock('@ktx/connector-bigquery', () => ({
readonly id: string;
readonly driver = 'bigquery';
constructor(options: { connectionId: string; connection: unknown; maxBytesBilled?: number | string }) {
constructor(options: { connectionId: string; connection: unknown }) {
bigQueryMock.constructorInputs.push(options);
this.id = `bigquery:${options.connectionId}`;
}
@ -61,7 +60,7 @@ describe('createKtxCliScanConnector', () => {
expect(connector.driver).toBe('sqlite');
});
it('passes BigQuery max_bytes_billed from standalone config', async () => {
it('passes canonical BigQuery YAML scan limits through to the connector', async () => {
await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' });
await writeFile(
join(tempDir, 'ktx.yaml'),
@ -72,6 +71,7 @@ describe('createKtxCliScanConnector', () => {
' driver: bigquery',
' dataset_id: analytics',
' max_bytes_billed: "987654321"',
' job_timeout_ms: 30000',
'',
].join('\n'),
'utf-8',
@ -85,9 +85,13 @@ describe('createKtxCliScanConnector', () => {
expect(bigQueryMock.constructorInputs).toEqual([
expect.objectContaining({
connectionId: 'warehouse',
maxBytesBilled: '987654321',
connection: expect.objectContaining({
max_bytes_billed: '987654321',
job_timeout_ms: 30000,
}),
}),
]);
expect(bigQueryMock.constructorInputs[0]).not.toHaveProperty('maxBytesBilled');
});
it('throws for structural daemon-only fallback configs', async () => {

View file

@ -3,20 +3,6 @@ import type { KtxScanConnector } from '@ktx/context/scan';
const SUPPORTED_DRIVERS = 'sqlite, postgres, mysql, clickhouse, sqlserver, bigquery, snowflake';
function bigQueryMaxBytesBilled(
connection: KtxLocalProject['config']['connections'][string],
): number | string | undefined {
const raw = connection.max_bytes_billed;
if (typeof raw === 'number') {
return Number.isFinite(raw) && raw > 0 ? raw : undefined;
}
if (typeof raw === 'string') {
const trimmed = raw.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
return undefined;
}
export async function createKtxCliScanConnector(
project: KtxLocalProject,
connectionId: string,
@ -64,12 +50,7 @@ export async function createKtxCliScanConnector(
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('@ktx/connector-bigquery');
if (isKtxBigQueryConnectionConfig(connection)) {
const maxBytesBilled = bigQueryMaxBytesBilled(connection);
return new KtxBigQueryScanConnector({
connectionId,
connection,
...(maxBytesBilled !== undefined ? { maxBytesBilled } : {}),
});
return new KtxBigQueryScanConnector({ connectionId, connection });
}
}
if (driver === 'snowflake') {

View file

@ -252,6 +252,7 @@ describe('setup sources step', () => {
max_knowledge_creates_per_run: 25,
max_knowledge_updates_per_run: 20,
});
expect((await readConfig()).connections['notion-main']?.last_successful_cursor).toBeUndefined();
});
it('accepts former ingest subcommand names as interactive source connection ids', async () => {

View file

@ -501,7 +501,6 @@ function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionC
max_pages_per_run: 1000,
max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN,
max_knowledge_updates_per_run: 20,
last_successful_cursor: null,
};
}

View file

@ -166,18 +166,18 @@ describe('standalone built ktx CLI smoke', () => {
expect(result.stderr).toContain("unknown command 'agent'");
});
it('runs doctor setup through the built binary', async () => {
const env = { ...process.env };
delete env.KTX_PROJECT_DIR;
const result = await runBuiltCli(['status', '--no-input'], { cwd: tempDir, env });
it('runs status setup checks through the built binary', async () => {
const result = await runBuiltCli(['status', '--verbose', '--no-input']);
expect(result.stdout).toMatch(/KTX (setup doctor|project doctor|status)/);
expect(result.stdout).toMatch(/KTX status/);
if (result.stdout.includes('No project here yet.')) {
expect(result.stdout).toContain('ktx setup');
} else {
expect(result.stdout).toContain('Node 22+');
expect(result.stdout).toContain('Workspace-local CLI');
}
expect(result.stdout).toContain('Node 22+');
expect(result.stdout).toContain('Workspace-local CLI');
expect(result.stderr === '' || result.stderr.startsWith('Project: ')).toBe(true);
expect([0, 1]).toContain(result.code);
});

View file

@ -61,6 +61,14 @@ interface WarningItem {
fix?: string;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function hasOwnField(value: Record<string, unknown>, key: string): boolean {
return Object.prototype.hasOwnProperty.call(value, key);
}
export interface ProjectStatus {
projectName: string;
projectDir: string;
@ -471,6 +479,51 @@ function buildWarnings(
): WarningItem[] {
const warnings: WarningItem[] = [];
for (const [connectionId, connection] of Object.entries(config.connections)) {
const driver = String(connection.driver ?? '').toLowerCase();
if (hasOwnField(connection, 'readonly')) {
warnings.push({
message: `connections.${connectionId}.readonly is no longer used.`,
fix: `Remove connections.${connectionId}.readonly from ktx.yaml.`,
});
}
if ((driver === 'sqlite' || driver === 'sqlite3') && hasOwnField(connection, 'file_path')) {
warnings.push({
message: `connections.${connectionId}.file_path was removed.`,
fix: `Rename connections.${connectionId}.file_path to path.`,
});
}
if (driver === 'notion' && hasOwnField(connection, 'last_successful_cursor')) {
warnings.push({
message: `connections.${connectionId}.last_successful_cursor is local sync state.`,
fix: 'Remove it from ktx.yaml. KTX stores the Notion cursor in .ktx/db.sqlite.',
});
}
const historicSql = isRecord(connection.historicSql) ? connection.historicSql : null;
if (!historicSql) {
continue;
}
if (hasOwnField(historicSql, 'concurrency')) {
warnings.push({
message: `connections.${connectionId}.historicSql.concurrency is no longer used.`,
fix: `Remove connections.${connectionId}.historicSql.concurrency from ktx.yaml.`,
});
}
const historicDialect = String(historicSql.dialect ?? driver).toLowerCase();
if (
(historicDialect === 'postgres' || historicDialect === 'postgresql') &&
hasOwnField(historicSql, 'windowDays')
) {
warnings.push({
message: `connections.${connectionId}.historicSql.windowDays does not constrain pg_stat_statements.`,
fix: `Remove connections.${connectionId}.historicSql.windowDays from ktx.yaml.`,
});
}
}
for (const adapter of config.ingest.adapters) {
const requiredDrivers = ADAPTER_DRIVER_REQUIREMENT[adapter];
if (!requiredDrivers) continue;

View file

@ -267,6 +267,30 @@ describe('KtxBigQueryScanConnector', () => {
);
});
it('applies canonical BigQuery YAML scan limits to query jobs', async () => {
const clientFactory = fakeClientFactory();
const connector = new KtxBigQueryScanConnector({
connectionId: 'warehouse',
connection: { ...connection, max_bytes_billed: '987654321', job_timeout_ms: 30_000 },
clientFactory,
});
await expect(
connector.executeReadOnly(
{ connectionId: 'warehouse', sql: 'select id, status from `project-1`.`analytics`.`orders`', maxRows: 1 },
{ runId: 'scan-run-1' },
),
).resolves.toMatchObject({ rows: [[1, 'paid']], rowCount: 1 });
const client = vi.mocked(clientFactory.createClient).mock.results[0]?.value as KtxBigQueryClient;
expect(client.createQueryJob).toHaveBeenLastCalledWith(
expect.objectContaining({
maximumBytesBilled: '987654321',
jobTimeoutMs: 30_000,
}),
);
});
it('adapts native snapshots to live-database introspection snapshots', async () => {
const introspection = createBigQueryLiveDatabaseIntrospection({
connections: { warehouse: connection },

View file

@ -30,6 +30,8 @@ export interface KtxBigQueryConnectionConfig {
dataset_ids?: string[];
credentials_json?: string;
location?: string;
max_bytes_billed?: number | string;
job_timeout_ms?: number;
[key: string]: unknown;
}
@ -158,6 +160,28 @@ function datasetIds(connection: KtxBigQueryConnectionConfig, env: NodeJS.Process
return datasetId ? [datasetId] : [];
}
function bigQueryMaxBytesBilledFromConnection(
connection: KtxBigQueryConnectionConfig | undefined,
): number | string | undefined {
const value = connection?.max_bytes_billed;
if (typeof value === 'number') {
return Number.isFinite(value) && value > 0 ? value : undefined;
}
if (typeof value === 'string') {
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
return undefined;
}
function bigQueryJobTimeoutMsFromConnection(connection: KtxBigQueryConnectionConfig | undefined): number | undefined {
const value = connection?.job_timeout_ms;
if (typeof value !== 'number') {
return undefined;
}
return Number.isInteger(value) && value > 0 ? value : undefined;
}
function tableKind(metadataType: string | undefined): KtxSchemaTable['kind'] {
const type = String(metadataType ?? '').toUpperCase();
if (type === 'VIEW' || type === 'MATERIALIZED_VIEW') {
@ -258,8 +282,8 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
});
this.clientFactory = options.clientFactory ?? new DefaultBigQueryClientFactory();
this.now = options.now ?? (() => new Date());
this.maxBytesBilled = options.maxBytesBilled;
this.queryTimeoutMs = options.queryTimeoutMs;
this.maxBytesBilled = options.maxBytesBilled ?? bigQueryMaxBytesBilledFromConnection(options.connection);
this.queryTimeoutMs = options.queryTimeoutMs ?? bigQueryJobTimeoutMsFromConnection(options.connection);
this.id = `bigquery:${options.connectionId}`;
}

View file

@ -90,6 +90,13 @@ describe('KtxSqliteScanConnector', () => {
connection: { driver: 'sqlite', path: 'warehouse.db' },
}),
).toBe(dbPath);
expect(() =>
sqliteDatabasePathFromConfig({
connectionId: 'warehouse',
projectDir: tempDir,
connection: { driver: 'sqlite', file_path: 'warehouse.db' },
}),
).toThrow('Native SQLite connector requires connections.warehouse.path or url');
} finally {
if (originalDatabaseUrl === undefined) {
delete process.env.KTX_SQLITE_TEST_URL;

View file

@ -28,7 +28,6 @@ export interface KtxSqliteConnectionConfig {
driver?: string;
path?: string;
url?: string;
file_path?: string;
[key: string]: unknown;
}
@ -146,12 +145,9 @@ export function sqliteDatabasePathFromConfig(input: SqliteDatabasePathInput): st
if (!isKtxSqliteConnectionConfig(input.connection)) {
throw new Error(`Native SQLite connector cannot run driver "${inputDriver}"`);
}
const configuredPath =
stringConfigValue(input.connection, 'path') ??
stringConfigValue(input.connection, 'file_path') ??
sqlitePathFromUrl(stringConfigValue(input.connection, 'url') ?? '');
const configuredPath = stringConfigValue(input.connection, 'path') ?? sqlitePathFromUrl(stringConfigValue(input.connection, 'url') ?? '');
if (!configuredPath) {
throw new Error(`Native SQLite connector requires connections.${input.connectionId}.path, file_path, or url`);
throw new Error(`Native SQLite connector requires connections.${input.connectionId}.path or url`);
}
return isAbsolute(configuredPath) ? configuredPath : resolve(input.projectDir ?? process.cwd(), configuredPath);
}

View file

@ -39,8 +39,8 @@ describe('standalone Notion connection config', () => {
max_pages_per_run: 1000,
max_knowledge_creates_per_run: 25,
max_knowledge_updates_per_run: 20,
last_successful_cursor: null,
});
expect(parsed).not.toHaveProperty('last_successful_cursor');
});
it('parses inline Notion auth tokens without requiring auth_token_ref', () => {
@ -132,7 +132,7 @@ describe('standalone Notion connection config', () => {
maxPagesPerRun: 12,
maxKnowledgeCreatesPerRun: 2,
maxKnowledgeUpdatesPerRun: 7,
lastSuccessfulCursor: '{"phase":"all_accessible_pages","cursor":"cursor-1"}',
lastSuccessfulCursor: null,
});
});

View file

@ -24,7 +24,6 @@ export interface KtxNotionConnectionConfig extends KtxProjectConnectionConfig {
max_pages_per_run: number;
max_knowledge_creates_per_run: number;
max_knowledge_updates_per_run: number;
last_successful_cursor: string | null;
}
export interface RedactedKtxNotionConnectionConfig {
@ -115,7 +114,6 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
}
return {
...input,
driver: 'notion',
auth_token: authToken,
auth_token_ref: authTokenRef,
@ -138,7 +136,6 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
0,
100,
),
last_successful_cursor: optionalString(input.last_successful_cursor),
};
}
@ -206,6 +203,6 @@ export async function notionConnectionToPullConfig(
maxPagesPerRun: config.max_pages_per_run,
maxKnowledgeCreatesPerRun: config.max_knowledge_creates_per_run,
maxKnowledgeUpdatesPerRun: config.max_knowledge_updates_per_run,
lastSuccessfulCursor: config.last_successful_cursor,
lastSuccessfulCursor: null,
});
}

View file

@ -103,7 +103,7 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -136,7 +136,6 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
dialect: 'bigquery',
minExecutions: 5,
windowDays: 90,
concurrency: 12,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],

View file

@ -215,7 +215,7 @@ describe('PostgresPgssReader aggregate path', () => {
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'postgres', minExecutions: 5, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}

View file

@ -102,7 +102,7 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, enabledTables: [], filters: { dropTrivialProbes: true }, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
@ -135,7 +135,6 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
dialect: 'snowflake',
minExecutions: 5,
windowDays: 90,
concurrency: 12,
enabledTables: [],
filters: { dropTrivialProbes: true },
redactionPatterns: [],

View file

@ -148,6 +148,10 @@ function isEnabledTable(table: string, filter: EnabledTableFilter | null): boole
return filter.exact.has(normalized) || filter.uniqueUnqualified.has(unqualifiedTableIdentifier(normalized));
}
function historicSqlWindowDays(config: HistoricSqlUnifiedPullConfig): number {
return 'windowDays' in config ? config.windowDays : 90;
}
function redactTemplateSql(
template: AggregatedTemplate,
redactors: readonly HistoricSqlRedactionPattern[],
@ -279,7 +283,7 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
const enabledTableFilter = buildEnabledTableFilter(config.enabledTables);
const redactors = compileHistoricSqlRedactionPatterns(config.redactionPatterns);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000);
const windowStart = new Date(now.getTime() - historicSqlWindowDays(config) * 24 * 60 * 60 * 1000);
const probe = await input.reader.probe(input.queryClient);
const snapshot: AggregatedTemplate[] = [];
let snapshotRowCount = 0;

View file

@ -12,11 +12,15 @@ describe('historic-sql unified contracts', () => {
expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).toMatchObject({
dialect: 'postgres',
minExecutions: 9,
windowDays: 90,
concurrency: 12,
redactionPatterns: [],
staleArchiveAfterDays: 90,
});
expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).not.toHaveProperty(
'windowDays',
);
expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).not.toHaveProperty(
'concurrency',
);
const parsed = historicSqlUnifiedPullConfigSchema.parse({
dialect: 'postgres',

View file

@ -8,11 +8,8 @@ export type HistoricSqlDialect = z.infer<typeof historicSqlDialectSchema>;
const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']);
export const historicSqlUnifiedPullConfigSchema = z.object({
dialect: historicSqlDialectSchema,
windowDays: z.number().int().positive().default(90),
const historicSqlCommonPullConfigSchema = z.object({
minExecutions: z.number().int().nonnegative().default(5),
concurrency: z.number().int().positive().default(12),
enabledTables: z.array(z.string().min(1)).default([]),
filters: z.object({
serviceAccounts: z.object({
@ -32,6 +29,20 @@ export const historicSqlUnifiedPullConfigSchema = z.object({
staleArchiveAfterDays: z.number().int().positive().default(90),
});
const historicSqlWindowedPullConfigSchema = historicSqlCommonPullConfigSchema.extend({
dialect: z.enum(['snowflake', 'bigquery']),
windowDays: z.number().int().positive().default(90),
});
const historicSqlPostgresPullConfigSchema = historicSqlCommonPullConfigSchema.extend({
dialect: z.literal('postgres'),
});
export const historicSqlUnifiedPullConfigSchema = z.discriminatedUnion('dialect', [
historicSqlWindowedPullConfigSchema,
historicSqlPostgresPullConfigSchema,
]);
export type HistoricSqlUnifiedPullConfig = z.infer<typeof historicSqlUnifiedPullConfigSchema>;
export const aggregatedTemplateSchema = z.object({

View file

@ -0,0 +1,36 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { LocalNotionRuntimeStore } from './local-state-store.js';
describe('LocalNotionRuntimeStore', () => {
let tempDir: string;
let dbPath: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-notion-state-'));
dbPath = join(tempDir, '.ktx', 'db.sqlite');
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it('stores Notion cursors in local state and clears them after complete snapshots', async () => {
const cursor = '{"phase":"all_accessible_pages","cursor":"cursor-1"}';
const store = new LocalNotionRuntimeStore({
dbPath,
now: () => new Date('2026-05-13T10:00:00.000Z'),
});
await expect(store.readCursor('notion-main')).resolves.toBeNull();
await store.setCursor('notion-main', cursor);
const reopened = new LocalNotionRuntimeStore({ dbPath });
await expect(reopened.readCursor('notion-main')).resolves.toBe(cursor);
await reopened.setCursor('notion-main', null);
await expect(reopened.readCursor('notion-main')).resolves.toBeNull();
});
});

View file

@ -0,0 +1,60 @@
import { mkdirSync } from 'node:fs';
import { dirname } from 'node:path';
import Database from 'better-sqlite3';
interface LocalNotionRuntimeStoreOptions {
dbPath: string;
now?: () => Date;
}
export class LocalNotionRuntimeStore {
private readonly db: Database.Database;
private readonly now: () => Date;
constructor(options: LocalNotionRuntimeStoreOptions) {
mkdirSync(dirname(options.dbPath), { recursive: true });
this.db = new Database(options.dbPath);
this.db.pragma('journal_mode = WAL');
this.db.pragma('foreign_keys = ON');
this.now = options.now ?? (() => new Date());
this.db.exec(`
CREATE TABLE IF NOT EXISTS local_notion_runtime_config (
notion_connection_id TEXT PRIMARY KEY,
last_successful_cursor TEXT,
updated_at TEXT NOT NULL
);
`);
}
async readCursor(notionConnectionId: string): Promise<string | null> {
const row = this.db
.prepare(
`
SELECT last_successful_cursor
FROM local_notion_runtime_config
WHERE notion_connection_id = ?
`,
)
.get(notionConnectionId) as { last_successful_cursor: string | null } | undefined;
return row?.last_successful_cursor ?? null;
}
async setCursor(notionConnectionId: string, cursor: string | null): Promise<void> {
this.db
.prepare(
`
INSERT INTO local_notion_runtime_config (
notion_connection_id,
last_successful_cursor,
updated_at
)
VALUES (?, ?, ?)
ON CONFLICT(notion_connection_id) DO UPDATE SET
last_successful_cursor = excluded.last_successful_cursor,
updated_at = excluded.updated_at
`,
)
.run(notionConnectionId, cursor, this.now().toISOString());
}
}

View file

@ -314,6 +314,7 @@ export type {
} from './adapters/metricflow/pull-config.js';
export { NOTION_ORG_KNOWLEDGE_WARNING } from './adapters/notion/chunk.js';
export { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
export { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js';
export { NotionSourceAdapter, type NotionSourceAdapterDeps } from './adapters/notion/notion.adapter.js';
export { NotionClient, type NotionApi, type NotionBotInfo } from './adapters/notion/notion-client.js';
export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js';

View file

@ -1,4 +1,4 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
@ -6,6 +6,7 @@ import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project
import type { SqlAnalysisPort } from '../sql-analysis/index.js';
import type { HistoricSqlReader } from './adapters/historic-sql/types.js';
import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js';
import { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js';
import { createDefaultLocalIngestAdapters, localPullConfigForAdapter } from './local-adapters.js';
describe('local ingest adapters', () => {
@ -192,9 +193,7 @@ describe('local ingest adapters', () => {
await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).resolves.toEqual({
dialect: 'postgres',
windowDays: 90,
minExecutions: 7,
concurrency: 12,
enabledTables: [],
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
@ -223,7 +222,6 @@ describe('local ingest adapters', () => {
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
windowDays: 45,
minExecutions: 7,
filters: { dropTrivialProbes: true },
});
@ -241,7 +239,7 @@ describe('local ingest adapters', () => {
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
windowDays: 30,
minExecutions: 5,
});
});
@ -566,6 +564,79 @@ describe('local ingest adapters', () => {
await expect(notion?.listTargetConnectionIds?.('/tmp/staged-notion')).resolves.toEqual(['warehouse']);
});
it('reads Notion cursors from local state instead of ktx.yaml', async () => {
const cursor = '{"phase":"all_accessible_pages","cursor":"cursor-1"}';
const notionProject = projectWithConnections({
notion: {
driver: 'notion',
auth_token: 'secret',
crawl_mode: 'all_accessible',
last_successful_cursor: '{"phase":"all_accessible_pages","cursor":"stale-yaml"}',
},
} as never);
await new LocalNotionRuntimeStore({ dbPath: join(notionProject.projectDir, '.ktx', 'db.sqlite') }).setCursor(
'notion',
cursor,
);
const notion = createDefaultLocalIngestAdapters(notionProject).find((adapter) => adapter.source === 'notion');
await expect(localPullConfigForAdapter(notionProject, notion!, 'notion')).resolves.toMatchObject({
lastSuccessfulCursor: cursor,
});
});
it('persists Notion next cursors to local state after successful pulls', async () => {
const cursor = '{"phase":"all_accessible_data_sources","cursor":"cursor-2"}';
const notionProject = projectWithConnections({
notion: {
driver: 'notion',
auth_token: 'secret',
crawl_mode: 'all_accessible',
},
} as never);
const stagedDir = await mkdtemp(join(tempDir, 'notion-staged-'));
await writeFile(
join(stagedDir, 'manifest.json'),
JSON.stringify({
source: 'notion',
apiVersion: '2026-03-11',
crawlMode: 'all_accessible',
rootPageIds: [],
rootDatabaseIds: [],
rootDataSourceIds: [],
fetchedAt: '2026-05-13T10:00:00.000Z',
pageCount: 1,
databaseCount: 0,
dataSourceCount: 0,
capped: true,
continuedFromCursor: false,
partialSnapshot: true,
maxPagesPerRun: 1,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
nextSuccessfulCursor: cursor,
skipped: [],
warnings: [],
}),
'utf-8',
);
const notion = createDefaultLocalIngestAdapters(notionProject).find((adapter) => adapter.source === 'notion');
await notion?.onPullSucceeded?.({
connectionId: 'notion',
sourceKey: 'notion',
syncId: 'sync-1',
trigger: 'scheduled_pull',
completedAt: new Date('2026-05-13T10:00:00.000Z'),
stagedDir,
});
await expect(
new LocalNotionRuntimeStore({ dbPath: join(notionProject.projectDir, '.ktx', 'db.sqlite') }).readCursor('notion'),
).resolves.toBe(cursor);
});
it('passes primary warehouse connection ids to local LookML and MetricFlow adapters', async () => {
const adapters = createDefaultLocalIngestAdapters(
projectWithConnections({

View file

@ -1,7 +1,7 @@
import { join } from 'node:path';
import { localConnectionToWarehouseDescriptor, notionConnectionToPullConfig, parseNotionConnectionConfig } from '../connections/index.js';
import { resolveKtxConfigReference } from '../core/config-reference.js';
import type { KtxLocalProject } from '../project/index.js';
import { ktxLocalStateDbPath, type KtxLocalProject } from '../project/index.js';
import type { SqlAnalysisPort } from '../sql-analysis/index.js';
import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js';
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
@ -37,6 +37,7 @@ import type { MetabaseClientLogger } from './adapters/metabase/client.js';
import type { MetabaseFetchLogger } from './adapters/metabase/fetch.js';
import { MetricflowSourceAdapter } from './adapters/metricflow/metricflow.adapter.js';
import { pullConfigFromMetricflowIntegration } from './adapters/metricflow/pull-config.js';
import { LocalNotionRuntimeStore } from './adapters/notion/local-state-store.js';
import { NotionSourceAdapter } from './adapters/notion/notion.adapter.js';
import type { NotionFetchLogger } from './adapters/notion/fetch.js';
import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js';
@ -115,6 +116,9 @@ export function createDefaultLocalIngestAdapters(
}),
new NotionSourceAdapter({
targetConnectionIds: primaryWarehouseConnectionIds(project),
onPullSucceeded: async ({ connectionId, nextSuccessfulCursor }) => {
await localNotionRuntimeStore(project).setCursor(connectionId, nextSuccessfulCursor);
},
...(options.logger ? { logger: options.logger } : {}),
}),
];
@ -152,6 +156,10 @@ function primaryWarehouseConnectionIds(project: KtxLocalProject): string[] {
.sort((left, right) => left.localeCompare(right));
}
function localNotionRuntimeStore(project: KtxLocalProject): LocalNotionRuntimeStore {
return new LocalNotionRuntimeStore({ dbPath: ktxLocalStateDbPath(project) });
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
@ -293,7 +301,11 @@ export async function localPullConfigForAdapter(
return localDbtPullConfigFromConnection(connection, options.looker?.env ?? process.env);
}
if (adapter.source === 'notion') {
return notionConnectionToPullConfig(parseNotionConnectionConfig(connection));
const pullConfig = await notionConnectionToPullConfig(parseNotionConnectionConfig(connection));
return {
...pullConfig,
lastSuccessfulCursor: await localNotionRuntimeStore(project).readCursor(connectionId),
};
}
if (adapter.source === 'metricflow') {
const metricflow = connection.metricflow;