mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-11 00:02:38 +02:00
feat: native google sign-in for signed-in users
Signed-in users can now connect Gmail and Calendar directly through Rowboat instead of going through Composio. Cleaner connection, no third-party in the data path. How it works: - Click "Connect Google" anywhere it appears (sidebar, onboarding, settings) and the system browser opens to a Rowboat-hosted page. Authorize Google there and the app picks up the connection automatically — no client id or secret to paste. - Token refresh happens through Rowboat's backend, so Google credentials never need to live on the user's machine. - Disconnect cleanly revokes access on Google's side too. Migration for existing Composio users: - A one-time modal explains that we've moved off Composio and asks the user to reconnect Google directly. - Their old Composio Gmail / Calendar connections are disconnected automatically when the modal first appears. - All previously-synced emails and calendar events are preserved on disk — the new connection picks up where Composio left off rather than re-downloading the last week from scratch. - "I'll do this later" dismisses the modal permanently; the user can still reconnect anytime via the connectors UI. (Sync stops in the meantime; nothing is deleted.) Other coverage: - BYOK mode (users who paste their own Google client id + secret) is unchanged — same modal, same local OAuth flow, same behavior. - Composio integrations for non-Google services (Slack, Linear, etc.) are unaffected. Only the Gmail and Calendar paths moved. - The "Connect Google" button label and connection state now apply uniformly to Gmail + Calendar (one OAuth grant covers both). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a76f8bae14
commit
d4850dace7
20 changed files with 780 additions and 904 deletions
|
|
@ -293,20 +293,6 @@ export function listConnected(): { toolkits: string[] } {
|
|||
return { toolkits: composioAccountsRepo.getConnectedToolkits() };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Composio should be used for Google services (Gmail, etc.)
|
||||
*/
|
||||
export async function useComposioForGoogle(): Promise<{ enabled: boolean }> {
|
||||
return { enabled: await composioClient.useComposioForGoogle() };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Composio should be used for Google Calendar
|
||||
*/
|
||||
export async function useComposioForGoogleCalendar(): Promise<{ enabled: boolean }> {
|
||||
return { enabled: await composioClient.useComposioForGoogleCalendar() };
|
||||
}
|
||||
|
||||
/**
|
||||
* List available Composio toolkits — filtered to curated list only.
|
||||
* Return type matches the ZToolkit schema from core/composio/types.ts.
|
||||
|
|
|
|||
|
|
@ -28,12 +28,19 @@ export function extractDeepLinkFromArgv(argv: readonly string[]): string | null
|
|||
}
|
||||
|
||||
/**
|
||||
* Dispatch any rowboat:// URL — chooses navigation vs action automatically.
|
||||
* Use this from notification click handlers and other URL entry points.
|
||||
* Dispatch any rowboat:// URL — chooses among action / oauth-completion /
|
||||
* navigation automatically. Use this from notification click handlers and
|
||||
* other URL entry points.
|
||||
*
|
||||
* OAuth completion (rowboat://oauth/google/done?session=<state>) is handled
|
||||
* in main, not the renderer, because claiming tokens writes oauth.json and
|
||||
* triggers sync — both main-process concerns.
|
||||
*/
|
||||
export function dispatchUrl(url: string): void {
|
||||
if (parseAction(url)) {
|
||||
void dispatchAction(url);
|
||||
} else if (parseOAuthCompletion(url)) {
|
||||
void dispatchOAuthCompletion(url);
|
||||
} else {
|
||||
dispatchDeepLink(url);
|
||||
}
|
||||
|
|
@ -111,6 +118,46 @@ async function handleTakeMeetingNotes(eventId: string, openMeeting: boolean): Pr
|
|||
win.webContents.send("app:takeMeetingNotes", payload);
|
||||
}
|
||||
|
||||
// --- OAuth completion (rowboat-mode Google connect) ---
|
||||
|
||||
interface OAuthCompletion {
|
||||
provider: "google";
|
||||
state: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Match rowboat://oauth/google/done?session=<state>. Returns null for
|
||||
* anything else — including paths with the right shape but wrong provider
|
||||
* or a missing `session` query param.
|
||||
*/
|
||||
function parseOAuthCompletion(url: string): OAuthCompletion | null {
|
||||
if (!url.startsWith(URL_PREFIX)) return null;
|
||||
const rest = url.slice(URL_PREFIX.length);
|
||||
const queryIdx = rest.indexOf("?");
|
||||
const path = queryIdx >= 0 ? rest.slice(0, queryIdx) : rest;
|
||||
const parts = path.split("/").filter(Boolean);
|
||||
if (parts.length !== 3 || parts[0] !== "oauth" || parts[2] !== "done") return null;
|
||||
if (parts[1] !== "google") return null;
|
||||
const params = new URLSearchParams(queryIdx >= 0 ? rest.slice(queryIdx + 1) : "");
|
||||
const state = params.get("session");
|
||||
return state ? { provider: "google", state } : null;
|
||||
}
|
||||
|
||||
async function dispatchOAuthCompletion(url: string): Promise<void> {
|
||||
const parsed = parseOAuthCompletion(url);
|
||||
if (!parsed) return;
|
||||
|
||||
// Bring the app to the front so the renderer can react to the
|
||||
// oauthEvent IPC that completeRowboatGoogleConnect emits.
|
||||
const win = mainWindowRef;
|
||||
if (win && !win.isDestroyed()) focusWindow(win);
|
||||
|
||||
// Lazy-import to keep deeplink.ts free of OAuth deps and avoid a
|
||||
// potential circular dep with oauth-handler.ts.
|
||||
const { completeRowboatGoogleConnect } = await import("./oauth-handler.js");
|
||||
await completeRowboatGoogleConnect(parsed.state);
|
||||
}
|
||||
|
||||
function focusWindow(win: BrowserWindow): void {
|
||||
if (win.isMinimized()) win.restore();
|
||||
win.show();
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import { ISlackConfigRepo } from '@x/core/dist/slack/repo.js';
|
|||
import { isOnboardingComplete, markOnboardingComplete } from '@x/core/dist/config/note_creation_config.js';
|
||||
import * as composioHandler from './composio-handler.js';
|
||||
import { consumePendingDeepLink } from './deeplink.js';
|
||||
import { qualifyAndDisconnectComposioGoogle } from '@x/core/dist/migrations/composio-google-migration.js';
|
||||
import { IAgentScheduleRepo } from '@x/core/dist/agent-schedule/repo.js';
|
||||
import { IAgentScheduleStateRepo } from '@x/core/dist/agent-schedule/state-repo.js';
|
||||
import { triggerRun as triggerAgentScheduleRun } from '@x/core/dist/agent-schedule/runner.js';
|
||||
|
|
@ -612,11 +613,8 @@ export function setupIpcHandlers() {
|
|||
'composio:list-toolkits': async () => {
|
||||
return composioHandler.listToolkits();
|
||||
},
|
||||
'composio:use-composio-for-google': async () => {
|
||||
return composioHandler.useComposioForGoogle();
|
||||
},
|
||||
'composio:use-composio-for-google-calendar': async () => {
|
||||
return composioHandler.useComposioForGoogleCalendar();
|
||||
'migration:check-composio-google': async () => {
|
||||
return qualifyAndDisconnectComposioGoogle();
|
||||
},
|
||||
// Agent schedule handlers
|
||||
'agent-schedule:getConfig': async () => {
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ import { ElectronBrowserControlService } from "./browser/control-service.js";
|
|||
import { ElectronNotificationService } from "./notification/electron-notification-service.js";
|
||||
import {
|
||||
DEEP_LINK_SCHEME,
|
||||
dispatchDeepLink,
|
||||
dispatchUrl,
|
||||
extractDeepLinkFromArgv,
|
||||
setMainWindowForDeepLinks,
|
||||
} from "./deeplink.js";
|
||||
|
|
@ -77,19 +77,19 @@ if (process.defaultApp) {
|
|||
// First-launch URL on Windows/Linux comes through argv.
|
||||
{
|
||||
const initialUrl = extractDeepLinkFromArgv(process.argv);
|
||||
if (initialUrl) dispatchDeepLink(initialUrl);
|
||||
if (initialUrl) dispatchUrl(initialUrl);
|
||||
}
|
||||
|
||||
// macOS sends URLs via 'open-url' (both first launch and while running).
|
||||
app.on("open-url", (event, url) => {
|
||||
event.preventDefault();
|
||||
dispatchDeepLink(url);
|
||||
dispatchUrl(url);
|
||||
});
|
||||
|
||||
// Subsequent launches on Windows/Linux land here via the single-instance lock.
|
||||
app.on("second-instance", (_event, argv) => {
|
||||
const url = extractDeepLinkFromArgv(argv);
|
||||
if (url) dispatchDeepLink(url);
|
||||
if (url) dispatchUrl(url);
|
||||
});
|
||||
|
||||
// Fix PATH for packaged Electron apps on macOS/Linux.
|
||||
|
|
|
|||
|
|
@ -13,6 +13,9 @@ import { triggerSync as triggerFirefliesSync } from '@x/core/dist/knowledge/sync
|
|||
import { emitOAuthEvent } from './ipc.js';
|
||||
import { getBillingInfo } from '@x/core/dist/billing/billing.js';
|
||||
import { capture as analyticsCapture, identify as analyticsIdentify, reset as analyticsReset } from '@x/core/dist/analytics/posthog.js';
|
||||
import { isSignedIn } from '@x/core/dist/account/account.js';
|
||||
import { getWebappUrl } from '@x/core/dist/config/remote-config.js';
|
||||
import { claimTokensViaBackend } from '@x/core/dist/auth/google-backend-oauth.js';
|
||||
|
||||
const REDIRECT_URI = 'http://localhost:8080/oauth/callback';
|
||||
|
||||
|
|
@ -201,6 +204,23 @@ export async function connectProvider(provider: string, credentials?: { clientId
|
|||
|
||||
if (provider === 'google') {
|
||||
if (!credentials?.clientId || !credentials?.clientSecret) {
|
||||
// No credentials → rowboat mode if the user is signed in to Rowboat
|
||||
// (we use the company-owned Google client via the api + webapp).
|
||||
// Otherwise it's BYOK with missing creds → error.
|
||||
if (await isSignedIn()) {
|
||||
try {
|
||||
const webappUrl = await getWebappUrl();
|
||||
await shell.openExternal(`${webappUrl}/oauth/google/start`);
|
||||
console.log('[OAuth] Started rowboat-mode Google connect (browser opened to webapp)');
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
console.error('[OAuth] Failed to start rowboat-mode Google connect:', error);
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to open browser',
|
||||
};
|
||||
}
|
||||
}
|
||||
return { success: false, error: 'Google client ID and client secret are required to connect.' };
|
||||
}
|
||||
}
|
||||
|
|
@ -257,11 +277,15 @@ export async function connectProvider(provider: string, credentials?: { clientId
|
|||
state
|
||||
);
|
||||
|
||||
// Save tokens and credentials
|
||||
// Save tokens and credentials. For Google, BYOK is the only path
|
||||
// that reaches this token exchange (rowboat path returns above
|
||||
// before any local server runs); stamp mode: 'byok' so a future
|
||||
// refresh / reconnect can't get confused with a rowboat entry.
|
||||
console.log(`[OAuth] Token exchange successful for ${provider}`);
|
||||
await oauthRepo.upsert(provider, {
|
||||
tokens,
|
||||
...(credentials ? { clientId: credentials.clientId, clientSecret: credentials.clientSecret } : {}),
|
||||
...(provider === 'google' ? { mode: 'byok' as const } : {}),
|
||||
error: null,
|
||||
});
|
||||
|
||||
|
|
@ -358,12 +382,65 @@ export async function connectProvider(provider: string, credentials?: { clientId
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a rowboat-mode Google connect: claim the tokens parked under
|
||||
* `state` by the webapp callback, persist them locally, and trigger sync.
|
||||
*
|
||||
* Called by the deep-link dispatcher (deeplink.ts) when the OS hands us a
|
||||
* rowboat://oauth/google/done?session=<state> URL.
|
||||
*/
|
||||
export async function completeRowboatGoogleConnect(state: string): Promise<void> {
|
||||
try {
|
||||
console.log('[OAuth] Claiming rowboat-mode Google tokens...');
|
||||
const tokens = await claimTokensViaBackend(state);
|
||||
const oauthRepo = getOAuthRepo();
|
||||
await oauthRepo.upsert('google', {
|
||||
tokens,
|
||||
mode: 'rowboat',
|
||||
// Explicitly null these — no client_id/secret on the desktop in this mode.
|
||||
clientId: null,
|
||||
clientSecret: null,
|
||||
error: null,
|
||||
});
|
||||
triggerGmailSync();
|
||||
triggerCalendarSync();
|
||||
emitOAuthEvent({ provider: 'google', success: true });
|
||||
console.log('[OAuth] Rowboat-mode Google connect complete');
|
||||
} catch (error) {
|
||||
console.error('[OAuth] Failed to complete rowboat-mode Google connect:', error);
|
||||
emitOAuthEvent({
|
||||
provider: 'google',
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to claim Google tokens',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect a provider (clear tokens)
|
||||
*/
|
||||
export async function disconnectProvider(provider: string): Promise<{ success: boolean }> {
|
||||
try {
|
||||
const oauthRepo = getOAuthRepo();
|
||||
|
||||
// For rowboat-mode Google, best-effort revoke at Google before clearing
|
||||
// local state. Google's revoke endpoint accepts an unauthenticated POST
|
||||
// with the access_token; failure is logged but doesn't block disconnect.
|
||||
if (provider === 'google') {
|
||||
const connection = await oauthRepo.read(provider);
|
||||
if (connection.mode === 'rowboat' && connection.tokens?.access_token) {
|
||||
try {
|
||||
const revokeUrl = `https://oauth2.googleapis.com/revoke?token=${encodeURIComponent(connection.tokens.access_token)}`;
|
||||
const res = await fetch(revokeUrl, { method: 'POST' });
|
||||
if (!res.ok) {
|
||||
console.warn(`[OAuth] Google revoke returned ${res.status}; continuing with local disconnect`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn('[OAuth] Google revoke failed; continuing with local disconnect:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await oauthRepo.delete(provider);
|
||||
if (provider === 'rowboat') {
|
||||
analyticsCapture('user_signed_out');
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ import { stripKnowledgePrefix, toKnowledgePath, wikiLabel } from '@/lib/wiki-lin
|
|||
import { splitFrontmatter, joinFrontmatter } from '@/lib/frontmatter'
|
||||
import { extractConferenceLink } from '@/lib/calendar-event'
|
||||
import { OnboardingModal } from '@/components/onboarding'
|
||||
import { ComposioGoogleMigrationModal } from '@/components/composio-google-migration-modal'
|
||||
import { CommandPalette, type CommandPaletteContext, type CommandPaletteMention } from '@/components/search-dialog'
|
||||
import { TrackModal } from '@/components/track-modal'
|
||||
import { BackgroundTaskDetail } from '@/components/background-task-detail'
|
||||
|
|
@ -780,6 +781,30 @@ function App() {
|
|||
return cleanup
|
||||
}, [refreshVoiceAvailability])
|
||||
|
||||
// One-time Composio→native Google migration check. Runs on mount and again
|
||||
// after the user signs in to Rowboat (so we catch users who weren't signed
|
||||
// in at startup). The IPC is idempotent — once `dismissed_at` is set on the
|
||||
// main side, every subsequent call returns `{shouldShow: false}`.
|
||||
useEffect(() => {
|
||||
const run = async () => {
|
||||
try {
|
||||
const result = await window.ipc.invoke('migration:check-composio-google', null)
|
||||
if (result.shouldShow) {
|
||||
setShowComposioGoogleMigration(true)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[migration] check-composio-google failed:', error)
|
||||
}
|
||||
}
|
||||
void run()
|
||||
const cleanup = window.ipc.on('oauth:didConnect', (event) => {
|
||||
if (event.provider === 'rowboat' && event.success) {
|
||||
void run()
|
||||
}
|
||||
})
|
||||
return cleanup
|
||||
}, [])
|
||||
|
||||
const handleStartRecording = useCallback(() => {
|
||||
setIsRecording(true)
|
||||
isRecordingRef.current = true
|
||||
|
|
@ -1033,6 +1058,9 @@ function App() {
|
|||
// Onboarding state
|
||||
const [showOnboarding, setShowOnboarding] = useState(false)
|
||||
|
||||
// One-time Composio→native Google migration modal
|
||||
const [showComposioGoogleMigration, setShowComposioGoogleMigration] = useState(false)
|
||||
|
||||
// Search state
|
||||
const [isSearchOpen, setIsSearchOpen] = useState(false)
|
||||
|
||||
|
|
@ -4904,6 +4932,17 @@ function App() {
|
|||
open={showOnboarding}
|
||||
onComplete={handleOnboardingComplete}
|
||||
/>
|
||||
<ComposioGoogleMigrationModal
|
||||
open={showComposioGoogleMigration}
|
||||
onOpenChange={setShowComposioGoogleMigration}
|
||||
onReconnect={() => {
|
||||
// Trigger the rowboat-mode Google connect flow. With no credentials
|
||||
// and the user signed in to Rowboat, the main process opens the
|
||||
// webapp `/oauth/google/start` URL. The deep link returns and
|
||||
// completeRowboatGoogleConnect persists the tokens.
|
||||
void window.ipc.invoke('oauth:connect', { provider: 'google' })
|
||||
}}
|
||||
/>
|
||||
<Dialog open={showMeetingPermissions} onOpenChange={setShowMeetingPermissions}>
|
||||
<DialogContent showCloseButton={false}>
|
||||
<DialogHeader>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
"use client"
|
||||
|
||||
import {
|
||||
Dialog,
|
||||
DialogContent,
|
||||
DialogDescription,
|
||||
DialogHeader,
|
||||
DialogTitle,
|
||||
} from "@/components/ui/dialog"
|
||||
import { Button } from "@/components/ui/button"
|
||||
|
||||
interface ComposioGoogleMigrationModalProps {
|
||||
open: boolean
|
||||
onOpenChange: (open: boolean) => void
|
||||
onReconnect: () => void
|
||||
}
|
||||
|
||||
/**
|
||||
* One-time modal shown to signed-in users who had Gmail/Calendar connected
|
||||
* via Composio before the native rowboat-mode OAuth flow shipped. By the
|
||||
* time this opens, the Composio Google accounts have already been
|
||||
* disconnected (fire-and-forget, on the qualification IPC) — the modal
|
||||
* just explains what happened and offers a one-click reconnect.
|
||||
*
|
||||
* Both buttons close the modal. The qualification IPC marks the migration
|
||||
* as dismissed before showing this, so neither button needs a follow-up
|
||||
* IPC of its own.
|
||||
*/
|
||||
export function ComposioGoogleMigrationModal({
|
||||
open,
|
||||
onOpenChange,
|
||||
onReconnect,
|
||||
}: ComposioGoogleMigrationModalProps) {
|
||||
return (
|
||||
<Dialog open={open} onOpenChange={onOpenChange}>
|
||||
<DialogContent className="w-[min(28rem,calc(100%-2rem))] max-w-md p-0 gap-0 overflow-hidden rounded-xl">
|
||||
<div className="p-6 pb-0">
|
||||
<DialogHeader className="space-y-1.5">
|
||||
<DialogTitle className="text-lg font-semibold">
|
||||
Reconnect Google to keep syncing
|
||||
</DialogTitle>
|
||||
<DialogDescription asChild>
|
||||
<div className="space-y-3 text-sm leading-relaxed">
|
||||
<p>
|
||||
Rowboat used to sync your Gmail and Calendar through{" "}
|
||||
<span className="font-medium text-foreground">Composio</span>, a
|
||||
third-party connector. We've now built a direct connection to
|
||||
Google — it's faster, more private, and doesn't rely on a
|
||||
middleman.
|
||||
</p>
|
||||
<p>
|
||||
We've disconnected the Composio connection. Reconnect Google
|
||||
directly to resume syncing — your existing emails and calendar
|
||||
events stay exactly where they are.
|
||||
</p>
|
||||
</div>
|
||||
</DialogDescription>
|
||||
</DialogHeader>
|
||||
</div>
|
||||
<div className="flex justify-end gap-2 px-6 py-4 mt-6 border-t bg-muted/30">
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => onOpenChange(false)}
|
||||
>
|
||||
I'll do this later
|
||||
</Button>
|
||||
<Button
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
onReconnect()
|
||||
onOpenChange(false)
|
||||
}}
|
||||
>
|
||||
Reconnect Google
|
||||
</Button>
|
||||
</div>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
)
|
||||
}
|
||||
|
|
@ -96,14 +96,14 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
|
|||
const [slackDiscovering, setSlackDiscovering] = useState(false)
|
||||
const [slackDiscoverError, setSlackDiscoverError] = useState<string | null>(null)
|
||||
|
||||
// Composio/Gmail state
|
||||
const [useComposioForGoogle, setUseComposioForGoogle] = useState(false)
|
||||
// Composio Gmail/Calendar sync was removed — flags are seeded false and
|
||||
// never flipped. Kept here so legacy gating expressions still type-check.
|
||||
const [useComposioForGoogle] = useState(false)
|
||||
const [gmailConnected, setGmailConnected] = useState(false)
|
||||
const [gmailLoading, setGmailLoading] = useState(true)
|
||||
const [gmailConnecting, setGmailConnecting] = useState(false)
|
||||
|
||||
// Composio/Google Calendar state
|
||||
const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false)
|
||||
const [useComposioForGoogleCalendar] = useState(false)
|
||||
const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false)
|
||||
const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true)
|
||||
const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false)
|
||||
|
|
@ -151,25 +151,8 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
|
|||
setProvidersLoading(false)
|
||||
}
|
||||
}
|
||||
async function loadComposioForGoogleFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google', null)
|
||||
setUseComposioForGoogle(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google flag:', error)
|
||||
}
|
||||
}
|
||||
async function loadComposioForGoogleCalendarFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null)
|
||||
setUseComposioForGoogleCalendar(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google-calendar flag:', error)
|
||||
}
|
||||
}
|
||||
// (Composio Gmail/Calendar flag fetches removed — sync was deleted.)
|
||||
loadProviders()
|
||||
loadComposioForGoogleFlag()
|
||||
loadComposioForGoogleCalendarFlag()
|
||||
}, [open])
|
||||
|
||||
// Load LLM models catalog on open
|
||||
|
|
@ -622,12 +605,20 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
|
|||
// Connect to a provider
|
||||
const handleConnect = useCallback(async (provider: string) => {
|
||||
if (provider === 'google') {
|
||||
// Signed-in users use the rowboat (managed-credentials) flow: opens
|
||||
// the webapp in the browser, no BYOK modal. Falls back to BYOK modal
|
||||
// for not-signed-in users. (Mirrors useConnectors.handleConnect.)
|
||||
const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false
|
||||
if (isSignedIntoRowboat) {
|
||||
await startConnect('google')
|
||||
return
|
||||
}
|
||||
setGoogleClientIdOpen(true)
|
||||
return
|
||||
}
|
||||
|
||||
await startConnect(provider)
|
||||
}, [startConnect])
|
||||
}, [startConnect, providerStates])
|
||||
|
||||
const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => {
|
||||
setGoogleCredentials(clientId, clientSecret)
|
||||
|
|
|
|||
|
|
@ -66,16 +66,16 @@ export function useOnboardingState(open: boolean, onComplete: () => void) {
|
|||
// Inline upsell callout dismissed
|
||||
const [upsellDismissed, setUpsellDismissed] = useState(false)
|
||||
|
||||
// Composio/Gmail state (used when signed in with Rowboat account)
|
||||
const [useComposioForGoogle, setUseComposioForGoogle] = useState(false)
|
||||
// Composio Gmail/Calendar sync was removed — flags are seeded false and
|
||||
// never flipped. Kept here so legacy gating expressions still type-check.
|
||||
const [useComposioForGoogle] = useState(false)
|
||||
const [gmailConnected, setGmailConnected] = useState(false)
|
||||
const [gmailLoading, setGmailLoading] = useState(true)
|
||||
const [gmailConnecting, setGmailConnecting] = useState(false)
|
||||
const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false)
|
||||
const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'slack' | 'gmail'>('gmail')
|
||||
|
||||
// Composio/Google Calendar state
|
||||
const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false)
|
||||
const [useComposioForGoogleCalendar] = useState(false)
|
||||
const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false)
|
||||
const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true)
|
||||
const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false)
|
||||
|
|
@ -123,25 +123,8 @@ export function useOnboardingState(open: boolean, onComplete: () => void) {
|
|||
setProvidersLoading(false)
|
||||
}
|
||||
}
|
||||
async function loadComposioForGoogleFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google', null)
|
||||
setUseComposioForGoogle(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google flag:', error)
|
||||
}
|
||||
}
|
||||
async function loadComposioForGoogleCalendarFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null)
|
||||
setUseComposioForGoogleCalendar(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google-calendar flag:', error)
|
||||
}
|
||||
}
|
||||
// (Composio Gmail/Calendar flag fetches removed — sync was deleted; flags stay false.)
|
||||
loadProviders()
|
||||
loadComposioForGoogleFlag()
|
||||
loadComposioForGoogleCalendarFlag()
|
||||
}, [open])
|
||||
|
||||
// Load LLM models catalog on open
|
||||
|
|
@ -539,17 +522,7 @@ export function useOnboardingState(open: boolean, onComplete: () => void) {
|
|||
|
||||
const cleanup = window.ipc.on('oauth:didConnect', async (event) => {
|
||||
if (event.provider === 'rowboat' && event.success) {
|
||||
// Re-check composio flags now that the account is connected
|
||||
try {
|
||||
const [googleResult, calendarResult] = await Promise.all([
|
||||
window.ipc.invoke('composio:use-composio-for-google', null),
|
||||
window.ipc.invoke('composio:use-composio-for-google-calendar', null),
|
||||
])
|
||||
setUseComposioForGoogle(googleResult.enabled)
|
||||
setUseComposioForGoogleCalendar(calendarResult.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to re-check composio flags:', error)
|
||||
}
|
||||
// (Composio Gmail/Calendar flag re-check removed — sync was deleted.)
|
||||
setCurrentStep(2) // Go to Connect Accounts
|
||||
}
|
||||
})
|
||||
|
|
@ -609,12 +582,20 @@ export function useOnboardingState(open: boolean, onComplete: () => void) {
|
|||
// Connect to a provider
|
||||
const handleConnect = useCallback(async (provider: string) => {
|
||||
if (provider === 'google') {
|
||||
// Signed-in users use the rowboat (managed-credentials) flow: opens
|
||||
// the webapp in the browser, no BYOK modal. Falls back to BYOK modal
|
||||
// for not-signed-in users. (Mirrors useConnectors.handleConnect.)
|
||||
const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false
|
||||
if (isSignedIntoRowboat) {
|
||||
await startConnect('google')
|
||||
return
|
||||
}
|
||||
setGoogleClientIdOpen(true)
|
||||
return
|
||||
}
|
||||
|
||||
await startConnect(provider)
|
||||
}, [startConnect])
|
||||
}, [startConnect, providerStates])
|
||||
|
||||
const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => {
|
||||
setGoogleCredentials(clientId, clientSecret)
|
||||
|
|
|
|||
|
|
@ -38,16 +38,21 @@ export function useConnectors(active: boolean) {
|
|||
const [slackDiscovering, setSlackDiscovering] = useState(false)
|
||||
const [slackDiscoverError, setSlackDiscoverError] = useState<string | null>(null)
|
||||
|
||||
// Composio/Gmail state
|
||||
const [useComposioForGoogle, setUseComposioForGoogle] = useState(false)
|
||||
// Composio Gmail/Calendar sync was removed. These flags are seeded false
|
||||
// and never flipped — the IPC that used to set them is gone. The setters
|
||||
// remain so the legacy Composio-Gmail handlers below still type-check,
|
||||
// but those handlers are no longer reachable in the UI (the gating
|
||||
// condition `useComposioForGoogle` stays false).
|
||||
// TODO follow-up: drop these flags entirely and prune the dead UI branches
|
||||
// in connectors-popover, connected-accounts-settings, and onboarding-modal.
|
||||
const [useComposioForGoogle] = useState(false)
|
||||
const [gmailConnected, setGmailConnected] = useState(false)
|
||||
const [gmailLoading, setGmailLoading] = useState(true)
|
||||
const [gmailLoading, setGmailLoading] = useState(false)
|
||||
const [gmailConnecting, setGmailConnecting] = useState(false)
|
||||
|
||||
// Composio/Google Calendar state
|
||||
const [useComposioForGoogleCalendar, setUseComposioForGoogleCalendar] = useState(false)
|
||||
const [useComposioForGoogleCalendar] = useState(false)
|
||||
const [googleCalendarConnected, setGoogleCalendarConnected] = useState(false)
|
||||
const [googleCalendarLoading, setGoogleCalendarLoading] = useState(true)
|
||||
const [googleCalendarLoading, setGoogleCalendarLoading] = useState(false)
|
||||
const [googleCalendarConnecting, setGoogleCalendarConnecting] = useState(false)
|
||||
|
||||
// Load available providers on mount
|
||||
|
|
@ -67,28 +72,7 @@ export function useConnectors(active: boolean) {
|
|||
loadProviders()
|
||||
}, [])
|
||||
|
||||
// Re-check composio-for-google flags when active
|
||||
useEffect(() => {
|
||||
if (!active) return
|
||||
async function loadComposioForGoogleFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google', null)
|
||||
setUseComposioForGoogle(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google flag:', error)
|
||||
}
|
||||
}
|
||||
async function loadComposioForGoogleCalendarFlag() {
|
||||
try {
|
||||
const result = await window.ipc.invoke('composio:use-composio-for-google-calendar', null)
|
||||
setUseComposioForGoogleCalendar(result.enabled)
|
||||
} catch (error) {
|
||||
console.error('Failed to check composio-for-google-calendar flag:', error)
|
||||
}
|
||||
}
|
||||
loadComposioForGoogleFlag()
|
||||
loadComposioForGoogleCalendarFlag()
|
||||
}, [active])
|
||||
// (Composio Gmail/Calendar flag-check effect removed — flags are constant false now.)
|
||||
|
||||
// Load Granola config
|
||||
const refreshGranolaConfig = useCallback(async () => {
|
||||
|
|
@ -346,13 +330,22 @@ export function useConnectors(active: boolean) {
|
|||
|
||||
const handleConnect = useCallback(async (provider: string) => {
|
||||
if (provider === 'google') {
|
||||
// Signed-in users use the rowboat (managed-credentials) flow: opens
|
||||
// the webapp in the browser, no BYOK modal. Main process detects
|
||||
// signed-in via isSignedIn() when oauth:connect arrives without creds.
|
||||
// Falls back to the BYOK modal for not-signed-in users.
|
||||
const isSignedIntoRowboat = providerStates.rowboat?.isConnected ?? false
|
||||
if (isSignedIntoRowboat) {
|
||||
await startConnect('google')
|
||||
return
|
||||
}
|
||||
setGoogleClientIdDescription(undefined)
|
||||
setGoogleClientIdOpen(true)
|
||||
return
|
||||
}
|
||||
|
||||
await startConnect(provider)
|
||||
}, [startConnect])
|
||||
}, [startConnect, providerStates])
|
||||
|
||||
const handleGoogleClientIdSubmit = useCallback((clientId: string, clientSecret: string) => {
|
||||
setGoogleCredentials(clientId, clientSecret)
|
||||
|
|
@ -485,19 +478,6 @@ export function useConnectors(active: boolean) {
|
|||
toast.success(`Connected to ${displayName}`)
|
||||
}
|
||||
|
||||
if (provider === 'rowboat') {
|
||||
try {
|
||||
const [googleResult, calendarResult] = await Promise.all([
|
||||
window.ipc.invoke('composio:use-composio-for-google', null),
|
||||
window.ipc.invoke('composio:use-composio-for-google-calendar', null),
|
||||
])
|
||||
setUseComposioForGoogle(googleResult.enabled)
|
||||
setUseComposioForGoogleCalendar(calendarResult.enabled)
|
||||
} catch (err) {
|
||||
console.error('Failed to re-check composio flags:', err)
|
||||
}
|
||||
}
|
||||
|
||||
refreshAllStatuses()
|
||||
}
|
||||
})
|
||||
|
|
|
|||
113
apps/x/packages/core/src/auth/google-backend-oauth.ts
Normal file
113
apps/x/packages/core/src/auth/google-backend-oauth.ts
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
import { API_URL } from "../config/env.js";
|
||||
import { getAccessToken } from "./tokens.js";
|
||||
import { OAuthTokens } from "./types.js";
|
||||
|
||||
/**
|
||||
* Client for the rowboat-mode Google OAuth endpoints on the api:
|
||||
* POST /v1/google-oauth/claim — one-shot retrieval of tokens parked by
|
||||
* the webapp callback under a `state` ticket
|
||||
* POST /v1/google-oauth/refresh — exchange a refresh_token for fresh tokens
|
||||
* (the secret-requiring step that can't
|
||||
* happen on the desktop)
|
||||
*
|
||||
* Both are called with the user's Rowboat Supabase bearer (via getAccessToken).
|
||||
*
|
||||
* The api response shape uses `scope: string` (space-delimited); we convert
|
||||
* to the desktop's `scopes: string[]`. On refresh, api may omit `scope` and
|
||||
* `refresh_token` — caller-provided existingScopes / refreshToken are
|
||||
* preserved in those cases (Google rarely rotates refresh tokens).
|
||||
*/
|
||||
|
||||
/** Thrown when the api signals the user must reconnect (Google `invalid_grant`). */
|
||||
export class ReconnectRequiredError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "ReconnectRequiredError";
|
||||
}
|
||||
}
|
||||
|
||||
interface ApiTokenResponse {
|
||||
access_token: string;
|
||||
refresh_token?: string;
|
||||
expires_at: number;
|
||||
scope?: string;
|
||||
token_type?: string;
|
||||
}
|
||||
|
||||
function toOAuthTokens(
|
||||
body: ApiTokenResponse,
|
||||
fallbackRefreshToken: string | null = null,
|
||||
fallbackScopes?: string[],
|
||||
): OAuthTokens {
|
||||
const refresh_token = body.refresh_token ?? fallbackRefreshToken;
|
||||
const scopes = body.scope
|
||||
? body.scope.split(" ").filter((s) => s.length > 0)
|
||||
: fallbackScopes;
|
||||
return {
|
||||
access_token: body.access_token,
|
||||
refresh_token,
|
||||
expires_at: body.expires_at,
|
||||
token_type: "Bearer",
|
||||
scopes,
|
||||
};
|
||||
}
|
||||
|
||||
async function postWithBearer(path: string, body: unknown): Promise<Response> {
|
||||
const bearer = await getAccessToken();
|
||||
return fetch(`${API_URL}${path}`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: `Bearer ${bearer}`,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
}
|
||||
|
||||
interface ErrorBody {
|
||||
error?: string;
|
||||
reconnectRequired?: boolean;
|
||||
}
|
||||
|
||||
async function readError(res: Response): Promise<ErrorBody> {
|
||||
try {
|
||||
return (await res.json()) as ErrorBody;
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
/** Claim the tokens parked under `state` after the webapp finished its callback. */
|
||||
export async function claimTokensViaBackend(state: string): Promise<OAuthTokens> {
|
||||
const res = await postWithBearer("/v1/google-oauth/claim", { session: state });
|
||||
if (!res.ok) {
|
||||
const err = await readError(res);
|
||||
throw new Error(`claim failed: ${res.status} ${err.error ?? ""}`.trim());
|
||||
}
|
||||
const body = (await res.json()) as ApiTokenResponse;
|
||||
return toOAuthTokens(body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh an access token via the api. Preserves caller's `refreshToken` and
|
||||
* `existingScopes` when Google omits them on the refresh response.
|
||||
*/
|
||||
export async function refreshTokensViaBackend(
|
||||
refreshToken: string,
|
||||
existingScopes?: string[],
|
||||
): Promise<OAuthTokens> {
|
||||
const res = await postWithBearer("/v1/google-oauth/refresh", { refreshToken });
|
||||
if (res.status === 409) {
|
||||
const err = await readError(res);
|
||||
if (err.reconnectRequired) {
|
||||
throw new ReconnectRequiredError(err.error ?? "Reconnect required");
|
||||
}
|
||||
throw new Error(`refresh failed: 409 ${err.error ?? ""}`.trim());
|
||||
}
|
||||
if (!res.ok) {
|
||||
const err = await readError(res);
|
||||
throw new Error(`refresh failed: ${res.status} ${err.error ?? ""}`.trim());
|
||||
}
|
||||
const body = (await res.json()) as ApiTokenResponse;
|
||||
return toOAuthTokens(body, refreshToken, existingScopes);
|
||||
}
|
||||
|
|
@ -8,6 +8,13 @@ const ProviderConnectionSchema = z.object({
|
|||
tokens: OAuthTokens.nullable().optional(),
|
||||
clientId: z.string().nullable().optional(),
|
||||
clientSecret: z.string().nullable().optional(),
|
||||
/**
|
||||
* `byok` (default for absent) — user provides their own client_id+secret;
|
||||
* tokens stored locally; refresh handled locally via openid-client.
|
||||
* `rowboat` — signed-in user; client_id+secret never on the desktop;
|
||||
* tokens stored locally but refresh goes through the api.
|
||||
*/
|
||||
mode: z.enum(['byok', 'rowboat']).optional(),
|
||||
error: z.string().nullable().optional(),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -49,8 +49,6 @@ async function getAuthHeaders(): Promise<Record<string, string>> {
|
|||
*/
|
||||
const ZComposioConfig = z.object({
|
||||
apiKey: z.string().optional(),
|
||||
use_composio_for_google: z.boolean().optional(),
|
||||
use_composio_for_google_calendar: z.boolean().optional(),
|
||||
});
|
||||
|
||||
type ComposioConfig = z.infer<typeof ZComposioConfig>;
|
||||
|
|
@ -106,24 +104,6 @@ export async function isConfigured(): Promise<boolean> {
|
|||
return !!getApiKey();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Composio should be used for Google services (Gmail, etc.)
|
||||
*/
|
||||
export async function useComposioForGoogle(): Promise<boolean> {
|
||||
if (await isSignedIn()) return true;
|
||||
const config = loadConfig();
|
||||
return config.use_composio_for_google === true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Composio should be used for Google Calendar
|
||||
*/
|
||||
export async function useComposioForGoogleCalendar(): Promise<boolean> {
|
||||
if (await isSignedIn()) return true;
|
||||
const config = loadConfig();
|
||||
return config.use_composio_for_google_calendar === true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make an API call to Composio
|
||||
*/
|
||||
|
|
|
|||
51
apps/x/packages/core/src/config/remote-config.ts
Normal file
51
apps/x/packages/core/src/config/remote-config.ts
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import { API_URL } from "./env.js";
|
||||
|
||||
/**
|
||||
* Per-process cache of the unauthenticated `GET /v1/config` response from
|
||||
* the api. The api returns `{ appUrl, supabaseUrl, websocketApiUrl }` —
|
||||
* we use this to discover the webapp host (where the rowboat-mode OAuth
|
||||
* flow runs) without hardcoding it on the desktop side.
|
||||
*
|
||||
* Cached as a Promise so concurrent first-callers all await the same fetch
|
||||
* (no thundering herd). On failure the cache is cleared so the next call
|
||||
* can retry.
|
||||
*/
|
||||
|
||||
interface RemoteConfig {
|
||||
appUrl: string;
|
||||
supabaseUrl: string;
|
||||
websocketApiUrl: string;
|
||||
}
|
||||
|
||||
let _cached: Promise<RemoteConfig> | null = null;
|
||||
|
||||
async function fetchRemoteConfig(): Promise<RemoteConfig> {
|
||||
const res = await fetch(`${API_URL}/v1/config`);
|
||||
if (!res.ok) {
|
||||
throw new Error(`/v1/config returned ${res.status}`);
|
||||
}
|
||||
const body = (await res.json()) as Partial<RemoteConfig>;
|
||||
if (!body.appUrl) {
|
||||
throw new Error("/v1/config response missing appUrl");
|
||||
}
|
||||
return {
|
||||
appUrl: body.appUrl,
|
||||
supabaseUrl: body.supabaseUrl ?? "",
|
||||
websocketApiUrl: body.websocketApiUrl ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
export async function getRemoteConfig(): Promise<RemoteConfig> {
|
||||
if (!_cached) {
|
||||
_cached = fetchRemoteConfig().catch((err) => {
|
||||
_cached = null; // allow retry
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
return _cached;
|
||||
}
|
||||
|
||||
export async function getWebappUrl(): Promise<string> {
|
||||
const config = await getRemoteConfig();
|
||||
return config.appUrl;
|
||||
}
|
||||
|
|
@ -8,8 +8,6 @@ import { waitForRunCompletion } from '../agents/utils.js';
|
|||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { loadUserConfig, updateUserEmail } from '../config/user_config.js';
|
||||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
import { useComposioForGoogle, executeAction } from '../composio/client.js';
|
||||
import { composioAccountsRepo } from '../composio/repo.js';
|
||||
import {
|
||||
loadAgentNotesState,
|
||||
saveAgentNotesState,
|
||||
|
|
@ -199,30 +197,7 @@ async function ensureUserEmail(): Promise<string | null> {
|
|||
return existing.email;
|
||||
}
|
||||
|
||||
// Try Composio (used when signed in or composio configured)
|
||||
try {
|
||||
if (await useComposioForGoogle()) {
|
||||
const account = composioAccountsRepo.getAccount('gmail');
|
||||
if (account && account.status === 'ACTIVE') {
|
||||
const result = await executeAction('GMAIL_GET_PROFILE', {
|
||||
connected_account_id: account.id,
|
||||
user_id: 'rowboat-user',
|
||||
version: 'latest',
|
||||
arguments: { user_id: 'me' },
|
||||
});
|
||||
const email = (result.data as Record<string, unknown>)?.emailAddress as string | undefined;
|
||||
if (email) {
|
||||
updateUserEmail(email);
|
||||
console.log(`[AgentNotes] Auto-populated user email via Composio: ${email}`);
|
||||
return email;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log('[AgentNotes] Could not fetch email via Composio:', error instanceof Error ? error.message : error);
|
||||
}
|
||||
|
||||
// Try direct Google OAuth
|
||||
// Try direct Google OAuth (covers both BYOK and rowboat modes)
|
||||
try {
|
||||
const auth = await GoogleClientFactory.getClient();
|
||||
if (auth) {
|
||||
|
|
|
|||
|
|
@ -6,20 +6,44 @@ import { getProviderConfig } from '../auth/providers.js';
|
|||
import * as oauthClient from '../auth/oauth-client.js';
|
||||
import type { Configuration } from '../auth/oauth-client.js';
|
||||
import { OAuthTokens } from '../auth/types.js';
|
||||
import {
|
||||
ReconnectRequiredError,
|
||||
refreshTokensViaBackend,
|
||||
} from '../auth/google-backend-oauth.js';
|
||||
|
||||
type Mode = 'byok' | 'rowboat';
|
||||
|
||||
/**
|
||||
* Factory for creating and managing Google OAuth2Client instances.
|
||||
* Handles caching, token refresh, and client reuse for Google API SDKs.
|
||||
*
|
||||
* Two connection modes share the same `oauth.json` provider entry:
|
||||
* - `byok` user supplied client_id+secret; refresh runs locally via
|
||||
* openid-client; OAuth2Client built with creds.
|
||||
* - `rowboat` signed-in user; client_id+secret never on the desktop;
|
||||
* refresh goes through the api at /v1/google-oauth/refresh;
|
||||
* OAuth2Client built without creds and without refresh_token
|
||||
* (we own all refreshes — see note below).
|
||||
*
|
||||
* **Auto-refresh disabled in rowboat mode:** google-auth-library's
|
||||
* OAuth2Client will, on a 401 from a Google API call, try to refresh using
|
||||
* the refresh_token + client secret it has on hand. In rowboat mode we have
|
||||
* no secret, so that would 401-loop. We block this by passing only
|
||||
* access_token + expiry_date in setCredentials (no refresh_token), which
|
||||
* leaves the library nothing to refresh with. Our proactive expiry check
|
||||
* in getClient() is the only refresh path.
|
||||
*/
|
||||
export class GoogleClientFactory {
|
||||
private static readonly PROVIDER_NAME = 'google';
|
||||
private static cache: {
|
||||
mode: Mode | null;
|
||||
config: Configuration | null;
|
||||
client: OAuth2Client | null;
|
||||
tokens: OAuthTokens | null;
|
||||
clientId: string | null;
|
||||
clientSecret: string | null;
|
||||
} = {
|
||||
mode: null,
|
||||
config: null,
|
||||
client: null,
|
||||
tokens: null,
|
||||
|
|
@ -27,7 +51,14 @@ export class GoogleClientFactory {
|
|||
clientSecret: null,
|
||||
};
|
||||
|
||||
private static async resolveCredentials(): Promise<{ clientId: string; clientSecret?: string }> {
|
||||
/**
|
||||
* Promise singleton so a burst of getClient() calls during the brief
|
||||
* expiry window all wait on a single refresh round-trip rather than
|
||||
* fanning out parallel refreshes.
|
||||
*/
|
||||
private static refreshInFlight: Promise<OAuth2Client | null> | null = null;
|
||||
|
||||
private static async resolveByokCredentials(): Promise<{ clientId: string; clientSecret?: string }> {
|
||||
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
|
||||
const connection = await oauthRepo.read(this.PROVIDER_NAME);
|
||||
if (!connection.clientId) {
|
||||
|
|
@ -41,80 +72,116 @@ export class GoogleClientFactory {
|
|||
* Get or create OAuth2Client, reusing cached instance when possible
|
||||
*/
|
||||
static async getClient(): Promise<OAuth2Client | null> {
|
||||
if (this.refreshInFlight) {
|
||||
return this.refreshInFlight;
|
||||
}
|
||||
|
||||
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
|
||||
const { tokens } = await oauthRepo.read(this.PROVIDER_NAME);
|
||||
const connection = await oauthRepo.read(this.PROVIDER_NAME);
|
||||
const tokens = connection.tokens ?? null;
|
||||
const mode: Mode = connection.mode ?? 'byok';
|
||||
|
||||
if (!tokens) {
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
|
||||
// Initialize config cache if needed
|
||||
try {
|
||||
await this.initializeConfigCache();
|
||||
} catch (error) {
|
||||
console.error("[OAuth] Failed to initialize Google OAuth configuration:", error);
|
||||
// Mode flipped (e.g. user disconnected then reconnected differently) — invalidate.
|
||||
if (this.cache.mode && this.cache.mode !== mode) {
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
if (!this.cache.config) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check if token is expired
|
||||
// BYOK needs an openid-client Configuration for local refresh; rowboat doesn't.
|
||||
if (mode === 'byok') {
|
||||
try {
|
||||
await this.initializeConfigCache();
|
||||
} catch (error) {
|
||||
console.error('[OAuth] Failed to initialize Google OAuth configuration:', error);
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
if (!this.cache.config) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Check expiry against the cached tokens. Note: oauthClient.isTokenExpired
|
||||
// applies a small clock-skew margin so we refresh slightly before real
|
||||
// expiry — keeps long-running calls from racing the boundary.
|
||||
if (oauthClient.isTokenExpired(tokens)) {
|
||||
// Token expired, try to refresh
|
||||
if (!tokens.refresh_token) {
|
||||
console.log("[OAuth] Token expired and no refresh token available for Google.");
|
||||
console.log('[OAuth] Token expired and no refresh token available for Google.');
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Missing refresh token. Please reconnect.' });
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
console.log(`[OAuth] Token expired, refreshing access token...`);
|
||||
const existingScopes = tokens.scopes;
|
||||
const refreshedTokens = await oauthClient.refreshTokens(
|
||||
this.cache.config,
|
||||
tokens.refresh_token,
|
||||
existingScopes
|
||||
);
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens });
|
||||
|
||||
// Update cached tokens and recreate client
|
||||
this.cache.tokens = refreshedTokens;
|
||||
if (!this.cache.clientId) {
|
||||
const creds = await this.resolveCredentials();
|
||||
this.cache.clientId = creds.clientId;
|
||||
this.cache.clientSecret = creds.clientSecret ?? null;
|
||||
}
|
||||
this.cache.client = this.createClientFromTokens(refreshedTokens, this.cache.clientId, this.cache.clientSecret ?? undefined);
|
||||
console.log(`[OAuth] Token refreshed successfully`);
|
||||
return this.cache.client;
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : 'Failed to refresh token for Google';
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { error: message });
|
||||
console.error("[OAuth] Failed to refresh token for Google:", error);
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
this.refreshInFlight = this.refreshAndBuild(tokens, mode).finally(() => {
|
||||
this.refreshInFlight = null;
|
||||
});
|
||||
return this.refreshInFlight;
|
||||
}
|
||||
|
||||
// Reuse client if tokens haven't changed
|
||||
if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token) {
|
||||
if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token && this.cache.mode === mode) {
|
||||
return this.cache.client;
|
||||
}
|
||||
|
||||
// Create new client with current tokens
|
||||
console.log(`[OAuth] Creating new OAuth2Client instance`);
|
||||
this.cache.tokens = tokens;
|
||||
if (!this.cache.clientId) {
|
||||
const creds = await this.resolveCredentials();
|
||||
// Build a fresh client for current tokens
|
||||
return this.buildAndCacheClient(tokens, mode);
|
||||
}
|
||||
|
||||
private static async refreshAndBuild(tokens: OAuthTokens, mode: Mode): Promise<OAuth2Client | null> {
|
||||
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
|
||||
|
||||
try {
|
||||
console.log(`[OAuth] Token expired, refreshing via ${mode}...`);
|
||||
const existingScopes = tokens.scopes;
|
||||
|
||||
let refreshedTokens: OAuthTokens;
|
||||
if (mode === 'rowboat') {
|
||||
refreshedTokens = await refreshTokensViaBackend(tokens.refresh_token!, existingScopes);
|
||||
} else {
|
||||
if (!this.cache.config) {
|
||||
// Should not happen — initializeConfigCache ran above for byok.
|
||||
throw new Error('Google OAuth config not initialized');
|
||||
}
|
||||
refreshedTokens = await oauthClient.refreshTokens(this.cache.config, tokens.refresh_token!, existingScopes);
|
||||
}
|
||||
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens, error: null });
|
||||
console.log('[OAuth] Token refreshed successfully');
|
||||
return this.buildAndCacheClient(refreshedTokens, mode);
|
||||
} catch (error) {
|
||||
if (error instanceof ReconnectRequiredError) {
|
||||
console.log('[OAuth] Reconnect required for Google');
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Reconnect Google' });
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
const message = error instanceof Error ? error.message : 'Failed to refresh token for Google';
|
||||
await oauthRepo.upsert(this.PROVIDER_NAME, { error: message });
|
||||
console.error('[OAuth] Failed to refresh token for Google:', error);
|
||||
this.clearCache();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static async buildAndCacheClient(tokens: OAuthTokens, mode: Mode): Promise<OAuth2Client> {
|
||||
if (mode === 'byok' && !this.cache.clientId) {
|
||||
const creds = await this.resolveByokCredentials();
|
||||
this.cache.clientId = creds.clientId;
|
||||
this.cache.clientSecret = creds.clientSecret ?? null;
|
||||
}
|
||||
this.cache.client = this.createClientFromTokens(tokens, this.cache.clientId, this.cache.clientSecret ?? undefined);
|
||||
return this.cache.client;
|
||||
|
||||
const client = mode === 'rowboat'
|
||||
? this.createRowboatClient(tokens)
|
||||
: this.createByokClient(tokens, this.cache.clientId!, this.cache.clientSecret ?? undefined);
|
||||
|
||||
this.cache.mode = mode;
|
||||
this.cache.tokens = tokens;
|
||||
this.cache.client = client;
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -139,7 +206,8 @@ export class GoogleClientFactory {
|
|||
* Clear cache (useful for testing or when credentials are revoked)
|
||||
*/
|
||||
static clearCache(): void {
|
||||
console.log(`[OAuth] Clearing Google auth cache`);
|
||||
console.log('[OAuth] Clearing Google auth cache');
|
||||
this.cache.mode = null;
|
||||
this.cache.config = null;
|
||||
this.cache.client = null;
|
||||
this.cache.tokens = null;
|
||||
|
|
@ -148,10 +216,10 @@ export class GoogleClientFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Initialize cached configuration (called once)
|
||||
* Initialize cached configuration for BYOK mode (rowboat doesn't need it).
|
||||
*/
|
||||
private static async initializeConfigCache(): Promise<void> {
|
||||
const { clientId, clientSecret } = await this.resolveCredentials();
|
||||
const { clientId, clientSecret } = await this.resolveByokCredentials();
|
||||
|
||||
if (this.cache.config && this.cache.clientId === clientId && this.cache.clientSecret === (clientSecret ?? null)) {
|
||||
return; // Already initialized for these credentials
|
||||
|
|
@ -161,13 +229,13 @@ export class GoogleClientFactory {
|
|||
this.clearCache();
|
||||
}
|
||||
|
||||
console.log(`[OAuth] Initializing Google OAuth configuration...`);
|
||||
console.log('[OAuth] Initializing Google OAuth configuration...');
|
||||
const providerConfig = await getProviderConfig(this.PROVIDER_NAME);
|
||||
|
||||
if (providerConfig.discovery.mode === 'issuer') {
|
||||
if (providerConfig.client.mode === 'static') {
|
||||
// Discover endpoints, use static client ID
|
||||
console.log(`[OAuth] Discovery mode: issuer with static client ID`);
|
||||
console.log('[OAuth] Discovery mode: issuer with static client ID');
|
||||
this.cache.config = await oauthClient.discoverConfiguration(
|
||||
providerConfig.discovery.issuer,
|
||||
clientId,
|
||||
|
|
@ -175,7 +243,7 @@ export class GoogleClientFactory {
|
|||
);
|
||||
} else {
|
||||
// DCR mode - need existing registration
|
||||
console.log(`[OAuth] Discovery mode: issuer with DCR`);
|
||||
console.log('[OAuth] Discovery mode: issuer with DCR');
|
||||
const clientRepo = container.resolve<IClientRegistrationRepo>('clientRegistrationRepo');
|
||||
const existingRegistration = await clientRepo.getClientRegistration(this.PROVIDER_NAME);
|
||||
|
||||
|
|
@ -194,7 +262,7 @@ export class GoogleClientFactory {
|
|||
throw new Error('DCR requires discovery mode "issuer", not "static"');
|
||||
}
|
||||
|
||||
console.log(`[OAuth] Using static endpoints (no discovery)`);
|
||||
console.log('[OAuth] Using static endpoints (no discovery)');
|
||||
this.cache.config = oauthClient.createStaticConfiguration(
|
||||
providerConfig.discovery.authorizationEndpoint,
|
||||
providerConfig.discovery.tokenEndpoint,
|
||||
|
|
@ -206,27 +274,33 @@ export class GoogleClientFactory {
|
|||
|
||||
this.cache.clientId = clientId;
|
||||
this.cache.clientSecret = clientSecret ?? null;
|
||||
console.log(`[OAuth] Google OAuth configuration initialized`);
|
||||
console.log('[OAuth] Google OAuth configuration initialized');
|
||||
}
|
||||
|
||||
/**
|
||||
* Create OAuth2Client from OAuthTokens
|
||||
*/
|
||||
private static createClientFromTokens(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client {
|
||||
const client = new OAuth2Client(
|
||||
clientId,
|
||||
clientSecret ?? undefined,
|
||||
undefined // redirect_uri not needed for token usage
|
||||
);
|
||||
|
||||
// Set credentials
|
||||
/** BYOK OAuth2Client — has client_id + secret + refresh_token. */
|
||||
private static createByokClient(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client {
|
||||
const client = new OAuth2Client(clientId, clientSecret ?? undefined, undefined);
|
||||
client.setCredentials({
|
||||
access_token: tokens.access_token,
|
||||
refresh_token: tokens.refresh_token || undefined,
|
||||
expiry_date: tokens.expires_at * 1000, // Convert from seconds to milliseconds
|
||||
expiry_date: tokens.expires_at * 1000,
|
||||
scope: tokens.scopes?.join(' ') || undefined,
|
||||
});
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rowboat OAuth2Client — no client_id/secret, no refresh_token.
|
||||
* Library auto-refresh is disabled by absence of refresh_token; our
|
||||
* proactive refresh in getClient() is the only refresh path.
|
||||
*/
|
||||
private static createRowboatClient(tokens: OAuthTokens): OAuth2Client {
|
||||
const client = new OAuth2Client();
|
||||
client.setCredentials({
|
||||
access_token: tokens.access_token,
|
||||
expiry_date: tokens.expires_at * 1000,
|
||||
scope: tokens.scopes?.join(' ') || undefined,
|
||||
});
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,10 +5,8 @@ import { OAuth2Client } from 'google-auth-library';
|
|||
import { NodeHtmlMarkdown } from 'node-html-markdown'
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js';
|
||||
import { composioAccountsRepo } from '../composio/repo.js';
|
||||
import { createEvent } from './track/events.js';
|
||||
|
||||
const MAX_EVENTS_IN_DIGEST = 50;
|
||||
|
|
@ -138,7 +136,6 @@ async function publishCalendarSyncEvent(
|
|||
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
|
||||
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
|
||||
const LOOKBACK_DAYS = 7;
|
||||
const COMPOSIO_LOOKBACK_DAYS = 7;
|
||||
const REQUIRED_SCOPES = [
|
||||
'https://www.googleapis.com/auth/calendar.events.readonly',
|
||||
'https://www.googleapis.com/auth/drive.readonly'
|
||||
|
|
@ -477,286 +474,17 @@ async function performSync(syncDir: string, lookbackDays: number) {
|
|||
}
|
||||
}
|
||||
|
||||
// --- Composio-based Sync ---
|
||||
|
||||
interface ComposioCalendarState {
|
||||
last_sync: string; // ISO string
|
||||
}
|
||||
|
||||
function loadComposioState(stateFile: string): ComposioCalendarState | null {
|
||||
if (fs.existsSync(stateFile)) {
|
||||
try {
|
||||
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
|
||||
if (data.last_sync) {
|
||||
return { last_sync: data.last_sync };
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[Calendar] Failed to load composio state:', e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function saveComposioState(stateFile: string, lastSync: string): void {
|
||||
fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Save a Composio calendar event as JSON (same format used by Google OAuth path).
|
||||
* The event data from Composio is already structured similarly to Google Calendar API.
|
||||
*/
|
||||
function saveComposioEvent(eventData: Record<string, unknown>, syncDir: string): { changed: boolean; isNew: boolean; title: string } {
|
||||
const eventId = eventData.id as string | undefined;
|
||||
if (!eventId) return { changed: false, isNew: false, title: 'Unknown' };
|
||||
|
||||
const filePath = path.join(syncDir, `${eventId}.json`);
|
||||
const content = JSON.stringify(eventData, null, 2);
|
||||
const exists = fs.existsSync(filePath);
|
||||
|
||||
try {
|
||||
if (exists) {
|
||||
const existing = fs.readFileSync(filePath, 'utf-8');
|
||||
if (existing === content) {
|
||||
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
|
||||
}
|
||||
}
|
||||
|
||||
fs.writeFileSync(filePath, content);
|
||||
return { changed: true, isNew: !exists, title: (eventData.summary as string) || eventId };
|
||||
} catch (e) {
|
||||
console.error(`[Calendar] Error saving event ${eventId}:`, e);
|
||||
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
|
||||
}
|
||||
}
|
||||
|
||||
async function performSyncComposio() {
|
||||
const STATE_FILE = path.join(SYNC_DIR, 'composio_state.json');
|
||||
|
||||
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
|
||||
|
||||
const account = composioAccountsRepo.getAccount('googlecalendar');
|
||||
if (!account || account.status !== 'ACTIVE') {
|
||||
console.log('[Calendar] Google Calendar not connected via Composio. Skipping sync.');
|
||||
return;
|
||||
}
|
||||
|
||||
const connectedAccountId = account.id;
|
||||
|
||||
// Calculate time window: lookback + 14 days forward
|
||||
const now = new Date();
|
||||
const lookbackMs = COMPOSIO_LOOKBACK_DAYS * 24 * 60 * 60 * 1000;
|
||||
const twoWeeksForwardMs = 14 * 24 * 60 * 60 * 1000;
|
||||
|
||||
const timeMin = new Date(now.getTime() - lookbackMs).toISOString();
|
||||
const timeMax = new Date(now.getTime() + twoWeeksForwardMs).toISOString();
|
||||
|
||||
console.log(`[Calendar] Syncing via Composio from ${timeMin} to ${timeMax} (lookback: ${COMPOSIO_LOOKBACK_DAYS} days)...`);
|
||||
|
||||
let run: ServiceRunContext | null = null;
|
||||
const ensureRun = async (): Promise<ServiceRunContext> => {
|
||||
if (!run) {
|
||||
run = await serviceLogger.startRun({
|
||||
service: 'calendar',
|
||||
message: 'Syncing calendar (Composio)',
|
||||
trigger: 'timer',
|
||||
});
|
||||
}
|
||||
return run;
|
||||
};
|
||||
|
||||
try {
|
||||
const currentEventIds = new Set<string>();
|
||||
let newCount = 0;
|
||||
let updatedCount = 0;
|
||||
const changedTitles: string[] = [];
|
||||
const newEvents: AnyEvent[] = [];
|
||||
const updatedEvents: AnyEvent[] = [];
|
||||
let pageToken: string | null = null;
|
||||
const MAX_PAGES = 20;
|
||||
|
||||
for (let page = 0; page < MAX_PAGES; page++) {
|
||||
// Re-check connection in case user disconnected mid-sync
|
||||
if (!composioAccountsRepo.isConnected('googlecalendar')) {
|
||||
console.log('[Calendar] Account disconnected during sync. Stopping.');
|
||||
return;
|
||||
}
|
||||
|
||||
const args: Record<string, unknown> = {
|
||||
calendar_id: 'primary',
|
||||
time_min: timeMin,
|
||||
time_max: timeMax,
|
||||
single_events: true,
|
||||
order_by: 'startTime',
|
||||
};
|
||||
if (pageToken) {
|
||||
args.page_token = pageToken;
|
||||
}
|
||||
|
||||
const result = await executeAction(
|
||||
'GOOGLECALENDAR_FIND_EVENT',
|
||||
{
|
||||
connected_account_id: connectedAccountId,
|
||||
user_id: 'rowboat-user',
|
||||
version: 'latest',
|
||||
arguments: args,
|
||||
}
|
||||
);
|
||||
|
||||
if (!result.successful || !result.data) {
|
||||
console.error('[Calendar] Failed to list events via Composio:', result.error);
|
||||
return;
|
||||
}
|
||||
|
||||
const data = result.data as Record<string, unknown>;
|
||||
// Composio may return events in different structures
|
||||
let events: Array<Record<string, unknown>> = [];
|
||||
|
||||
if (Array.isArray(data.items)) {
|
||||
events = data.items as Array<Record<string, unknown>>;
|
||||
} else if (Array.isArray(data.events)) {
|
||||
events = data.events as Array<Record<string, unknown>>;
|
||||
} else if (data.event_data && typeof data.event_data === 'object') {
|
||||
const nested = data.event_data as Record<string, unknown>;
|
||||
if (Array.isArray(nested.event_data)) {
|
||||
events = nested.event_data as Array<Record<string, unknown>>;
|
||||
} else if (Array.isArray(data.event_data)) {
|
||||
events = data.event_data as Array<Record<string, unknown>>;
|
||||
}
|
||||
} else if (Array.isArray(data)) {
|
||||
events = data as unknown as Array<Record<string, unknown>>;
|
||||
}
|
||||
|
||||
if (events.length === 0 && page === 0) {
|
||||
console.log('[Calendar] No events found in this window.');
|
||||
} else if (events.length > 0) {
|
||||
console.log(`[Calendar] Page ${page + 1}: found ${events.length} events.`);
|
||||
for (const event of events) {
|
||||
const eventId = event.id as string | undefined;
|
||||
if (eventId) {
|
||||
const saveResult = saveComposioEvent(event, SYNC_DIR);
|
||||
currentEventIds.add(eventId);
|
||||
|
||||
if (saveResult.changed) {
|
||||
await ensureRun();
|
||||
changedTitles.push(saveResult.title);
|
||||
if (saveResult.isNew) {
|
||||
newCount++;
|
||||
newEvents.push(event);
|
||||
} else {
|
||||
updatedCount++;
|
||||
updatedEvents.push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for next page
|
||||
const nextToken = data.nextPageToken as string | undefined;
|
||||
if (nextToken) {
|
||||
pageToken = nextToken;
|
||||
console.log(`[Calendar] Fetching next page...`);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up events no longer in the window
|
||||
const deletedFiles = cleanUpOldFiles(currentEventIds, SYNC_DIR);
|
||||
let deletedCount = 0;
|
||||
if (deletedFiles.length > 0) {
|
||||
await ensureRun();
|
||||
deletedCount = deletedFiles.length;
|
||||
}
|
||||
|
||||
// Publish a single bundled event capturing all changes from this sync.
|
||||
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
|
||||
|
||||
// Log results if any changes were detected (run was started by ensureRun)
|
||||
if (run) {
|
||||
const r = run as ServiceRunContext;
|
||||
const totalChanges = newCount + updatedCount + deletedCount;
|
||||
const limitedTitles = limitEventItems(changedTitles);
|
||||
await serviceLogger.log({
|
||||
type: 'changes_identified',
|
||||
service: r.service,
|
||||
runId: r.runId,
|
||||
level: 'info',
|
||||
message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
|
||||
counts: {
|
||||
newEvents: newCount,
|
||||
updatedEvents: updatedCount,
|
||||
deletedFiles: deletedCount,
|
||||
},
|
||||
items: limitedTitles.items,
|
||||
truncated: limitedTitles.truncated,
|
||||
});
|
||||
await serviceLogger.log({
|
||||
type: 'run_complete',
|
||||
service: r.service,
|
||||
runId: r.runId,
|
||||
level: 'info',
|
||||
message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
|
||||
durationMs: Date.now() - r.startedAt,
|
||||
outcome: 'ok',
|
||||
summary: {
|
||||
newEvents: newCount,
|
||||
updatedEvents: updatedCount,
|
||||
deletedFiles: deletedCount,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Save state
|
||||
saveComposioState(STATE_FILE, new Date().toISOString());
|
||||
console.log(`[Calendar] Composio sync completed. ${newCount} new, ${updatedCount} updated, ${deletedCount} deleted.`);
|
||||
} catch (error) {
|
||||
console.error('[Calendar] Error during Composio sync:', error);
|
||||
const errRun = await ensureRun();
|
||||
await serviceLogger.log({
|
||||
type: 'error',
|
||||
service: errRun.service,
|
||||
runId: errRun.runId,
|
||||
level: 'error',
|
||||
message: 'Calendar sync error',
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
await serviceLogger.log({
|
||||
type: 'run_complete',
|
||||
service: errRun.service,
|
||||
runId: errRun.runId,
|
||||
level: 'error',
|
||||
message: 'Calendar sync failed',
|
||||
durationMs: Date.now() - errRun.startedAt,
|
||||
outcome: 'error',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function init() {
|
||||
console.log("Starting Google Calendar & Notes Sync (TS)...");
|
||||
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const composioMode = await useComposioForGoogleCalendar();
|
||||
if (composioMode) {
|
||||
const isConnected = composioAccountsRepo.isConnected('googlecalendar');
|
||||
if (!isConnected) {
|
||||
console.log('[Calendar] Google Calendar not connected via Composio. Sleeping...');
|
||||
} else {
|
||||
await performSyncComposio();
|
||||
}
|
||||
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
|
||||
if (!hasCredentials) {
|
||||
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
|
||||
} else {
|
||||
// Check if credentials are available with required scopes
|
||||
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
|
||||
|
||||
if (!hasCredentials) {
|
||||
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
|
||||
} else {
|
||||
// Perform one sync
|
||||
await performSync(SYNC_DIR, LOOKBACK_DAYS);
|
||||
}
|
||||
await performSync(SYNC_DIR, LOOKBACK_DAYS);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error in main loop:", error);
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ import { WorkDir } from '../config/config.js';
|
|||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { executeAction, useComposioForGoogle } from '../composio/client.js';
|
||||
import { composioAccountsRepo } from '../composio/repo.js';
|
||||
import { createEvent } from './track/events.js';
|
||||
|
||||
// Configuration
|
||||
|
|
@ -225,7 +223,7 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
|
|||
}
|
||||
}
|
||||
|
||||
function loadState(stateFile: string): { historyId?: string } {
|
||||
function loadState(stateFile: string): { historyId?: string; last_sync?: string } {
|
||||
if (fs.existsSync(stateFile)) {
|
||||
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
|
||||
}
|
||||
|
|
@ -240,9 +238,24 @@ function saveState(historyId: string, stateFile: string) {
|
|||
}
|
||||
|
||||
async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) {
|
||||
console.log(`Performing full sync of last ${lookbackDays} days...`);
|
||||
const gmail = google.gmail({ version: 'v1', auth });
|
||||
|
||||
// If the state file holds a last_sync timestamp (e.g. left over from a
|
||||
// prior Composio sync, or from a previous successful native sync that
|
||||
// we're falling back to after a history.list 404), use that as the
|
||||
// floor instead of the default lookback. Carries forward Composio's
|
||||
// last_sync on first migration so we don't refetch the last 7 days.
|
||||
const state = loadState(stateFile);
|
||||
let pastDate: Date;
|
||||
if (state.last_sync) {
|
||||
pastDate = new Date(state.last_sync);
|
||||
console.log(`Performing full sync from last_sync=${state.last_sync}...`);
|
||||
} else {
|
||||
pastDate = new Date();
|
||||
pastDate.setDate(pastDate.getDate() - lookbackDays);
|
||||
console.log(`Performing full sync of last ${lookbackDays} days...`);
|
||||
}
|
||||
|
||||
let run: ServiceRunContext | null = null;
|
||||
const ensureRun = async () => {
|
||||
if (!run) {
|
||||
|
|
@ -255,8 +268,6 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
|
|||
};
|
||||
|
||||
try {
|
||||
const pastDate = new Date();
|
||||
pastDate.setDate(pastDate.getDate() - lookbackDays);
|
||||
const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/');
|
||||
|
||||
// Get History ID
|
||||
|
|
@ -498,386 +509,17 @@ async function performSync() {
|
|||
}
|
||||
}
|
||||
|
||||
// --- Composio-based Sync ---
|
||||
|
||||
const COMPOSIO_LOOKBACK_DAYS = 7;
|
||||
|
||||
interface ComposioSyncState {
|
||||
last_sync: string; // ISO string
|
||||
}
|
||||
|
||||
function loadComposioState(stateFile: string): ComposioSyncState | null {
|
||||
if (fs.existsSync(stateFile)) {
|
||||
try {
|
||||
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
|
||||
if (data.last_sync) {
|
||||
return { last_sync: data.last_sync };
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[Gmail] Failed to load composio state:', e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function saveComposioState(stateFile: string, lastSync: string): void {
|
||||
fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2));
|
||||
}
|
||||
|
||||
function tryParseDate(dateStr: string): Date | null {
|
||||
const d = new Date(dateStr);
|
||||
return isNaN(d.getTime()) ? null : d;
|
||||
}
|
||||
|
||||
interface ParsedMessage {
|
||||
from: string;
|
||||
date: string;
|
||||
subject: string;
|
||||
body: string;
|
||||
}
|
||||
|
||||
function parseMessageData(messageData: Record<string, unknown>): ParsedMessage {
|
||||
const headers = messageData.payload && typeof messageData.payload === 'object'
|
||||
? (messageData.payload as Record<string, unknown>).headers as Array<{ name: string; value: string }> | undefined
|
||||
: undefined;
|
||||
|
||||
const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown');
|
||||
const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown');
|
||||
const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)');
|
||||
|
||||
let body = '';
|
||||
|
||||
if (messageData.payload && typeof messageData.payload === 'object') {
|
||||
body = extractBodyFromPayload(messageData.payload as Record<string, unknown>);
|
||||
}
|
||||
|
||||
if (!body) {
|
||||
if (typeof messageData.body === 'string') {
|
||||
body = messageData.body;
|
||||
} else if (typeof messageData.snippet === 'string') {
|
||||
body = messageData.snippet;
|
||||
} else if (typeof messageData.text === 'string') {
|
||||
body = messageData.text;
|
||||
}
|
||||
}
|
||||
|
||||
if (body && (body.includes('<html') || body.includes('<div') || body.includes('<p'))) {
|
||||
body = nhm.translate(body);
|
||||
}
|
||||
|
||||
if (body) {
|
||||
body = body.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
|
||||
}
|
||||
|
||||
return { from, date, subject, body };
|
||||
}
|
||||
|
||||
function extractBodyFromPayload(payload: Record<string, unknown>): string {
|
||||
const parts = payload.parts as Array<Record<string, unknown>> | undefined;
|
||||
|
||||
if (parts) {
|
||||
for (const part of parts) {
|
||||
const mimeType = part.mimeType as string | undefined;
|
||||
const bodyData = part.body && typeof part.body === 'object'
|
||||
? (part.body as Record<string, unknown>).data as string | undefined
|
||||
: undefined;
|
||||
|
||||
if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) {
|
||||
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
|
||||
if (mimeType === 'text/html') {
|
||||
return nhm.translate(decoded);
|
||||
}
|
||||
return decoded;
|
||||
}
|
||||
|
||||
if (part.parts) {
|
||||
const result = extractBodyFromPayload(part as Record<string, unknown>);
|
||||
if (result) return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const bodyData = payload.body && typeof payload.body === 'object'
|
||||
? (payload.body as Record<string, unknown>).data as string | undefined
|
||||
: undefined;
|
||||
|
||||
if (bodyData) {
|
||||
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
|
||||
const mimeType = payload.mimeType as string | undefined;
|
||||
if (mimeType === 'text/html') {
|
||||
return nhm.translate(decoded);
|
||||
}
|
||||
return decoded;
|
||||
}
|
||||
|
||||
return '';
|
||||
}
|
||||
|
||||
interface ComposioThreadResult {
|
||||
synced: SyncedThread | null;
|
||||
newestIsoPlusOne: string | null;
|
||||
}
|
||||
|
||||
async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise<ComposioThreadResult> {
|
||||
let threadResult;
|
||||
try {
|
||||
threadResult = await executeAction(
|
||||
'GMAIL_FETCH_MESSAGE_BY_THREAD_ID',
|
||||
{
|
||||
connected_account_id: connectedAccountId,
|
||||
user_id: 'rowboat-user',
|
||||
version: 'latest',
|
||||
arguments: { thread_id: threadId, user_id: 'me' },
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error);
|
||||
return { synced: null, newestIsoPlusOne: null };
|
||||
}
|
||||
|
||||
if (!threadResult.successful || !threadResult.data) {
|
||||
console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error);
|
||||
return { synced: null, newestIsoPlusOne: null };
|
||||
}
|
||||
|
||||
const data = threadResult.data as Record<string, unknown>;
|
||||
const messages = data.messages as Array<Record<string, unknown>> | undefined;
|
||||
|
||||
let newestDate: Date | null = null;
|
||||
let mdContent: string;
|
||||
let subjectForLog: string;
|
||||
|
||||
if (!messages || messages.length === 0) {
|
||||
const parsed = parseMessageData(data);
|
||||
mdContent = `# ${parsed.subject}\n\n` +
|
||||
`**Thread ID:** ${threadId}\n` +
|
||||
`**Message Count:** 1\n\n---\n\n` +
|
||||
`### From: ${parsed.from}\n` +
|
||||
`**Date:** ${parsed.date}\n\n` +
|
||||
`${parsed.body}\n\n---\n\n`;
|
||||
subjectForLog = parsed.subject;
|
||||
newestDate = tryParseDate(parsed.date);
|
||||
} else {
|
||||
const firstParsed = parseMessageData(messages[0]);
|
||||
mdContent = `# ${firstParsed.subject}\n\n`;
|
||||
mdContent += `**Thread ID:** ${threadId}\n`;
|
||||
mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`;
|
||||
|
||||
for (const msg of messages) {
|
||||
const parsed = parseMessageData(msg);
|
||||
mdContent += `### From: ${parsed.from}\n`;
|
||||
mdContent += `**Date:** ${parsed.date}\n\n`;
|
||||
mdContent += `${parsed.body}\n\n`;
|
||||
mdContent += `---\n\n`;
|
||||
|
||||
const msgDate = tryParseDate(parsed.date);
|
||||
if (msgDate && (!newestDate || msgDate > newestDate)) {
|
||||
newestDate = msgDate;
|
||||
}
|
||||
}
|
||||
subjectForLog = firstParsed.subject;
|
||||
}
|
||||
|
||||
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
|
||||
console.log(`[Gmail] Synced Thread: ${subjectForLog} (${threadId})`);
|
||||
|
||||
const newestIsoPlusOne = newestDate ? new Date(newestDate.getTime() + 1000).toISOString() : null;
|
||||
return { synced: { threadId, markdown: mdContent }, newestIsoPlusOne };
|
||||
}
|
||||
|
||||
async function performSyncComposio() {
|
||||
const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments');
|
||||
const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json');
|
||||
|
||||
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
|
||||
if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true });
|
||||
|
||||
const account = composioAccountsRepo.getAccount('gmail');
|
||||
if (!account || account.status !== 'ACTIVE') {
|
||||
console.log('[Gmail] Gmail not connected via Composio. Skipping sync.');
|
||||
return;
|
||||
}
|
||||
|
||||
const connectedAccountId = account.id;
|
||||
|
||||
const state = loadComposioState(STATE_FILE);
|
||||
let afterEpochSeconds: number;
|
||||
|
||||
if (state) {
|
||||
afterEpochSeconds = Math.floor(new Date(state.last_sync).getTime() / 1000);
|
||||
console.log(`[Gmail] Syncing messages since ${state.last_sync}...`);
|
||||
} else {
|
||||
const pastDate = new Date();
|
||||
pastDate.setDate(pastDate.getDate() - COMPOSIO_LOOKBACK_DAYS);
|
||||
afterEpochSeconds = Math.floor(pastDate.getTime() / 1000);
|
||||
console.log(`[Gmail] First sync - fetching last ${COMPOSIO_LOOKBACK_DAYS} days...`);
|
||||
}
|
||||
|
||||
let run: ServiceRunContext | null = null;
|
||||
const ensureRun = async () => {
|
||||
if (!run) {
|
||||
run = await serviceLogger.startRun({
|
||||
service: 'gmail',
|
||||
message: 'Syncing Gmail (Composio)',
|
||||
trigger: 'timer',
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const allThreadIds: string[] = [];
|
||||
let pageToken: string | undefined;
|
||||
|
||||
do {
|
||||
const params: Record<string, unknown> = {
|
||||
query: `after:${afterEpochSeconds}`,
|
||||
max_results: 20,
|
||||
user_id: 'me',
|
||||
};
|
||||
if (pageToken) {
|
||||
params.page_token = pageToken;
|
||||
}
|
||||
|
||||
const result = await executeAction(
|
||||
'GMAIL_LIST_THREADS',
|
||||
{
|
||||
connected_account_id: connectedAccountId,
|
||||
user_id: 'rowboat-user',
|
||||
version: 'latest',
|
||||
arguments: params,
|
||||
}
|
||||
);
|
||||
|
||||
if (!result.successful || !result.data) {
|
||||
console.error('[Gmail] Failed to list threads:', result.error);
|
||||
return;
|
||||
}
|
||||
|
||||
const data = result.data as Record<string, unknown>;
|
||||
const threads = data.threads as Array<Record<string, unknown>> | undefined;
|
||||
|
||||
if (threads && threads.length > 0) {
|
||||
for (const thread of threads) {
|
||||
const threadId = thread.id as string | undefined;
|
||||
if (threadId) {
|
||||
allThreadIds.push(threadId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pageToken = data.nextPageToken as string | undefined;
|
||||
} while (pageToken);
|
||||
|
||||
if (allThreadIds.length === 0) {
|
||||
console.log('[Gmail] No new threads.');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`);
|
||||
|
||||
await ensureRun();
|
||||
const limitedThreads = limitEventItems(allThreadIds);
|
||||
await serviceLogger.log({
|
||||
type: 'changes_identified',
|
||||
service: run!.service,
|
||||
runId: run!.runId,
|
||||
level: 'info',
|
||||
message: `Found ${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'} to sync`,
|
||||
counts: { threads: allThreadIds.length },
|
||||
items: limitedThreads.items,
|
||||
truncated: limitedThreads.truncated,
|
||||
});
|
||||
|
||||
// Process oldest first so high-water mark advances chronologically
|
||||
allThreadIds.reverse();
|
||||
|
||||
let highWaterMark: string | null = state?.last_sync ?? null;
|
||||
let processedCount = 0;
|
||||
const synced: SyncedThread[] = [];
|
||||
for (const threadId of allThreadIds) {
|
||||
// Re-check connection in case user disconnected mid-sync
|
||||
if (!composioAccountsRepo.isConnected('gmail')) {
|
||||
console.log('[Gmail] Account disconnected during sync. Stopping.');
|
||||
break;
|
||||
}
|
||||
try {
|
||||
const result = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR);
|
||||
processedCount++;
|
||||
|
||||
if (result.synced) synced.push(result.synced);
|
||||
|
||||
if (result.newestIsoPlusOne) {
|
||||
if (!highWaterMark || new Date(result.newestIsoPlusOne) > new Date(highWaterMark)) {
|
||||
highWaterMark = result.newestIsoPlusOne;
|
||||
}
|
||||
saveComposioState(STATE_FILE, highWaterMark);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
await publishGmailSyncEvent(synced);
|
||||
|
||||
await serviceLogger.log({
|
||||
type: 'run_complete',
|
||||
service: run!.service,
|
||||
runId: run!.runId,
|
||||
level: 'info',
|
||||
message: `Gmail sync complete: ${processedCount}/${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'}`,
|
||||
durationMs: Date.now() - run!.startedAt,
|
||||
outcome: 'ok',
|
||||
summary: { threads: processedCount },
|
||||
});
|
||||
|
||||
console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`);
|
||||
} catch (error) {
|
||||
console.error('[Gmail] Error during sync:', error);
|
||||
await ensureRun();
|
||||
await serviceLogger.log({
|
||||
type: 'error',
|
||||
service: run!.service,
|
||||
runId: run!.runId,
|
||||
level: 'error',
|
||||
message: 'Gmail sync error',
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
await serviceLogger.log({
|
||||
type: 'run_complete',
|
||||
service: run!.service,
|
||||
runId: run!.runId,
|
||||
level: 'error',
|
||||
message: 'Gmail sync failed',
|
||||
durationMs: Date.now() - run!.startedAt,
|
||||
outcome: 'error',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function init() {
|
||||
console.log("Starting Gmail Sync (TS)...");
|
||||
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const composioMode = await useComposioForGoogle();
|
||||
if (composioMode) {
|
||||
const isConnected = composioAccountsRepo.isConnected('gmail');
|
||||
if (!isConnected) {
|
||||
console.log('[Gmail] Gmail not connected via Composio. Sleeping...');
|
||||
} else {
|
||||
await performSyncComposio();
|
||||
}
|
||||
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE);
|
||||
if (!hasCredentials) {
|
||||
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
|
||||
} else {
|
||||
// Check if credentials are available with required scopes
|
||||
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE);
|
||||
|
||||
if (!hasCredentials) {
|
||||
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
|
||||
} else {
|
||||
// Perform one sync
|
||||
await performSync();
|
||||
}
|
||||
await performSync();
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error in main loop:", error);
|
||||
|
|
|
|||
132
apps/x/packages/core/src/migrations/composio-google-migration.ts
Normal file
132
apps/x/packages/core/src/migrations/composio-google-migration.ts
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { z } from 'zod';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { isSignedIn } from '../account/account.js';
|
||||
import { composioAccountsRepo } from '../composio/repo.js';
|
||||
import { deleteConnectedAccount } from '../composio/client.js';
|
||||
import container from '../di/container.js';
|
||||
import { IOAuthRepo } from '../auth/repo.js';
|
||||
|
||||
/**
|
||||
* One-time migration that moves Composio-connected Gmail/Calendar users
|
||||
* to the native rowboat-mode Google OAuth flow.
|
||||
*
|
||||
* Triggered by the renderer on app launch and after Rowboat sign-in. The
|
||||
* single guard is `dismissed_at` in the migration state file — once set,
|
||||
* none of the migration's side effects run again. This protects users who
|
||||
* later re-add Composio Google for non-sync purposes (e.g. a tool that
|
||||
* happens to use the Gmail toolkit) from having that connection blown
|
||||
* away on a future launch.
|
||||
*/
|
||||
|
||||
const STATE_FILE = path.join(WorkDir, 'config', 'composio-google-migration.json');
|
||||
|
||||
const ZState = z.object({
|
||||
dismissed_at: z.string().min(1).optional(),
|
||||
});
|
||||
type State = z.infer<typeof ZState>;
|
||||
|
||||
function loadState(): State {
|
||||
try {
|
||||
if (fs.existsSync(STATE_FILE)) {
|
||||
const raw = fs.readFileSync(STATE_FILE, 'utf-8');
|
||||
return ZState.parse(JSON.parse(raw));
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[composio-google-migration] failed to load state:', error);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
function saveState(state: State): void {
|
||||
const dir = path.dirname(STATE_FILE);
|
||||
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
|
||||
fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2));
|
||||
}
|
||||
|
||||
function markDismissed(): void {
|
||||
saveState({ dismissed_at: new Date().toISOString() });
|
||||
}
|
||||
|
||||
async function disconnectComposioGoogle(): Promise<void> {
|
||||
for (const slug of ['gmail', 'googlecalendar'] as const) {
|
||||
const account = composioAccountsRepo.getAccount(slug);
|
||||
if (!account?.id) continue;
|
||||
|
||||
try {
|
||||
await deleteConnectedAccount(account.id);
|
||||
console.log(`[composio-google-migration] composio: deleted ${slug} (${account.id})`);
|
||||
} catch (error) {
|
||||
// Best-effort — logged but doesn't block the local cleanup.
|
||||
console.warn(`[composio-google-migration] composio delete failed for ${slug}:`, error);
|
||||
}
|
||||
|
||||
try {
|
||||
composioAccountsRepo.deleteAccount(slug);
|
||||
} catch (error) {
|
||||
console.warn(`[composio-google-migration] local delete failed for ${slug}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function cleanupCalendarComposioState(): void {
|
||||
const file = path.join(WorkDir, 'calendar_sync', 'composio_state.json');
|
||||
try {
|
||||
if (fs.existsSync(file)) {
|
||||
fs.unlinkSync(file);
|
||||
console.log('[composio-google-migration] removed stale calendar composio_state.json');
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn('[composio-google-migration] failed to remove composio_state.json:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the user qualifies for the migration. If they do, atomically
|
||||
* mark `dismissed_at`, fire-and-forget the Composio disconnect, and return
|
||||
* `{shouldShow: true}` so the renderer can show the modal.
|
||||
*
|
||||
* Idempotent: subsequent calls return `{shouldShow: false}` once `dismissed_at`
|
||||
* is set, regardless of whether the modal was actually shown or the user
|
||||
* completed the OAuth flow.
|
||||
*/
|
||||
export async function qualifyAndDisconnectComposioGoogle(): Promise<{ shouldShow: boolean }> {
|
||||
// Rule 4 — already processed
|
||||
const state = loadState();
|
||||
if (state.dismissed_at) {
|
||||
return { shouldShow: false };
|
||||
}
|
||||
|
||||
// Rule 1 — must be signed in to Rowboat
|
||||
if (!(await isSignedIn())) {
|
||||
return { shouldShow: false };
|
||||
}
|
||||
|
||||
// Rule 3 — already on native rowboat-mode Google → silently mark dismissed
|
||||
// (so we stop re-checking) and bail before touching Composio state.
|
||||
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
|
||||
const googleConnection = await oauthRepo.read('google');
|
||||
if (googleConnection.tokens && googleConnection.mode === 'rowboat') {
|
||||
markDismissed();
|
||||
return { shouldShow: false };
|
||||
}
|
||||
|
||||
// Rule 2 — must have at least one Composio Google toolkit connected
|
||||
const hasGmail = composioAccountsRepo.isConnected('gmail');
|
||||
const hasCalendar = composioAccountsRepo.isConnected('googlecalendar');
|
||||
if (!hasGmail && !hasCalendar) {
|
||||
return { shouldShow: false };
|
||||
}
|
||||
|
||||
// All rules pass. Mark dismissed atomically before any side effects so
|
||||
// a crash mid-migration leaves us in a deterministic post-migration state.
|
||||
markDismissed();
|
||||
|
||||
// Fire-and-forget: disconnect Composio Google + clean up the stale
|
||||
// calendar state file. Both are best-effort.
|
||||
void disconnectComposioGoogle();
|
||||
cleanupCalendarComposioState();
|
||||
|
||||
return { shouldShow: true };
|
||||
}
|
||||
|
|
@ -429,16 +429,10 @@ const ipcSchemas = {
|
|||
toolkits: z.array(z.string()),
|
||||
}),
|
||||
},
|
||||
'composio:use-composio-for-google': {
|
||||
'migration:check-composio-google': {
|
||||
req: z.null(),
|
||||
res: z.object({
|
||||
enabled: z.boolean(),
|
||||
}),
|
||||
},
|
||||
'composio:use-composio-for-google-calendar': {
|
||||
req: z.null(),
|
||||
res: z.object({
|
||||
enabled: z.boolean(),
|
||||
shouldShow: z.boolean(),
|
||||
}),
|
||||
},
|
||||
'composio:didConnect': {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue