diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 2244c9ee..90377e62 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -258,6 +258,20 @@ export async function startServicesWatcher(): Promise { }); } +export function stopRunsWatcher(): void { + if (runsWatcher) { + runsWatcher(); + runsWatcher = null; + } +} + +export function stopServicesWatcher(): void { + if (servicesWatcher) { + servicesWatcher(); + servicesWatcher = null; + } +} + // ============================================================================ // Handler Implementations // ============================================================================ diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index d2ad14e1..2a5330ab 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -1,6 +1,14 @@ import { app, BrowserWindow, protocol, net, shell } from "electron"; import path from "node:path"; -import { setupIpcHandlers, startRunsWatcher, startServicesWatcher, startWorkspaceWatcher, stopWorkspaceWatcher } from "./ipc.js"; +import { + setupIpcHandlers, + startRunsWatcher, + startServicesWatcher, + startWorkspaceWatcher, + stopRunsWatcher, + stopServicesWatcher, + stopWorkspaceWatcher +} from "./ipc.js"; import { fileURLToPath, pathToFileURL } from "node:url"; import { dirname } from "node:path"; import { updateElectronApp, UpdateSourceType } from "update-electron-app"; @@ -185,4 +193,6 @@ app.on("window-all-closed", () => { app.on("before-quit", () => { // Clean up watcher on app quit stopWorkspaceWatcher(); + stopRunsWatcher(); + stopServicesWatcher(); }); diff --git a/apps/x/packages/core/src/config/note_creation_config.ts b/apps/x/packages/core/src/config/note_creation_config.ts index da4b3d02..a86e8c00 100644 --- a/apps/x/packages/core/src/config/note_creation_config.ts +++ b/apps/x/packages/core/src/config/note_creation_config.ts @@ -28,6 +28,7 @@ function readConfig(): NoteCreationConfig { ? config.strictness : DEFAULT_STRICTNESS, configured: config.configured === true, + onboardingComplete: config.onboardingComplete === true, }; } catch { return { strictness: DEFAULT_STRICTNESS, configured: false }; @@ -83,7 +84,10 @@ export function markStrictnessConfigured(): void { * Set strictness and mark as configured in one operation. */ export function setStrictnessAndMarkConfigured(strictness: NoteCreationStrictness): void { - writeConfig({ strictness, configured: true }); + const config = readConfig(); + config.strictness = strictness; + config.configured = true; + writeConfig(config); } /** diff --git a/apps/x/packages/core/src/knowledge/build_graph.ts b/apps/x/packages/core/src/knowledge/build_graph.ts index 2346005e..a1b7e135 100644 --- a/apps/x/packages/core/src/knowledge/build_graph.ts +++ b/apps/x/packages/core/src/knowledge/build_graph.ts @@ -14,6 +14,7 @@ import { type GraphState, } from './graph_state.js'; import { buildKnowledgeIndex, formatIndexForPrompt } from './knowledge_index.js'; +import { limitEventItems } from './limit_event_items.js'; /** * Build obsidian-style knowledge graph by running topic extraction @@ -33,14 +34,6 @@ const SOURCE_FOLDERS = [ // Voice memos are now created directly in knowledge/Voice Memos// const VOICE_MEMOS_KNOWLEDGE_DIR = path.join(NOTES_OUTPUT_DIR, 'Voice Memos'); -const MAX_EVENT_ITEMS = 50; - -function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { - if (items.length <= max) { - return { items, truncated: false }; - } - return { items: items.slice(0, max), truncated: true }; -} function extractPathFromToolInput(input: string): string | null { try { diff --git a/apps/x/packages/core/src/knowledge/granola/sync.ts b/apps/x/packages/core/src/knowledge/granola/sync.ts index 5e303e8d..f03a8e06 100644 --- a/apps/x/packages/core/src/knowledge/granola/sync.ts +++ b/apps/x/packages/core/src/knowledge/granola/sync.ts @@ -5,6 +5,7 @@ import { WorkDir } from '../../config/config.js'; import container from '../../di/container.js'; import { IGranolaConfigRepo } from './repo.js'; import { serviceLogger } from '../../services/service_logger.js'; +import { limitEventItems } from '../limit_event_items.js'; import { GetDocumentsResponse, SyncState, @@ -23,14 +24,6 @@ const API_DELAY_MS = 1000; // 1 second delay between API calls const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit const MAX_RETRIES = 3; // Maximum retries for rate-limited requests const MAX_BATCH_SIZE = 10; // Process max 10 documents per folder per sync -const MAX_EVENT_ITEMS = 50; - -function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { - if (items.length <= max) { - return { items, truncated: false }; - } - return { items: items.slice(0, max), truncated: true }; -} // --- Wake Signal for Immediate Sync Trigger --- let wakeResolve: (() => void) | null = null; diff --git a/apps/x/packages/core/src/knowledge/limit_event_items.ts b/apps/x/packages/core/src/knowledge/limit_event_items.ts new file mode 100644 index 00000000..b3935112 --- /dev/null +++ b/apps/x/packages/core/src/knowledge/limit_event_items.ts @@ -0,0 +1,8 @@ +export const MAX_EVENT_ITEMS = 50; + +export function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} diff --git a/apps/x/packages/core/src/knowledge/sync_calendar.ts b/apps/x/packages/core/src/knowledge/sync_calendar.ts index 36c83016..cf2f6387 100644 --- a/apps/x/packages/core/src/knowledge/sync_calendar.ts +++ b/apps/x/packages/core/src/knowledge/sync_calendar.ts @@ -6,6 +6,7 @@ import { NodeHtmlMarkdown } from 'node-html-markdown' import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; import { serviceLogger } from '../services/service_logger.js'; +import { limitEventItems } from './limit_event_items.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'calendar_sync'); @@ -15,15 +16,6 @@ const REQUIRED_SCOPES = [ 'https://www.googleapis.com/auth/calendar.events.readonly', 'https://www.googleapis.com/auth/drive.readonly' ]; -const MAX_EVENT_ITEMS = 50; - -function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { - if (items.length <= max) { - return { items, truncated: false }; - } - return { items: items.slice(0, max), truncated: true }; -} - const nhm = new NodeHtmlMarkdown(); // --- Wake Signal for Immediate Sync Trigger --- diff --git a/apps/x/packages/core/src/knowledge/sync_fireflies.ts b/apps/x/packages/core/src/knowledge/sync_fireflies.ts index 093ee044..5e0cca07 100644 --- a/apps/x/packages/core/src/knowledge/sync_fireflies.ts +++ b/apps/x/packages/core/src/knowledge/sync_fireflies.ts @@ -3,6 +3,7 @@ import path from 'path'; import { WorkDir } from '../config/config.js'; import { FirefliesClientFactory } from './fireflies-client-factory.js'; import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; +import { limitEventItems } from './limit_event_items.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'fireflies_transcripts'); @@ -12,14 +13,6 @@ const LOOKBACK_DAYS = 30; // Last 1 month const API_DELAY_MS = 2000; // 2 second delay between API calls const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit const MAX_RETRIES = 3; // Maximum retries for rate-limited requests -const MAX_EVENT_ITEMS = 50; - -function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { - if (items.length <= max) { - return { items, truncated: false }; - } - return { items: items.slice(0, max), truncated: true }; -} // --- Wake Signal for Immediate Sync Trigger --- let wakeResolve: (() => void) | null = null; @@ -603,7 +596,7 @@ async function syncMeetings() { level: 'info', message: `Fireflies sync complete: ${newCount} transcript${newCount === 1 ? '' : 's'}`, durationMs: Date.now() - run.startedAt, - outcome: newCount > 0 ? 'ok' : 'error', + outcome: newCount > 0 ? 'ok' : 'idle', summary: { transcripts: newCount }, }); diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 000c5f10..de73c016 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -5,21 +5,13 @@ import { NodeHtmlMarkdown } from 'node-html-markdown' import { OAuth2Client } from 'google-auth-library'; import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; -import { serviceLogger } from '../services/service_logger.js'; +import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; +import { limitEventItems } from './limit_event_items.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; -const MAX_EVENT_ITEMS = 50; - -function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { - if (items.length <= max) { - return { items, truncated: false }; - } - return { items: items.slice(0, max), truncated: true }; -} - const nhm = new NodeHtmlMarkdown(); // --- Wake Signal for Immediate Sync Trigger --- @@ -201,79 +193,120 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str console.log(`Performing full sync of last ${lookbackDays} days...`); const gmail = google.gmail({ version: 'v1', auth }); - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - lookbackDays); - const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); + let run: ServiceRunContext | null = null; + const ensureRun = async () => { + if (!run) { + run = await serviceLogger.startRun({ + service: 'gmail', + message: 'Syncing Gmail', + trigger: 'timer', + }); + } + }; - // Get History ID - const profile = await gmail.users.getProfile({ userId: 'me' }); - const currentHistoryId = profile.data.historyId!; + try { + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - lookbackDays); + const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); - const threadIds: string[] = []; - let pageToken: string | undefined; - do { - const res = await gmail.users.threads.list({ - userId: 'me', - q: `after:${dateQuery}`, - pageToken - }); + // Get History ID + const profile = await gmail.users.getProfile({ userId: 'me' }); + const currentHistoryId = profile.data.historyId!; - const threads = res.data.threads; - if (threads) { - for (const thread of threads) { - if (thread.id) { - threadIds.push(thread.id); + const threadIds: string[] = []; + let pageToken: string | undefined; + do { + const res = await gmail.users.threads.list({ + userId: 'me', + q: `after:${dateQuery}`, + pageToken + }); + + const threads = res.data.threads; + if (threads) { + for (const thread of threads) { + if (thread.id) { + threadIds.push(thread.id); + } } } + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); + + if (threadIds.length === 0) { + saveState(currentHistoryId, stateFile); + console.log("Full sync complete. No threads found."); + return; + } + + await ensureRun(); + const limitedThreads = limitEventItems(threadIds); + await serviceLogger.log({ + type: 'changes_identified', + service: run!.service, + runId: run!.runId, + level: 'info', + message: `Found ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'} to sync`, + counts: { threads: threadIds.length }, + items: limitedThreads.items, + truncated: limitedThreads.truncated, + }); + + for (const threadId of threadIds) { + await processThread(auth, threadId, syncDir, attachmentsDir); } - pageToken = res.data.nextPageToken ?? undefined; - } while (pageToken); - if (threadIds.length === 0) { saveState(currentHistoryId, stateFile); - console.log("Full sync complete. No threads found."); - return; + await serviceLogger.log({ + type: 'run_complete', + service: run!.service, + runId: run!.runId, + level: 'info', + message: `Gmail sync complete: ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'}`, + durationMs: Date.now() - run!.startedAt, + outcome: 'ok', + summary: { threads: threadIds.length }, + }); + console.log("Full sync complete."); + } catch (error) { + console.error("Error during full sync:", error); + await ensureRun(); + await serviceLogger.log({ + type: 'error', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync failed', + durationMs: Date.now() - run!.startedAt, + outcome: 'error', + }); + throw error; } - - const run = await serviceLogger.startRun({ - service: 'gmail', - message: 'Syncing Gmail', - trigger: 'timer', - }); - const limitedThreads = limitEventItems(threadIds); - await serviceLogger.log({ - type: 'changes_identified', - service: run.service, - runId: run.runId, - level: 'info', - message: `Found ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'} to sync`, - counts: { threads: threadIds.length }, - items: limitedThreads.items, - truncated: limitedThreads.truncated, - }); - - for (const threadId of threadIds) { - await processThread(auth, threadId, syncDir, attachmentsDir); - } - - saveState(currentHistoryId, stateFile); - await serviceLogger.log({ - type: 'run_complete', - service: run.service, - runId: run.runId, - level: 'info', - message: `Gmail sync complete: ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'}`, - durationMs: Date.now() - run.startedAt, - outcome: 'ok', - summary: { threads: threadIds.length }, - }); - console.log("Full sync complete."); } async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { console.log(`Checking updates since historyId ${startHistoryId}...`); const gmail = google.gmail({ version: 'v1', auth }); + let run: ServiceRunContext | null = null; + const ensureRun = async () => { + if (!run) { + run = await serviceLogger.startRun({ + service: 'gmail', + message: 'Syncing Gmail', + trigger: 'timer', + }); + } + }; + try { const res = await gmail.users.history.list({ userId: 'me', @@ -308,17 +341,13 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: return; } - const run = await serviceLogger.startRun({ - service: 'gmail', - message: 'Syncing Gmail', - trigger: 'timer', - }); + await ensureRun(); const threadIdList = Array.from(threadIds); const limitedThreads = limitEventItems(threadIdList); await serviceLogger.log({ type: 'changes_identified', - service: run.service, - runId: run.runId, + service: run!.service, + runId: run!.runId, level: 'info', message: `Found ${threadIdList.length} new thread${threadIdList.length === 1 ? '' : 's'}`, counts: { threads: threadIdList.length }, @@ -334,11 +363,11 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: saveState(profile.data.historyId!, stateFile); await serviceLogger.log({ type: 'run_complete', - service: run.service, - runId: run.runId, + service: run!.service, + runId: run!.runId, level: 'info', message: `Gmail sync complete: ${threadIdList.length} thread${threadIdList.length === 1 ? '' : 's'}`, - durationMs: Date.now() - run.startedAt, + durationMs: Date.now() - run!.startedAt, outcome: 'ok', summary: { threads: threadIdList.length }, }); @@ -348,13 +377,32 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: if (e.response?.status === 404) { console.log("History ID expired. Falling back to full sync."); await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays); - } else { - console.error("Error during partial sync:", error); - // If 401, clear tokens to force re-auth next run - if (e.response?.status === 401) { - console.log("401 Unauthorized, clearing cache"); - GoogleClientFactory.clearCache(); - } + return; + } + + console.error("Error during partial sync:", error); + await ensureRun(); + await serviceLogger.log({ + type: 'error', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync failed', + durationMs: Date.now() - run!.startedAt, + outcome: 'error', + }); + // If 401, clear tokens to force re-auth next run + if (e.response?.status === 401) { + console.log("401 Unauthorized, clearing cache"); + GoogleClientFactory.clearCache(); } } } diff --git a/apps/x/packages/core/src/services/service_logger.ts b/apps/x/packages/core/src/services/service_logger.ts index 50cac7e8..886cf66d 100644 --- a/apps/x/packages/core/src/services/service_logger.ts +++ b/apps/x/packages/core/src/services/service_logger.ts @@ -7,7 +7,7 @@ import type { ServiceEventType } from "@x/shared/dist/service-events.js"; import { serviceBus } from "./service_bus.js"; type ServiceNameType = ServiceEventType["service"]; -type DistributiveOmit = T extends any ? Omit : never; +type DistributiveOmit = T extends unknown ? Omit : never; type ServiceEventInput = DistributiveOmit; const LOG_DIR = path.join(WorkDir, "logs"); @@ -47,8 +47,18 @@ export class ServiceLogger { private async rotateIfNeeded(nextBytes: number): Promise { if (this.currentSize + nextBytes <= MAX_LOG_BYTES) return; if (this.stream) { - this.stream.close(); + const stream = this.stream; this.stream = null; + await new Promise((resolve) => { + let settled = false; + const done = () => { + if (settled) return; + settled = true; + resolve(); + }; + stream.once("error", done); + stream.end(done); + }); } const ts = safeTimestampForFile(new Date().toISOString()); const rotatedPath = path.join(LOG_DIR, `services.${ts}.jsonl`);