From d4850dace70171afaf2b47389216b9aa919db23e Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Tue, 5 May 2026 14:28:46 +0530 Subject: [PATCH] feat: native google sign-in for signed-in users MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-in users can now connect Gmail and Calendar directly through Rowboat instead of going through Composio. Cleaner connection, no third-party in the data path. How it works: - Click "Connect Google" anywhere it appears (sidebar, onboarding, settings) and the system browser opens to a Rowboat-hosted page. Authorize Google there and the app picks up the connection automatically — no client id or secret to paste. - Token refresh happens through Rowboat's backend, so Google credentials never need to live on the user's machine. - Disconnect cleanly revokes access on Google's side too. Migration for existing Composio users: - A one-time modal explains that we've moved off Composio and asks the user to reconnect Google directly. - Their old Composio Gmail / Calendar connections are disconnected automatically when the modal first appears. - All previously-synced emails and calendar events are preserved on disk — the new connection picks up where Composio left off rather than re-downloading the last week from scratch. - "I'll do this later" dismisses the modal permanently; the user can still reconnect anytime via the connectors UI. (Sync stops in the meantime; nothing is deleted.) Other coverage: - BYOK mode (users who paste their own Google client id + secret) is unchanged — same modal, same local OAuth flow, same behavior. - Composio integrations for non-Google services (Slack, Linear, etc.) are unaffected. Only the Gmail and Calendar paths moved. - The "Connect Google" button label and connection state now apply uniformly to Gmail + Calendar (one OAuth grant covers both). Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/x/apps/main/src/composio-handler.ts | 14 - apps/x/apps/main/src/deeplink.ts | 51 ++- apps/x/apps/main/src/ipc.ts | 8 +- apps/x/apps/main/src/main.ts | 8 +- apps/x/apps/main/src/oauth-handler.ts | 79 +++- apps/x/apps/renderer/src/App.tsx | 39 ++ .../composio-google-migration-modal.tsx | 81 ++++ .../src/components/onboarding-modal.tsx | 37 +- .../onboarding/use-onboarding-state.ts | 49 +-- .../apps/renderer/src/hooks/useConnectors.ts | 64 +-- .../core/src/auth/google-backend-oauth.ts | 113 +++++ apps/x/packages/core/src/auth/repo.ts | 7 + apps/x/packages/core/src/composio/client.ts | 20 - .../packages/core/src/config/remote-config.ts | 51 +++ .../core/src/knowledge/agent_notes.ts | 27 +- .../src/knowledge/google-client-factory.ts | 212 +++++++--- .../core/src/knowledge/sync_calendar.ts | 282 +----------- .../packages/core/src/knowledge/sync_gmail.ts | 400 +----------------- .../migrations/composio-google-migration.ts | 132 ++++++ apps/x/packages/shared/src/ipc.ts | 10 +- 20 files changed, 780 insertions(+), 904 deletions(-) create mode 100644 apps/x/apps/renderer/src/components/composio-google-migration-modal.tsx create mode 100644 apps/x/packages/core/src/auth/google-backend-oauth.ts create mode 100644 apps/x/packages/core/src/config/remote-config.ts create mode 100644 apps/x/packages/core/src/migrations/composio-google-migration.ts diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index 274cfb2a..8fc4b754 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -293,20 +293,6 @@ export function listConnected(): { toolkits: string[] } { return { toolkits: composioAccountsRepo.getConnectedToolkits() }; } -/** - * Check if Composio should be used for Google services (Gmail, etc.) - */ -export async function useComposioForGoogle(): Promise<{ enabled: boolean }> { - return { enabled: await composioClient.useComposioForGoogle() }; -} - -/** - * Check if Composio should be used for Google Calendar - */ -export async function useComposioForGoogleCalendar(): Promise<{ enabled: boolean }> { - return { enabled: await composioClient.useComposioForGoogleCalendar() }; -} - /** * List available Composio toolkits — filtered to curated list only. * Return type matches the ZToolkit schema from core/composio/types.ts. diff --git a/apps/x/apps/main/src/deeplink.ts b/apps/x/apps/main/src/deeplink.ts index 605990d1..aaaaa3bc 100644 --- a/apps/x/apps/main/src/deeplink.ts +++ b/apps/x/apps/main/src/deeplink.ts @@ -28,12 +28,19 @@ export function extractDeepLinkFromArgv(argv: readonly string[]): string | null } /** - * Dispatch any rowboat:// URL — chooses navigation vs action automatically. - * Use this from notification click handlers and other URL entry points. + * Dispatch any rowboat:// URL — chooses among action / oauth-completion / + * navigation automatically. Use this from notification click handlers and + * other URL entry points. + * + * OAuth completion (rowboat://oauth/google/done?session=) is handled + * in main, not the renderer, because claiming tokens writes oauth.json and + * triggers sync — both main-process concerns. */ export function dispatchUrl(url: string): void { if (parseAction(url)) { void dispatchAction(url); + } else if (parseOAuthCompletion(url)) { + void dispatchOAuthCompletion(url); } else { dispatchDeepLink(url); } @@ -111,6 +118,46 @@ async function handleTakeMeetingNotes(eventId: string, openMeeting: boolean): Pr win.webContents.send("app:takeMeetingNotes", payload); } +// --- OAuth completion (rowboat-mode Google connect) --- + +interface OAuthCompletion { + provider: "google"; + state: string; +} + +/** + * Match rowboat://oauth/google/done?session=. Returns null for + * anything else — including paths with the right shape but wrong provider + * or a missing `session` query param. + */ +function parseOAuthCompletion(url: string): OAuthCompletion | null { + if (!url.startsWith(URL_PREFIX)) return null; + const rest = url.slice(URL_PREFIX.length); + const queryIdx = rest.indexOf("?"); + const path = queryIdx >= 0 ? rest.slice(0, queryIdx) : rest; + const parts = path.split("/").filter(Boolean); + if (parts.length !== 3 || parts[0] !== "oauth" || parts[2] !== "done") return null; + if (parts[1] !== "google") return null; + const params = new URLSearchParams(queryIdx >= 0 ? rest.slice(queryIdx + 1) : ""); + const state = params.get("session"); + return state ? { provider: "google", state } : null; +} + +async function dispatchOAuthCompletion(url: string): Promise { + const parsed = parseOAuthCompletion(url); + if (!parsed) return; + + // Bring the app to the front so the renderer can react to the + // oauthEvent IPC that completeRowboatGoogleConnect emits. + const win = mainWindowRef; + if (win && !win.isDestroyed()) focusWindow(win); + + // Lazy-import to keep deeplink.ts free of OAuth deps and avoid a + // potential circular dep with oauth-handler.ts. + const { completeRowboatGoogleConnect } = await import("./oauth-handler.js"); + await completeRowboatGoogleConnect(parsed.state); +} + function focusWindow(win: BrowserWindow): void { if (win.isMinimized()) win.restore(); win.show(); diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index d70192cc..056bb4c3 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -35,6 +35,7 @@ import { ISlackConfigRepo } from '@x/core/dist/slack/repo.js'; import { isOnboardingComplete, markOnboardingComplete } from '@x/core/dist/config/note_creation_config.js'; import * as composioHandler from './composio-handler.js'; import { consumePendingDeepLink } from './deeplink.js'; +import { qualifyAndDisconnectComposioGoogle } from '@x/core/dist/migrations/composio-google-migration.js'; import { IAgentScheduleRepo } from '@x/core/dist/agent-schedule/repo.js'; import { IAgentScheduleStateRepo } from '@x/core/dist/agent-schedule/state-repo.js'; import { triggerRun as triggerAgentScheduleRun } from '@x/core/dist/agent-schedule/runner.js'; @@ -612,11 +613,8 @@ export function setupIpcHandlers() { 'composio:list-toolkits': async () => { return composioHandler.listToolkits(); }, - 'composio:use-composio-for-google': async () => { - return composioHandler.useComposioForGoogle(); - }, - 'composio:use-composio-for-google-calendar': async () => { - return composioHandler.useComposioForGoogleCalendar(); + 'migration:check-composio-google': async () => { + return qualifyAndDisconnectComposioGoogle(); }, // Agent schedule handlers 'agent-schedule:getConfig': async () => { diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index cd0717a4..c3618000 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -42,7 +42,7 @@ import { ElectronBrowserControlService } from "./browser/control-service.js"; import { ElectronNotificationService } from "./notification/electron-notification-service.js"; import { DEEP_LINK_SCHEME, - dispatchDeepLink, + dispatchUrl, extractDeepLinkFromArgv, setMainWindowForDeepLinks, } from "./deeplink.js"; @@ -77,19 +77,19 @@ if (process.defaultApp) { // First-launch URL on Windows/Linux comes through argv. { const initialUrl = extractDeepLinkFromArgv(process.argv); - if (initialUrl) dispatchDeepLink(initialUrl); + if (initialUrl) dispatchUrl(initialUrl); } // macOS sends URLs via 'open-url' (both first launch and while running). app.on("open-url", (event, url) => { event.preventDefault(); - dispatchDeepLink(url); + dispatchUrl(url); }); // Subsequent launches on Windows/Linux land here via the single-instance lock. app.on("second-instance", (_event, argv) => { const url = extractDeepLinkFromArgv(argv); - if (url) dispatchDeepLink(url); + if (url) dispatchUrl(url); }); // Fix PATH for packaged Electron apps on macOS/Linux. diff --git a/apps/x/apps/main/src/oauth-handler.ts b/apps/x/apps/main/src/oauth-handler.ts index d3caba38..f61b59cc 100644 --- a/apps/x/apps/main/src/oauth-handler.ts +++ b/apps/x/apps/main/src/oauth-handler.ts @@ -13,6 +13,9 @@ import { triggerSync as triggerFirefliesSync } from '@x/core/dist/knowledge/sync import { emitOAuthEvent } from './ipc.js'; import { getBillingInfo } from '@x/core/dist/billing/billing.js'; import { capture as analyticsCapture, identify as analyticsIdentify, reset as analyticsReset } from '@x/core/dist/analytics/posthog.js'; +import { isSignedIn } from '@x/core/dist/account/account.js'; +import { getWebappUrl } from '@x/core/dist/config/remote-config.js'; +import { claimTokensViaBackend } from '@x/core/dist/auth/google-backend-oauth.js'; const REDIRECT_URI = 'http://localhost:8080/oauth/callback'; @@ -201,6 +204,23 @@ export async function connectProvider(provider: string, credentials?: { clientId if (provider === 'google') { if (!credentials?.clientId || !credentials?.clientSecret) { + // No credentials → rowboat mode if the user is signed in to Rowboat + // (we use the company-owned Google client via the api + webapp). + // Otherwise it's BYOK with missing creds → error. + if (await isSignedIn()) { + try { + const webappUrl = await getWebappUrl(); + await shell.openExternal(`${webappUrl}/oauth/google/start`); + console.log('[OAuth] Started rowboat-mode Google connect (browser opened to webapp)'); + return { success: true }; + } catch (error) { + console.error('[OAuth] Failed to start rowboat-mode Google connect:', error); + return { + success: false, + error: error instanceof Error ? error.message : 'Failed to open browser', + }; + } + } return { success: false, error: 'Google client ID and client secret are required to connect.' }; } } @@ -257,11 +277,15 @@ export async function connectProvider(provider: string, credentials?: { clientId state ); - // Save tokens and credentials + // Save tokens and credentials. For Google, BYOK is the only path + // that reaches this token exchange (rowboat path returns above + // before any local server runs); stamp mode: 'byok' so a future + // refresh / reconnect can't get confused with a rowboat entry. console.log(`[OAuth] Token exchange successful for ${provider}`); await oauthRepo.upsert(provider, { tokens, ...(credentials ? { clientId: credentials.clientId, clientSecret: credentials.clientSecret } : {}), + ...(provider === 'google' ? { mode: 'byok' as const } : {}), error: null, }); @@ -358,12 +382,65 @@ export async function connectProvider(provider: string, credentials?: { clientId } } +/** + * Complete a rowboat-mode Google connect: claim the tokens parked under + * `state` by the webapp callback, persist them locally, and trigger sync. + * + * Called by the deep-link dispatcher (deeplink.ts) when the OS hands us a + * rowboat://oauth/google/done?session= URL. + */ +export async function completeRowboatGoogleConnect(state: string): Promise { + try { + console.log('[OAuth] Claiming rowboat-mode Google tokens...'); + const tokens = await claimTokensViaBackend(state); + const oauthRepo = getOAuthRepo(); + await oauthRepo.upsert('google', { + tokens, + mode: 'rowboat', + // Explicitly null these — no client_id/secret on the desktop in this mode. + clientId: null, + clientSecret: null, + error: null, + }); + triggerGmailSync(); + triggerCalendarSync(); + emitOAuthEvent({ provider: 'google', success: true }); + console.log('[OAuth] Rowboat-mode Google connect complete'); + } catch (error) { + console.error('[OAuth] Failed to complete rowboat-mode Google connect:', error); + emitOAuthEvent({ + provider: 'google', + success: false, + error: error instanceof Error ? error.message : 'Failed to claim Google tokens', + }); + } +} + /** * Disconnect a provider (clear tokens) */ export async function disconnectProvider(provider: string): Promise<{ success: boolean }> { try { const oauthRepo = getOAuthRepo(); + + // For rowboat-mode Google, best-effort revoke at Google before clearing + // local state. Google's revoke endpoint accepts an unauthenticated POST + // with the access_token; failure is logged but doesn't block disconnect. + if (provider === 'google') { + const connection = await oauthRepo.read(provider); + if (connection.mode === 'rowboat' && connection.tokens?.access_token) { + try { + const revokeUrl = `https://oauth2.googleapis.com/revoke?token=${encodeURIComponent(connection.tokens.access_token)}`; + const res = await fetch(revokeUrl, { method: 'POST' }); + if (!res.ok) { + console.warn(`[OAuth] Google revoke returned ${res.status}; continuing with local disconnect`); + } + } catch (error) { + console.warn('[OAuth] Google revoke failed; continuing with local disconnect:', error); + } + } + } + await oauthRepo.delete(provider); if (provider === 'rowboat') { analyticsCapture('user_signed_out'); diff --git a/apps/x/apps/renderer/src/App.tsx b/apps/x/apps/renderer/src/App.tsx index d0ed5284..07f91e0b 100644 --- a/apps/x/apps/renderer/src/App.tsx +++ b/apps/x/apps/renderer/src/App.tsx @@ -56,6 +56,7 @@ import { stripKnowledgePrefix, toKnowledgePath, wikiLabel } from '@/lib/wiki-lin import { splitFrontmatter, joinFrontmatter } from '@/lib/frontmatter' import { extractConferenceLink } from '@/lib/calendar-event' import { OnboardingModal } from '@/components/onboarding' +import { ComposioGoogleMigrationModal } from '@/components/composio-google-migration-modal' import { CommandPalette, type CommandPaletteContext, type CommandPaletteMention } from '@/components/search-dialog' import { TrackModal } from '@/components/track-modal' import { BackgroundTaskDetail } from '@/components/background-task-detail' @@ -780,6 +781,30 @@ function App() { return cleanup }, [refreshVoiceAvailability]) + // One-time Composio→native Google migration check. Runs on mount and again + // after the user signs in to Rowboat (so we catch users who weren't signed + // in at startup). The IPC is idempotent — once `dismissed_at` is set on the + // main side, every subsequent call returns `{shouldShow: false}`. + useEffect(() => { + const run = async () => { + try { + const result = await window.ipc.invoke('migration:check-composio-google', null) + if (result.shouldShow) { + setShowComposioGoogleMigration(true) + } + } catch (error) { + console.error('[migration] check-composio-google failed:', error) + } + } + void run() + const cleanup = window.ipc.on('oauth:didConnect', (event) => { + if (event.provider === 'rowboat' && event.success) { + void run() + } + }) + return cleanup + }, []) + const handleStartRecording = useCallback(() => { setIsRecording(true) isRecordingRef.current = true @@ -1033,6 +1058,9 @@ function App() { // Onboarding state const [showOnboarding, setShowOnboarding] = useState(false) + // One-time Composio→native Google migration modal + const [showComposioGoogleMigration, setShowComposioGoogleMigration] = useState(false) + // Search state const [isSearchOpen, setIsSearchOpen] = useState(false) @@ -4904,6 +4932,17 @@ function App() { open={showOnboarding} onComplete={handleOnboardingComplete} /> + { + // Trigger the rowboat-mode Google connect flow. With no credentials + // and the user signed in to Rowboat, the main process opens the + // webapp `/oauth/google/start` URL. The deep link returns and + // completeRowboatGoogleConnect persists the tokens. + void window.ipc.invoke('oauth:connect', { provider: 'google' }) + }} + /> diff --git a/apps/x/apps/renderer/src/components/composio-google-migration-modal.tsx b/apps/x/apps/renderer/src/components/composio-google-migration-modal.tsx new file mode 100644 index 00000000..8afea839 --- /dev/null +++ b/apps/x/apps/renderer/src/components/composio-google-migration-modal.tsx @@ -0,0 +1,81 @@ +"use client" + +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog" +import { Button } from "@/components/ui/button" + +interface ComposioGoogleMigrationModalProps { + open: boolean + onOpenChange: (open: boolean) => void + onReconnect: () => void +} + +/** + * One-time modal shown to signed-in users who had Gmail/Calendar connected + * via Composio before the native rowboat-mode OAuth flow shipped. By the + * time this opens, the Composio Google accounts have already been + * disconnected (fire-and-forget, on the qualification IPC) — the modal + * just explains what happened and offers a one-click reconnect. + * + * Both buttons close the modal. The qualification IPC marks the migration + * as dismissed before showing this, so neither button needs a follow-up + * IPC of its own. + */ +export function ComposioGoogleMigrationModal({ + open, + onOpenChange, + onReconnect, +}: ComposioGoogleMigrationModalProps) { + return ( + + +
+ + + Reconnect Google to keep syncing + + +
+

