Integrate Google Calendar sync functionality with Composio, enhancing the connection handling in composio-handler and oauth-handler. Update onboarding modal and connectors-popover to manage connection states and provide user feedback during the sync process. Implement Composio-based event syncing in sync_calendar.ts.

This commit is contained in:
tusharmagar 2026-03-16 21:01:50 +05:30
parent 71e2e43ed1
commit bc54ef2177
7 changed files with 267 additions and 72 deletions

View file

@ -5,13 +5,16 @@ import { OAuth2Client } from 'google-auth-library';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { WorkDir } from '../config/config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { serviceLogger } from '../services/service_logger.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const LOOKBACK_DAYS = 14;
const COMPOSIO_LOOKBACK_DAYS = 14;
const REQUIRED_SCOPES = [
'https://www.googleapis.com/auth/calendar.events.readonly',
'https://www.googleapis.com/auth/drive.readonly'
@ -56,7 +59,7 @@ function cleanUpOldFiles(currentEventIds: Set<string>, syncDir: string): string[
const files = fs.readdirSync(syncDir);
const deleted: string[] = [];
for (const filename of files) {
if (filename === 'sync_state.json') continue;
if (filename === 'sync_state.json' || filename === 'composio_state.json') continue;
// We expect files like:
// {eventId}.json
@ -133,10 +136,10 @@ async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, s
const filename = `${eventId}_doc_${safeTitle}.md`;
const filePath = path.join(syncDir, filename);
// Simple cache check: if file exists, skip.
// Simple cache check: if file exists, skip.
// Ideally we check modifiedTime, but that requires an extra API call per file.
// Given the loop interval, we can just check existence to save quota.
// If user updates notes, they might want them re-synced.
// If user updates notes, they might want them re-synced.
// For now, let's just check existence. To be smarter, we'd need a state file or check API.
if (fs.existsSync(filePath)) continue;
@ -343,20 +346,248 @@ async function performSync(syncDir: string, lookbackDays: number) {
}
}
// --- Composio-based Sync ---
interface ComposioCalendarState {
last_sync: string; // ISO string
}
function loadComposioState(stateFile: string): ComposioCalendarState | null {
if (fs.existsSync(stateFile)) {
try {
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
if (data.last_sync) {
return { last_sync: data.last_sync };
}
} catch (e) {
console.error('[Calendar] Failed to load composio state:', e);
}
}
return null;
}
function saveComposioState(stateFile: string, lastSync: string): void {
fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2));
}
/**
* Save a Composio calendar event as JSON (same format used by Google OAuth path).
* The event data from Composio is already structured similarly to Google Calendar API.
*/
function saveComposioEvent(eventData: Record<string, unknown>, syncDir: string): { changed: boolean; isNew: boolean; title: string } {
const eventId = eventData.id as string | undefined;
if (!eventId) return { changed: false, isNew: false, title: 'Unknown' };
const filePath = path.join(syncDir, `${eventId}.json`);
const content = JSON.stringify(eventData, null, 2);
const exists = fs.existsSync(filePath);
try {
if (exists) {
const existing = fs.readFileSync(filePath, 'utf-8');
if (existing === content) {
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
}
}
fs.writeFileSync(filePath, content);
return { changed: true, isNew: !exists, title: (eventData.summary as string) || eventId };
} catch (e) {
console.error(`[Calendar] Error saving event ${eventId}:`, e);
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
}
}
async function performSyncComposio() {
const STATE_FILE = path.join(SYNC_DIR, 'composio_state.json');
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
const account = composioAccountsRepo.getAccount('googlecalendar');
if (!account || account.status !== 'ACTIVE') {
console.log('[Calendar] Google Calendar not connected via Composio. Skipping sync.');
return;
}
const connectedAccountId = account.id;
// Calculate time window: lookback + 14 days forward
const now = new Date();
const lookbackMs = COMPOSIO_LOOKBACK_DAYS * 24 * 60 * 60 * 1000;
const twoWeeksForwardMs = 14 * 24 * 60 * 60 * 1000;
const timeMin = new Date(now.getTime() - lookbackMs).toISOString();
const timeMax = new Date(now.getTime() + twoWeeksForwardMs).toISOString();
console.log(`[Calendar] Syncing via Composio from ${timeMin} to ${timeMax} (lookback: ${COMPOSIO_LOOKBACK_DAYS} days)...`);
let run: ServiceRunContext | null = null;
const ensureRun = async (): Promise<ServiceRunContext> => {
if (!run) {
run = await serviceLogger.startRun({
service: 'calendar',
message: 'Syncing calendar (Composio)',
trigger: 'timer',
});
}
return run;
};
try {
const result = await executeAction(
'GOOGLECALENDAR_FIND_EVENT',
{
connected_account_id: connectedAccountId,
user_id: 'rowboat-user',
version: 'latest',
arguments: {
calendar_id: 'primary',
time_min: timeMin,
time_max: timeMax,
single_events: true,
order_by: 'startTime',
},
}
);
if (!result.successful || !result.data) {
console.error('[Calendar] Failed to list events via Composio:', result.error);
return;
}
const data = result.data as Record<string, unknown>;
// Composio may return events in different structures
let events: Array<Record<string, unknown>> = [];
if (Array.isArray(data.items)) {
events = data.items as Array<Record<string, unknown>>;
} else if (Array.isArray(data.events)) {
events = data.events as Array<Record<string, unknown>>;
} else if (Array.isArray(data)) {
events = data as unknown as Array<Record<string, unknown>>;
}
const currentEventIds = new Set<string>();
let newCount = 0;
let updatedCount = 0;
const changedTitles: string[] = [];
if (events.length === 0) {
console.log('[Calendar] No events found in this window.');
} else {
console.log(`[Calendar] Found ${events.length} events.`);
for (const event of events) {
const eventId = event.id as string | undefined;
if (eventId) {
const saveResult = saveComposioEvent(event, SYNC_DIR);
currentEventIds.add(eventId);
if (saveResult.changed) {
await ensureRun();
changedTitles.push(saveResult.title);
if (saveResult.isNew) {
newCount++;
} else {
updatedCount++;
}
}
}
}
}
// Clean up events no longer in the window
const deletedFiles = cleanUpOldFiles(currentEventIds, SYNC_DIR);
let deletedCount = 0;
if (deletedFiles.length > 0) {
await ensureRun();
deletedCount = deletedFiles.length;
}
// Log results if any changes were detected (run was started by ensureRun)
if (run) {
const r = run as ServiceRunContext;
const totalChanges = newCount + updatedCount + deletedCount;
const limitedTitles = limitEventItems(changedTitles);
await serviceLogger.log({
type: 'changes_identified',
service: r.service,
runId: r.runId,
level: 'info',
message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
counts: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
},
items: limitedTitles.items,
truncated: limitedTitles.truncated,
});
await serviceLogger.log({
type: 'run_complete',
service: r.service,
runId: r.runId,
level: 'info',
message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
durationMs: Date.now() - r.startedAt,
outcome: 'ok',
summary: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
},
});
}
// Save state
saveComposioState(STATE_FILE, new Date().toISOString());
console.log(`[Calendar] Composio sync completed. ${newCount} new, ${updatedCount} updated, ${deletedCount} deleted.`);
} catch (error) {
console.error('[Calendar] Error during Composio sync:', error);
const errRun = await ensureRun();
await serviceLogger.log({
type: 'error',
service: errRun.service,
runId: errRun.runId,
level: 'error',
message: 'Calendar sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: errRun.service,
runId: errRun.runId,
level: 'error',
message: 'Calendar sync failed',
durationMs: Date.now() - errRun.startedAt,
outcome: 'error',
});
}
}
export async function init() {
console.log("Starting Google Calendar & Notes Sync (TS)...");
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
// Check if credentials are available with required scopes
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
const composioMode = await useComposioForGoogleCalendar();
if (composioMode) {
const isConnected = composioAccountsRepo.isConnected('googlecalendar');
if (!isConnected) {
console.log('[Calendar] Google Calendar not connected via Composio. Sleeping...');
} else {
await performSyncComposio();
}
} else {
// Perform one sync
await performSync(SYNC_DIR, LOOKBACK_DAYS);
// Check if credentials are available with required scopes
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
} else {
// Perform one sync
await performSync(SYNC_DIR, LOOKBACK_DAYS);
}
}
} catch (error) {
console.error("Error in main loop:", error);

View file

@ -786,13 +786,9 @@ export async function init() {
console.log("Starting Gmail Sync (TS)...");
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
const composioMode = await useComposioForGoogle();
if (composioMode) {
console.log('[Gmail] Using Composio backend for Gmail sync.');
}
while (true) {
try {
const composioMode = await useComposioForGoogle();
if (composioMode) {
const isConnected = composioAccountsRepo.isConnected('gmail');
if (!isConnected) {