diff --git a/apps/x/apps/renderer/src/App.tsx b/apps/x/apps/renderer/src/App.tsx index 563d3cb2..7b84f2d6 100644 --- a/apps/x/apps/renderer/src/App.tsx +++ b/apps/x/apps/renderer/src/App.tsx @@ -1034,6 +1034,10 @@ function App() { const chatViewStateByTabRef = useRef(chatViewStateByTab) const chatDraftsRef = useRef(new Map()) const selectedModelByTabRef = useRef(new Map()) + // Work directory is per-chat. Keyed by tab id; null/absent means none set. + const [workDirByTab, setWorkDirByTab] = useState>({}) + const workDirByTabRef = useRef(workDirByTab) + workDirByTabRef.current = workDirByTab const chatScrollTopByTabRef = useRef(new Map()) const [toolOpenByTab, setToolOpenByTab] = useState>>({}) const [chatViewportAnchorByTab, setChatViewportAnchorByTab] = useState>({}) @@ -1046,6 +1050,36 @@ function App() { chatDraftsRef.current.delete(tabId) } }, []) + // Persist a run's work directory to its per-run sidecar config file. The agent + // runtime reads this same file (config/workdir-.json) on each turn. + const persistRunWorkDir = useCallback(async (runId: string, value: string | null) => { + try { + await window.ipc.invoke('workspace:writeFile', { + path: `config/workdir-${runId}.json`, + data: JSON.stringify(value ? { path: value } : {}, null, 2), + }) + } catch (err) { + console.error('Failed to persist work directory for run', runId, err) + } + }, []) + // Read a run's persisted work directory (used when (re)opening a run into a tab). + const loadRunWorkDir = useCallback(async (runId: string): Promise => { + try { + const result = await window.ipc.invoke('workspace:readFile', { path: `config/workdir-${runId}.json` }) + const parsed = JSON.parse(result.data) + const value = typeof parsed?.path === 'string' ? parsed.path.trim() : '' + return value || null + } catch { + return null + } + }, []) + const setTabWorkDir = useCallback((tabId: string, value: string | null) => { + setWorkDirByTab((prev) => ({ ...prev, [tabId]: value })) + // If the tab is already bound to a run, persist immediately so the change + // applies to that chat's subsequent messages. + const runId = chatTabsRef.current.find((t) => t.id === tabId)?.runId + if (runId) void persistRunWorkDir(runId, value) + }, [persistRunWorkDir]) const isToolOpenForTab = useCallback((tabId: string, toolId: string): boolean => { return toolOpenByTab[tabId]?.[toolId] ?? false }, [toolOpenByTab]) @@ -2023,10 +2057,16 @@ function App() { setPendingAskHumanRequests(pendingAsks) setAllPermissionRequests(allPermissionRequests) setPermissionResponses(permResponseMap) + + // Restore the run's per-chat work directory into the tab it was loaded into. + const tabId = activeChatTabIdRef.current + const wd = await loadRunWorkDir(id) + if (loadRunRequestIdRef.current !== requestId) return + setWorkDirByTab((prev) => ({ ...prev, [tabId]: wd })) } catch (err) { console.error('Failed to load run:', err) } - }, []) + }, [loadRunWorkDir]) const getStreamingBuffer = useCallback((id: string) => { const existing = streamingBuffersRef.current.get(id) @@ -2492,6 +2532,10 @@ function App() { ? { ...tab, runId: currentRunId } : tab ))) + // Flush this tab's pending work directory onto the freshly created run so + // the agent picks it up on the first turn. Done before createMessage below. + const pendingWorkDir = workDirByTabRef.current[submitTabId] ?? null + if (pendingWorkDir) await persistRunWorkDir(currentRunId, pendingWorkDir) isNewRun = true } @@ -2687,6 +2731,8 @@ function App() { ...prev, [activeChatTabIdRef.current]: createEmptyChatTabViewState(), })) + // A brand-new chat starts with no work directory. + setWorkDirByTab(prev => ({ ...prev, [activeChatTabIdRef.current]: null })) }, [setChatViewportAnchor]) // Chat tab operations @@ -2774,6 +2820,12 @@ function App() { chatDraftsRef.current.delete(tabId) selectedModelByTabRef.current.delete(tabId) chatScrollTopByTabRef.current.delete(tabId) + setWorkDirByTab((prev) => { + if (!(tabId in prev)) return prev + const next = { ...prev } + delete next[tabId] + return next + }) setToolOpenByTab((prev) => { if (!(tabId in prev)) return prev const next = { ...prev } @@ -5835,6 +5887,8 @@ function App() { selectedModelByTabRef.current.delete(tab.id) } }} + workDir={workDirByTab[tab.id] ?? null} + onWorkDirChange={(v) => setTabWorkDir(tab.id, v)} isRecording={isActive && isRecording} recordingText={isActive ? voice.interimText : undefined} recordingState={isActive ? (voice.state === 'connecting' ? 'connecting' : 'listening') : undefined} @@ -5904,6 +5958,8 @@ function App() { selectedModelByTabRef.current.delete(tabId) } }} + workDirByTab={workDirByTab} + onWorkDirChangeForTab={setTabWorkDir} pendingAskHumanRequests={pendingAskHumanRequests} allPermissionRequests={allPermissionRequests} permissionResponses={permissionResponses} diff --git a/apps/x/apps/renderer/src/components/chat-input-with-mentions.tsx b/apps/x/apps/renderer/src/components/chat-input-with-mentions.tsx index 81e25b4e..cf94d57f 100644 --- a/apps/x/apps/renderer/src/components/chat-input-with-mentions.tsx +++ b/apps/x/apps/renderer/src/components/chat-input-with-mentions.tsx @@ -29,7 +29,6 @@ import { DropdownMenuItem, DropdownMenuRadioGroup, DropdownMenuRadioItem, - DropdownMenuSeparator, DropdownMenuTrigger, } from '@/components/ui/dropdown-menu' import { @@ -134,6 +133,10 @@ interface ChatInputInnerProps { onTtsModeChange?: (mode: 'summary' | 'full') => void /** Fired when the user picks a different model in the dropdown (only when no run exists yet). */ onSelectedModelChange?: (model: SelectedModel | null) => void + /** Work directory for this chat (per-chat). Null when none is set. */ + workDir?: string | null + /** Fired when the user sets/changes/clears the work directory for this chat. */ + onWorkDirChange?: (value: string | null) => void } function ChatInputInner({ @@ -160,6 +163,8 @@ function ChatInputInner({ onToggleTts, onTtsModeChange, onSelectedModelChange, + workDir = null, + onWorkDirChange, }: ChatInputInnerProps) { const controller = usePromptInputController() const message = controller.textInput.value @@ -174,7 +179,6 @@ function ChatInputInner({ const [searchEnabled, setSearchEnabled] = useState(false) const [searchAvailable, setSearchAvailable] = useState(false) const [isRowboatConnected, setIsRowboatConnected] = useState(false) - const [workDir, setWorkDir] = useState(null) const [codingAgent, setCodingAgent] = useState<'claude' | 'codex'>('claude') const [codeModeEnabled, setCodeModeEnabled] = useState(false) const [codeModeFeatureEnabled, setCodeModeFeatureEnabled] = useState(false) @@ -332,24 +336,16 @@ function ChatInputInner({ }) }, []) - // Load currently configured work directory (and its agent preference) - const loadWorkDir = useCallback(async () => { - let dir: string | null = null - try { - const result = await window.ipc.invoke('workspace:readFile', { path: 'config/workdir.json' }) - const parsed = JSON.parse(result.data) - const value = typeof parsed?.path === 'string' ? parsed.path.trim() : '' - dir = value || null - } catch { - dir = null - } - setWorkDir(dir) - setCodingAgent(await loadCodingAgentFor(dir)) - }, [loadCodingAgentFor]) - + // Work directory is owned per-chat by the parent (App). This component only + // drives the picker dialog and reports changes up via onWorkDirChange. Whenever + // the work directory changes, load its persisted coding-agent preference. useEffect(() => { - loadWorkDir() - }, [isActive, loadWorkDir]) + let cancelled = false + loadCodingAgentFor(workDir).then((agent) => { + if (!cancelled) setCodingAgent(agent) + }) + return () => { cancelled = true } + }, [workDir, loadCodingAgentFor]) const handleSetWorkDir = useCallback(async () => { try { @@ -370,33 +366,20 @@ function ChatInputInner({ defaultPath, }) if (!chosen) return - await window.ipc.invoke('workspace:writeFile', { - path: 'config/workdir.json', - data: JSON.stringify({ path: chosen }, null, 2), - }) - setWorkDir(chosen) + onWorkDirChange?.(chosen) setCodingAgent(await loadCodingAgentFor(chosen)) toast.success(`Work directory set: ${chosen}`) } catch (err) { console.error('Failed to set work directory', err) toast.error('Failed to set work directory') } - }, [workDir, loadCodingAgentFor]) + }, [workDir, onWorkDirChange, loadCodingAgentFor]) - const handleClearWorkDir = useCallback(async () => { - try { - await window.ipc.invoke('workspace:writeFile', { - path: 'config/workdir.json', - data: JSON.stringify({}, null, 2), - }) - setWorkDir(null) - setCodingAgent('claude') - toast.success('Work directory cleared') - } catch (err) { - console.error('Failed to clear work directory', err) - toast.error('Failed to clear work directory') - } - }, []) + const handleClearWorkDir = useCallback(() => { + onWorkDirChange?.(null) + setCodingAgent('claude') + toast.success('Work directory cleared') + }, [onWorkDirChange]) const handleToggleCodingAgent = useCallback(async () => { const next: 'claude' | 'codex' = codingAgent === 'claude' ? 'codex' : 'claude' @@ -672,28 +655,29 @@ function ChatInputInner({ {workDir ? 'Change work directory' : 'Set work directory'} - {workDir && ( - <> - - { void handleClearWorkDir() }}> - - Clear work directory - - - )} {workDir && ( - +
+ + +
Work directory: {workDir} @@ -701,36 +685,28 @@ function ChatInputInner({
)} {searchAvailable && ( - searchEnabled ? ( - - - - - Web search on — click to disable - - ) : ( - - - - - Enable web search - - ) + )} {codeModeFeatureEnabled && (codeModeEnabled ? (
@@ -961,6 +937,8 @@ export interface ChatInputWithMentionsProps { onToggleTts?: () => void onTtsModeChange?: (mode: 'summary' | 'full') => void onSelectedModelChange?: (model: SelectedModel | null) => void + workDir?: string | null + onWorkDirChange?: (value: string | null) => void } export function ChatInputWithMentions({ @@ -990,6 +968,8 @@ export function ChatInputWithMentions({ onToggleTts, onTtsModeChange, onSelectedModelChange, + workDir, + onWorkDirChange, }: ChatInputWithMentionsProps) { return ( @@ -1017,6 +997,8 @@ export function ChatInputWithMentions({ onToggleTts={onToggleTts} onTtsModeChange={onTtsModeChange} onSelectedModelChange={onSelectedModelChange} + workDir={workDir} + onWorkDirChange={onWorkDirChange} /> ) diff --git a/apps/x/apps/renderer/src/components/chat-sidebar.tsx b/apps/x/apps/renderer/src/components/chat-sidebar.tsx index ff7c6921..ce6d8bda 100644 --- a/apps/x/apps/renderer/src/components/chat-sidebar.tsx +++ b/apps/x/apps/renderer/src/components/chat-sidebar.tsx @@ -143,6 +143,8 @@ interface ChatSidebarProps { getInitialDraft?: (tabId: string) => string | undefined onDraftChangeForTab?: (tabId: string, text: string) => void onSelectedModelChangeForTab?: (tabId: string, model: SelectedModel | null) => void + workDirByTab?: Record + onWorkDirChangeForTab?: (tabId: string, value: string | null) => void pendingAskHumanRequests?: ChatTabViewState['pendingAskHumanRequests'] allPermissionRequests?: ChatTabViewState['allPermissionRequests'] permissionResponses?: ChatTabViewState['permissionResponses'] @@ -199,6 +201,8 @@ export function ChatSidebar({ getInitialDraft, onDraftChangeForTab, onSelectedModelChangeForTab, + workDirByTab = {}, + onWorkDirChangeForTab, pendingAskHumanRequests = new Map(), allPermissionRequests = new Map(), permissionResponses = new Map(), @@ -690,6 +694,8 @@ export function ChatSidebar({ initialDraft={getInitialDraft?.(tab.id)} onDraftChange={onDraftChangeForTab ? (text) => onDraftChangeForTab(tab.id, text) : undefined} onSelectedModelChange={onSelectedModelChangeForTab ? (m) => onSelectedModelChangeForTab(tab.id, m) : undefined} + workDir={workDirByTab[tab.id] ?? null} + onWorkDirChange={onWorkDirChangeForTab ? (v) => onWorkDirChangeForTab(tab.id, v) : undefined} isRecording={isActive && isRecording} recordingText={isActive ? recordingText : undefined} recordingState={isActive ? recordingState : undefined} diff --git a/apps/x/apps/renderer/src/components/markdown-editor.tsx b/apps/x/apps/renderer/src/components/markdown-editor.tsx index 6146d2e4..7b742008 100644 --- a/apps/x/apps/renderer/src/components/markdown-editor.tsx +++ b/apps/x/apps/renderer/src/components/markdown-editor.tsx @@ -648,6 +648,13 @@ export const MarkdownEditor = forwardRef(null) + // Read wikiLinks lazily inside the editor config via this ref. wikiLinks changes + // identity whenever the workspace directory tree changes (file watcher → new file + // list), and it used to be a useEditor() dependency — so any background write to + // the workspace destroyed and recreated the entire editor, resetting scroll to the + // top. Keeping it off the dep array (and reading the ref at event time) means the + // editor instance survives directory changes. + const wikiLinksRef = useRef(wikiLinks) const [activeWikiLink, setActiveWikiLink] = useState(null) const [anchorPosition, setAnchorPosition] = useState<{ left: number; top: number } | null>(null) const [selectionHighlight, setSelectionHighlight] = useState(null) @@ -670,6 +677,7 @@ export const MarkdownEditor = forwardRef { - void wikiLinks.onCreate(path) - } - : undefined, + onCreate: (path: string) => { + void wikiLinksRef.current?.onCreate?.(path) + }, }), TaskList, TaskItem.configure({ @@ -912,7 +918,7 @@ export const MarkdownEditor = forwardRef { @@ -1203,11 +1211,37 @@ export const MarkdownEditor = forwardRef s.split('\n').map(line => line.trimEnd()).join('\n').trim() if (normalizeForCompare(currentContent) !== normalizeForCompare(content)) { + // Preserve scroll + selection across an external content sync. setContent() + // resets the selection to the top of the doc and ProseMirror scrolls it into + // view; without restoring, a background writer touching the open file (graph + // builder, live-note runner, version-history commit) yanks the viewport back + // to the top repeatedly — making the note impossible to scroll. This editor + // instance is bound to a single note path, so the prior scrollTop is always + // valid for the reloaded content. + const wrapper = wrapperRef.current + const prevScrollTop = wrapper?.scrollTop ?? 0 + const hadFocus = editor.isFocused + const { from: prevFrom, to: prevTo } = editor.state.selection + isInternalUpdate.current = true const preprocessed = preprocessMarkdown(content) // Treat tab-open content as baseline: do not add hydration to undo history. editor.chain().setMeta('addToHistory', false).setContent(preprocessed).run() + + // Only restore the caret for a focused editor, so we never steal focus or + // scroll for a passive viewer. Clamp to the (possibly shorter) new doc. + if (hadFocus) { + const docSize = editor.state.doc.content.size + const from = Math.min(prevFrom, docSize) + const to = Math.min(prevTo, docSize) + try { + editor.chain().setMeta('addToHistory', false).setTextSelection({ from, to }).run() + } catch { /* selection no longer valid in the new doc — ignore */ } + } isInternalUpdate.current = false + + // Restore scroll last so it wins over any scrollIntoView triggered above. + if (wrapper) wrapper.scrollTop = prevScrollTop } } }, [editor, content]) diff --git a/apps/x/packages/core/src/agents/runtime.ts b/apps/x/packages/core/src/agents/runtime.ts index 0b260eec..3146101e 100644 --- a/apps/x/packages/core/src/agents/runtime.ts +++ b/apps/x/packages/core/src/agents/runtime.ts @@ -38,7 +38,12 @@ import { getRaw as getInlineTaskAgentRaw } from "../knowledge/inline_task_agent. import { getRaw as getAgentNotesAgentRaw } from "../knowledge/agent_notes_agent.js"; const AGENT_NOTES_DIR = path.join(WorkDir, 'knowledge', 'Agent Notes'); -const WORKDIR_CONFIG_FILE = path.join(WorkDir, 'config', 'workdir.json'); + +// Work directory is scoped per run (per chat). Each run gets its own sidecar +// config file so setting it in one chat does not leak into others. +function workDirConfigFile(runId: string): string { + return path.join(WorkDir, 'config', `workdir-${runId}.json`); +} type ToolPermissionMetadataValue = z.infer; @@ -165,10 +170,11 @@ async function getToolPermissionMetadata( }; } -function loadUserWorkDir(): string | null { +function loadUserWorkDir(runId: string): string | null { try { - if (!fs.existsSync(WORKDIR_CONFIG_FILE)) return null; - const raw = fs.readFileSync(WORKDIR_CONFIG_FILE, 'utf-8'); + const file = workDirConfigFile(runId); + if (!fs.existsSync(file)) return null; + const raw = fs.readFileSync(file, 'utf-8'); const parsed = JSON.parse(raw) as { path?: unknown }; const value = typeof parsed.path === 'string' ? parsed.path.trim() : ''; return value || null; @@ -1264,7 +1270,7 @@ export async function* streamAgent({ if (agentNotesContext) { instructionsWithDateTime += `\n\n${agentNotesContext}`; } - const userWorkDir = loadUserWorkDir(); + const userWorkDir = loadUserWorkDir(runId); if (userWorkDir) { loopLogger.log('injecting user work directory', userWorkDir); instructionsWithDateTime += `\n\n# User Work Directory diff --git a/apps/x/packages/core/src/auth/google-backend-oauth.ts b/apps/x/packages/core/src/auth/google-backend-oauth.ts index a441d205..b3d77c42 100644 --- a/apps/x/packages/core/src/auth/google-backend-oauth.ts +++ b/apps/x/packages/core/src/auth/google-backend-oauth.ts @@ -26,6 +26,25 @@ export class ReconnectRequiredError extends Error { } } +/** + * Thrown when the api signals a transient failure (rate limit, in-flight dedup, + * upstream 5xx) — caller should leave stored tokens untouched and retry on its + * next tick rather than flagging the user for reconnect. + * + * In particular: the backend returns 429 with `Refresh in progress, retry shortly` + * when two desktop clients race the same refresh; the proactive in-flight dedup + * in GoogleClientFactory should make that unreachable, but this is the safety + * net if it ever isn't. + */ +export class TransientRefreshError extends Error { + readonly status: number; + constructor(message: string, status: number) { + super(message); + this.name = "TransientRefreshError"; + this.status = status; + } +} + interface ApiTokenResponse { access_token: string; refresh_token?: string; @@ -104,6 +123,17 @@ export async function refreshTokensViaBackend( } throw new Error(`refresh failed: 409 ${err.error ?? ""}`.trim()); } + // 429 = backend dedup said another refresh is in flight; 5xx = upstream + // hiccup. Either way the local tokens are still valid for the next attempt + // — surface as TransientRefreshError so the factory doesn't write a stuck + // error into oauth.json. + if (res.status === 429 || res.status >= 500) { + const err = await readError(res); + throw new TransientRefreshError( + `refresh failed: ${res.status} ${err.error ?? ""}`.trim(), + res.status, + ); + } if (!res.ok) { const err = await readError(res); throw new Error(`refresh failed: ${res.status} ${err.error ?? ""}`.trim()); diff --git a/apps/x/packages/core/src/knowledge/build_graph.ts b/apps/x/packages/core/src/knowledge/build_graph.ts index 0a6c0fd0..3e05e611 100644 --- a/apps/x/packages/core/src/knowledge/build_graph.ts +++ b/apps/x/packages/core/src/knowledge/build_graph.ts @@ -264,9 +264,10 @@ async function createNotesFromBatch( message += `**Instructions:**\n`; message += `- Use the KNOWLEDGE BASE INDEX below to resolve entities - DO NOT grep/search for existing notes\n`; message += `- Extract entities (people, organizations, projects, topics) from ALL files below\n`; + message += `- The source files below are INDEPENDENT — they are batched only for efficiency. Two entities are related ONLY if they co-occur within the same single source file (or in an existing note). NEVER link entities just because they appear in this batch (see "Source Scoping" in your instructions)\n`; message += `- Create or update notes in "knowledge" directory (workspace-relative paths like "knowledge/People/Name.md")\n`; message += `- You may also create or update "${SUGGESTED_TOPICS_REL_PATH}" to maintain curated suggested-topic cards\n`; - message += `- If the same entity appears in multiple files, merge the information into a single note\n`; + message += `- If the SAME entity appears in multiple files, merge the information into a single note (this is identity, not a relationship — do not link different entities across files)\n`; message += `- Use file tools to read existing notes or "${SUGGESTED_TOPICS_REL_PATH}" (when you need full content) and write updates\n`; message += `- Follow the note templates and guidelines in your instructions\n\n`; @@ -357,7 +358,7 @@ async function buildGraphWithFiles( return { processedFiles: [], notesCreated: new Set(), notesModified: new Set(), hadError: false }; } - const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing + const BATCH_SIZE = 1; // One source file per agent run — prevents cross-file entity contamination in the graph const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`); @@ -543,7 +544,7 @@ async function processVoiceMemosForKnowledge(): Promise { } // Process in batches like other sources - const BATCH_SIZE = 10; + const BATCH_SIZE = 1; // One source file per agent run — prevents cross-file entity contamination in the graph const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); const notesCreated = new Set(); diff --git a/apps/x/packages/core/src/knowledge/google-client-factory.test.ts b/apps/x/packages/core/src/knowledge/google-client-factory.test.ts new file mode 100644 index 00000000..7e49845b --- /dev/null +++ b/apps/x/packages/core/src/knowledge/google-client-factory.test.ts @@ -0,0 +1,143 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { OAuthTokens } from '../auth/types.js'; + +/** + * Regression for the cold-start race that left a stuck `error` field in + * oauth.json: Gmail + Calendar both call getClient() in the same tick, the + * dedup singleton's check-and-assign were separated by an `await`, two + * parallel refreshes go out, backend 429s the second one, the upsert(error) + * write from the 429 path could land last and stick "Needs reconnect" in + * the UI even though tokens were valid. + */ + +interface MockOAuthRepo { + read: ReturnType; + upsert: ReturnType; + delete: ReturnType; + getClientFacingConfig: ReturnType; +} + +let refreshSpy: ReturnType; +let mockOAuthRepo: MockOAuthRepo; +let storedTokens: OAuthTokens; + +beforeEach(() => { + vi.resetModules(); + + // Expired 1 minute ago — forces the refresh path through getClient. + storedTokens = { + access_token: 'old-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) - 60, + token_type: 'Bearer', + scopes: ['https://www.googleapis.com/auth/gmail.modify'], + }; + + mockOAuthRepo = { + read: vi.fn(async () => ({ tokens: storedTokens, mode: 'rowboat' as const })), + upsert: vi.fn(async () => undefined), + delete: vi.fn(async () => undefined), + getClientFacingConfig: vi.fn(async () => ({})), + }; + + vi.doMock('../di/container.js', () => ({ + default: { + resolve: (key: string) => { + if (key === 'oauthRepo') return mockOAuthRepo; + throw new Error(`unexpected DI resolve in test: ${key}`); + }, + }, + })); + + // Real-ish delay so two concurrent callers actually have something to + // overlap on — without it the spy might resolve synchronously and mask + // the very race we're testing for. + refreshSpy = vi.fn(async (_rt: string, scopes?: string[]) => { + await new Promise((r) => setTimeout(r, 25)); + return { + access_token: 'new-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) + 3600, + token_type: 'Bearer' as const, + scopes, + }; + }); + + vi.doMock('../auth/google-backend-oauth.js', async () => { + const actual = await vi.importActual( + '../auth/google-backend-oauth.js', + ); + return { + ...actual, + refreshTokensViaBackend: refreshSpy, + }; + }); +}); + +afterEach(() => { + vi.doUnmock('../di/container.js'); + vi.doUnmock('../auth/google-backend-oauth.js'); + vi.resetModules(); +}); + +describe('GoogleClientFactory.getClient', () => { + it('coalesces concurrent callers into a single refresh', async () => { + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + // Same tick — this is the exact pattern that sync_gmail.init() and + // sync_calendar.init() produce on cold start. + const [a, b] = await Promise.all([ + GoogleClientFactory.getClient(), + GoogleClientFactory.getClient(), + ]); + + expect(refreshSpy).toHaveBeenCalledTimes(1); + expect(a).not.toBeNull(); + expect(a).toBe(b); + + // And the failure-path upsert (error: '429…') is never invoked, so + // oauth.json doesn't get a stuck error. + const errorUpserts = mockOAuthRepo.upsert.mock.calls.filter( + ([, conn]) => (conn as { error?: string | null }).error, + ); + expect(errorUpserts).toHaveLength(0); + }); + + it('returns cached client when tokens are not expired', async () => { + // Tokens valid for another hour — no refresh should fire. + storedTokens = { + access_token: 'fresh-access', + refresh_token: 'rt', + expires_at: Math.floor(Date.now() / 1000) + 3600, + token_type: 'Bearer', + scopes: ['https://www.googleapis.com/auth/gmail.modify'], + }; + mockOAuthRepo.read = vi.fn(async () => ({ tokens: storedTokens, mode: 'rowboat' as const })); + + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + const a = await GoogleClientFactory.getClient(); + const b = await GoogleClientFactory.getClient(); + + expect(refreshSpy).not.toHaveBeenCalled(); + expect(a).toBe(b); + }); + + it('does not stick an error on transient (429) refresh failure', async () => { + const { TransientRefreshError } = await import('../auth/google-backend-oauth.js'); + refreshSpy.mockRejectedValueOnce(new TransientRefreshError('refresh failed: 429 Refresh in progress', 429)); + + const { GoogleClientFactory } = await import('./google-client-factory.js'); + GoogleClientFactory.clearCache(); + + const result = await GoogleClientFactory.getClient(); + + expect(result).toBeNull(); + const errorUpserts = mockOAuthRepo.upsert.mock.calls.filter( + ([, conn]) => (conn as { error?: string | null }).error, + ); + expect(errorUpserts).toHaveLength(0); + }); +}); diff --git a/apps/x/packages/core/src/knowledge/google-client-factory.ts b/apps/x/packages/core/src/knowledge/google-client-factory.ts index db5da7c6..2e85366b 100644 --- a/apps/x/packages/core/src/knowledge/google-client-factory.ts +++ b/apps/x/packages/core/src/knowledge/google-client-factory.ts @@ -8,6 +8,7 @@ import type { Configuration } from '../auth/oauth-client.js'; import { OAuthTokens } from '../auth/types.js'; import { ReconnectRequiredError, + TransientRefreshError, refreshTokensViaBackend, } from '../auth/google-backend-oauth.js'; @@ -52,11 +53,14 @@ export class GoogleClientFactory { }; /** - * Promise singleton so a burst of getClient() calls during the brief - * expiry window all wait on a single refresh round-trip rather than - * fanning out parallel refreshes. + * Promise singleton so concurrent getClient() callers share a single + * pass through the read/refresh/build pipeline rather than fanning + * out parallel refreshes. The check-and-assign must be atomic (no + * `await` between them) so two callers in the same tick can't both + * pass the null check before either assigns — that's why getClient() + * is a thin synchronous wrapper around getClientInner(). */ - private static refreshInFlight: Promise | null = null; + private static inFlightClient: Promise | null = null; private static async resolveByokCredentials(): Promise<{ clientId: string; clientSecret?: string }> { const oauthRepo = container.resolve('oauthRepo'); @@ -69,13 +73,24 @@ export class GoogleClientFactory { } /** - * Get or create OAuth2Client, reusing cached instance when possible + * Get or create OAuth2Client, reusing the cached instance when possible. + * + * The check-and-assign of `inFlightClient` is synchronous so concurrent + * callers in the same tick coalesce onto a single pipeline run. The actual + * work lives in getClientInner(); this wrapper exists purely to guarantee + * the dedup invariant. */ static async getClient(): Promise { - if (this.refreshInFlight) { - return this.refreshInFlight; + if (this.inFlightClient) { + return this.inFlightClient; } + this.inFlightClient = this.getClientInner().finally(() => { + this.inFlightClient = null; + }); + return this.inFlightClient; + } + private static async getClientInner(): Promise { const oauthRepo = container.resolve('oauthRepo'); const connection = await oauthRepo.read(this.PROVIDER_NAME); const tokens = connection.tokens ?? null; @@ -110,16 +125,12 @@ export class GoogleClientFactory { // expiry — keeps long-running calls from racing the boundary. if (oauthClient.isTokenExpired(tokens)) { if (!tokens.refresh_token) { - console.log('[OAuth] Token expired and no refresh token available for Google.'); + console.log('[OAuth] Google token expired and no refresh token available.'); await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Missing refresh token. Please reconnect.' }); this.clearCache(); return null; } - - this.refreshInFlight = this.refreshAndBuild(tokens, mode).finally(() => { - this.refreshInFlight = null; - }); - return this.refreshInFlight; + return this.refreshAndBuild(tokens, mode); } // Reuse client if tokens haven't changed @@ -135,7 +146,8 @@ export class GoogleClientFactory { const oauthRepo = container.resolve('oauthRepo'); try { - console.log(`[OAuth] Token expired, refreshing via ${mode}...`); + const secsSinceExpiry = Math.floor(Date.now() / 1000) - tokens.expires_at; + console.log(`[OAuth] Google token expired ${secsSinceExpiry}s ago, refreshing via ${mode}...`); const existingScopes = tokens.scopes; let refreshedTokens: OAuthTokens; @@ -150,7 +162,8 @@ export class GoogleClientFactory { } await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens, error: null }); - console.log('[OAuth] Token refreshed successfully'); + const ttl = refreshedTokens.expires_at - Math.floor(Date.now() / 1000); + console.log(`[OAuth] Google token refreshed successfully (mode=${mode}, new expires_at=${refreshedTokens.expires_at}, ttl=${ttl}s)`); return this.buildAndCacheClient(refreshedTokens, mode); } catch (error) { if (error instanceof ReconnectRequiredError) { @@ -159,9 +172,24 @@ export class GoogleClientFactory { this.clearCache(); return null; } + if (error instanceof TransientRefreshError) { + // Transient (rate limit, in-flight dedup, upstream 5xx): leave + // stored tokens + cache alone, log, and let the next sync tick + // retry. Writing an `error` here would stick "Needs reconnect" + // in the UI for a problem the user can't fix by reconnecting. + console.warn(`[OAuth] Transient Google refresh failure (status=${error.status}): ${error.message} — will retry on next tick`); + return null; + } const message = error instanceof Error ? error.message : 'Failed to refresh token for Google'; await oauthRepo.upsert(this.PROVIDER_NAME, { error: message }); console.error('[OAuth] Failed to refresh token for Google:', error); + // Walk cause chain so we can see e.g. `Not signed into Rowboat` + // showing up under a generic `fetch failed` outer error. + let cause: unknown = error; + while (cause != null && typeof cause === 'object' && 'cause' in cause) { + cause = (cause as { cause?: unknown }).cause; + if (cause != null) console.error('[OAuth] Caused by:', cause); + } this.clearCache(); return null; } diff --git a/apps/x/packages/core/src/knowledge/note_creation.ts b/apps/x/packages/core/src/knowledge/note_creation.ts index bd804449..baee1d2b 100644 --- a/apps/x/packages/core/src/knowledge/note_creation.ts +++ b/apps/x/packages/core/src/knowledge/note_creation.ts @@ -37,7 +37,7 @@ Sources (emails, meetings, voice memos) are processed in roughly chronological o # Task -You are a memory agent. Given a single source file (email, meeting transcript, or voice memo), you will: +You are a memory agent. You are given one or more source files (emails, meeting transcripts, or voice memos) to process. **The files in a request are independent of each other** — they are batched together only for efficiency, not because they are related. Process each source file on its own terms (see "Source Scoping" below). For each source file you will: 1. **Determine source type (meeting or email)** 2. **Evaluate if the source is worth processing** @@ -49,7 +49,21 @@ You are a memory agent. Given a single source file (email, meeting transcript, o 8. Create new notes or update existing notes 9. **Apply state changes to existing notes** -The core rule: **Both meetings and emails can create notes, but emails require personalized content.** +The core rule: **Both meetings and emails can create notes, but emails require personalized content — and a new People/Organization note from an email also requires the user to have replied at least once in the thread (the Email Reply Gate). Emails can always update existing notes regardless.** + +# Source Scoping (Batch Isolation) — READ FIRST + +You may receive several source files in one request. **They are unrelated by default.** Two source files appearing in the same request tells you *nothing* about whether their entities are related. + +**The only relationship signal is co-occurrence WITHIN a single source file (or a relationship already recorded in existing notes).** Concretely: + +- **Create a link / relationship between two entities ONLY if the connection is evidenced within the same single source file, or is already documented in an existing note.** Example: if email A is between Sarah (Acme) and you, and email B is between David (Globex) and you, you must **not** link Sarah↔David or Acme↔Globex — they never appeared together. +- **Never infer a relationship from batch co-occurrence.** "Both showed up in this run" is not evidence. When the only thing two entities share is the batch, add no link. +- **The one allowed cross-file operation is identity merging:** if the *same* canonical entity appears in multiple source files in the batch, merge its information into a single note. That is recognizing one entity, not relating two. +- **Activity entries are per-source.** Each activity line describes one source file's interaction and links only the entities actually present in *that* source. +- **When in doubt, omit the link.** A missing edge is a minor gap; a fabricated edge is a wrong fact in the graph (the knowledge graph draws an edge for every \`[[link]]\` you write). + +This applies to every step below — entity resolution, content extraction, and especially the bidirectional links in Step 10. You have full read access to the existing knowledge directory. Use this extensively to: - Find existing notes for people, organizations, projects mentioned @@ -194,6 +208,7 @@ Emails containing calendar invites (\`.ics\` attachments or inline calendar data - Contains calendar metadata (VCALENDAR, VEVENT) **Rules for calendar invite emails:** +0. **Exempt from the Email Reply Gate** - a meeting actually scheduled with the user is direct engagement, so you may create the primary-contact note even if the user hasn't sent a text reply in the thread. 1. **CREATE a note for the primary contact** - the person you're actually meeting with 2. **Extract from the invite:** their name, email, organization (from email domain), meeting topic 3. **Skip automated notifications from Google/Outlook** - emails from calendar-no-reply@google.com with no human sender @@ -436,7 +451,7 @@ Resolution Map: **If source_type == "email":** - The email already passed label-based filtering in Step 1 - Resolved entities → Update existing notes -- New entities → Create notes (the labels already confirmed this email is worth processing) +- New entities → Create notes **only if the email-reply gate passes** (see Step 5 → "Email Reply Gate"). If the thread is purely inbound (the user never replied), update existing notes only — do not create new canonical People/Organization notes. ## 4c: Disambiguation Rules @@ -508,7 +523,7 @@ For entities not resolved to existing notes, determine if they warrant new notes **CREATE a note for people who are:** - External (not @user.domain) - People you directly interacted with in meetings -- Email correspondents directly participating in the thread (emails that reach this step already passed label-based filtering) +- Email correspondents directly participating in a thread the user has replied to (emails that reach this step already passed label-based filtering; new People/Org notes also require the Email Reply Gate) - Decision makers or contacts at customers, prospects, or partners - Investors or potential investors - Candidates you are interviewing @@ -579,6 +594,21 @@ For people who don't warrant their own note, add to Organization note's Contacts - Sarah Lee — Support, handled wire transfer issue \`\`\` +### Email Reply Gate (new People/Organization notes only) + +**Emails can always update existing notes. But an email may only CREATE a new canonical People or Organization note if the user has replied at least once in the thread.** This stops purely inbound email (cold outreach, newsletters, one-way notifications) from spawning new notes for people the user has never engaged. + +**How to check:** The email source lists each message as a \`### From: \` block. The user has replied if **at least one message in the thread was sent by the user** — a \`### From:\` line whose address matches \`user.email\`. A reply from someone at \`@user.domain\` (the user's own team) also counts as the user's side having engaged. + +**Rules:** +- **User replied at least once** → the thread is a two-way exchange; you may create new canonical People/Organization notes (still subject to the Direct Interaction and Weekly Importance tests below). +- **Purely inbound** (every message is from external senders; no \`### From:\` matches \`user.email\` or \`@user.domain\`) → do **NOT** create new canonical People/Organization notes. You may still: update notes that already exist, and create/update a suggestion card in \`suggested-topics.md\` if the entity looks strategically relevant. + +**Scope:** +- Applies **only to creating new** People/Organization notes from **emails**. It does not block updates to existing notes. +- Does **not** apply to meetings or voice memos (those always create). +- **Exception:** calendar-invite emails for a meeting actually scheduled with the user (see "Calendar Invite Emails") are exempt — a scheduled meeting is itself direct engagement, so create the primary-contact note even without a text reply. + ### Direct Interaction Test (People and Organizations) For **new canonical People and Organizations notes**, require **direct interaction**, not just mention. @@ -597,9 +627,13 @@ For **new canonical People and Organizations notes**, require **direct interacti - The source only establishes a second-degree relationship, not a direct one **Canonical note rule:** -- For **new People/Organizations**, create the canonical note only if both are true: - 1. There is **direct interaction** - 2. The entity clears the **weekly importance test** +- For **new People/Organizations**, create the canonical note only if all are true: + 1. For **email** sources, the **Email Reply Gate** passes (the user replied in the thread, or it's an exempt calendar invite) + 2. There is **direct interaction** + 3. The interaction is **not transactional** per the Transactional Interaction Check (see below) — reporting an issue, sending/paying an invoice, support questions, scheduling, etc. update existing notes only, never create new ones + 4. The entity clears the **weekly importance test** + 5. The interaction is **not purely temporary** per the ongoing-relationship soft check (see below) +- **Updates to existing notes are never gated by these checks** — a transactional or temporary interaction with a person/org that already has a note still gets logged as activity. If an entity seems strategically relevant but fails the direct interaction test, do **not** auto-create a canonical note. At most, create a suggestion card in \`suggested-topics.md\`. @@ -638,6 +672,42 @@ This test is mainly for **People** and **Organizations**. **Do NOT use it as the - Update the existing note even if the current source is weaker; the importance test is mainly for deciding whether to create a **new** People/Organization note - If a previously tentative person/org is now clearly important enough for a canonical note, create/update the note and remove any tentative suggestion card for that exact entity from \`suggested-topics.md\` +### Transactional Interaction Check (People and Organizations) + +**If the source is a transactional interaction — a discrete task or exchange that completes and closes — do NOT create a new canonical note. You may still UPDATE an existing note** (add an activity entry, mark an open item complete, update a field). The transaction is real activity worth logging when the person/org already matters, but on its own it is not evidence of a durable relationship worth minting a new note. + +**Transactional interactions include:** +- Reporting, acknowledging, or resolving an **issue / bug / outage / support ticket** +- Sending, requesting, or paying an **invoice, receipt, or payment confirmation** +- A **how-to or product question** that resolves within the thread +- **Scheduling / logistics / calendar** back-and-forth +- A one-time **purchase, refund, password reset, form submission, or signature request** +- Automated, templated, or notification-style messages + +The signal is the **nature of the exchange, not the sender's importance**: even someone at an important company, if they are only handling a transactional task here, does not earn a *new* note from that interaction alone. If the same person/org later shows non-transactional substance (an active deal, evaluation, partnership, ongoing thread), create the note then. + +### Ongoing-Relationship Test (soft check, People and Organizations) + +A softer companion to the transactional and weekly-importance checks, aimed at filtering out **temporary, one-off interactions** even when the single touchpoint looks substantive. + +**Ask:** _"Will the user still be in touch with this person/organization a month from now, or is this a temporary interaction that wraps up once this thread/issue is resolved?"_ + +If the honest answer is "this is temporary and won't carry forward," **don't create a canonical note** — even if there was a real two-way exchange. The interaction can still be logged on an existing org note (e.g. in Contacts) without minting a new People note. + +**Temporary / one-off (lean NO — don't create):** +- **Customer-support questions** — a support rep, or a customer asking a one-time support/how-to question, with no ongoing strategic relationship. Don't create a note for that person. +- A scheduling/logistics back-and-forth that ends when the meeting is booked +- A one-time transactional exchange (a single vendor purchase, a password reset, a refund, a form submission) +- A recruiter or service rep handling a single request +- Anyone where the interaction is clearly self-contained and resolves within this thread + +**Durable (lean YES — note is OK if the other gates pass):** +- An active customer, prospect, investor, partner, or candidate relationship likely to continue +- A contact in an ongoing deal, project, or evaluation +- Someone with whom a recurring cadence (calls, syncs, threads) is likely + +This is a **soft** check: weigh it alongside the weekly-importance and direct-interaction tests rather than as a hard veto. When the relationship is genuinely durable, a single temporary-looking exchange shouldn't block the note. When in doubt and the interaction looks temporary, prefer a suggestion card (or just logging the activity on an existing note) over creating a new canonical note. + ## Organizations **CREATE a note if:** @@ -651,6 +721,8 @@ This test is mainly for **People** and **Organizations**. **Do NOT use it as the - One-time transactional vendors - Consumer service companies - Organizations only referenced through third-party mention or offered introductions +- Transactional interactions (see Transactional Interaction Check) — invoices, support tickets, issue reports, scheduling. Update an existing org note if one exists; don't create a new one +- Temporary, self-contained interactions that won't carry forward a month from now (see Ongoing-Relationship Test) — e.g. a one-off support exchange ## Projects @@ -1056,6 +1128,8 @@ After writing, verify links go both ways. ## Bidirectional Link Rules +**Precondition (see "Source Scoping"):** only add a link when the relationship is evidenced **within a single source file** or already recorded in an existing note. Do **not** add links between entities that merely share this batch. Bidirectionality applies *after* a link is justified — it never justifies creating one. + | If you add... | Then also add... | |---------------|------------------| | Person → Organization | Organization → Person (in People section) | @@ -1064,6 +1138,8 @@ After writing, verify links go both ways. | Project → Topic | Topic → Project (in Related section) | | Person → Person | Person → Person (reverse link) | +**Before writing any \`[[link]]\`, ask:** "Did these two entities actually appear together in *this* source file (or an existing note)?" If the only thing they share is the batch, do not link them. + --- ${renderNoteTypesBlock()} @@ -1076,9 +1152,12 @@ ${renderNoteTypesBlock()} |-------------|---------------|----------------|------------------------| | Meeting | Yes | Yes | Yes | | Voice memo | Yes | Yes | Yes | -| Email (has create label) | Yes | Yes | Yes | +| Email (create label + user replied in thread) | Yes | Yes | Yes | +| Email (create label, purely inbound — no user reply) | Update-only (no new People/Org notes) | Yes | Yes | | Email (only skip labels) | No (SKIP) | No | No | +**Email Reply Gate:** New canonical People/Organization notes from an email require the user to have replied at least once in the thread (a \`### From:\` matching \`user.email\` or \`@user.domain\`). Purely inbound threads update existing notes only. Calendar invites for a scheduled meeting are exempt. + **Meeting activity format:** Always include a link to the source meeting note: \`\`\` **2025-01-15** (meeting): Discussed project timeline with [[People/Sarah Chen]]. See [[Meetings/granola/abc123_Weekly Sync]] @@ -1125,8 +1204,11 @@ Before completing, verify: **Filtering:** - [ ] Excluded self (user.name, user.email, @user.domain) - [ ] Applied relevance test to each person +- [ ] Applied the email reply gate to new People/Organizations from email sources (purely inbound threads create no new notes) - [ ] Applied the direct interaction test to new People/Organizations +- [ ] Applied the transactional interaction check (issue reports, invoices, support, scheduling update existing notes only — never create new ones) - [ ] Applied the weekly importance test to new People/Organizations +- [ ] Applied the ongoing-relationship soft check (temporary/one-off interactions create no new notes) - [ ] Transactional contacts in Org Contacts, not People notes - [ ] Source correctly classified (process vs skip) - [ ] Third-party mentions did not become new canonical People/Organizations notes @@ -1147,6 +1229,7 @@ Before completing, verify: - [ ] Logged all state changes in activity **Structure:** +- [ ] Every \`[[link]]\` reflects a real relationship from a single source file or existing note — none created from batch co-occurrence (Source Scoping) - [ ] All entity mentions use \`[[Folder/Name]]\` absolute links - [ ] Activity entries are reverse chronological - [ ] No duplicate activity entries diff --git a/apps/x/packages/core/src/models/defaults.ts b/apps/x/packages/core/src/models/defaults.ts index d6be0330..3163438c 100644 --- a/apps/x/packages/core/src/models/defaults.ts +++ b/apps/x/packages/core/src/models/defaults.ts @@ -7,7 +7,7 @@ import container from "../di/container.js"; const SIGNED_IN_DEFAULT_MODEL = "gpt-5.4"; const SIGNED_IN_DEFAULT_PROVIDER = "rowboat"; const SIGNED_IN_KG_MODEL = "google/gemini-3.1-flash-lite"; -const SIGNED_IN_LIVE_NOTE_AGENT_MODEL = "google/gemini-3.1-flash-lite-preview"; +const SIGNED_IN_LIVE_NOTE_AGENT_MODEL = "google/gemini-3.1-flash-lite"; /** * The single source of truth for "what model+provider should we use when