Revert "feat: move gmail sync to composio OAuth and remove calendar sync"

This reverts commit d12150f1bf.
This commit is contained in:
Ramnique Singh 2026-02-03 22:54:33 +05:30
parent 9747c55d0e
commit fbdf9cd834
6 changed files with 282 additions and 543 deletions

View file

@ -1,14 +1,15 @@
import fs from 'fs';
import path from 'path';
import { google, gmail_v1 as gmail } from 'googleapis';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { OAuth2Client } from 'google-auth-library';
import { WorkDir } from '../config/config.js';
import { executeAction } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { GoogleClientFactory } from './google-client-factory.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const LOOKBACK_DAYS = 7;
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
const nhm = new NodeHtmlMarkdown();
@ -42,209 +43,241 @@ function cleanFilename(name: string): string {
return name.replace(/[\\/*?:":<>|]/g, "").substring(0, 100).trim();
}
// --- State Management ---
interface SyncState {
last_sync: string; // ISO string — human-readable, source of truth
function decodeBase64(data: string): string {
return Buffer.from(data, 'base64').toString('utf-8');
}
function loadState(stateFile: string): SyncState | null {
if (fs.existsSync(stateFile)) {
try {
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
if (data.last_sync) {
return { last_sync: data.last_sync };
function getBody(payload: gmail.Schema$MessagePart): string {
let body = "";
if (payload.parts) {
for (const part of payload.parts) {
if (part.mimeType === 'text/plain' && part.body && part.body.data) {
const text = decodeBase64(part.body.data);
// Strip quoted lines
const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>'));
body += cleanLines.join('\n');
} else if (part.mimeType === 'text/html' && part.body && part.body.data) {
const html = decodeBase64(part.body.data);
const md = nhm.translate(html);
// Simple quote stripping for MD
const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>'));
body += cleanLines.join('\n');
} else if (part.parts) {
body += getBody(part);
}
} catch (e) {
console.error('[Gmail] Failed to load state:', e);
}
} else if (payload.body && payload.body.data) {
const data = decodeBase64(payload.body.data);
if (payload.mimeType === 'text/html') {
const md = nhm.translate(data);
body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
} else {
body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
}
}
return body;
}
async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise<string | null> {
const filename = part.filename;
const attId = part.body?.attachmentId;
if (!filename || !attId) return null;
const safeName = `${msgId}_${cleanFilename(filename)}`;
const filePath = path.join(attachmentsDir, safeName);
if (fs.existsSync(filePath)) return safeName;
try {
const res = await gmail.users.messages.attachments.get({
userId,
messageId: msgId,
id: attId
});
const data = res.data.data;
if (data) {
fs.writeFileSync(filePath, Buffer.from(data, 'base64'));
console.log(`Saved attachment: ${safeName}`);
return safeName;
}
} catch (e) {
console.error(`Error saving attachment ${filename}:`, e);
}
return null;
}
function saveState(stateFile: string, lastSync: string): void {
const state: SyncState = {
last_sync: lastSync,
};
fs.writeFileSync(stateFile, JSON.stringify(state, null, 2));
}
/**
* Try to parse a date string into a Date. Returns null if unparseable.
*/
function tryParseDate(dateStr: string): Date | null {
const d = new Date(dateStr);
return isNaN(d.getTime()) ? null : d;
}
function toEpochSeconds(isoString: string): number {
return Math.floor(new Date(isoString).getTime() / 1000);
}
// --- Message Parsing ---
interface ParsedMessage {
from: string;
date: string;
subject: string;
body: string;
}
function parseMessageData(messageData: Record<string, unknown>): ParsedMessage {
const headers = messageData.payload && typeof messageData.payload === 'object'
? (messageData.payload as Record<string, unknown>).headers as Array<{ name: string; value: string }> | undefined
: undefined;
const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown');
const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown');
const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)');
let body = '';
// Try to extract body from payload structure (Gmail API format)
if (messageData.payload && typeof messageData.payload === 'object') {
body = extractBodyFromPayload(messageData.payload as Record<string, unknown>);
}
// Fallback: try snippet or body fields
if (!body) {
if (typeof messageData.body === 'string') {
body = messageData.body;
} else if (typeof messageData.snippet === 'string') {
body = messageData.snippet;
} else if (typeof messageData.text === 'string') {
body = messageData.text;
}
}
// Convert HTML to markdown if body looks like HTML
if (body && (body.includes('<html') || body.includes('<div') || body.includes('<p'))) {
body = nhm.translate(body);
}
// Strip quoted lines
if (body) {
body = body.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
}
return { from, date, subject, body };
}
function extractBodyFromPayload(payload: Record<string, unknown>): string {
const parts = payload.parts as Array<Record<string, unknown>> | undefined;
if (parts) {
for (const part of parts) {
const mimeType = part.mimeType as string | undefined;
const bodyData = part.body && typeof part.body === 'object'
? (part.body as Record<string, unknown>).data as string | undefined
: undefined;
if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) {
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
if (mimeType === 'text/html') {
return nhm.translate(decoded);
}
return decoded;
}
// Recurse into nested parts
if (part.parts) {
const result = extractBodyFromPayload(part as Record<string, unknown>);
if (result) return result;
}
}
}
// Single-part message
const bodyData = payload.body && typeof payload.body === 'object'
? (payload.body as Record<string, unknown>).data as string | undefined
: undefined;
if (bodyData) {
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
const mimeType = payload.mimeType as string | undefined;
if (mimeType === 'text/html') {
return nhm.translate(decoded);
}
return decoded;
}
return '';
}
// --- Sync Logic ---
/**
* Process a thread and write its .md file.
* Returns the newest message date (as ISO string) found in the thread, or null.
*/
async function processThread(connectedAccountId: string, threadId: string, syncDir: string): Promise<string | null> {
let threadResult;
async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) {
const gmail = google.gmail({ version: 'v1', auth });
try {
threadResult = await executeAction(
'GMAIL_FETCH_MESSAGE_BY_THREAD_ID',
connectedAccountId,
{ thread_id: threadId, user_id: 'me' }
);
} catch (error) {
console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error);
return null;
}
const res = await gmail.users.threads.get({ userId: 'me', id: threadId });
const thread = res.data;
const messages = thread.messages;
if (!threadResult.success || !threadResult.data) {
console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error);
return null;
}
if (!messages || messages.length === 0) return;
const data = threadResult.data as Record<string, unknown>;
const messages = data.messages as Array<Record<string, unknown>> | undefined;
// Subject from first message
const firstHeader = messages[0].payload?.headers;
const subject = firstHeader?.find(h => h.name === 'Subject')?.value || '(No Subject)';
let newestDate: Date | null = null;
if (!messages || messages.length === 0) {
// Single message response
const parsed = parseMessageData(data);
const mdContent = `# ${parsed.subject}\n\n` +
`**Thread ID:** ${threadId}\n` +
`**Message Count:** 1\n\n---\n\n` +
`### From: ${parsed.from}\n` +
`**Date:** ${parsed.date}\n\n` +
`${parsed.body}\n\n---\n\n`;
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`);
newestDate = tryParseDate(parsed.date);
} else {
// Multi-message thread
const firstParsed = parseMessageData(messages[0]);
let mdContent = `# ${firstParsed.subject}\n\n`;
let mdContent = `# ${subject}\n\n`;
mdContent += `**Thread ID:** ${threadId}\n`;
mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`;
for (const msg of messages) {
const parsed = parseMessageData(msg);
mdContent += `### From: ${parsed.from}\n`;
mdContent += `**Date:** ${parsed.date}\n\n`;
mdContent += `${parsed.body}\n\n`;
mdContent += `---\n\n`;
const msgId = msg.id!;
const headers = msg.payload?.headers || [];
const from = headers.find(h => h.name === 'From')?.value || 'Unknown';
const date = headers.find(h => h.name === 'Date')?.value || 'Unknown';
const msgDate = tryParseDate(parsed.date);
if (msgDate && (!newestDate || msgDate > newestDate)) {
newestDate = msgDate;
mdContent += `### From: ${from}\n`;
mdContent += `**Date:** ${date}\n\n`;
if (msg.payload) {
const body = getBody(msg.payload);
mdContent += `${body}\n\n`;
}
// Attachments
const parts: gmail.Schema$MessagePart[] = [];
const traverseParts = (pList: gmail.Schema$MessagePart[]) => {
for (const p of pList) {
parts.push(p);
if (p.parts) traverseParts(p.parts);
}
};
if (msg.payload?.parts) traverseParts(msg.payload.parts);
let attachmentsFound = false;
for (const part of parts) {
if (part.filename && part.body?.attachmentId) {
const savedName = await saveAttachment(gmail, 'me', msgId, part, attachmentsDir);
if (savedName) {
if (!attachmentsFound) {
mdContent += "**Attachments:**\n";
attachmentsFound = true;
}
mdContent += `- [${part.filename}](attachments/${savedName})\n`;
}
}
}
mdContent += "\n---\n\n";
}
fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent);
console.log(`Synced Thread: ${subject} (${threadId})`);
} catch (error) {
console.error(`Error processing thread ${threadId}:`, error);
}
}
function loadState(stateFile: string): { historyId?: string } {
if (fs.existsSync(stateFile)) {
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
}
return {};
}
function saveState(historyId: string, stateFile: string) {
fs.writeFileSync(stateFile, JSON.stringify({
historyId,
last_sync: new Date().toISOString()
}, null, 2));
}
async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) {
console.log(`Performing full sync of last ${lookbackDays} days...`);
const gmail = google.gmail({ version: 'v1', auth });
const pastDate = new Date();
pastDate.setDate(pastDate.getDate() - lookbackDays);
const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/');
// Get History ID
const profile = await gmail.users.getProfile({ userId: 'me' });
const currentHistoryId = profile.data.historyId!;
let pageToken: string | undefined;
do {
const res = await gmail.users.threads.list({
userId: 'me',
q: `after:${dateQuery}`,
pageToken
});
const threads = res.data.threads;
if (threads) {
for (const thread of threads) {
await processThread(auth, thread.id!, syncDir, attachmentsDir);
}
}
pageToken = res.data.nextPageToken ?? undefined;
} while (pageToken);
saveState(currentHistoryId, stateFile);
console.log("Full sync complete.");
}
async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) {
console.log(`Checking updates since historyId ${startHistoryId}...`);
const gmail = google.gmail({ version: 'v1', auth });
try {
const res = await gmail.users.history.list({
userId: 'me',
startHistoryId,
historyTypes: ['messageAdded']
});
const changes = res.data.history;
if (!changes || changes.length === 0) {
console.log("No new changes.");
const profile = await gmail.users.getProfile({ userId: 'me' });
saveState(profile.data.historyId!, stateFile);
return;
}
console.log(`Found ${changes.length} history records.`);
const threadIds = new Set<string>();
for (const record of changes) {
if (record.messagesAdded) {
for (const item of record.messagesAdded) {
if (item.message?.threadId) {
threadIds.add(item.message.threadId);
}
}
}
}
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`);
}
for (const tid of threadIds) {
await processThread(auth, tid, syncDir, attachmentsDir);
}
if (!newestDate) return null;
// Add 1 second so the `after:` query (epoch-second granularity) excludes this email next sync
return new Date(newestDate.getTime() + 1000).toISOString();
const profile = await gmail.users.getProfile({ userId: 'me' });
saveState(profile.data.historyId!, stateFile);
} catch (error: unknown) {
const e = error as { response?: { status?: number } };
if (e.response?.status === 404) {
console.log("History ID expired. Falling back to full sync.");
await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays);
} else {
console.error("Error during partial sync:", error);
// If 401, clear tokens to force re-auth next run
if (e.response?.status === 401) {
console.log("401 Unauthorized, clearing cache");
GoogleClientFactory.clearCache();
}
}
}
}
async function performSync() {
const LOOKBACK_DAYS = 30; // Default to 1 month
const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments');
const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json');
@ -252,127 +285,51 @@ async function performSync() {
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true });
const account = composioAccountsRepo.getAccount('gmail');
if (!account || account.status !== 'ACTIVE') {
console.log('[Gmail] Gmail not connected via Composio. Skipping sync.');
return;
}
const connectedAccountId = account.id;
// Determine query timestamp
const state = loadState(STATE_FILE);
let afterEpochSeconds: number;
if (state) {
afterEpochSeconds = toEpochSeconds(state.last_sync);
console.log(`[Gmail] Syncing messages since ${state.last_sync}...`);
} else {
const pastDate = new Date();
pastDate.setDate(pastDate.getDate() - LOOKBACK_DAYS);
afterEpochSeconds = Math.floor(pastDate.getTime() / 1000);
console.log(`[Gmail] First sync - fetching last ${LOOKBACK_DAYS} days...`);
}
try {
// List threads since last sync (lightweight - returns IDs only)
const allThreadIds: string[] = [];
let pageToken: string | undefined;
do {
const params: Record<string, unknown> = {
query: `after:${afterEpochSeconds}`,
max_results: 20,
user_id: 'me',
};
if (pageToken) {
params.page_token = pageToken;
}
const result = await executeAction(
'GMAIL_LIST_THREADS',
connectedAccountId,
params
);
if (!result.success || !result.data) {
console.error('[Gmail] Failed to list threads:', result.error);
return;
}
const data = result.data as Record<string, unknown>;
const threads = data.threads as Array<Record<string, unknown>> | undefined;
if (threads && threads.length > 0) {
for (const thread of threads) {
const threadId = thread.id as string | undefined;
if (threadId) {
allThreadIds.push(threadId);
}
}
}
pageToken = data.nextPageToken as string | undefined;
} while (pageToken);
if (allThreadIds.length === 0) {
console.log('[Gmail] No new threads.');
const auth = await GoogleClientFactory.getClient();
if (!auth) {
console.log("No valid OAuth credentials available.");
return;
}
console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`);
console.log("Authorization successful. Starting sync...");
// Reverse so we process oldest first. Gmail returns newest first,
// so processing in reverse lets the high-water mark advance
// chronologically — safe to save state after each thread.
allThreadIds.reverse();
// Process each thread, saving state after each one with the
// newest email date seen so far (high-water mark).
let highWaterMark: string | null = state?.last_sync ?? null;
let processedCount = 0;
for (const threadId of allThreadIds) {
try {
const newestInThread = await processThread(connectedAccountId, threadId, SYNC_DIR);
processedCount++;
// Advance high-water mark if this thread has a newer email
if (newestInThread) {
if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) {
highWaterMark = newestInThread;
}
saveState(STATE_FILE, highWaterMark);
}
} catch (error) {
console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error);
}
const state = loadState(STATE_FILE);
if (!state.historyId) {
console.log("No history ID found, starting full sync...");
await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
} else {
console.log("History ID found, starting partial sync...");
await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
}
console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`);
console.log("Sync completed.");
} catch (error) {
console.error('[Gmail] Error during sync:', error);
console.error("Error during sync:", error);
}
}
export async function init() {
console.log('[Gmail] Starting Gmail Sync (Composio)...');
console.log(`[Gmail] Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
console.log("Starting Gmail Sync (TS)...");
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
const isConnected = composioAccountsRepo.isConnected('gmail');
if (!isConnected) {
console.log('[Gmail] Gmail not connected via Composio. Sleeping...');
// Check if credentials are available with required scopes
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
} else {
// Perform one sync
await performSync();
}
} catch (error) {
console.error('[Gmail] Error in main loop:', error);
console.error("Error in main loop:", error);
}
console.log(`[Gmail] Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
// Sleep for N minutes before next check (can be interrupted by triggerSync)
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await interruptibleSleep(SYNC_INTERVAL_MS);
}
}