diff --git a/apps/cli/src/knowledge/sync_calendar.ts b/apps/cli/src/knowledge/sync_calendar.ts new file mode 100644 index 00000000..98270ea1 --- /dev/null +++ b/apps/cli/src/knowledge/sync_calendar.ts @@ -0,0 +1,196 @@ +import fs from 'fs'; +import path from 'path'; +import { google } from 'googleapis'; +import { authenticate } from '@google-cloud/local-auth'; +import { OAuth2Client } from 'google-auth-library'; + +// Configuration +const CREDENTIALS_PATH = path.join(process.cwd(), 'credentials.json'); +const TOKEN_PATH = path.join(process.cwd(), 'token_calendar.json'); +const SYNC_INTERVAL_MS = 60 * 1000; +const SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']; + +// --- Auth Functions --- + +async function loadSavedCredentialsIfExist(): Promise { + try { + const tokenContent = fs.readFileSync(TOKEN_PATH, 'utf-8'); + const tokenData = JSON.parse(tokenContent); + + const credsContent = fs.readFileSync(CREDENTIALS_PATH, 'utf-8'); + const keys = JSON.parse(credsContent); + const key = keys.installed || keys.web; + + const credentials = { + type: 'authorized_user', + client_id: key.client_id, + client_secret: key.client_secret, + refresh_token: tokenData.refresh_token || tokenData.refreshToken, + access_token: tokenData.token || tokenData.access_token, + expiry_date: tokenData.expiry || tokenData.expiry_date + }; + return google.auth.fromJSON(credentials) as OAuth2Client; + } catch (err) { + // console.error("Error loading saved credentials:", err); // Optional: silence if just not found + return null; + } +} + +async function saveCredentials(client: OAuth2Client) { + const content = fs.readFileSync(CREDENTIALS_PATH, 'utf-8'); + const keys = JSON.parse(content); + const key = keys.installed || keys.web; + const payload = JSON.stringify({ + type: 'authorized_user', + client_id: key.client_id, + client_secret: key.client_secret, + refresh_token: client.credentials.refresh_token, + access_token: client.credentials.access_token, + expiry_date: client.credentials.expiry_date, + }, null, 2); + fs.writeFileSync(TOKEN_PATH, payload); +} + +async function authorize(): Promise { + let client = await loadSavedCredentialsIfExist(); + if (client && client.credentials && client.credentials.expiry_date && client.credentials.expiry_date > Date.now()) { + console.log("Using existing valid token."); + return client; + } + + if (client && client.credentials && (!client.credentials.expiry_date || client.credentials.expiry_date <= Date.now()) && client.credentials.refresh_token) { + console.log("Refreshing expired token..."); + try { + await client.refreshAccessToken(); + await saveCredentials(client); + return client; + } catch (e) { + console.error("Failed to refresh token:", e); + if (fs.existsSync(TOKEN_PATH)) fs.unlinkSync(TOKEN_PATH); + } + } + + console.log("Performing new OAuth authentication..."); + client = await authenticate({ + scopes: SCOPES, + keyfilePath: CREDENTIALS_PATH, + }) as any; + if (client && client.credentials) { + await saveCredentials(client); + } + return client!; +} + +// --- Sync Logic --- + +function cleanUpOldFiles(currentEventIds: Set, syncDir: string) { + if (!fs.existsSync(syncDir)) return; + + const files = fs.readdirSync(syncDir); + for (const filename of files) { + if (!filename.endsWith('.json') || filename === 'sync_state.json') continue; + + const eventId = filename.replace('.json', ''); + if (!currentEventIds.has(eventId)) { + try { + fs.unlinkSync(path.join(syncDir, filename)); + console.log(`Removed old/out-of-window event: ${eventId}`); + } catch (e) { + console.error(`Error deleting file ${filename}:`, e); + } + } + } +} + +async function saveEvent(event: any, syncDir: string): Promise { + const eventId = event.id; + if (!eventId) return false; + + const filePath = path.join(syncDir, `${eventId}.json`); + + try { + fs.writeFileSync(filePath, JSON.stringify(event, null, 2)); + return true; + } catch (e) { + console.error(`Error saving event ${eventId}:`, e); + return false; + } +} + +async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackDays: number) { + // Calculate window: -lookbackDays to +2 weeks + const now = new Date(); + const lookbackMs = lookbackDays * 24 * 60 * 60 * 1000; + const twoWeeksForwardMs = 14 * 24 * 60 * 60 * 1000; // Remaining constant as per original python script + + const timeMin = new Date(now.getTime() - lookbackMs).toISOString(); + const timeMax = new Date(now.getTime() + twoWeeksForwardMs).toISOString(); + + console.log(`Syncing calendar from ${timeMin} to ${timeMax} (lookback: ${lookbackDays} days)...`); + + const calendar = google.calendar({ version: 'v3', auth }); + + try { + const res = await calendar.events.list({ + calendarId: 'primary', + timeMin: timeMin, + timeMax: timeMax, + singleEvents: true, + orderBy: 'startTime' + }); + + const events = res.data.items || []; + const currentEventIds = new Set(); + + if (events.length === 0) { + console.log("No events found in this window."); + } else { + console.log(`Found ${events.length} events.`); + for (const event of events) { + if (event.id) { + await saveEvent(event, syncDir); + currentEventIds.add(event.id); + } + } + } + + cleanUpOldFiles(currentEventIds, syncDir); + + } catch (error) { + console.error("An error occurred during calendar sync:", error); + } +} + +async function main() { + console.log("Starting Google Calendar Sync (TS)..."); + + const syncDirArg = process.argv[2]; + const lookbackDaysArg = process.argv[3]; + + const SYNC_DIR = syncDirArg || 'synced_calendar_events'; + const LOOKBACK_DAYS = lookbackDaysArg ? parseInt(lookbackDaysArg, 10) : 14; // Default to 14 days + + if (isNaN(LOOKBACK_DAYS) || LOOKBACK_DAYS <= 0) { + console.error("Error: Lookback days must be a positive number."); + process.exit(1); + } + + if (!fs.existsSync(SYNC_DIR)) { + fs.mkdirSync(SYNC_DIR, { recursive: true }); + } + + try { + const auth = await authorize(); + console.log("Authorization successful."); + + while (true) { + await syncCalendarWindow(auth, SYNC_DIR, LOOKBACK_DAYS); + console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS)); + } + } catch (error) { + console.error("Fatal error in main loop:", error); + } +} + +main().catch(console.error); diff --git a/apps/cli/src/knowledge/sync_gmail.ts b/apps/cli/src/knowledge/sync_gmail.ts new file mode 100644 index 00000000..f9c5d80b --- /dev/null +++ b/apps/cli/src/knowledge/sync_gmail.ts @@ -0,0 +1,372 @@ +import fs from 'fs'; +import path from 'path'; +import { google } from 'googleapis'; +import { authenticate } from '@google-cloud/local-auth'; +import TurndownService from 'turndown'; +import { OAuth2Client } from 'google-auth-library'; + +// Configuration +const DEFAULT_SYNC_DIR = 'synced_emails_ts'; +const CREDENTIALS_PATH = path.join(process.cwd(), 'credentials.json'); +const TOKEN_PATH = path.join(process.cwd(), 'token_api.json'); // Reuse Python's token +const SYNC_INTERVAL_MS = 60 * 1000; +const SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']; + +// Initialize Turndown service for HTML to Markdown +const turndownService = new TurndownService({ + headingStyle: 'atx', + codeBlockStyle: 'fenced' +}); + +// --- Auth Functions --- + +async function loadSavedCredentialsIfExist(): Promise { + try { + const tokenContent = fs.readFileSync(TOKEN_PATH, 'utf-8'); + const tokenData = JSON.parse(tokenContent); + + const credsContent = fs.readFileSync(CREDENTIALS_PATH, 'utf-8'); + const keys = JSON.parse(credsContent); + const key = keys.installed || keys.web; + + // Manually construct credentials for google.auth.fromJSON + const credentials = { + type: 'authorized_user', + client_id: key.client_id, + client_secret: key.client_secret, + refresh_token: tokenData.refresh_token || tokenData.refreshToken, // Handle both cases + access_token: tokenData.token || tokenData.access_token, // Handle both cases + expiry_date: tokenData.expiry || tokenData.expiry_date + }; + return google.auth.fromJSON(credentials) as OAuth2Client; + } catch (err) { + console.error("Error loading saved credentials:", err); + return null; + } +} + +async function saveCredentials(client: OAuth2Client) { + const content = fs.readFileSync(CREDENTIALS_PATH, 'utf-8'); + const keys = JSON.parse(content); + const key = keys.installed || keys.web; + const payload = JSON.stringify({ + type: 'authorized_user', + client_id: key.client_id, + client_secret: key.client_secret, + refresh_token: client.credentials.refresh_token, + access_token: client.credentials.access_token, + expiry_date: client.credentials.expiry_date, + }, null, 2); + fs.writeFileSync(TOKEN_PATH, payload); +} + +async function authorize(): Promise { + let client = await loadSavedCredentialsIfExist(); + if (client && client.credentials && client.credentials.expiry_date && client.credentials.expiry_date > Date.now()) { + console.log("Using existing valid token."); + return client; + } + + if (client && client.credentials && (!client.credentials.expiry_date || client.credentials.expiry_date <= Date.now()) && client.credentials.refresh_token) { + console.log("Refreshing expired token..."); + try { + await client.refreshAccessToken(); + await saveCredentials(client); // Save refreshed token + return client; + } catch (e) { + console.error("Failed to refresh token:", e); + // Fall through to full re-auth if refresh fails + fs.existsSync(TOKEN_PATH) && fs.unlinkSync(TOKEN_PATH); + } + } + + console.log("Performing new OAuth authentication..."); + client = await authenticate({ + scopes: SCOPES, + keyfilePath: CREDENTIALS_PATH, + }) as any; + if (client && client.credentials) { + await saveCredentials(client); + } + return client!; +} + +// --- Helper Functions --- + +function cleanFilename(name: string): string { + return name.replace(/[\\/*?:":<>|]/g, "").substring(0, 100).trim(); +} + +function decodeBase64(data: string): string { + return Buffer.from(data, 'base64').toString('utf-8'); +} + +function getBody(payload: any): string { + let body = ""; + if (payload.parts) { + for (const part of payload.parts) { + if (part.mimeType === 'text/plain' && part.body && part.body.data) { + const text = decodeBase64(part.body.data); + // Strip quoted lines + const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>')); + body += cleanLines.join('\n'); + } else if (part.mimeType === 'text/html' && part.body && part.body.data) { + const html = decodeBase64(part.body.data); + let md = turndownService.turndown(html); + // Simple quote stripping for MD + const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>')); + body += cleanLines.join('\n'); + } else if (part.parts) { + body += getBody(part); + } + } + } else if (payload.body && payload.body.data) { + const data = decodeBase64(payload.body.data); + if (payload.mimeType === 'text/html') { + let md = turndownService.turndown(data); + body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); + } else { + body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); + } + } + return body; +} + +async function saveAttachment(gmail: any, userId: string, msgId: string, part: any, attachmentsDir: string): Promise { + const filename = part.filename; + const attId = part.body?.attachmentId; + if (!filename || !attId) return null; + + const safeName = `${msgId}_${cleanFilename(filename)}`; + const filePath = path.join(attachmentsDir, safeName); + + if (fs.existsSync(filePath)) return safeName; + + try { + const res = await gmail.users.messages.attachments.get({ + userId, + messageId: msgId, + id: attId + }); + + const data = res.data.data; + if (data) { + fs.writeFileSync(filePath, Buffer.from(data, 'base64')); + console.log(`Saved attachment: ${safeName}`); + return safeName; + } + } catch (e) { + console.error(`Error saving attachment ${filename}:`, e); + } + return null; +} + +// --- Sync Logic --- + +async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) { + const gmail = google.gmail({ version: 'v1', auth }); + try { + const res = await gmail.users.threads.get({ userId: 'me', id: threadId }); + const thread = res.data; + const messages = thread.messages; + + if (!messages || messages.length === 0) return; + + // Subject from first message + const firstHeader = messages[0].payload?.headers; + const subject = firstHeader?.find(h => h.name === 'Subject')?.value || '(No Subject)'; + + let mdContent = `# ${subject}\n\n`; + mdContent += `**Thread ID:** ${threadId}\n`; + mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; + + for (const msg of messages) { + const msgId = msg.id!; + const headers = msg.payload?.headers || []; + const from = headers.find(h => h.name === 'From')?.value || 'Unknown'; + const date = headers.find(h => h.name === 'Date')?.value || 'Unknown'; + + mdContent += `### From: ${from}\n`; + mdContent += `**Date:** ${date}\n\n`; + + const body = getBody(msg.payload); + mdContent += `${body}\n\n`; + + // Attachments + const parts: any[] = []; + const traverseParts = (pList: any[]) => { + for (const p of pList) { + parts.push(p); + if (p.parts) traverseParts(p.parts); + } + }; + if (msg.payload?.parts) traverseParts(msg.payload.parts); + + let attachmentsFound = false; + for (const part of parts) { + if (part.filename && part.body?.attachmentId) { + const savedName = await saveAttachment(gmail, 'me', msgId, part, attachmentsDir); + if (savedName) { + if (!attachmentsFound) { + mdContent += "**Attachments:**\n"; + attachmentsFound = true; + } + mdContent += `- [${part.filename}](attachments/${savedName})\n`; + } + } + } + mdContent += "\n---\n\n"; + } + + fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); + console.log(`Synced Thread: ${subject} (${threadId})`); + + } catch (error) { + console.error(`Error processing thread ${threadId}:`, error); + } +} + +function loadState(stateFile: string): { historyId?: string } { + if (fs.existsSync(stateFile)) { + return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); + } + return {}; +} + +function saveState(historyId: string, stateFile: string) { + fs.writeFileSync(stateFile, JSON.stringify({ + historyId, + last_sync: new Date().toISOString() + }, null, 2)); +} + +async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { + console.log(`Performing full sync of last ${lookbackDays} days...`); + const gmail = google.gmail({ version: 'v1', auth }); + + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - lookbackDays); + const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); + + // Get History ID + const profile = await gmail.users.getProfile({ userId: 'me' }); + const currentHistoryId = profile.data.historyId!; + + let pageToken: string | undefined; + do { + const res: any = await gmail.users.threads.list({ + userId: 'me', + q: `after:${dateQuery}`, + pageToken + }); + + const threads = res.data.threads; + if (threads) { + for (const thread of threads) { + await processThread(auth, thread.id!, syncDir, attachmentsDir); + } + } + pageToken = res.data.nextPageToken; + } while (pageToken); + + saveState(currentHistoryId, stateFile); + console.log("Full sync complete."); +} + +async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { + console.log(`Checking updates since historyId ${startHistoryId}...`); + const gmail = google.gmail({ version: 'v1', auth }); + + try { + const res = await gmail.users.history.list({ + userId: 'me', + startHistoryId, + historyTypes: ['messageAdded'] + }); + + const changes = res.data.history; + if (!changes || changes.length === 0) { + console.log("No new changes."); + const profile = await gmail.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile); + return; + } + + console.log(`Found ${changes.length} history records.`); + const threadIds = new Set(); + + for (const record of changes) { + if (record.messagesAdded) { + for (const item of record.messagesAdded) { + if (item.message?.threadId) { + threadIds.add(item.message.threadId); + } + } + } + } + + for (const tid of threadIds) { + await processThread(auth, tid, syncDir, attachmentsDir); + } + + const profile = await gmail.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile); + + } catch (error: any) { + if (error.response?.status === 404) { + console.log("History ID expired. Falling back to full sync."); + await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays); + } else { + console.error("Error during partial sync:", error); + // If 401, remove token to force re-auth next run + if (error.response?.status === 401 && fs.existsSync(TOKEN_PATH)) { + console.log("401 Unauthorized. Deleting token to force re-authentication."); + fs.unlinkSync(TOKEN_PATH); + } + } + } +} + +async function main() { + console.log("Starting Gmail Sync (TS)..."); + const syncDirArg = process.argv[2]; + const lookbackDaysArg = process.argv[3]; + + const SYNC_DIR = syncDirArg || DEFAULT_SYNC_DIR; + const LOOKBACK_DAYS = lookbackDaysArg ? parseInt(lookbackDaysArg, 10) : 7; // Default to 7 days + + if (isNaN(LOOKBACK_DAYS) || LOOKBACK_DAYS <= 0) { + console.error("Error: Lookback days must be a positive number."); + process.exit(1); + } + + const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); + const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); + + // Ensure directories exist + if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); + if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); + + try { + const auth = await authorize(); + console.log("Authorization successful."); + + while (true) { + const state = loadState(STATE_FILE); + if (!state.historyId) { + console.log("No history ID found, starting full sync..."); + await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); + } else { + console.log("History ID found, starting partial sync..."); + await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); + } + + console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS)); + } + } catch (error) { + console.error("Fatal error in main loop:", error); + } +} + +main().catch(console.error);