diff --git a/apps/x/packages/core/src/auth/google-backend-oauth.ts b/apps/x/packages/core/src/auth/google-backend-oauth.ts index a441d205..b3d77c42 100644 --- a/apps/x/packages/core/src/auth/google-backend-oauth.ts +++ b/apps/x/packages/core/src/auth/google-backend-oauth.ts @@ -26,6 +26,25 @@ export class ReconnectRequiredError extends Error { } } +/** + * Thrown when the api signals a transient failure (rate limit, in-flight dedup, + * upstream 5xx) — caller should leave stored tokens untouched and retry on its + * next tick rather than flagging the user for reconnect. + * + * In particular: the backend returns 429 with `Refresh in progress, retry shortly` + * when two desktop clients race the same refresh; the proactive in-flight dedup + * in GoogleClientFactory should make that unreachable, but this is the safety + * net if it ever isn't. + */ +export class TransientRefreshError extends Error { + readonly status: number; + constructor(message: string, status: number) { + super(message); + this.name = "TransientRefreshError"; + this.status = status; + } +} + interface ApiTokenResponse { access_token: string; refresh_token?: string; @@ -104,6 +123,17 @@ export async function refreshTokensViaBackend( } throw new Error(`refresh failed: 409 ${err.error ?? ""}`.trim()); } + // 429 = backend dedup said another refresh is in flight; 5xx = upstream + // hiccup. Either way the local tokens are still valid for the next attempt + // — surface as TransientRefreshError so the factory doesn't write a stuck + // error into oauth.json. + if (res.status === 429 || res.status >= 500) { + const err = await readError(res); + throw new TransientRefreshError( + `refresh failed: ${res.status} ${err.error ?? ""}`.trim(), + res.status, + ); + } if (!res.ok) { const err = await readError(res); throw new Error(`refresh failed: ${res.status} ${err.error ?? ""}`.trim()); diff --git a/apps/x/packages/core/src/knowledge/google-client-factory.test.ts b/apps/x/packages/core/src/knowledge/google-client-factory.test.ts new file mode 100644 index 00000000..7e49845b --- /dev/null +++ b/apps/x/packages/core/src/knowledge/google-client-factory.test.ts @@ -0,0 +1,143 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { OAuthTokens } from '../auth/types.js'; + +/** + * Regression for the cold-start race that left a stuck `error` field in + * oauth.json: Gmail + Calendar both call getClient() in the same tick, the + * dedup singleton's check-and-assign were separated by an `await`, two + * parallel refreshes go out, backend 429s the second one, the upsert(error) + * write from the 429 path could land last and stick "Needs reconnect" in + * the UI even though tokens were valid. + */ + +interface MockOAuthRepo { + read: ReturnType; + upsert: ReturnType; + delete: ReturnType; + getClientFacingConfig: ReturnType; +} + +let refreshSpy: ReturnType; +let mockOAuthRepo: MockOAuthRepo; +let storedTokens: OAuthTokens; + +beforeEach(() => { + vi.resetModules(); + + // Expired 1 minute ago — forces the refresh path through getClient. + storedTokens = { + access_token: 'old-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) - 60, + token_type: 'Bearer', + scopes: ['https://www.googleapis.com/auth/gmail.modify'], + }; + + mockOAuthRepo = { + read: vi.fn(async () => ({ tokens: storedTokens, mode: 'rowboat' as const })), + upsert: vi.fn(async () => undefined), + delete: vi.fn(async () => undefined), + getClientFacingConfig: vi.fn(async () => ({})), + }; + + vi.doMock('../di/container.js', () => ({ + default: { + resolve: (key: string) => { + if (key === 'oauthRepo') return mockOAuthRepo; + throw new Error(`unexpected DI resolve in test: ${key}`); + }, + }, + })); + + // Real-ish delay so two concurrent callers actually have something to + // overlap on — without it the spy might resolve synchronously and mask + // the very race we're testing for. + refreshSpy = vi.fn(async (_rt: string, scopes?: string[]) => { + await new Promise((r) => setTimeout(r, 25)); + return { + access_token: 'new-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) + 3600, + token_type: 'Bearer' as const, + scopes, + }; + }); + + vi.doMock('../auth/google-backend-oauth.js', async () => { + const actual = await vi.importActual( + '../auth/google-backend-oauth.js', + ); + return { + ...actual, + refreshTokensViaBackend: refreshSpy, + }; + }); +}); + +afterEach(() => { + vi.doUnmock('../di/container.js'); + vi.doUnmock('../auth/google-backend-oauth.js'); + vi.resetModules(); +}); + +describe('GoogleClientFactory.getClient', () => { + it('coalesces concurrent callers into a single refresh', async () => { + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + // Same tick — this is the exact pattern that sync_gmail.init() and + // sync_calendar.init() produce on cold start. + const [a, b] = await Promise.all([ + GoogleClientFactory.getClient(), + GoogleClientFactory.getClient(), + ]); + + expect(refreshSpy).toHaveBeenCalledTimes(1); + expect(a).not.toBeNull(); + expect(a).toBe(b); + + // And the failure-path upsert (error: '429…') is never invoked, so + // oauth.json doesn't get a stuck error. + const errorUpserts = mockOAuthRepo.upsert.mock.calls.filter( + ([, conn]) => (conn as { error?: string | null }).error, + ); + expect(errorUpserts).toHaveLength(0); + }); + + it('returns cached client when tokens are not expired', async () => { + // Tokens valid for another hour — no refresh should fire. + storedTokens = { + access_token: 'fresh-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) + 3600, + token_type: 'Bearer', + scopes: ['https://www.googleapis.com/auth/gmail.modify'], + }; + mockOAuthRepo.read = vi.fn(async () => ({ tokens: storedTokens, mode: 'rowboat' as const })); + + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + const a = await GoogleClientFactory.getClient(); + const b = await GoogleClientFactory.getClient(); + + expect(refreshSpy).not.toHaveBeenCalled(); + expect(a).toBe(b); + }); + + it('does not stick an error on transient (429) refresh failure', async () => { + const { TransientRefreshError } = await import('../auth/google-backend-oauth.js'); + refreshSpy.mockRejectedValueOnce(new TransientRefreshError('refresh failed: 429 Refresh in progress', 429)); + + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + const result = await GoogleClientFactory.getClient(); + + expect(result).toBeNull(); + const errorUpserts = mockOAuthRepo.upsert.mock.calls.filter( + ([, conn]) => (conn as { error?: string | null }).error, + ); + expect(errorUpserts).toHaveLength(0); + }); +}); diff --git a/apps/x/packages/core/src/knowledge/google-client-factory.ts b/apps/x/packages/core/src/knowledge/google-client-factory.ts index db5da7c6..2e85366b 100644 --- a/apps/x/packages/core/src/knowledge/google-client-factory.ts +++ b/apps/x/packages/core/src/knowledge/google-client-factory.ts @@ -8,6 +8,7 @@ import type { Configuration } from '../auth/oauth-client.js'; import { OAuthTokens } from '../auth/types.js'; import { ReconnectRequiredError, + TransientRefreshError, refreshTokensViaBackend, } from '../auth/google-backend-oauth.js'; @@ -52,11 +53,14 @@ export class GoogleClientFactory { }; /** - * Promise singleton so a burst of getClient() calls during the brief - * expiry window all wait on a single refresh round-trip rather than - * fanning out parallel refreshes. + * Promise singleton so concurrent getClient() callers share a single + * pass through the read/refresh/build pipeline rather than fanning + * out parallel refreshes. The check-and-assign must be atomic (no + * `await` between them) so two callers in the same tick can't both + * pass the null check before either assigns — that's why getClient() + * is a thin synchronous wrapper around getClientInner(). */ - private static refreshInFlight: Promise | null = null; + private static inFlightClient: Promise | null = null; private static async resolveByokCredentials(): Promise<{ clientId: string; clientSecret?: string }> { const oauthRepo = container.resolve('oauthRepo'); @@ -69,13 +73,24 @@ export class GoogleClientFactory { } /** - * Get or create OAuth2Client, reusing cached instance when possible + * Get or create OAuth2Client, reusing the cached instance when possible. + * + * The check-and-assign of `inFlightClient` is synchronous so concurrent + * callers in the same tick coalesce onto a single pipeline run. The actual + * work lives in getClientInner(); this wrapper exists purely to guarantee + * the dedup invariant. */ static async getClient(): Promise { - if (this.refreshInFlight) { - return this.refreshInFlight; + if (this.inFlightClient) { + return this.inFlightClient; } + this.inFlightClient = this.getClientInner().finally(() => { + this.inFlightClient = null; + }); + return this.inFlightClient; + } + private static async getClientInner(): Promise { const oauthRepo = container.resolve('oauthRepo'); const connection = await oauthRepo.read(this.PROVIDER_NAME); const tokens = connection.tokens ?? null; @@ -110,16 +125,12 @@ export class GoogleClientFactory { // expiry — keeps long-running calls from racing the boundary. if (oauthClient.isTokenExpired(tokens)) { if (!tokens.refresh_token) { - console.log('[OAuth] Token expired and no refresh token available for Google.'); + console.log('[OAuth] Google token expired and no refresh token available.'); await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Missing refresh token. Please reconnect.' }); this.clearCache(); return null; } - - this.refreshInFlight = this.refreshAndBuild(tokens, mode).finally(() => { - this.refreshInFlight = null; - }); - return this.refreshInFlight; + return this.refreshAndBuild(tokens, mode); } // Reuse client if tokens haven't changed @@ -135,7 +146,8 @@ export class GoogleClientFactory { const oauthRepo = container.resolve('oauthRepo'); try { - console.log(`[OAuth] Token expired, refreshing via ${mode}...`); + const secsSinceExpiry = Math.floor(Date.now() / 1000) - tokens.expires_at; + console.log(`[OAuth] Google token expired ${secsSinceExpiry}s ago, refreshing via ${mode}...`); const existingScopes = tokens.scopes; let refreshedTokens: OAuthTokens; @@ -150,7 +162,8 @@ export class GoogleClientFactory { } await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens, error: null }); - console.log('[OAuth] Token refreshed successfully'); + const ttl = refreshedTokens.expires_at - Math.floor(Date.now() / 1000); + console.log(`[OAuth] Google token refreshed successfully (mode=${mode}, new expires_at=${refreshedTokens.expires_at}, ttl=${ttl}s)`); return this.buildAndCacheClient(refreshedTokens, mode); } catch (error) { if (error instanceof ReconnectRequiredError) { @@ -159,9 +172,24 @@ export class GoogleClientFactory { this.clearCache(); return null; } + if (error instanceof TransientRefreshError) { + // Transient (rate limit, in-flight dedup, upstream 5xx): leave + // stored tokens + cache alone, log, and let the next sync tick + // retry. Writing an `error` here would stick "Needs reconnect" + // in the UI for a problem the user can't fix by reconnecting. + console.warn(`[OAuth] Transient Google refresh failure (status=${error.status}): ${error.message} — will retry on next tick`); + return null; + } const message = error instanceof Error ? error.message : 'Failed to refresh token for Google'; await oauthRepo.upsert(this.PROVIDER_NAME, { error: message }); console.error('[OAuth] Failed to refresh token for Google:', error); + // Walk cause chain so we can see e.g. `Not signed into Rowboat` + // showing up under a generic `fetch failed` outer error. + let cause: unknown = error; + while (cause != null && typeof cause === 'object' && 'cause' in cause) { + cause = (cause as { cause?: unknown }).cause; + if (cause != null) console.error('[OAuth] Caused by:', cause); + } this.clearCache(); return null; }