Improve Notion ingest UX

This commit is contained in:
Andrey Avtomonov 2026-05-12 16:21:05 +02:00
parent d0f650f44a
commit 9a9e40939a
20 changed files with 615 additions and 279 deletions

View file

@ -21,6 +21,7 @@ export {
notionConnectionToPullConfig,
parseNotionConnectionConfig,
redactNotionConnectionConfig,
resolveNotionConnectionAuthToken,
resolveNotionAuthToken,
type KtxNotionConnectionConfig,
type RedactedKtxNotionConnectionConfig,

View file

@ -30,6 +30,7 @@ describe('standalone Notion connection config', () => {
expect(parsed).toEqual({
driver: 'notion',
auth_token: null,
auth_token_ref: 'env:NOTION_TOKEN',
crawl_mode: 'selected_roots',
root_page_ids: ['page-1'],
@ -42,6 +43,23 @@ describe('standalone Notion connection config', () => {
});
});
it('parses inline Notion auth tokens without requiring auth_token_ref', () => {
const parsed = parseNotionConnectionConfig({
driver: 'notion',
auth_token: ' ntn_inline_token ',
crawl_mode: 'selected_roots',
root_page_ids: ['page-1'],
});
expect(parsed).toMatchObject({
driver: 'notion',
auth_token: 'ntn_inline_token',
auth_token_ref: null,
crawl_mode: 'selected_roots',
root_page_ids: ['page-1'],
});
});
it('redacts token references from display output', () => {
expect(
redactNotionConnectionConfig(
@ -117,4 +135,23 @@ describe('standalone Notion connection config', () => {
lastSuccessfulCursor: '{"phase":"all_accessible_pages","cursor":"cursor-1"}',
});
});
it('uses inline Notion auth_token when building adapter pull config', async () => {
const pullConfig = await notionConnectionToPullConfig(
parseNotionConnectionConfig({
driver: 'notion',
auth_token: 'ntn_inline_token',
auth_token_ref: 'env:STALE_NOTION_TOKEN',
crawl_mode: 'all_accessible',
}),
{
env: {},
readTextFile: async () => {
throw new Error('readTextFile should not be called for inline auth_token');
},
},
);
expect(pullConfig.authToken).toBe('ntn_inline_token');
});
});

View file

@ -15,7 +15,8 @@ type KtxNotionCrawlMode = 'all_accessible' | 'selected_roots';
export interface KtxNotionConnectionConfig extends KtxProjectConnectionConfig {
driver: 'notion';
auth_token_ref: string;
auth_token: string | null;
auth_token_ref: string | null;
crawl_mode: KtxNotionCrawlMode;
root_page_ids: string[];
root_database_ids: string[];
@ -93,11 +94,12 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
if (input.driver !== 'notion') {
throw new Error('Notion connection config requires driver: notion');
}
const authTokenRef = stringValue(input.auth_token_ref, '');
if (!authTokenRef) {
throw new Error('Notion connection config requires auth_token_ref');
const authToken = optionalString(input.auth_token);
const authTokenRef = optionalString(input.auth_token_ref);
if (!authToken && !authTokenRef) {
throw new Error('Notion connection config requires auth_token or auth_token_ref');
}
if (!authTokenRef.startsWith('env:') && !authTokenRef.startsWith('file:')) {
if (authTokenRef && !authTokenRef.startsWith('env:') && !authTokenRef.startsWith('file:')) {
throw new Error('Notion auth_token_ref must use env:NAME or file:/path');
}
@ -115,6 +117,7 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
return {
...input,
driver: 'notion',
auth_token: authToken,
auth_token_ref: authTokenRef,
crawl_mode: crawlMode,
root_page_ids: rootPageIds,
@ -142,7 +145,7 @@ export function parseNotionConnectionConfig(raw: unknown): KtxNotionConnectionCo
export function redactNotionConnectionConfig(config: KtxNotionConnectionConfig): RedactedKtxNotionConnectionConfig {
return {
driver: 'notion',
hasAuthToken: Boolean(config.auth_token_ref),
hasAuthToken: Boolean(config.auth_token ?? config.auth_token_ref),
crawlMode: config.crawl_mode,
rootPageIds: config.root_page_ids,
rootDatabaseIds: config.root_database_ids,
@ -182,12 +185,20 @@ export async function resolveNotionAuthToken(
throw new Error('Notion auth_token_ref must use env:NAME or file:/path');
}
export async function resolveNotionConnectionAuthToken(
config: Pick<KtxNotionConnectionConfig, 'auth_token' | 'auth_token_ref'>,
options: ResolveNotionTokenOptions = {},
): Promise<string> {
return config.auth_token ?? (await resolveNotionAuthToken(config.auth_token_ref ?? '', options));
}
export async function notionConnectionToPullConfig(
config: KtxNotionConnectionConfig,
options: ResolveNotionTokenOptions = {},
): Promise<NotionPullConfig> {
const authToken = await resolveNotionConnectionAuthToken(config, options);
return notionPullConfigSchema.parse({
authToken: await resolveNotionAuthToken(config.auth_token_ref, options),
authToken,
crawlMode: config.crawl_mode,
rootPageIds: config.root_page_ids,
rootDatabaseIds: config.root_database_ids,

View file

@ -253,6 +253,43 @@ describe('NotionSourceAdapter', () => {
expect(result.contextReport).toEqual({ capped: false, warnings: [NOTION_ORG_KNOWLEDGE_WARNING] });
});
it('chunks retried pages when failed provenance makes unchanged raw files look added again', async () => {
await writeFile(
join(stagedDir, 'manifest.json'),
JSON.stringify({
source: 'notion',
apiVersion: '2026-03-11',
crawlMode: 'selected_roots',
rootPageIds: ['page-1'],
rootDatabaseIds: [],
rootDataSourceIds: [],
fetchedAt: '2026-04-28T00:00:00.000Z',
pageCount: 1,
databaseCount: 0,
dataSourceCount: 0,
capped: false,
continuedFromCursor: false,
partialSnapshot: false,
maxPagesPerRun: 100,
maxKnowledgeCreatesPerRun: 25,
maxKnowledgeUpdatesPerRun: 20,
skipped: [],
warnings: [],
}),
'utf-8',
);
await writePage('page-1', 'Retry Me');
const result = await adapter.chunk(stagedDir, {
added: ['pages/page-1/metadata.json', 'pages/page-1/page.md'],
modified: [],
deleted: [],
unchanged: ['manifest.json', 'pages/page-1/blocks.json'],
});
expect(result.workUnits.map((workUnit) => workUnit.unitKey)).toEqual(['notion-page-page-1']);
});
it('reports malformed manifests with a Notion-specific error', async () => {
await writeFile(join(stagedDir, 'manifest.json'), '{bad json', 'utf-8');

View file

@ -0,0 +1,28 @@
interface MemoryFlowErrorContext {
adapter: string;
}
export function isNotionAuthorizationExpired(
context: MemoryFlowErrorContext,
reason: string | undefined,
): boolean {
if (context.adapter !== 'notion') {
return false;
}
const normalized = (reason ?? '').toLowerCase();
return (
normalized.includes('invalid_grant') &&
(normalized.includes('invalid_rapt') || normalized.includes('reauth'))
);
}
export function formatNotionAuthorizationExpiredDetail(unitKey: string): string {
return `${unitKey} could not read Notion because the saved OAuth grant expired or requires reauthentication (invalid_grant / invalid_rapt).`;
}
export function notionAuthorizationFixSuggestions(connectionId: string): string[] {
return [
`Refresh the Notion token referenced by auth_token_ref for ${connectionId}. If it uses env:NAME, export a fresh token in that variable; if it uses file:/path, replace that file.`,
`Run ktx connection notion pick ${connectionId} to confirm Notion access, then rerun ktx ingest ${connectionId}.`,
];
}

View file

@ -60,6 +60,36 @@ describe('formatMemoryFlowFinalSummary', () => {
).toContain('Trust issues: 3');
});
it('explains expired Notion authorization with fix suggestions', () => {
const rawReason =
'notion-cluster-1 failed: {"error":"invalid_grant","error_description":"reauth related error (invalid_rapt)","error_uri":"https://accounts.example/reauth"}';
const summary = formatMemoryFlowFinalSummary(
input({
connectionId: 'notion-main',
adapter: 'notion',
status: 'error',
events: [
{ type: 'source_acquired', adapter: 'notion', trigger: 'manual_resync', fileCount: 37 },
{ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 },
{ type: 'work_unit_finished', unitKey: 'notion-cluster-1', status: 'failed', reason: rawReason },
],
}),
);
expect(summary).toContain('Memory-flow summary: error');
expect(summary).toContain(
'Notion authorization expired: notion-cluster-1 could not read Notion because the saved OAuth grant expired or requires reauthentication (invalid_grant / invalid_rapt).',
);
expect(summary).toContain('Fix suggestions:');
expect(summary).toContain(
'- Refresh the Notion token referenced by auth_token_ref for notion-main. If it uses env:NAME, export a fresh token in that variable; if it uses file:/path, replace that file.',
);
expect(summary).toContain(
'- Run ktx connection notion pick notion-main to confirm Notion access, then rerun ktx ingest notion-main.',
);
expect(summary).not.toContain('error_uri');
});
it('labels replay source metadata in final summaries', () => {
const summary = formatMemoryFlowFinalSummary({
metadata: {

View file

@ -1,6 +1,7 @@
import { sanitizeMemoryFlowError } from './live-buffer.js';
import type { MemoryFlowEvent, MemoryFlowReplayInput } from './types.js';
import { buildMemoryFlowViewModel } from './view-model.js';
import { isNotionAuthorizationExpired, notionAuthorizationFixSuggestions } from './known-errors.js';
function latest<T extends MemoryFlowEvent['type']>(
events: MemoryFlowEvent[],
@ -42,6 +43,14 @@ function humanizeSummaryText(value: string): string {
.replace(/\bSL\b/g, 'semantic layer');
}
function fixSuggestions(input: MemoryFlowReplayInput): string[] {
const workUnitReasons = eventsOf(input.events, 'work_unit_finished').map((event) => event.reason);
const hasNotionAuthFailure = [...workUnitReasons, ...input.errors].some((reason) =>
isNotionAuthorizationExpired(input, reason),
);
return hasNotionAuthFailure ? notionAuthorizationFixSuggestions(input.connectionId) : [];
}
export function formatMemoryFlowFinalSummary(input: MemoryFlowReplayInput): string {
const sources = eventsOf(input.events, 'source_acquired');
const source = sources.at(-1);
@ -84,6 +93,14 @@ export function formatMemoryFlowFinalSummary(input: MemoryFlowReplayInput): stri
}
}
const suggestions = fixSuggestions(input);
if (suggestions.length > 0) {
lines.push('Fix suggestions:');
for (const suggestion of suggestions) {
lines.push(`- ${suggestion}`);
}
}
for (const error of input.errors.slice(0, 3)) {
lines.push(`Error: ${sanitizeMemoryFlowError(error)}`);
}

View file

@ -9,6 +9,7 @@ import type {
MemoryFlowViewModel,
} from './types.js';
import { sanitizeMemoryFlowError } from './live-buffer.js';
import { formatNotionAuthorizationExpiredDetail, isNotionAuthorizationExpired } from './known-errors.js';
function latest<T extends MemoryFlowEvent['type']>(
events: MemoryFlowEvent[],
@ -109,7 +110,7 @@ function errorDetails(input: MemoryFlowReplayInput): string[] {
}
function isValidationFailure(reason: string | undefined): boolean {
return /semantic-layer|validation|invalid/i.test(reason ?? '');
return /semantic-layer|validation/i.test(reason ?? '');
}
function failedWorkUnitDetails(failed: Array<Extract<MemoryFlowEvent, { type: 'work_unit_finished' }>>): string[] {
@ -180,11 +181,14 @@ function buildMemoryFlowTrustIssues(input: MemoryFlowReplayInput): MemoryFlowTru
for (const event of failed) {
const reason = sanitizeMemoryFlowError(event.reason ?? 'failed');
const knownNotionAuthFailure = isNotionAuthorizationExpired(input, event.reason);
issues.push({
id: `work-unit-failed:${event.unitKey}`,
severity: 'failed',
title: 'WorkUnit failed',
detail: `${event.unitKey} failed: ${reason}`,
title: knownNotionAuthFailure ? 'Notion authorization expired' : 'WorkUnit failed',
detail: knownNotionAuthFailure
? formatNotionAuthorizationExpiredDetail(event.unitKey)
: `${event.unitKey} failed: ${reason}`,
columnId: 'workUnits',
targetLabel: event.unitKey,
});

View file

@ -66,6 +66,27 @@ function reportBody(syncId: string, supersededBy: string | null = null): IngestR
};
}
function emptyReportBody(syncId: string, overrides: Partial<IngestReportBody> = {}): IngestReportBody {
return {
syncId,
diffSummary: diffSummary({ added: 0, modified: 0, deleted: 0, unchanged: 1 }),
commitSha: null,
workUnits: [],
failedWorkUnits: [],
reconciliationSkipped: true,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
...overrides,
};
}
describe('SqliteBundleIngestStore', () => {
let tempDir: string;
let dbPath: string;
@ -226,6 +247,204 @@ describe('SqliteBundleIngestStore', () => {
);
});
it('does not baseline skipped provenance from failed work units or zero-work retry runs', async () => {
const store = new SqliteBundleIngestStore({ dbPath });
const rawHashes = new Map([
['pages/page-1/metadata.json', 'hash-metadata'],
['pages/page-1/page.md', 'hash-page'],
]);
const failedRun = await store.create(runArgs({ jobId: 'job-failed-review', syncId: 'sync-failed-review' }));
await store.insertMany(
[...rawHashes].map(([rawPath, rawContentHash]) => ({
connectionId: 'docs',
sourceKey: 'notion',
syncId: 'sync-failed-review',
rawPath,
rawContentHash,
artifactKind: null,
artifactKey: null,
artifactContentHash: null,
actionType: 'skipped' as const,
})),
);
await store.markCompleted(failedRun.id, diffSummary({ added: 2 }));
await store.create({
runId: failedRun.id,
jobId: 'job-failed-review',
connectionId: 'docs',
sourceKey: 'notion',
body: emptyReportBody('sync-failed-review', {
workUnits: [
{
unitKey: 'notion-page-page-1',
rawFiles: [...rawHashes.keys()],
status: 'failed',
reason: 'invalid_grant',
actions: [],
touchedSlSources: [],
},
],
failedWorkUnits: ['notion-page-page-1'],
}),
});
const noWorkRun = await store.create(runArgs({ jobId: 'job-no-work', syncId: 'sync-no-work' }));
await store.insertMany(
[...rawHashes].map(([rawPath, rawContentHash]) => ({
connectionId: 'docs',
sourceKey: 'notion',
syncId: 'sync-no-work',
rawPath,
rawContentHash,
artifactKind: null,
artifactKey: null,
artifactContentHash: null,
actionType: 'skipped' as const,
})),
);
await store.markCompleted(noWorkRun.id, diffSummary({ unchanged: 2 }));
await store.create({
runId: noWorkRun.id,
jobId: 'job-no-work',
connectionId: 'docs',
sourceKey: 'notion',
body: emptyReportBody('sync-no-work', { workUnits: [], failedWorkUnits: [] }),
});
await expect(store.findLatestHashesForCompletedSyncs('docs', 'notion')).resolves.toEqual(new Map());
await expect(new DiffSetService(store).compute('docs', 'notion', rawHashes)).resolves.toEqual({
added: ['pages/page-1/metadata.json', 'pages/page-1/page.md'],
modified: [],
deleted: [],
unchanged: [],
});
});
it('baselines skipped provenance from successful no-output work unit runs', async () => {
const store = new SqliteBundleIngestStore({ dbPath });
const run = await store.create(runArgs({ jobId: 'job-reviewed-no-output', syncId: 'sync-reviewed-no-output' }));
await store.insertMany([
{
connectionId: 'docs',
sourceKey: 'notion',
syncId: 'sync-reviewed-no-output',
rawPath: 'pages/page-1/page.md',
rawContentHash: 'hash-reviewed',
artifactKind: null,
artifactKey: null,
artifactContentHash: null,
actionType: 'skipped',
},
]);
await store.markCompleted(run.id, diffSummary({ added: 1 }));
await store.create({
runId: run.id,
jobId: 'job-reviewed-no-output',
connectionId: 'docs',
sourceKey: 'notion',
body: emptyReportBody('sync-reviewed-no-output', {
workUnits: [
{
unitKey: 'notion-page-page-1',
rawFiles: ['pages/page-1/page.md'],
status: 'success',
actions: [],
touchedSlSources: [],
},
],
failedWorkUnits: [],
}),
});
await expect(store.findLatestHashesForCompletedSyncs('docs', 'notion')).resolves.toEqual(
new Map([['pages/page-1/page.md', 'hash-reviewed']]),
);
await expect(
new DiffSetService(store).compute('docs', 'notion', new Map([['pages/page-1/page.md', 'hash-reviewed']])),
).resolves.toMatchObject({
added: [],
unchanged: ['pages/page-1/page.md'],
});
});
it('baselines artifact provenance in partial failures but not skipped-only failed paths', async () => {
const store = new SqliteBundleIngestStore({ dbPath });
const run = await store.create(runArgs({ jobId: 'job-partial', syncId: 'sync-partial' }));
await store.insertMany([
{
connectionId: 'docs',
sourceKey: 'notion',
syncId: 'sync-partial',
rawPath: 'pages/success/page.md',
rawContentHash: 'hash-success',
artifactKind: 'wiki',
artifactKey: 'knowledge/notion/success.md',
artifactContentHash: 'artifact-success',
actionType: 'wiki_written',
},
{
connectionId: 'docs',
sourceKey: 'notion',
syncId: 'sync-partial',
rawPath: 'pages/failed/page.md',
rawContentHash: 'hash-failed',
artifactKind: null,
artifactKey: null,
artifactContentHash: null,
actionType: 'skipped',
},
]);
await store.markCompleted(run.id, diffSummary({ added: 2 }));
await store.create({
runId: run.id,
jobId: 'job-partial',
connectionId: 'docs',
sourceKey: 'notion',
body: emptyReportBody('sync-partial', {
workUnits: [
{
unitKey: 'notion-page-success',
rawFiles: ['pages/success/page.md'],
status: 'success',
actions: [],
touchedSlSources: [],
},
{
unitKey: 'notion-page-failed',
rawFiles: ['pages/failed/page.md'],
status: 'failed',
reason: 'invalid_grant',
actions: [],
touchedSlSources: [],
},
],
failedWorkUnits: ['notion-page-failed'],
}),
});
await expect(store.findLatestHashesForCompletedSyncs('docs', 'notion')).resolves.toEqual(
new Map([['pages/success/page.md', 'hash-success']]),
);
await expect(
new DiffSetService(store).compute(
'docs',
'notion',
new Map([
['pages/success/page.md', 'hash-success'],
['pages/failed/page.md', 'hash-failed'],
]),
),
).resolves.toEqual({
added: ['pages/failed/page.md'],
modified: [],
deleted: [],
unchanged: ['pages/success/page.md'],
});
});
it('returns the latest stored report across bundle ingest runs', async () => {
const store = new SqliteBundleIngestStore({
dbPath,

View file

@ -46,6 +46,13 @@ interface ProvenanceRow {
action_type: string;
}
interface ProvenanceHashCandidateRow {
raw_path: string;
raw_content_hash: string;
action_type: string;
report_body_json: string | null;
}
function parseArtifactKind(kind: string | null): IngestProvenanceRow['artifact_kind'] {
if (kind === null || kind === 'sl' || kind === 'wiki') {
return kind;
@ -93,6 +100,31 @@ function toPortProvenanceRow(row: ProvenanceRow): IngestProvenanceRow {
};
}
function recordValue(value: unknown, key: string): unknown {
return typeof value === 'object' && value !== null && !Array.isArray(value)
? (value as Record<string, unknown>)[key]
: undefined;
}
function isSuccessfulNoOutputSkippedBaseline(reportBodyJson: string | null): boolean {
if (reportBodyJson === null) {
return true;
}
const body = JSON.parse(reportBodyJson) as unknown;
const workUnits = recordValue(body, 'workUnits');
const failedWorkUnits = recordValue(body, 'failedWorkUnits');
return (
Array.isArray(workUnits) &&
workUnits.length > 0 &&
Array.isArray(failedWorkUnits) &&
failedWorkUnits.length === 0
);
}
function isProcessedHashBaseline(row: ProvenanceHashCandidateRow): boolean {
return row.action_type !== 'skipped' || isSuccessfulNoOutputSkippedBaseline(row.report_body_json);
}
function placeholders(values: readonly unknown[]): string {
return values.map(() => '?').join(', ');
}
@ -275,23 +307,34 @@ export class SqliteBundleIngestStore
const rows = this.db
.prepare(
`
SELECT p.raw_path, p.raw_content_hash
SELECT
p.raw_path,
p.raw_content_hash,
p.action_type,
br.body_json AS report_body_json
FROM bundle_ingest_provenance p
INNER JOIN bundle_ingest_runs r
ON r.connection_id = p.connection_id
AND r.source_key = p.source_key
AND r.sync_id = p.sync_id
LEFT JOIN bundle_ingest_reports br
ON br.run_id = r.id
WHERE p.connection_id = ?
AND p.source_key = ?
AND r.status = 'completed'
ORDER BY r.completed_at DESC, r.rowid DESC, p.created_at DESC, p.rowid DESC
`,
)
.all(connectionId, sourceKey) as Array<{ raw_path: string; raw_content_hash: string }>;
.all(connectionId, sourceKey) as ProvenanceHashCandidateRow[];
const latest = new Map<string, string>();
const seen = new Set<string>();
for (const row of rows) {
if (!latest.has(row.raw_path)) {
if (seen.has(row.raw_path)) {
continue;
}
seen.add(row.raw_path);
if (isProcessedHashBaseline(row)) {
latest.set(row.raw_path, row.raw_content_hash);
}
}