diff --git a/apps/x/packages/core/src/knowledge/granola/sync.ts b/apps/x/packages/core/src/knowledge/granola/sync.ts index fcff60d7..ee873928 100644 --- a/apps/x/packages/core/src/knowledge/granola/sync.ts +++ b/apps/x/packages/core/src/knowledge/granola/sync.ts @@ -5,9 +5,7 @@ import { WorkDir } from '../../config/config.js'; import container from '../../di/container.js'; import { IGranolaConfigRepo } from './repo.js'; import { - GetWorkspacesResponse, - GetDocumentListsResponse, - GetDocumentsBatchResponse, + GetDocumentsResponse, SyncState, Document, } from './types.js'; @@ -19,7 +17,11 @@ const GRANOLA_API_BASE = 'https://api.granola.ai'; const GRANOLA_CONFIG_PATH = path.join(homedir(), 'Library', 'Application Support', 'Granola', 'supabase.json'); const SYNC_DIR = path.join(WorkDir, 'granola_notes'); const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); -const SYNC_INTERVAL_MS = 60 * 1000; // Check every minute +const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes +const API_DELAY_MS = 1000; // 1 second delay between API calls +const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit +const MAX_RETRIES = 3; // Maximum retries for rate-limited requests +const MAX_BATCH_SIZE = 10; // Process max 10 documents per folder per sync // --- Token Extraction --- @@ -63,6 +65,52 @@ function extractAccessToken(): string | null { } } +// --- Helper Functions --- + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +async function callWithRateLimit( + operation: () => Promise, + operationName: string +): Promise { + let retries = 0; + let delay = RATE_LIMIT_RETRY_DELAY_MS; + + while (retries < MAX_RETRIES) { + try { + const result = await operation(); + return result; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + + // Check if it's a rate limit error (429 Too Many Requests) + if (errorMessage.includes('429') || + errorMessage.includes('Too Many Requests') || + errorMessage.includes('too many requests') || + errorMessage.includes('rate limit')) { + + retries++; + console.log(`[Granola] Rate limit hit for ${operationName}. Retry ${retries}/${MAX_RETRIES} in ${delay/1000}s...`); + + if (retries >= MAX_RETRIES) { + console.error(`[Granola] Max retries reached for ${operationName}. Skipping.`); + return null; + } + + await sleep(delay); + delay *= 2; // Exponential backoff + } else { + // Not a rate limit error, throw it + throw error; + } + } + } + + return null; +} + // --- API Client --- function getHeaders(accessToken: string): Record { @@ -78,63 +126,44 @@ async function apiCall( endpoint: string, accessToken: string, body: Record = {} -): Promise { - try { - const response = await fetch(`${GRANOLA_API_BASE}${endpoint}`, { - method: 'POST', - headers: getHeaders(accessToken), - body: JSON.stringify(body), - }); - - if (!response.ok) { - console.error(`[Granola] API error ${response.status}: ${response.statusText}`); - return null; - } - - return await response.json() as T; - } catch (error) { - console.error(`[Granola] API call failed for ${endpoint}:`, error); - return null; - } -} - -async function getWorkspaces(accessToken: string) { - const response = await apiCall('/v1/get-workspaces', accessToken); - if (!response) return null; - - try { - return GetWorkspacesResponse.parse(response); - } catch (error) { - console.error('[Granola] Failed to parse workspaces response:', error); - return null; - } -} - -async function getDocumentLists(accessToken: string) { - const response = await apiCall('/v2/get-document-lists', accessToken); - if (!response) return null; - - try { - return GetDocumentListsResponse.parse(response); - } catch (error) { - console.error('[Granola] Failed to parse document lists response:', error); - return null; - } -} - -async function getDocumentsBatch(accessToken: string, documentIds: string[]) { - if (documentIds.length === 0) return { docs: [] }; - - const response = await apiCall('/v1/get-documents-batch', accessToken, { - document_ids: documentIds, - include_last_viewed_panel: true, +): Promise { + console.log(`[Granola] API call: ${endpoint}`); + const response = await fetch(`${GRANOLA_API_BASE}${endpoint}`, { + method: 'POST', + headers: getHeaders(accessToken), + body: JSON.stringify(body), }); + + if (!response.ok) { + const errorText = await response.text().catch(() => 'no body'); + console.error(`[Granola] API error ${response.status}: ${response.statusText} - ${errorText.slice(0, 200)}`); + // Throw error with status code so rate limit handler can detect 429 + throw new Error(`${response.status}: ${response.statusText}`); + } + + const data = await response.json() as T; + console.log(`[Granola] API success: ${endpoint}`); + return data; +} + +async function getDocuments(accessToken: string, limit: number, offset: number) { + const response = await callWithRateLimit( + () => apiCall('/v2/get-documents', accessToken, { + limit, + offset, + include_last_viewed_panel: true, + }), + 'get-documents' + ); if (!response) return null; - + try { - return GetDocumentsBatchResponse.parse(response); + const parsed = GetDocumentsResponse.parse(response); + console.log(`[Granola] Fetched ${parsed.docs.length} documents (offset: ${offset})`); + return parsed; } catch (error) { - console.error('[Granola] Failed to parse documents batch response:', error); + console.error('[Granola] Failed to parse documents response:', error); + console.error('[Granola] Raw response:', JSON.stringify(response, null, 2).slice(0, 1000)); return null; } } @@ -169,25 +198,102 @@ function ensureDir(dirPath: string): void { } } +interface ProseMirrorNode { + type: string; + attrs?: Record; + content?: ProseMirrorNode[]; + text?: string; +} + +function convertProseMirrorToMarkdown(content: ProseMirrorNode | undefined): string { + if (!content || typeof content !== 'object' || !content.content) { + return ''; + } + + function processNode(node: ProseMirrorNode): string { + if (!node || typeof node !== 'object') { + return ''; + } + + const nodeType = node.type || ''; + const children = node.content || []; + const text = node.text || ''; + + if (nodeType === 'heading') { + const level = (node.attrs?.level as number) || 1; + const headingText = children.map(processNode).join(''); + return `${'#'.repeat(level)} ${headingText}\n\n`; + } + + if (nodeType === 'paragraph') { + const paraText = children.map(processNode).join(''); + return `${paraText}\n\n`; + } + + if (nodeType === 'bulletList') { + const items: string[] = []; + for (const item of children) { + if (item.type === 'listItem') { + const itemContent = (item.content || []).map(processNode).join('').trim(); + items.push(`- ${itemContent}`); + } + } + return items.join('\n') + '\n\n'; + } + + if (nodeType === 'orderedList') { + const items: string[] = []; + let num = 1; + for (const item of children) { + if (item.type === 'listItem') { + const itemContent = (item.content || []).map(processNode).join('').trim(); + items.push(`${num}. ${itemContent}`); + num++; + } + } + return items.join('\n') + '\n\n'; + } + + if (nodeType === 'text') { + return text; + } + + if (nodeType === 'hardBreak') { + return '\n'; + } + + // For other node types, recursively process children + return children.map(processNode).join(''); + } + + return processNode(content); +} + function documentToMarkdown(doc: Document): string { const title = doc.title || 'Untitled'; const createdAt = doc.created_at; const updatedAt = doc.updated_at || doc.created_at; - + let md = `---\n`; md += `granola_id: ${doc.id}\n`; md += `title: "${title.replace(/"/g, '\\"')}"\n`; md += `created_at: ${createdAt}\n`; md += `updated_at: ${updatedAt}\n`; md += `---\n\n`; - - // Use notes_markdown if available, otherwise notes_plain - if (doc.notes_markdown) { + + // Try last_viewed_panel content first (ProseMirror format) + const lastViewedContent = doc.last_viewed_panel?.content; + if (lastViewedContent && typeof lastViewedContent === 'object' && lastViewedContent.type === 'doc') { + md += convertProseMirrorToMarkdown(lastViewedContent as ProseMirrorNode); + } else if (doc.notes && typeof doc.notes === 'object' && doc.notes.type === 'doc') { + // Fall back to notes field (also ProseMirror format) + md += convertProseMirrorToMarkdown(doc.notes as ProseMirrorNode); + } else if (doc.notes_markdown) { md += doc.notes_markdown; } else if (doc.notes_plain) { md += doc.notes_plain; } - + return md; } @@ -217,81 +323,50 @@ async function syncNotes(): Promise { // Load state const state = loadState(); - // Get workspaces - const workspacesResponse = await getWorkspaces(accessToken); - if (!workspacesResponse) { - console.log('[Granola] Failed to fetch workspaces'); - return; - } - - console.log(`[Granola] Found ${workspacesResponse.workspaces.length} workspaces`); - - // Build workspace lookup - const workspaceMap = new Map(); - for (const ws of workspacesResponse.workspaces) { - workspaceMap.set(ws.workspace.workspace_id, { - slug: ws.workspace.slug, - displayName: ws.workspace.display_name, - }); - } - - // Get document lists (folders) - const listsResponse = await getDocumentLists(accessToken); - if (!listsResponse) { - console.log('[Granola] Failed to fetch document lists'); - return; - } - - console.log(`[Granola] Found ${listsResponse.lists.length} folders`); - let newCount = 0; let updatedCount = 0; - - // Process each folder - for (const list of listsResponse.lists) { - const folderName = cleanFilename(list.title); - const folderPath = path.join(SYNC_DIR, folderName); - - // Get document IDs from the list - const docIds = list.documents.map(d => d.id); - - if (docIds.length === 0) { - console.log(`[Granola] Folder "${list.title}" is empty, skipping`); - continue; + let offset = 0; + let hasMore = true; + + // Fetch documents with pagination + while (hasMore) { + // Delay before API call (except first) + if (offset > 0) { + await sleep(API_DELAY_MS); } - - console.log(`[Granola] Processing folder "${list.title}" with ${docIds.length} documents`); - - // Fetch full documents - const docsResponse = await getDocumentsBatch(accessToken, docIds); + + const docsResponse = await getDocuments(accessToken, MAX_BATCH_SIZE, offset); if (!docsResponse) { - console.log(`[Granola] Failed to fetch documents for folder "${list.title}"`); - continue; + console.log('[Granola] Failed to fetch documents'); + break; } - + + if (docsResponse.docs.length === 0) { + console.log('[Granola] No more documents to fetch'); + hasMore = false; + break; + } + // Process each document for (const doc of docsResponse.docs) { const docUpdatedAt = doc.updated_at || doc.created_at; const lastSyncedAt = state.syncedDocs[doc.id]; - + // Check if needs sync (new or updated) const needsSync = !lastSyncedAt || lastSyncedAt !== docUpdatedAt; - + if (!needsSync) { continue; } - - // Ensure folder exists - ensureDir(folderPath); - + // Convert to markdown and save const markdown = documentToMarkdown(doc); const docTitle = doc.title || 'Untitled'; const filename = `${doc.id}_${cleanFilename(docTitle)}.md`; - const filePath = path.join(folderPath, filename); - + const filePath = path.join(SYNC_DIR, filename); + fs.writeFileSync(filePath, markdown); - + if (lastSyncedAt) { console.log(`[Granola] Updated: ${filename}`); updatedCount++; @@ -299,12 +374,20 @@ async function syncNotes(): Promise { console.log(`[Granola] Saved: ${filename}`); newCount++; } - + // Update state state.syncedDocs[doc.id] = docUpdatedAt; } + + // Move to next page + offset += docsResponse.docs.length; + + // Stop if we got fewer docs than requested (last page) + if (docsResponse.docs.length < MAX_BATCH_SIZE) { + hasMore = false; + } } - + // Save state state.lastSyncDate = new Date().toISOString(); saveState(state); @@ -321,18 +404,18 @@ async function syncNotes(): Promise { export async function init(): Promise { console.log('[Granola] Starting Granola Sync...'); - console.log(`[Granola] Will check every ${SYNC_INTERVAL_MS / 1000} seconds.`); + console.log(`[Granola] Will check every ${SYNC_INTERVAL_MS / 60000} minutes.`); console.log(`[Granola] Notes will be saved to: ${SYNC_DIR}`); - + while (true) { try { await syncNotes(); } catch (error) { console.error('[Granola] Error in sync loop:', error); } - + // Sleep before next check - console.log(`[Granola] Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + console.log(`[Granola] Sleeping for ${SYNC_INTERVAL_MS / 60000} minutes...`); await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS)); } } diff --git a/apps/x/packages/core/src/knowledge/granola/types.ts b/apps/x/packages/core/src/knowledge/granola/types.ts index aeec884b..f322cb02 100644 --- a/apps/x/packages/core/src/knowledge/granola/types.ts +++ b/apps/x/packages/core/src/knowledge/granola/types.ts @@ -9,32 +9,36 @@ export type GranolaConfig = z.infer; // --- API Schemas --- +// ProseMirror node (recursive structure) +export const ProseMirrorNode: z.ZodType<{ + type: string; + attrs?: Record; + content?: unknown[]; + text?: string; +}> = z.object({ + type: z.string(), + attrs: z.record(z.string(), z.unknown()).optional(), + content: z.array(z.lazy(() => ProseMirrorNode)).optional(), + text: z.string().optional(), +}).passthrough(); + export const Document = z.object({ id: z.string(), created_at: z.string(), - updated_at: z.string().nullable(), - deleted_at: z.string().nullable(), - notes: z.object({ - type: z.string(), - content: z.array(z.object({ - type: z.string(), - attrs: z.object({ - id: z.string(), - }).optional(), - content: z.array(z.object({ - type: z.string(), - text: z.string().optional(), - })).optional(), - })), - }).optional(), - title: z.string().nullable(), - type: z.string(), - user_id: z.string(), - notes_plain: z.string().optional(), - notes_markdown: z.string().optional(), - workspace_id: z.string().nullable(), - public: z.boolean(), -}); + updated_at: z.string().nullable().optional(), + deleted_at: z.string().nullable().optional(), + title: z.string().nullable().optional(), + type: z.string().nullable().optional(), + user_id: z.string().optional(), + workspace_id: z.string().nullable().optional(), + public: z.boolean().optional(), + notes: ProseMirrorNode.optional().nullable(), + notes_plain: z.string().nullable().optional(), + notes_markdown: z.string().nullable().optional(), + last_viewed_panel: z.object({ + content: z.union([ProseMirrorNode, z.string()]).optional().nullable(), + }).passthrough().optional().nullable(), +}).passthrough(); // Allow additional fields export type Document = z.infer; export const GetWorkspacesResponse = z.object({ @@ -76,12 +80,17 @@ export const GetDocumentTranscriptResponse = z.array(z.object({ })); export type GetDocumentTranscriptResponse = z.infer; +// Document reference in a list (may be partial, we only need id) +export const DocumentRef = z.object({ + id: z.string(), +}).passthrough(); // Allow additional fields + export const DocumentListItem = z.object({ id: z.string(), title: z.string(), created_at: z.string(), updated_at: z.string(), - documents: z.array(Document), + documents: z.array(DocumentRef), }); export type DocumentListItem = z.infer;