+ Rowboat used to sync your Gmail and Calendar through{" "} + Composio, a + third-party connector. We've now built a direct connection to + Google — it's faster, more private, and doesn't rely on a + middleman. +

+

+ We've disconnected the Composio connection. Reconnect Google + directly to resume syncing — your existing emails and calendar + events stay exactly where they are. +

+
+
+
+
+
+ + +
+
+
+ ) +} diff --git a/apps/x/apps/renderer/src/components/onboarding-modal.tsx b/apps/x/apps/renderer/src/components/onboarding-modal.tsx index 469ac35d..33c89231 100644 --- a/apps/x/apps/renderer/src/components/onboarding-modal.tsx +++ b/apps/x/apps/renderer/src/components/onboarding-modal.tsx @@ -96,14 +96,14 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const [slackDiscovering, setSlackDiscovering] = useState(false) const [slackDiscoverError, setSlackDiscoverError] = useState(null) - // Composio/Gmail state - const [useComposioForGoogle, setUseComposioForGoogle] = useState(false) + // Composio Gmail/Calendar sync was removed — flags are seeded false and + // never flipped. Kept here so legacy gating expressions still type-check. + const [useComposioForGoogle] = useState(false) const [gmailConnected, setGmailConnected] = useState(false) const [gmailLoading, setGmailLoading] = useState(true) const [gmailConnecting, setGmailConnecting] = useState(false) - // Composio/Google Calendar state - const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false) + const [useComposioForGoogleCalendar] = useState(false) const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false) const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true) const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false) @@ -151,25 +151,8 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { setProvidersLoading(false) } } - async function loadComposioForGoogleFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google', null) - setUseComposioForGoogle(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google flag:', error) - } - } - async function loadComposioForGoogleCalendarFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null) - setUseComposioForGoogleCalendar(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google-calendar flag:', error) - } - } + // (Composio Gmail/Calendar flag fetches removed — sync was deleted.) loadProviders() - loadComposioForGoogleFlag() - loadComposioForGoogleCalendarFlag() }, [open]) // Load LLM models catalog on open @@ -622,12 +605,20 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Connect to a provider const handleConnect = useCallback(async (provider: string) => { if (provider === 'google') { + // Signed-in users use the rowboat (managed-credentials) flow: opens + // the webapp in the browser, no BYOK modal. Falls back to BYOK modal + // for not-signed-in users. (Mirrors useConnectors.handleConnect.) + const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false + if (isSignedIntoRowboat) { + await startConnect('google') + return + } setGoogleClientIdOpen(true) return } await startConnect(provider) - }, [startConnect]) + }, [startConnect, providerStates]) const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => { setGoogleCredentials(clientId, clientSecret) diff --git a/apps/x/apps/renderer/src/components/onboarding/use-onboarding-state.ts b/apps/x/apps/renderer/src/components/onboarding/use-onboarding-state.ts index edb3616b..b06ec862 100644 --- a/apps/x/apps/renderer/src/components/onboarding/use-onboarding-state.ts +++ b/apps/x/apps/renderer/src/components/onboarding/use-onboarding-state.ts @@ -66,16 +66,16 @@ export function useOnboardingState(open: boolean, onComplete: () => void) { // Inline upsell callout dismissed const [upsellDismissed, setUpsellDismissed] = useState(false) - // Composio/Gmail state (used when signed in with Rowboat account) - const [useComposioForGoogle, setUseComposioForGoogle] = useState(false) + // Composio Gmail/Calendar sync was removed — flags are seeded false and + // never flipped. Kept here so legacy gating expressions still type-check. + const [useComposioForGoogle] = useState(false) const [gmailConnected, setGmailConnected] = useState(false) const [gmailLoading, setGmailLoading] = useState(true) const [gmailConnecting, setGmailConnecting] = useState(false) const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'slack' | 'gmail'>('gmail') - // Composio/Google Calendar state - const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false) + const [useComposioForGoogleCalendar] = useState(false) const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false) const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true) const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false) @@ -123,25 +123,8 @@ export function useOnboardingState(open: boolean, onComplete: () => void) { setProvidersLoading(false) } } - async function loadComposioForGoogleFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google', null) - setUseComposioForGoogle(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google flag:', error) - } - } - async function loadComposioForGoogleCalendarFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null) - setUseComposioForGoogleCalendar(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google-calendar flag:', error) - } - } + // (Composio Gmail/Calendar flag fetches removed — sync was deleted; flags stay false.) loadProviders() - loadComposioForGoogleFlag() - loadComposioForGoogleCalendarFlag() }, [open]) // Load LLM models catalog on open @@ -539,17 +522,7 @@ export function useOnboardingState(open: boolean, onComplete: () => void) { const cleanup = window.ipc.on('oauth:didConnect', async (event) => { if (event.provider === 'rowboat' && event.success) { - // Re-check composio flags now that the account is connected - try { - const [googleResult, calendarResult] = await Promise.all([ - window.ipc.invoke('composio:use-composio-for-google', null), - window.ipc.invoke('composio:use-composio-for-google-calendar', null), - ]) - setUseComposioForGoogle(googleResult.enabled) - setUseComposioForGoogleCalendar(calendarResult.enabled) - } catch (error) { - console.error('Failed to re-check composio flags:', error) - } + // (Composio Gmail/Calendar flag re-check removed — sync was deleted.) setCurrentStep(2) // Go to Connect Accounts } }) @@ -609,12 +582,20 @@ export function useOnboardingState(open: boolean, onComplete: () => void) { // Connect to a provider const handleConnect = useCallback(async (provider: string) => { if (provider === 'google') { + // Signed-in users use the rowboat (managed-credentials) flow: opens + // the webapp in the browser, no BYOK modal. Falls back to BYOK modal + // for not-signed-in users. (Mirrors useConnectors.handleConnect.) + const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false + if (isSignedIntoRowboat) { + await startConnect('google') + return + } setGoogleClientIdOpen(true) return } await startConnect(provider) - }, [startConnect]) + }, [startConnect, providerStates]) const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => { setGoogleCredentials(clientId, clientSecret) diff --git a/apps/x/apps/renderer/src/hooks/useConnectors.ts b/apps/x/apps/renderer/src/hooks/useConnectors.ts index 7285fe04..af56b921 100644 --- a/apps/x/apps/renderer/src/hooks/useConnectors.ts +++ b/apps/x/apps/renderer/src/hooks/useConnectors.ts @@ -38,16 +38,21 @@ export function useConnectors(active: boolean) { const [slackDiscovering, setSlackDiscovering] = useState(false) const [slackDiscoverError, setSlackDiscoverError] = useState(null) - // Composio/Gmail state - const [useComposioForGoogle, setUseComposioForGoogle] = useState(false) + // Composio Gmail/Calendar sync was removed. These flags are seeded false + // and never flipped — the IPC that used to set them is gone. The setters + // remain so the legacy Composio-Gmail handlers below still type-check, + // but those handlers are no longer reachable in the UI (the gating + // condition `useComposioForGoogle` stays false). + // TODO follow-up: drop these flags entirely and prune the dead UI branches + // in connectors-popover, connected-accounts-settings, and onboarding-modal. + const [useComposioForGoogle] = useState(false) const [gmailConnected, setGmailConnected] = useState(false) - const [gmailLoading, setGmailLoading] = useState(true) + const [gmailLoading, setGmailLoading] = useState(false) const [gmailConnecting, setGmailConnecting] = useState(false) - // Composio/Google Calendar state - const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false) + const [useComposioForGoogleCalendar] = useState(false) const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false) - const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true) + const [googleCalendarLoading, setGoogleCalendarLoading] = useState(false) const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false) // Load available providers on mount @@ -67,28 +72,7 @@ export function useConnectors(active: boolean) { loadProviders() }, []) - // Re-check composio-for-google flags when active - useEffect(() => { - if (!active) return - async function loadComposioForGoogleFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google', null) - setUseComposioForGoogle(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google flag:', error) - } - } - async function loadComposioForGoogleCalendarFlag() { - try { - const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null) - setUseComposioForGoogleCalendar(result.enabled) - } catch (error) { - console.error('Failed to check composio-for-google-calendar flag:', error) - } - } - loadComposioForGoogleFlag() - loadComposioForGoogleCalendarFlag() - }, [active]) + // (Composio Gmail/Calendar flag-check effect removed — flags are constant false now.) // Load Granola config const refreshGranolaConfig = useCallback(async () => { @@ -346,13 +330,22 @@ export function useConnectors(active: boolean) { const handleConnect = useCallback(async (provider: string) => { if (provider === 'google') { + // Signed-in users use the rowboat (managed-credentials) flow: opens + // the webapp in the browser, no BYOK modal. Main process detects + // signed-in via isSignedIn() when oauth:connect arrives without creds. + // Falls back to the BYOK modal for not-signed-in users. + const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false + if (isSignedIntoRowboat) { + await startConnect('google') + return + } setGoogleClientIdDescription(undefined) setGoogleClientIdOpen(true) return } await startConnect(provider) - }, [startConnect]) + }, [startConnect, providerStates]) const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => { setGoogleCredentials(clientId, clientSecret) @@ -485,19 +478,6 @@ export function useConnectors(active: boolean) { toast.success(`Connected to ${displayName}`) } - if (provider === 'rowboat') { - try { - const [googleResult, calendarResult] = await Promise.all([ - window.ipc.invoke('composio:use-composio-for-google', null), - window.ipc.invoke('composio:use-composio-for-google-calendar', null), - ]) - setUseComposioForGoogle(googleResult.enabled) - setUseComposioForGoogleCalendar(calendarResult.enabled) - } catch (err) { - console.error('Failed to re-check composio flags:', err) - } - } - refreshAllStatuses() } }) diff --git a/apps/x/packages/core/src/auth/google-backend-oauth.ts b/apps/x/packages/core/src/auth/google-backend-oauth.ts new file mode 100644 index 00000000..a441d205 --- /dev/null +++ b/apps/x/packages/core/src/auth/google-backend-oauth.ts @@ -0,0 +1,113 @@ +import { API_URL } from "../config/env.js"; +import { getAccessToken } from "./tokens.js"; +import { OAuthTokens } from "./types.js"; + +/** + * Client for the rowboat-mode Google OAuth endpoints on the api: + * POST /v1/google-oauth/claim — one-shot retrieval of tokens parked by + * the webapp callback under a `state` ticket + * POST /v1/google-oauth/refresh — exchange a refresh_token for fresh tokens + * (the secret-requiring step that can't + * happen on the desktop) + * + * Both are called with the user's Rowboat Supabase bearer (via getAccessToken). + * + * The api response shape uses `scope: string` (space-delimited); we convert + * to the desktop's `scopes: string[]`. On refresh, api may omit `scope` and + * `refresh_token` — caller-provided existingScopes / refreshToken are + * preserved in those cases (Google rarely rotates refresh tokens). + */ + +/** Thrown when the api signals the user must reconnect (Google `invalid_grant`). */ +export class ReconnectRequiredError extends Error { + constructor(message: string) { + super(message); + this.name = "ReconnectRequiredError"; + } +} + +interface ApiTokenResponse { + access_token: string; + refresh_token?: string; + expires_at: number; + scope?: string; + token_type?: string; +} + +function toOAuthTokens( + body: ApiTokenResponse, + fallbackRefreshToken: string | null = null, + fallbackScopes?: string[], +): OAuthTokens { + const refresh_token = body.refresh_token ?? fallbackRefreshToken; + const scopes = body.scope + ? body.scope.split(" ").filter((s) => s.length > 0) + : fallbackScopes; + return { + access_token: body.access_token, + refresh_token, + expires_at: body.expires_at, + token_type: "Bearer", + scopes, + }; +} + +async function postWithBearer(path: string, body: unknown): Promise { + const bearer = await getAccessToken(); + return fetch(`${API_URL}${path}`, { + method: "POST", + headers: { + "content-type": "application/json", + authorization: `Bearer ${bearer}`, + }, + body: JSON.stringify(body), + }); +} + +interface ErrorBody { + error?: string; + reconnectRequired?: boolean; +} + +async function readError(res: Response): Promise { + try { + return (await res.json()) as ErrorBody; + } catch { + return {}; + } +} + +/** Claim the tokens parked under `state` after the webapp finished its callback. */ +export async function claimTokensViaBackend(state: string): Promise { + const res = await postWithBearer("/v1/google-oauth/claim", { session: state }); + if (!res.ok) { + const err = await readError(res); + throw new Error(`claim failed: ${res.status} ${err.error ?? ""}`.trim()); + } + const body = (await res.json()) as ApiTokenResponse; + return toOAuthTokens(body); +} + +/** + * Refresh an access token via the api. Preserves caller's `refreshToken` and + * `existingScopes` when Google omits them on the refresh response. + */ +export async function refreshTokensViaBackend( + refreshToken: string, + existingScopes?: string[], +): Promise { + const res = await postWithBearer("/v1/google-oauth/refresh", { refreshToken }); + if (res.status === 409) { + const err = await readError(res); + if (err.reconnectRequired) { + throw new ReconnectRequiredError(err.error ?? "Reconnect required"); + } + throw new Error(`refresh failed: 409 ${err.error ?? ""}`.trim()); + } + if (!res.ok) { + const err = await readError(res); + throw new Error(`refresh failed: ${res.status} ${err.error ?? ""}`.trim()); + } + const body = (await res.json()) as ApiTokenResponse; + return toOAuthTokens(body, refreshToken, existingScopes); +} diff --git a/apps/x/packages/core/src/auth/repo.ts b/apps/x/packages/core/src/auth/repo.ts index 5276faea..08f6b56d 100644 --- a/apps/x/packages/core/src/auth/repo.ts +++ b/apps/x/packages/core/src/auth/repo.ts @@ -8,6 +8,13 @@ const ProviderConnectionSchema = z.object({ tokens: OAuthTokens.nullable().optional(), clientId: z.string().nullable().optional(), clientSecret: z.string().nullable().optional(), + /** + * `byok` (default for absent) — user provides their own client_id+secret; + * tokens stored locally; refresh handled locally via openid-client. + * `rowboat` — signed-in user; client_id+secret never on the desktop; + * tokens stored locally but refresh goes through the api. + */ + mode: z.enum(['byok', 'rowboat']).optional(), error: z.string().nullable().optional(), }); diff --git a/apps/x/packages/core/src/composio/client.ts b/apps/x/packages/core/src/composio/client.ts index 2844fc28..8080d923 100644 --- a/apps/x/packages/core/src/composio/client.ts +++ b/apps/x/packages/core/src/composio/client.ts @@ -49,8 +49,6 @@ async function getAuthHeaders(): Promise> { */ const ZComposioConfig = z.object({ apiKey: z.string().optional(), - use_composio_for_google: z.boolean().optional(), - use_composio_for_google_calendar: z.boolean().optional(), }); type ComposioConfig = z.infer; @@ -106,24 +104,6 @@ export async function isConfigured(): Promise { return !!getApiKey(); } -/** - * Check if Composio should be used for Google services (Gmail, etc.) - */ -export async function useComposioForGoogle(): Promise { - if (await isSignedIn()) return true; - const config = loadConfig(); - return config.use_composio_for_google === true; -} - -/** - * Check if Composio should be used for Google Calendar - */ -export async function useComposioForGoogleCalendar(): Promise { - if (await isSignedIn()) return true; - const config = loadConfig(); - return config.use_composio_for_google_calendar === true; -} - /** * Make an API call to Composio */ diff --git a/apps/x/packages/core/src/config/remote-config.ts b/apps/x/packages/core/src/config/remote-config.ts new file mode 100644 index 00000000..87174ef7 --- /dev/null +++ b/apps/x/packages/core/src/config/remote-config.ts @@ -0,0 +1,51 @@ +import { API_URL } from "./env.js"; + +/** + * Per-process cache of the unauthenticated `GET /v1/config` response from + * the api. The api returns `{ appUrl, supabaseUrl, websocketApiUrl }` — + * we use this to discover the webapp host (where the rowboat-mode OAuth + * flow runs) without hardcoding it on the desktop side. + * + * Cached as a Promise so concurrent first-callers all await the same fetch + * (no thundering herd). On failure the cache is cleared so the next call + * can retry. + */ + +interface RemoteConfig { + appUrl: string; + supabaseUrl: string; + websocketApiUrl: string; +} + +let _cached: Promise | null = null; + +async function fetchRemoteConfig(): Promise { + const res = await fetch(`${API_URL}/v1/config`); + if (!res.ok) { + throw new Error(`/v1/config returned ${res.status}`); + } + const body = (await res.json()) as Partial; + if (!body.appUrl) { + throw new Error("/v1/config response missing appUrl"); + } + return { + appUrl: body.appUrl, + supabaseUrl: body.supabaseUrl ?? "", + websocketApiUrl: body.websocketApiUrl ?? "", + }; +} + +export async function getRemoteConfig(): Promise { + if (!_cached) { + _cached = fetchRemoteConfig().catch((err) => { + _cached = null; // allow retry + throw err; + }); + } + return _cached; +} + +export async function getWebappUrl(): Promise { + const config = await getRemoteConfig(); + return config.appUrl; +} diff --git a/apps/x/packages/core/src/knowledge/agent_notes.ts b/apps/x/packages/core/src/knowledge/agent_notes.ts index 471bfecd..301c10a6 100644 --- a/apps/x/packages/core/src/knowledge/agent_notes.ts +++ b/apps/x/packages/core/src/knowledge/agent_notes.ts @@ -8,8 +8,6 @@ import { waitForRunCompletion } from '../agents/utils.js'; import { serviceLogger } from '../services/service_logger.js'; import { loadUserConfig, updateUserEmail } from '../config/user_config.js'; import { GoogleClientFactory } from './google-client-factory.js'; -import { useComposioForGoogle, executeAction } from '../composio/client.js'; -import { composioAccountsRepo } from '../composio/repo.js'; import { loadAgentNotesState, saveAgentNotesState, @@ -199,30 +197,7 @@ async function ensureUserEmail(): Promise { return existing.email; } - // Try Composio (used when signed in or composio configured) - try { - if (await useComposioForGoogle()) { - const account = composioAccountsRepo.getAccount('gmail'); - if (account && account.status === 'ACTIVE') { - const result = await executeAction('GMAIL_GET_PROFILE', { - connected_account_id: account.id, - user_id: 'rowboat-user', - version: 'latest', - arguments: { user_id: 'me' }, - }); - const email = (result.data as Record)?.emailAddress as string | undefined; - if (email) { - updateUserEmail(email); - console.log(`[AgentNotes] Auto-populated user email via Composio: ${email}`); - return email; - } - } - } - } catch (error) { - console.log('[AgentNotes] Could not fetch email via Composio:', error instanceof Error ? error.message : error); - } - - // Try direct Google OAuth + // Try direct Google OAuth (covers both BYOK and rowboat modes) try { const auth = await GoogleClientFactory.getClient(); if (auth) { diff --git a/apps/x/packages/core/src/knowledge/google-client-factory.ts b/apps/x/packages/core/src/knowledge/google-client-factory.ts index 9e0ad2d1..0c48ae37 100644 --- a/apps/x/packages/core/src/knowledge/google-client-factory.ts +++ b/apps/x/packages/core/src/knowledge/google-client-factory.ts @@ -6,20 +6,44 @@ import { getProviderConfig } from '../auth/providers.js'; import * as oauthClient from '../auth/oauth-client.js'; import type { Configuration } from '../auth/oauth-client.js'; import { OAuthTokens } from '../auth/types.js'; +import { + ReconnectRequiredError, + refreshTokensViaBackend, +} from '../auth/google-backend-oauth.js'; + +type Mode = 'byok' | 'rowboat'; /** * Factory for creating and managing Google OAuth2Client instances. * Handles caching, token refresh, and client reuse for Google API SDKs. + * + * Two connection modes share the same `oauth.json` provider entry: + * - `byok` user supplied client_id+secret; refresh runs locally via + * openid-client; OAuth2Client built with creds. + * - `rowboat` signed-in user; client_id+secret never on the desktop; + * refresh goes through the api at /v1/google-oauth/refresh; + * OAuth2Client built without creds and without refresh_token + * (we own all refreshes — see note below). + * + * **Auto-refresh disabled in rowboat mode:** google-auth-library's + * OAuth2Client will, on a 401 from a Google API call, try to refresh using + * the refresh_token + client secret it has on hand. In rowboat mode we have + * no secret, so that would 401-loop. We block this by passing only + * access_token + expiry_date in setCredentials (no refresh_token), which + * leaves the library nothing to refresh with. Our proactive expiry check + * in getClient() is the only refresh path. */ export class GoogleClientFactory { private static readonly PROVIDER_NAME = 'google'; private static cache: { + mode: Mode | null; config: Configuration | null; client: OAuth2Client | null; tokens: OAuthTokens | null; clientId: string | null; clientSecret: string | null; } = { + mode: null, config: null, client: null, tokens: null, @@ -27,7 +51,14 @@ export class GoogleClientFactory { clientSecret: null, }; - private static async resolveCredentials(): Promise<{ clientId: string; clientSecret?: string }> { + /** + * Promise singleton so a burst of getClient() calls during the brief + * expiry window all wait on a single refresh round-trip rather than + * fanning out parallel refreshes. + */ + private static refreshInFlight: Promise | null = null; + + private static async resolveByokCredentials(): Promise<{ clientId: string; clientSecret?: string }> { const oauthRepo = container.resolve('oauthRepo'); const connection = await oauthRepo.read(this.PROVIDER_NAME); if (!connection.clientId) { @@ -41,80 +72,116 @@ export class GoogleClientFactory { * Get or create OAuth2Client, reusing cached instance when possible */ static async getClient(): Promise { + if (this.refreshInFlight) { + return this.refreshInFlight; + } + const oauthRepo = container.resolve('oauthRepo'); - const { tokens } = await oauthRepo.read(this.PROVIDER_NAME); + const connection = await oauthRepo.read(this.PROVIDER_NAME); + const tokens = connection.tokens ?? null; + const mode: Mode = connection.mode ?? 'byok'; if (!tokens) { this.clearCache(); return null; } - // Initialize config cache if needed - try { - await this.initializeConfigCache(); - } catch (error) { - console.error("[OAuth] Failed to initialize Google OAuth configuration:", error); + // Mode flipped (e.g. user disconnected then reconnected differently) — invalidate. + if (this.cache.mode && this.cache.mode !== mode) { this.clearCache(); - return null; - } - if (!this.cache.config) { - return null; } - // Check if token is expired + // BYOK needs an openid-client Configuration for local refresh; rowboat doesn't. + if (mode === 'byok') { + try { + await this.initializeConfigCache(); + } catch (error) { + console.error('[OAuth] Failed to initialize Google OAuth configuration:', error); + this.clearCache(); + return null; + } + if (!this.cache.config) { + return null; + } + } + + // Check expiry against the cached tokens. Note: oauthClient.isTokenExpired + // applies a small clock-skew margin so we refresh slightly before real + // expiry — keeps long-running calls from racing the boundary. if (oauthClient.isTokenExpired(tokens)) { - // Token expired, try to refresh if (!tokens.refresh_token) { - console.log("[OAuth] Token expired and no refresh token available for Google."); + console.log('[OAuth] Token expired and no refresh token available for Google.'); await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Missing refresh token. Please reconnect.' }); this.clearCache(); return null; } - try { - console.log(`[OAuth] Token expired, refreshing access token...`); - const existingScopes = tokens.scopes; - const refreshedTokens = await oauthClient.refreshTokens( - this.cache.config, - tokens.refresh_token, - existingScopes - ); - await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens }); - - // Update cached tokens and recreate client - this.cache.tokens = refreshedTokens; - if (!this.cache.clientId) { - const creds = await this.resolveCredentials(); - this.cache.clientId = creds.clientId; - this.cache.clientSecret = creds.clientSecret ?? null; - } - this.cache.client = this.createClientFromTokens(refreshedTokens, this.cache.clientId, this.cache.clientSecret ?? undefined); - console.log(`[OAuth] Token refreshed successfully`); - return this.cache.client; - } catch (error) { - const message = error instanceof Error ? error.message : 'Failed to refresh token for Google'; - await oauthRepo.upsert(this.PROVIDER_NAME, { error: message }); - console.error("[OAuth] Failed to refresh token for Google:", error); - this.clearCache(); - return null; - } + this.refreshInFlight = this.refreshAndBuild(tokens, mode).finally(() => { + this.refreshInFlight = null; + }); + return this.refreshInFlight; } // Reuse client if tokens haven't changed - if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token) { + if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token && this.cache.mode === mode) { return this.cache.client; } - // Create new client with current tokens - console.log(`[OAuth] Creating new OAuth2Client instance`); - this.cache.tokens = tokens; - if (!this.cache.clientId) { - const creds = await this.resolveCredentials(); + // Build a fresh client for current tokens + return this.buildAndCacheClient(tokens, mode); + } + + private static async refreshAndBuild(tokens: OAuthTokens, mode: Mode): Promise { + const oauthRepo = container.resolve('oauthRepo'); + + try { + console.log(`[OAuth] Token expired, refreshing via ${mode}...`); + const existingScopes = tokens.scopes; + + let refreshedTokens: OAuthTokens; + if (mode === 'rowboat') { + refreshedTokens = await refreshTokensViaBackend(tokens.refresh_token!, existingScopes); + } else { + if (!this.cache.config) { + // Should not happen — initializeConfigCache ran above for byok. + throw new Error('Google OAuth config not initialized'); + } + refreshedTokens = await oauthClient.refreshTokens(this.cache.config, tokens.refresh_token!, existingScopes); + } + + await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens, error: null }); + console.log('[OAuth] Token refreshed successfully'); + return this.buildAndCacheClient(refreshedTokens, mode); + } catch (error) { + if (error instanceof ReconnectRequiredError) { + console.log('[OAuth] Reconnect required for Google'); + await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Reconnect Google' }); + this.clearCache(); + return null; + } + const message = error instanceof Error ? error.message : 'Failed to refresh token for Google'; + await oauthRepo.upsert(this.PROVIDER_NAME, { error: message }); + console.error('[OAuth] Failed to refresh token for Google:', error); + this.clearCache(); + return null; + } + } + + private static async buildAndCacheClient(tokens: OAuthTokens, mode: Mode): Promise { + if (mode === 'byok' && !this.cache.clientId) { + const creds = await this.resolveByokCredentials(); this.cache.clientId = creds.clientId; this.cache.clientSecret = creds.clientSecret ?? null; } - this.cache.client = this.createClientFromTokens(tokens, this.cache.clientId, this.cache.clientSecret ?? undefined); - return this.cache.client; + + const client = mode === 'rowboat' + ? this.createRowboatClient(tokens) + : this.createByokClient(tokens, this.cache.clientId!, this.cache.clientSecret ?? undefined); + + this.cache.mode = mode; + this.cache.tokens = tokens; + this.cache.client = client; + return client; } /** @@ -139,7 +206,8 @@ export class GoogleClientFactory { * Clear cache (useful for testing or when credentials are revoked) */ static clearCache(): void { - console.log(`[OAuth] Clearing Google auth cache`); + console.log('[OAuth] Clearing Google auth cache'); + this.cache.mode = null; this.cache.config = null; this.cache.client = null; this.cache.tokens = null; @@ -148,10 +216,10 @@ export class GoogleClientFactory { } /** - * Initialize cached configuration (called once) + * Initialize cached configuration for BYOK mode (rowboat doesn't need it). */ private static async initializeConfigCache(): Promise { - const { clientId, clientSecret } = await this.resolveCredentials(); + const { clientId, clientSecret } = await this.resolveByokCredentials(); if (this.cache.config && this.cache.clientId === clientId && this.cache.clientSecret === (clientSecret ?? null)) { return; // Already initialized for these credentials @@ -161,13 +229,13 @@ export class GoogleClientFactory { this.clearCache(); } - console.log(`[OAuth] Initializing Google OAuth configuration...`); + console.log('[OAuth] Initializing Google OAuth configuration...'); const providerConfig = await getProviderConfig(this.PROVIDER_NAME); if (providerConfig.discovery.mode === 'issuer') { if (providerConfig.client.mode === 'static') { // Discover endpoints, use static client ID - console.log(`[OAuth] Discovery mode: issuer with static client ID`); + console.log('[OAuth] Discovery mode: issuer with static client ID'); this.cache.config = await oauthClient.discoverConfiguration( providerConfig.discovery.issuer, clientId, @@ -175,7 +243,7 @@ export class GoogleClientFactory { ); } else { // DCR mode - need existing registration - console.log(`[OAuth] Discovery mode: issuer with DCR`); + console.log('[OAuth] Discovery mode: issuer with DCR'); const clientRepo = container.resolve('clientRegistrationRepo'); const existingRegistration = await clientRepo.getClientRegistration(this.PROVIDER_NAME); @@ -194,7 +262,7 @@ export class GoogleClientFactory { throw new Error('DCR requires discovery mode "issuer", not "static"'); } - console.log(`[OAuth] Using static endpoints (no discovery)`); + console.log('[OAuth] Using static endpoints (no discovery)'); this.cache.config = oauthClient.createStaticConfiguration( providerConfig.discovery.authorizationEndpoint, providerConfig.discovery.tokenEndpoint, @@ -206,27 +274,33 @@ export class GoogleClientFactory { this.cache.clientId = clientId; this.cache.clientSecret = clientSecret ?? null; - console.log(`[OAuth] Google OAuth configuration initialized`); + console.log('[OAuth] Google OAuth configuration initialized'); } - /** - * Create OAuth2Client from OAuthTokens - */ - private static createClientFromTokens(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client { - const client = new OAuth2Client( - clientId, - clientSecret ?? undefined, - undefined // redirect_uri not needed for token usage - ); - - // Set credentials + /** BYOK OAuth2Client — has client_id + secret + refresh_token. */ + private static createByokClient(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client { + const client = new OAuth2Client(clientId, clientSecret ?? undefined, undefined); client.setCredentials({ access_token: tokens.access_token, refresh_token: tokens.refresh_token || undefined, - expiry_date: tokens.expires_at * 1000, // Convert from seconds to milliseconds + expiry_date: tokens.expires_at * 1000, scope: tokens.scopes?.join(' ') || undefined, }); + return client; + } + /** + * Rowboat OAuth2Client — no client_id/secret, no refresh_token. + * Library auto-refresh is disabled by absence of refresh_token; our + * proactive refresh in getClient() is the only refresh path. + */ + private static createRowboatClient(tokens: OAuthTokens): OAuth2Client { + const client = new OAuth2Client(); + client.setCredentials({ + access_token: tokens.access_token, + expiry_date: tokens.expires_at * 1000, + scope: tokens.scopes?.join(' ') || undefined, + }); return client; } } diff --git a/apps/x/packages/core/src/knowledge/sync_calendar.ts b/apps/x/packages/core/src/knowledge/sync_calendar.ts index b6258975..b311dfa2 100644 --- a/apps/x/packages/core/src/knowledge/sync_calendar.ts +++ b/apps/x/packages/core/src/knowledge/sync_calendar.ts @@ -5,10 +5,8 @@ import { OAuth2Client } from 'google-auth-library'; import { NodeHtmlMarkdown } from 'node-html-markdown' import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; -import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; +import { serviceLogger } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; -import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js'; -import { composioAccountsRepo } from '../composio/repo.js'; import { createEvent } from './track/events.js'; const MAX_EVENTS_IN_DIGEST = 50; @@ -138,7 +136,6 @@ async function publishCalendarSyncEvent( const SYNC_DIR = path.join(WorkDir, 'calendar_sync'); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const LOOKBACK_DAYS = 7; -const COMPOSIO_LOOKBACK_DAYS = 7; const REQUIRED_SCOPES = [ 'https://www.googleapis.com/auth/calendar.events.readonly', 'https://www.googleapis.com/auth/drive.readonly' @@ -477,286 +474,17 @@ async function performSync(syncDir: string, lookbackDays: number) { } } -// --- Composio-based Sync --- - -interface ComposioCalendarState { - last_sync: string; // ISO string -} - -function loadComposioState(stateFile: string): ComposioCalendarState | null { - if (fs.existsSync(stateFile)) { - try { - const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); - if (data.last_sync) { - return { last_sync: data.last_sync }; - } - } catch (e) { - console.error('[Calendar] Failed to load composio state:', e); - } - } - return null; -} - -function saveComposioState(stateFile: string, lastSync: string): void { - fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2)); -} - -/** - * Save a Composio calendar event as JSON (same format used by Google OAuth path). - * The event data from Composio is already structured similarly to Google Calendar API. - */ -function saveComposioEvent(eventData: Record, syncDir: string): { changed: boolean; isNew: boolean; title: string } { - const eventId = eventData.id as string | undefined; - if (!eventId) return { changed: false, isNew: false, title: 'Unknown' }; - - const filePath = path.join(syncDir, `${eventId}.json`); - const content = JSON.stringify(eventData, null, 2); - const exists = fs.existsSync(filePath); - - try { - if (exists) { - const existing = fs.readFileSync(filePath, 'utf-8'); - if (existing === content) { - return { changed: false, isNew: false, title: (eventData.summary as string) || eventId }; - } - } - - fs.writeFileSync(filePath, content); - return { changed: true, isNew: !exists, title: (eventData.summary as string) || eventId }; - } catch (e) { - console.error(`[Calendar] Error saving event ${eventId}:`, e); - return { changed: false, isNew: false, title: (eventData.summary as string) || eventId }; - } -} - -async function performSyncComposio() { - const STATE_FILE = path.join(SYNC_DIR, 'composio_state.json'); - - if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); - - const account = composioAccountsRepo.getAccount('googlecalendar'); - if (!account || account.status !== 'ACTIVE') { - console.log('[Calendar] Google Calendar not connected via Composio. Skipping sync.'); - return; - } - - const connectedAccountId = account.id; - - // Calculate time window: lookback + 14 days forward - const now = new Date(); - const lookbackMs = COMPOSIO_LOOKBACK_DAYS * 24 * 60 * 60 * 1000; - const twoWeeksForwardMs = 14 * 24 * 60 * 60 * 1000; - - const timeMin = new Date(now.getTime() - lookbackMs).toISOString(); - const timeMax = new Date(now.getTime() + twoWeeksForwardMs).toISOString(); - - console.log(`[Calendar] Syncing via Composio from ${timeMin} to ${timeMax} (lookback: ${COMPOSIO_LOOKBACK_DAYS} days)...`); - - let run: ServiceRunContext | null = null; - const ensureRun = async (): Promise => { - if (!run) { - run = await serviceLogger.startRun({ - service: 'calendar', - message: 'Syncing calendar (Composio)', - trigger: 'timer', - }); - } - return run; - }; - - try { - const currentEventIds = new Set(); - let newCount = 0; - let updatedCount = 0; - const changedTitles: string[] = []; - const newEvents: AnyEvent[] = []; - const updatedEvents: AnyEvent[] = []; - let pageToken: string | null = null; - const MAX_PAGES = 20; - - for (let page = 0; page < MAX_PAGES; page++) { - // Re-check connection in case user disconnected mid-sync - if (!composioAccountsRepo.isConnected('googlecalendar')) { - console.log('[Calendar] Account disconnected during sync. Stopping.'); - return; - } - - const args: Record = { - calendar_id: 'primary', - time_min: timeMin, - time_max: timeMax, - single_events: true, - order_by: 'startTime', - }; - if (pageToken) { - args.page_token = pageToken; - } - - const result = await executeAction( - 'GOOGLECALENDAR_FIND_EVENT', - { - connected_account_id: connectedAccountId, - user_id: 'rowboat-user', - version: 'latest', - arguments: args, - } - ); - - if (!result.successful || !result.data) { - console.error('[Calendar] Failed to list events via Composio:', result.error); - return; - } - - const data = result.data as Record; - // Composio may return events in different structures - let events: Array> = []; - - if (Array.isArray(data.items)) { - events = data.items as Array>; - } else if (Array.isArray(data.events)) { - events = data.events as Array>; - } else if (data.event_data && typeof data.event_data === 'object') { - const nested = data.event_data as Record; - if (Array.isArray(nested.event_data)) { - events = nested.event_data as Array>; - } else if (Array.isArray(data.event_data)) { - events = data.event_data as Array>; - } - } else if (Array.isArray(data)) { - events = data as unknown as Array>; - } - - if (events.length === 0 && page === 0) { - console.log('[Calendar] No events found in this window.'); - } else if (events.length > 0) { - console.log(`[Calendar] Page ${page + 1}: found ${events.length} events.`); - for (const event of events) { - const eventId = event.id as string | undefined; - if (eventId) { - const saveResult = saveComposioEvent(event, SYNC_DIR); - currentEventIds.add(eventId); - - if (saveResult.changed) { - await ensureRun(); - changedTitles.push(saveResult.title); - if (saveResult.isNew) { - newCount++; - newEvents.push(event); - } else { - updatedCount++; - updatedEvents.push(event); - } - } - } - } - } - - // Check for next page - const nextToken = data.nextPageToken as string | undefined; - if (nextToken) { - pageToken = nextToken; - console.log(`[Calendar] Fetching next page...`); - } else { - break; - } - } - - // Clean up events no longer in the window - const deletedFiles = cleanUpOldFiles(currentEventIds, SYNC_DIR); - let deletedCount = 0; - if (deletedFiles.length > 0) { - await ensureRun(); - deletedCount = deletedFiles.length; - } - - // Publish a single bundled event capturing all changes from this sync. - await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles); - - // Log results if any changes were detected (run was started by ensureRun) - if (run) { - const r = run as ServiceRunContext; - const totalChanges = newCount + updatedCount + deletedCount; - const limitedTitles = limitEventItems(changedTitles); - await serviceLogger.log({ - type: 'changes_identified', - service: r.service, - runId: r.runId, - level: 'info', - message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`, - counts: { - newEvents: newCount, - updatedEvents: updatedCount, - deletedFiles: deletedCount, - }, - items: limitedTitles.items, - truncated: limitedTitles.truncated, - }); - await serviceLogger.log({ - type: 'run_complete', - service: r.service, - runId: r.runId, - level: 'info', - message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`, - durationMs: Date.now() - r.startedAt, - outcome: 'ok', - summary: { - newEvents: newCount, - updatedEvents: updatedCount, - deletedFiles: deletedCount, - }, - }); - } - - // Save state - saveComposioState(STATE_FILE, new Date().toISOString()); - console.log(`[Calendar] Composio sync completed. ${newCount} new, ${updatedCount} updated, ${deletedCount} deleted.`); - } catch (error) { - console.error('[Calendar] Error during Composio sync:', error); - const errRun = await ensureRun(); - await serviceLogger.log({ - type: 'error', - service: errRun.service, - runId: errRun.runId, - level: 'error', - message: 'Calendar sync error', - error: error instanceof Error ? error.message : String(error), - }); - await serviceLogger.log({ - type: 'run_complete', - service: errRun.service, - runId: errRun.runId, - level: 'error', - message: 'Calendar sync failed', - durationMs: Date.now() - errRun.startedAt, - outcome: 'error', - }); - } -} - export async function init() { console.log("Starting Google Calendar & Notes Sync (TS)..."); console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); while (true) { try { - const composioMode = await useComposioForGoogleCalendar(); - if (composioMode) { - const isConnected = composioAccountsRepo.isConnected('googlecalendar'); - if (!isConnected) { - console.log('[Calendar] Google Calendar not connected via Composio. Sleeping...'); - } else { - await performSyncComposio(); - } + const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES); + if (!hasCredentials) { + console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping..."); } else { - // Check if credentials are available with required scopes - const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES); - - if (!hasCredentials) { - console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping..."); - } else { - // Perform one sync - await performSync(SYNC_DIR, LOOKBACK_DAYS); - } + await performSync(SYNC_DIR, LOOKBACK_DAYS); } } catch (error) { console.error("Error in main loop:", error); diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 2aa48944..81a63edf 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -7,8 +7,6 @@ import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; -import { executeAction, useComposioForGoogle } from '../composio/client.js'; -import { composioAccountsRepo } from '../composio/repo.js'; import { createEvent } from './track/events.js'; // Configuration @@ -225,7 +223,7 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri } } -function loadState(stateFile: string): { historyId?: string } { +function loadState(stateFile: string): { historyId?: string; last_sync?: string } { if (fs.existsSync(stateFile)) { return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); } @@ -240,9 +238,24 @@ function saveState(historyId: string, stateFile: string) { } async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { - console.log(`Performing full sync of last ${lookbackDays} days...`); const gmail = google.gmail({ version: 'v1', auth }); + // If the state file holds a last_sync timestamp (e.g. left over from a + // prior Composio sync, or from a previous successful native sync that + // we're falling back to after a history.list 404), use that as the + // floor instead of the default lookback. Carries forward Composio's + // last_sync on first migration so we don't refetch the last 7 days. + const state = loadState(stateFile); + let pastDate: Date; + if (state.last_sync) { + pastDate = new Date(state.last_sync); + console.log(`Performing full sync from last_sync=${state.last_sync}...`); + } else { + pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - lookbackDays); + console.log(`Performing full sync of last ${lookbackDays} days...`); + } + let run: ServiceRunContext | null = null; const ensureRun = async () => { if (!run) { @@ -255,8 +268,6 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str }; try { - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - lookbackDays); const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); // Get History ID @@ -498,386 +509,17 @@ async function performSync() { } } -// --- Composio-based Sync --- - -const COMPOSIO_LOOKBACK_DAYS = 7; - -interface ComposioSyncState { - last_sync: string; // ISO string -} - -function loadComposioState(stateFile: string): ComposioSyncState | null { - if (fs.existsSync(stateFile)) { - try { - const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); - if (data.last_sync) { - return { last_sync: data.last_sync }; - } - } catch (e) { - console.error('[Gmail] Failed to load composio state:', e); - } - } - return null; -} - -function saveComposioState(stateFile: string, lastSync: string): void { - fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2)); -} - -function tryParseDate(dateStr: string): Date | null { - const d = new Date(dateStr); - return isNaN(d.getTime()) ? null : d; -} - -interface ParsedMessage { - from: string; - date: string; - subject: string; - body: string; -} - -function parseMessageData(messageData: Record): ParsedMessage { - const headers = messageData.payload && typeof messageData.payload === 'object' - ? (messageData.payload as Record).headers as Array<{ name: string; value: string }> | undefined - : undefined; - - const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown'); - const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown'); - const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)'); - - let body = ''; - - if (messageData.payload && typeof messageData.payload === 'object') { - body = extractBodyFromPayload(messageData.payload as Record); - } - - if (!body) { - if (typeof messageData.body === 'string') { - body = messageData.body; - } else if (typeof messageData.snippet === 'string') { - body = messageData.snippet; - } else if (typeof messageData.text === 'string') { - body = messageData.text; - } - } - - if (body && (body.includes(' !line.trim().startsWith('>')).join('\n'); - } - - return { from, date, subject, body }; -} - -function extractBodyFromPayload(payload: Record): string { - const parts = payload.parts as Array> | undefined; - - if (parts) { - for (const part of parts) { - const mimeType = part.mimeType as string | undefined; - const bodyData = part.body && typeof part.body === 'object' - ? (part.body as Record).data as string | undefined - : undefined; - - if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) { - const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); - if (mimeType === 'text/html') { - return nhm.translate(decoded); - } - return decoded; - } - - if (part.parts) { - const result = extractBodyFromPayload(part as Record); - if (result) return result; - } - } - } - - const bodyData = payload.body && typeof payload.body === 'object' - ? (payload.body as Record).data as string | undefined - : undefined; - - if (bodyData) { - const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); - const mimeType = payload.mimeType as string | undefined; - if (mimeType === 'text/html') { - return nhm.translate(decoded); - } - return decoded; - } - - return ''; -} - -interface ComposioThreadResult { - synced: SyncedThread | null; - newestIsoPlusOne: string | null; -} - -async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise { - let threadResult; - try { - threadResult = await executeAction( - 'GMAIL_FETCH_MESSAGE_BY_THREAD_ID', - { - connected_account_id: connectedAccountId, - user_id: 'rowboat-user', - version: 'latest', - arguments: { thread_id: threadId, user_id: 'me' }, - } - ); - } catch (error) { - console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error); - return { synced: null, newestIsoPlusOne: null }; - } - - if (!threadResult.successful || !threadResult.data) { - console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); - return { synced: null, newestIsoPlusOne: null }; - } - - const data = threadResult.data as Record; - const messages = data.messages as Array> | undefined; - - let newestDate: Date | null = null; - let mdContent: string; - let subjectForLog: string; - - if (!messages || messages.length === 0) { - const parsed = parseMessageData(data); - mdContent = `# ${parsed.subject}\n\n` + - `**Thread ID:** ${threadId}\n` + - `**Message Count:** 1\n\n---\n\n` + - `### From: ${parsed.from}\n` + - `**Date:** ${parsed.date}\n\n` + - `${parsed.body}\n\n---\n\n`; - subjectForLog = parsed.subject; - newestDate = tryParseDate(parsed.date); - } else { - const firstParsed = parseMessageData(messages[0]); - mdContent = `# ${firstParsed.subject}\n\n`; - mdContent += `**Thread ID:** ${threadId}\n`; - mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; - - for (const msg of messages) { - const parsed = parseMessageData(msg); - mdContent += `### From: ${parsed.from}\n`; - mdContent += `**Date:** ${parsed.date}\n\n`; - mdContent += `${parsed.body}\n\n`; - mdContent += `---\n\n`; - - const msgDate = tryParseDate(parsed.date); - if (msgDate && (!newestDate || msgDate > newestDate)) { - newestDate = msgDate; - } - } - subjectForLog = firstParsed.subject; - } - - fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); - console.log(`[Gmail] Synced Thread: ${subjectForLog} (${threadId})`); - - const newestIsoPlusOne = newestDate ? new Date(newestDate.getTime() + 1000).toISOString() : null; - return { synced: { threadId, markdown: mdContent }, newestIsoPlusOne }; -} - -async function performSyncComposio() { - const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); - const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); - - if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); - if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); - - const account = composioAccountsRepo.getAccount('gmail'); - if (!account || account.status !== 'ACTIVE') { - console.log('[Gmail] Gmail not connected via Composio. Skipping sync.'); - return; - } - - const connectedAccountId = account.id; - - const state = loadComposioState(STATE_FILE); - let afterEpochSeconds: number; - - if (state) { - afterEpochSeconds = Math.floor(new Date(state.last_sync).getTime() / 1000); - console.log(`[Gmail] Syncing messages since ${state.last_sync}...`); - } else { - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - COMPOSIO_LOOKBACK_DAYS); - afterEpochSeconds = Math.floor(pastDate.getTime() / 1000); - console.log(`[Gmail] First sync - fetching last ${COMPOSIO_LOOKBACK_DAYS} days...`); - } - - let run: ServiceRunContext | null = null; - const ensureRun = async () => { - if (!run) { - run = await serviceLogger.startRun({ - service: 'gmail', - message: 'Syncing Gmail (Composio)', - trigger: 'timer', - }); - } - }; - - try { - const allThreadIds: string[] = []; - let pageToken: string | undefined; - - do { - const params: Record = { - query: `after:${afterEpochSeconds}`, - max_results: 20, - user_id: 'me', - }; - if (pageToken) { - params.page_token = pageToken; - } - - const result = await executeAction( - 'GMAIL_LIST_THREADS', - { - connected_account_id: connectedAccountId, - user_id: 'rowboat-user', - version: 'latest', - arguments: params, - } - ); - - if (!result.successful || !result.data) { - console.error('[Gmail] Failed to list threads:', result.error); - return; - } - - const data = result.data as Record; - const threads = data.threads as Array> | undefined; - - if (threads && threads.length > 0) { - for (const thread of threads) { - const threadId = thread.id as string | undefined; - if (threadId) { - allThreadIds.push(threadId); - } - } - } - - pageToken = data.nextPageToken as string | undefined; - } while (pageToken); - - if (allThreadIds.length === 0) { - console.log('[Gmail] No new threads.'); - return; - } - - console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`); - - await ensureRun(); - const limitedThreads = limitEventItems(allThreadIds); - await serviceLogger.log({ - type: 'changes_identified', - service: run!.service, - runId: run!.runId, - level: 'info', - message: `Found ${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'} to sync`, - counts: { threads: allThreadIds.length }, - items: limitedThreads.items, - truncated: limitedThreads.truncated, - }); - - // Process oldest first so high-water mark advances chronologically - allThreadIds.reverse(); - - let highWaterMark: string | null = state?.last_sync ?? null; - let processedCount = 0; - const synced: SyncedThread[] = []; - for (const threadId of allThreadIds) { - // Re-check connection in case user disconnected mid-sync - if (!composioAccountsRepo.isConnected('gmail')) { - console.log('[Gmail] Account disconnected during sync. Stopping.'); - break; - } - try { - const result = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR); - processedCount++; - - if (result.synced) synced.push(result.synced); - - if (result.newestIsoPlusOne) { - if (!highWaterMark || new Date(result.newestIsoPlusOne) > new Date(highWaterMark)) { - highWaterMark = result.newestIsoPlusOne; - } - saveComposioState(STATE_FILE, highWaterMark); - } - } catch (error) { - console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error); - } - } - - await publishGmailSyncEvent(synced); - - await serviceLogger.log({ - type: 'run_complete', - service: run!.service, - runId: run!.runId, - level: 'info', - message: `Gmail sync complete: ${processedCount}/${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'}`, - durationMs: Date.now() - run!.startedAt, - outcome: 'ok', - summary: { threads: processedCount }, - }); - - console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`); - } catch (error) { - console.error('[Gmail] Error during sync:', error); - await ensureRun(); - await serviceLogger.log({ - type: 'error', - service: run!.service, - runId: run!.runId, - level: 'error', - message: 'Gmail sync error', - error: error instanceof Error ? error.message : String(error), - }); - await serviceLogger.log({ - type: 'run_complete', - service: run!.service, - runId: run!.runId, - level: 'error', - message: 'Gmail sync failed', - durationMs: Date.now() - run!.startedAt, - outcome: 'error', - }); - } -} - export async function init() { console.log("Starting Gmail Sync (TS)..."); console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); while (true) { try { - const composioMode = await useComposioForGoogle(); - if (composioMode) { - const isConnected = composioAccountsRepo.isConnected('gmail'); - if (!isConnected) { - console.log('[Gmail] Gmail not connected via Composio. Sleeping...'); - } else { - await performSyncComposio(); - } + const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); + if (!hasCredentials) { + console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); } else { - // Check if credentials are available with required scopes - const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); - - if (!hasCredentials) { - console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); - } else { - // Perform one sync - await performSync(); - } + await performSync(); } } catch (error) { console.error("Error in main loop:", error); diff --git a/apps/x/packages/core/src/migrations/composio-google-migration.ts b/apps/x/packages/core/src/migrations/composio-google-migration.ts new file mode 100644 index 00000000..3e8f699d --- /dev/null +++ b/apps/x/packages/core/src/migrations/composio-google-migration.ts @@ -0,0 +1,132 @@ +import fs from 'fs'; +import path from 'path'; +import { z } from 'zod'; +import { WorkDir } from '../config/config.js'; +import { isSignedIn } from '../account/account.js'; +import { composioAccountsRepo } from '../composio/repo.js'; +import { deleteConnectedAccount } from '../composio/client.js'; +import container from '../di/container.js'; +import { IOAuthRepo } from '../auth/repo.js'; + +/** + * One-time migration that moves Composio-connected Gmail/Calendar users + * to the native rowboat-mode Google OAuth flow. + * + * Triggered by the renderer on app launch and after Rowboat sign-in. The + * single guard is `dismissed_at` in the migration state file — once set, + * none of the migration's side effects run again. This protects users who + * later re-add Composio Google for non-sync purposes (e.g. a tool that + * happens to use the Gmail toolkit) from having that connection blown + * away on a future launch. + */ + +const STATE_FILE = path.join(WorkDir, 'config', 'composio-google-migration.json'); + +const ZState = z.object({ + dismissed_at: z.string().min(1).optional(), +}); +type State = z.infer; + +function loadState(): State { + try { + if (fs.existsSync(STATE_FILE)) { + const raw = fs.readFileSync(STATE_FILE, 'utf-8'); + return ZState.parse(JSON.parse(raw)); + } + } catch (error) { + console.error('[composio-google-migration] failed to load state:', error); + } + return {}; +} + +function saveState(state: State): void { + const dir = path.dirname(STATE_FILE); + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); + fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2)); +} + +function markDismissed(): void { + saveState({ dismissed_at: new Date().toISOString() }); +} + +async function disconnectComposioGoogle(): Promise { + for (const slug of ['gmail', 'googlecalendar'] as const) { + const account = composioAccountsRepo.getAccount(slug); + if (!account?.id) continue; + + try { + await deleteConnectedAccount(account.id); + console.log(`[composio-google-migration] composio: deleted ${slug} (${account.id})`); + } catch (error) { + // Best-effort — logged but doesn't block the local cleanup. + console.warn(`[composio-google-migration] composio delete failed for ${slug}:`, error); + } + + try { + composioAccountsRepo.deleteAccount(slug); + } catch (error) { + console.warn(`[composio-google-migration] local delete failed for ${slug}:`, error); + } + } +} + +function cleanupCalendarComposioState(): void { + const file = path.join(WorkDir, 'calendar_sync', 'composio_state.json'); + try { + if (fs.existsSync(file)) { + fs.unlinkSync(file); + console.log('[composio-google-migration] removed stale calendar composio_state.json'); + } + } catch (error) { + console.warn('[composio-google-migration] failed to remove composio_state.json:', error); + } +} + +/** + * Check whether the user qualifies for the migration. If they do, atomically + * mark `dismissed_at`, fire-and-forget the Composio disconnect, and return + * `{shouldShow: true}` so the renderer can show the modal. + * + * Idempotent: subsequent calls return `{shouldShow: false}` once `dismissed_at` + * is set, regardless of whether the modal was actually shown or the user + * completed the OAuth flow. + */ +export async function qualifyAndDisconnectComposioGoogle(): Promise<{ shouldShow: boolean }> { + // Rule 4 — already processed + const state = loadState(); + if (state.dismissed_at) { + return { shouldShow: false }; + } + + // Rule 1 — must be signed in to Rowboat + if (!(await isSignedIn())) { + return { shouldShow: false }; + } + + // Rule 3 — already on native rowboat-mode Google → silently mark dismissed + // (so we stop re-checking) and bail before touching Composio state. + const oauthRepo = container.resolve('oauthRepo'); + const googleConnection = await oauthRepo.read('google'); + if (googleConnection.tokens && googleConnection.mode === 'rowboat') { + markDismissed(); + return { shouldShow: false }; + } + + // Rule 2 — must have at least one Composio Google toolkit connected + const hasGmail = composioAccountsRepo.isConnected('gmail'); + const hasCalendar = composioAccountsRepo.isConnected('googlecalendar'); + if (!hasGmail && !hasCalendar) { + return { shouldShow: false }; + } + + // All rules pass. Mark dismissed atomically before any side effects so + // a crash mid-migration leaves us in a deterministic post-migration state. + markDismissed(); + + // Fire-and-forget: disconnect Composio Google + clean up the stale + // calendar state file. Both are best-effort. + void disconnectComposioGoogle(); + cleanupCalendarComposioState(); + + return { shouldShow: true }; +} diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index ab7d7f73..605b26d9 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -429,16 +429,10 @@ const ipcSchemas = { toolkits: z.array(z.string()), }), }, - 'composio:use-composio-for-google': { + 'migration:check-composio-google': { req: z.null(), res: z.object({ - enabled: z.boolean(), - }), - }, - 'composio:use-composio-for-google-calendar': { - req: z.null(), - res: z.object({ - enabled: z.boolean(), + shouldShow: z.boolean(), }), }, 'composio:didConnect': {