diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 6efd48df..2244c9ee 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -12,10 +12,12 @@ import { workspace as workspaceShared } from '@x/shared'; import * as mcpCore from '@x/core/dist/mcp/mcp.js'; import * as runsCore from '@x/core/dist/runs/runs.js'; import { bus } from '@x/core/dist/runs/bus.js'; +import { serviceBus } from '@x/core/dist/services/service_bus.js'; import type { FSWatcher } from 'chokidar'; import fs from 'node:fs/promises'; import z from 'zod'; import { RunEvent } from '@x/shared/dist/runs.js'; +import { ServiceEvent } from '@x/shared/dist/service-events.js'; import container from '@x/core/dist/di/container.js'; import { listOnboardingModels } from '@x/core/dist/models/models-dev.js'; import { testModelConnection } from '@x/core/dist/models/models.js'; @@ -218,6 +220,15 @@ function emitRunEvent(event: z.infer): void { } } +function emitServiceEvent(event: z.infer): void { + const windows = BrowserWindow.getAllWindows(); + for (const win of windows) { + if (!win.isDestroyed() && win.webContents) { + win.webContents.send('services:events', event); + } + } +} + export function emitOAuthEvent(event: { provider: string; success: boolean; error?: string }): void { const windows = BrowserWindow.getAllWindows(); for (const win of windows) { @@ -237,6 +248,16 @@ export async function startRunsWatcher(): Promise { }); } +let servicesWatcher: (() => void) | null = null; +export async function startServicesWatcher(): Promise { + if (servicesWatcher) { + return; + } + servicesWatcher = await serviceBus.subscribe(async (event) => { + emitServiceEvent(event); + }); +} + // ============================================================================ // Handler Implementations // ============================================================================ diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index 6bf82325..a77a8f7b 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -1,6 +1,6 @@ import { app, BrowserWindow, protocol, net, shell } from "electron"; import path from "node:path"; -import { setupIpcHandlers, startRunsWatcher, startWorkspaceWatcher, stopWorkspaceWatcher } from "./ipc.js"; +import { setupIpcHandlers, startRunsWatcher, startServicesWatcher, startWorkspaceWatcher, stopWorkspaceWatcher } from "./ipc.js"; import { fileURLToPath, pathToFileURL } from "node:url"; import { dirname } from "node:path"; import { updateElectronApp, UpdateSourceType } from "update-electron-app"; @@ -143,6 +143,9 @@ app.whenReady().then(async () => { // start runs watcher startRunsWatcher(); + // start services watcher + startServicesWatcher(); + // start gmail sync initGmailSync(); @@ -180,4 +183,4 @@ app.on("window-all-closed", () => { app.on("before-quit", () => { // Clean up watcher on app quit stopWorkspaceWatcher(); -}); \ No newline at end of file +}); diff --git a/apps/x/apps/renderer/src/components/sidebar-content.tsx b/apps/x/apps/renderer/src/components/sidebar-content.tsx index 02422b20..4be433d2 100644 --- a/apps/x/apps/renderer/src/components/sidebar-content.tsx +++ b/apps/x/apps/renderer/src/components/sidebar-content.tsx @@ -1,7 +1,7 @@ "use client" import * as React from "react" -import { useState } from "react" +import { useEffect, useRef, useState } from "react" import { Bot, ChevronRight, @@ -15,6 +15,7 @@ import { Mic, Network, Pencil, + LoaderIcon, Square, SquarePen, Trash2, @@ -28,6 +29,7 @@ import { import { Sidebar, SidebarContent, + SidebarFooter, SidebarGroup, SidebarGroupContent, SidebarHeader, @@ -36,6 +38,7 @@ import { SidebarMenuItem, SidebarMenuSub, SidebarRail, + useSidebar, } from "@/components/ui/sidebar" import { Tooltip, @@ -52,6 +55,8 @@ import { import { Input } from "@/components/ui/input" import { useSidebarSection } from "@/contexts/sidebar-context" import { toast } from "@/lib/toast" +import { ServiceEvent } from "@x/shared/src/service-events.js" +import z from "zod" interface TreeNode { path: string @@ -96,6 +101,11 @@ type BackgroundTaskItem = { lastRunAt?: string | null } +type ServiceEventType = z.infer + +const MAX_SYNC_EVENTS = 30 +const RUN_STALE_MS = 2 * 60 * 60 * 1000 + type TasksActions = { onNewChat: () => void onSelectRun: (runId: string) => void @@ -121,6 +131,117 @@ const sectionTitles = { tasks: "Chats", } +function formatEventTime(ts: string): string { + const date = new Date(ts) + if (Number.isNaN(date.getTime())) return "" + return date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" }) +} + +function SyncStatusBar() { + const { state, isMobile } = useSidebar() + const [events, setEvents] = useState([]) + const [activeRuns, setActiveRuns] = useState>(new Set()) + const [isExpanded, setIsExpanded] = useState(false) + const runTimeoutsRef = useRef>>(new Map()) + + useEffect(() => { + const cleanup = window.ipc.on('services:events', (event) => { + const nextEvent = event as ServiceEventType + setEvents((prev) => [nextEvent, ...prev].slice(0, MAX_SYNC_EVENTS)) + if (nextEvent.type === 'run_start') { + setActiveRuns((prev) => { + const next = new Set(prev) + next.add(nextEvent.runId) + return next + }) + const existingTimeout = runTimeoutsRef.current.get(nextEvent.runId) + if (existingTimeout) { + clearTimeout(existingTimeout) + } + const timeout = setTimeout(() => { + setActiveRuns((prev) => { + if (!prev.has(nextEvent.runId)) return prev + const next = new Set(prev) + next.delete(nextEvent.runId) + return next + }) + runTimeoutsRef.current.delete(nextEvent.runId) + }, RUN_STALE_MS) + runTimeoutsRef.current.set(nextEvent.runId, timeout) + } else if (nextEvent.type === 'run_complete') { + setActiveRuns((prev) => { + const next = new Set(prev) + next.delete(nextEvent.runId) + return next + }) + const existingTimeout = runTimeoutsRef.current.get(nextEvent.runId) + if (existingTimeout) { + clearTimeout(existingTimeout) + runTimeoutsRef.current.delete(nextEvent.runId) + } + } + }) + return cleanup + }, []) + + useEffect(() => { + return () => { + runTimeoutsRef.current.forEach((timeout) => clearTimeout(timeout)) + runTimeoutsRef.current.clear() + } + }, []) + + const isSyncing = activeRuns.size > 0 + const isCollapsed = state === "collapsed" + + return ( + <> + {!isMobile && isCollapsed && isSyncing && ( +
+ +
+ )} + + + {isExpanded && ( +
+ {events.length === 0 ? ( +
No recent activity.
+ ) : ( + events.map((event, idx) => ( +
+ + {formatEventTime(event.ts)} + + {event.message} +
+ )) + )} +
+ )} +
+ + ) +} + export function SidebarContentPanel({ tree, selectedPath, @@ -165,6 +286,7 @@ export function SidebarContentPanel({ /> )} + ) @@ -779,4 +901,3 @@ function TasksSection({ ) } - diff --git a/apps/x/packages/core/src/knowledge/build_graph.ts b/apps/x/packages/core/src/knowledge/build_graph.ts index 1c954bdc..2346005e 100644 --- a/apps/x/packages/core/src/knowledge/build_graph.ts +++ b/apps/x/packages/core/src/knowledge/build_graph.ts @@ -4,6 +4,7 @@ import { WorkDir } from '../config/config.js'; import { autoConfigureStrictnessIfNeeded } from '../config/strictness_analyzer.js'; import { createRun, createMessage } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; +import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; import { loadState, saveState, @@ -32,6 +33,23 @@ const SOURCE_FOLDERS = [ // Voice memos are now created directly in knowledge/Voice Memos// const VOICE_MEMOS_KNOWLEDGE_DIR = path.join(NOTES_OUTPUT_DIR, 'Voice Memos'); +const MAX_EVENT_ITEMS = 50; + +function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} + +function extractPathFromToolInput(input: string): string | null { + try { + const parsed = JSON.parse(input) as { path?: string }; + return typeof parsed.path === 'string' ? parsed.path : null; + } catch { + return null; + } +} /** * Get unprocessed voice memo files from knowledge/Voice Memos/ @@ -148,7 +166,11 @@ async function waitForRunCompletion(runId: string): Promise { /** * Run note creation agent on a batch of files to extract entities and create/update notes */ -async function createNotesFromBatch(files: { path: string; content: string }[], batchNumber: number, knowledgeIndex: string): Promise { +async function createNotesFromBatch( + files: { path: string; content: string }[], + batchNumber: number, + knowledgeIndex: string +): Promise<{ runId: string; notesCreated: Set; notesModified: Set }> { // Ensure notes output directory exists if (!fs.existsSync(NOTES_OUTPUT_DIR)) { fs.mkdirSync(NOTES_OUTPUT_DIR, { recursive: true }); @@ -182,18 +204,155 @@ async function createNotesFromBatch(files: { path: string; content: string }[], message += `\n\n---\n\n`; }); + const notesCreated = new Set(); + const notesModified = new Set(); + + const unsubscribe = await bus.subscribe(run.id, async (event) => { + if (event.type !== "tool-invocation") { + return; + } + if (event.toolName !== "workspace-writeFile" && event.toolName !== "workspace-edit") { + return; + } + const toolPath = extractPathFromToolInput(event.input); + if (!toolPath) { + return; + } + if (event.toolName === "workspace-writeFile") { + notesCreated.add(toolPath); + } else if (event.toolName === "workspace-edit") { + notesModified.add(toolPath); + } + }); + await createMessage(run.id, message); // Wait for the run to complete await waitForRunCompletion(run.id); + unsubscribe(); - return run.id; + return { runId: run.id, notesCreated, notesModified }; } /** * Build the knowledge graph from all content files in the specified source directory * Only processes new or changed files based on state tracking */ +type BatchResult = { + processedFiles: string[]; + notesCreated: Set; + notesModified: Set; + hadError: boolean; +}; + +async function buildGraphWithFiles( + sourceDir: string, + filesToProcess: string[], + state: GraphState, + run?: ServiceRunContext +): Promise { + console.log(`[buildGraph] Starting build for directory: ${sourceDir}`); + + if (filesToProcess.length === 0) { + console.log(`[buildGraph] No new or changed files to process in ${path.basename(sourceDir)}`); + return { processedFiles: [], notesCreated: new Set(), notesModified: new Set(), hadError: false }; + } + + console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`); + + // Read file contents + const contentFiles = await readFileContents(filesToProcess); + + if (contentFiles.length === 0) { + console.log(`No files could be read from ${sourceDir}`); + return { processedFiles: [], notesCreated: new Set(), notesModified: new Set(), hadError: false }; + } + + const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing + const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); + + console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`); + + const processedFiles: string[] = []; + const notesCreated = new Set(); + const notesModified = new Set(); + let hadError = false; + + // Process files in batches + for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) { + const batch = contentFiles.slice(i, i + BATCH_SIZE); + const batchNumber = Math.floor(i / BATCH_SIZE) + 1; + + try { + // Build fresh index before each batch to include notes from previous batches + console.log(`Building knowledge index for batch ${batchNumber}...`); + const indexStartTime = Date.now(); + const index = buildKnowledgeIndex(); + const indexForPrompt = formatIndexForPrompt(index); + const indexDuration = ((Date.now() - indexStartTime) / 1000).toFixed(2); + console.log(`Index built in ${indexDuration}s: ${index.people.length} people, ${index.organizations.length} orgs, ${index.projects.length} projects, ${index.topics.length} topics, ${index.other.length} other`); + + console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`); + if (run) { + await serviceLogger.log({ + type: 'progress', + service: run.service, + runId: run.runId, + level: 'info', + message: `Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)`, + step: 'batch', + current: batchNumber, + total: totalBatches, + details: { filesInBatch: batch.length }, + }); + } + const agentStartTime = Date.now(); + const batchResult = await createNotesFromBatch(batch, batchNumber, indexForPrompt); + const agentDuration = ((Date.now() - agentStartTime) / 1000).toFixed(2); + console.log(`Batch ${batchNumber}/${totalBatches} complete in ${agentDuration}s`); + + for (const note of batchResult.notesCreated) { + notesCreated.add(note); + } + for (const note of batchResult.notesModified) { + notesModified.add(note); + } + + // Mark files in this batch as processed + for (const file of batch) { + markFileAsProcessed(file.path, state); + processedFiles.push(file.path); + } + + // Save state after each successful batch + // This ensures partial progress is saved even if later batches fail + saveState(state); + } catch (error) { + hadError = true; + console.error(`Error processing batch ${batchNumber}:`, error); + if (run) { + await serviceLogger.log({ + type: 'error', + service: run.service, + runId: run.runId, + level: 'error', + message: `Error processing batch ${batchNumber}`, + error: error instanceof Error ? error.message : String(error), + context: { batchNumber }, + }); + } + // Continue with next batch (without saving state for failed batch) + } + } + + // Update state with last build time and save + state.lastBuildTime = new Date().toISOString(); + saveState(state); + + console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`); + return { processedFiles, notesCreated, notesModified, hadError }; +} + export async function buildGraph(sourceDir: string): Promise { console.log(`[buildGraph] Starting build for directory: ${sourceDir}`); @@ -210,62 +369,7 @@ export async function buildGraph(sourceDir: string): Promise { return; } - console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`); - - // Read file contents - const contentFiles = await readFileContents(filesToProcess); - - if (contentFiles.length === 0) { - console.log(`No files could be read from ${sourceDir}`); - return; - } - - const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing - const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); - - console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`); - - // Process files in batches - const processedFiles: string[] = []; - for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) { - const batch = contentFiles.slice(i, i + BATCH_SIZE); - const batchNumber = Math.floor(i / BATCH_SIZE) + 1; - - try { - // Build fresh index before each batch to include notes from previous batches - console.log(`Building knowledge index for batch ${batchNumber}...`); - const indexStartTime = Date.now(); - const index = buildKnowledgeIndex(); - const indexForPrompt = formatIndexForPrompt(index); - const indexDuration = ((Date.now() - indexStartTime) / 1000).toFixed(2); - console.log(`Index built in ${indexDuration}s: ${index.people.length} people, ${index.organizations.length} orgs, ${index.projects.length} projects, ${index.topics.length} topics, ${index.other.length} other`); - - console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`); - const agentStartTime = Date.now(); - await createNotesFromBatch(batch, batchNumber, indexForPrompt); - const agentDuration = ((Date.now() - agentStartTime) / 1000).toFixed(2); - console.log(`Batch ${batchNumber}/${totalBatches} complete in ${agentDuration}s`); - - // Mark files in this batch as processed - for (const file of batch) { - markFileAsProcessed(file.path, state); - processedFiles.push(file.path); - } - - // Save state after each successful batch - // This ensures partial progress is saved even if later batches fail - saveState(state); - } catch (error) { - console.error(`Error processing batch ${batchNumber}:`, error); - // Continue with next batch (without saving state for failed batch) - } - } - - // Update state with last build time and save - state.lastBuildTime = new Date().toISOString(); - saveState(state); - - console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`); + await buildGraphWithFiles(sourceDir, filesToProcess, state); } /** @@ -287,10 +391,39 @@ async function processVoiceMemosForKnowledge(): Promise { console.log(`[GraphBuilder] Processing ${unprocessedFiles.length} voice memo transcripts for entity extraction...`); console.log(`[GraphBuilder] Files to process: ${unprocessedFiles.map(f => path.basename(f)).join(', ')}`); + const run = await serviceLogger.startRun({ + service: 'voice_memo', + message: `Processing ${unprocessedFiles.length} voice memo${unprocessedFiles.length === 1 ? '' : 's'}`, + trigger: 'timer', + }); + + const relativeVoiceMemos = unprocessedFiles.map(filePath => path.relative(WorkDir, filePath)); + const limitedVoiceMemos = limitEventItems(relativeVoiceMemos); + await serviceLogger.log({ + type: 'changes_identified', + service: run.service, + runId: run.runId, + level: 'info', + message: `Found ${unprocessedFiles.length} new voice memo${unprocessedFiles.length === 1 ? '' : 's'}`, + counts: { voiceMemos: unprocessedFiles.length }, + items: limitedVoiceMemos.items, + truncated: limitedVoiceMemos.truncated, + }); + // Read the files const contentFiles = await readFileContents(unprocessedFiles); if (contentFiles.length === 0) { + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: 'info', + message: 'No voice memos could be read', + durationMs: Date.now() - run.startedAt, + outcome: 'error', + summary: { processedFiles: 0 }, + }); return false; } @@ -298,6 +431,10 @@ async function processVoiceMemosForKnowledge(): Promise { const BATCH_SIZE = 10; const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); + const notesCreated = new Set(); + const notesModified = new Set(); + let hadError = false; + for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) { const batch = contentFiles.slice(i, i + BATCH_SIZE); const batchNumber = Math.floor(i / BATCH_SIZE) + 1; @@ -309,9 +446,27 @@ async function processVoiceMemosForKnowledge(): Promise { const indexForPrompt = formatIndexForPrompt(index); console.log(`[GraphBuilder] Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`); - await createNotesFromBatch(batch, batchNumber, indexForPrompt); + await serviceLogger.log({ + type: 'progress', + service: run.service, + runId: run.runId, + level: 'info', + message: `Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)`, + step: 'batch', + current: batchNumber, + total: totalBatches, + details: { filesInBatch: batch.length }, + }); + const batchResult = await createNotesFromBatch(batch, batchNumber, indexForPrompt); console.log(`[GraphBuilder] Batch ${batchNumber}/${totalBatches} complete`); + for (const note of batchResult.notesCreated) { + notesCreated.add(note); + } + for (const note of batchResult.notesModified) { + notesModified.add(note); + } + // Mark files as processed for (const file of batch) { markFileAsProcessed(file.path, state); @@ -320,7 +475,17 @@ async function processVoiceMemosForKnowledge(): Promise { // Save state after each batch saveState(state); } catch (error) { + hadError = true; console.error(`[GraphBuilder] Error processing batch ${batchNumber}:`, error); + await serviceLogger.log({ + type: 'error', + service: run.service, + runId: run.runId, + level: 'error', + message: `Error processing voice memo batch ${batchNumber}`, + error: error instanceof Error ? error.message : String(error), + context: { batchNumber }, + }); } } @@ -328,6 +493,21 @@ async function processVoiceMemosForKnowledge(): Promise { state.lastBuildTime = new Date().toISOString(); saveState(state); + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: hadError ? 'error' : 'info', + message: `Voice memos processed: ${contentFiles.length} files, ${notesCreated.size} created, ${notesModified.size} updated`, + durationMs: Date.now() - run.startedAt, + outcome: hadError ? 'error' : 'ok', + summary: { + processedFiles: contentFiles.length, + notesCreated: notesCreated.size, + notesModified: notesModified.size, + }, + }); + return true; } @@ -352,6 +532,11 @@ async function processAllSources(): Promise { console.error('[GraphBuilder] Error processing voice memos:', error); } + const state = loadState(); + const folderChanges: { folder: string; sourceDir: string; files: string[] }[] = []; + const countsByFolder: Record = {}; + const allFiles: string[] = []; + for (const folder of SOURCE_FOLDERS) { const sourceDir = path.join(WorkDir, folder); @@ -362,14 +547,13 @@ async function processAllSources(): Promise { } try { - // Quick check if there are any files to process before doing the full build - const state = loadState(); const filesToProcess = getFilesToProcess(sourceDir, state); if (filesToProcess.length > 0) { console.log(`[GraphBuilder] Found ${filesToProcess.length} new/changed files in ${folder}`); - await buildGraph(sourceDir); - anyFilesProcessed = true; + folderChanges.push({ folder, sourceDir, files: filesToProcess }); + countsByFolder[folder] = filesToProcess.length; + allFiles.push(...filesToProcess); } } catch (error) { console.error(`[GraphBuilder] Error processing ${folder}:`, error); @@ -377,6 +561,63 @@ async function processAllSources(): Promise { } } + if (allFiles.length > 0) { + const run = await serviceLogger.startRun({ + service: 'graph', + message: 'Syncing knowledge graph', + trigger: 'timer', + config: { sources: SOURCE_FOLDERS }, + }); + + const relativeFiles = allFiles.map(filePath => path.relative(WorkDir, filePath)); + const limitedFiles = limitEventItems(relativeFiles); + const foldersList = Object.keys(countsByFolder).join(', '); + const folderMessage = foldersList ? ` across ${foldersList}` : ''; + + await serviceLogger.log({ + type: 'changes_identified', + service: run.service, + runId: run.runId, + level: 'info', + message: `Found ${allFiles.length} changed file${allFiles.length === 1 ? '' : 's'}${folderMessage}`, + counts: countsByFolder, + items: limitedFiles.items, + truncated: limitedFiles.truncated, + }); + + const notesCreated = new Set(); + const notesModified = new Set(); + const processedFiles: string[] = []; + let hadError = false; + + for (const entry of folderChanges) { + const result = await buildGraphWithFiles(entry.sourceDir, entry.files, state, run); + result.processedFiles.forEach(file => processedFiles.push(file)); + result.notesCreated.forEach(note => notesCreated.add(note)); + result.notesModified.forEach(note => notesModified.add(note)); + if (result.hadError) { + hadError = true; + } + } + + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: hadError ? 'error' : 'info', + message: `Graph sync complete: ${processedFiles.length} files, ${notesCreated.size} created, ${notesModified.size} updated`, + durationMs: Date.now() - run.startedAt, + outcome: hadError ? 'error' : 'ok', + summary: { + processedFiles: processedFiles.length, + notesCreated: notesCreated.size, + notesModified: notesModified.size, + }, + }); + + anyFilesProcessed = true; + } + if (!anyFilesProcessed) { console.log('[GraphBuilder] No new content to process'); } else { diff --git a/apps/x/packages/core/src/knowledge/granola/sync.ts b/apps/x/packages/core/src/knowledge/granola/sync.ts index 6c736085..5e303e8d 100644 --- a/apps/x/packages/core/src/knowledge/granola/sync.ts +++ b/apps/x/packages/core/src/knowledge/granola/sync.ts @@ -4,6 +4,7 @@ import { homedir } from 'os'; import { WorkDir } from '../../config/config.js'; import container from '../../di/container.js'; import { IGranolaConfigRepo } from './repo.js'; +import { serviceLogger } from '../../services/service_logger.js'; import { GetDocumentsResponse, SyncState, @@ -22,6 +23,14 @@ const API_DELAY_MS = 1000; // 1 second delay between API calls const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit const MAX_RETRIES = 3; // Maximum retries for rate-limited requests const MAX_BATCH_SIZE = 10; // Process max 10 documents per folder per sync +const MAX_EVENT_ITEMS = 50; + +function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} // --- Wake Signal for Immediate Sync Trigger --- let wakeResolve: (() => void) | null = null; @@ -325,102 +334,169 @@ function documentToMarkdown(doc: Document): string { async function syncNotes(): Promise { console.log('[Granola] Starting sync...'); - - // Check if enabled - const granolaRepo = container.resolve('granolaConfigRepo'); - const config = await granolaRepo.getConfig(); - if (!config.enabled) { - console.log('[Granola] Sync disabled in config'); - return; - } - - // Extract access token - const accessToken = extractAccessToken(); - if (!accessToken) { - console.log('[Granola] No access token available'); - return; - } - - // Ensure sync directory exists - ensureDir(SYNC_DIR); - - // Load state - const state = loadState(); - - let newCount = 0; - let updatedCount = 0; - let offset = 0; - let hasMore = true; - // Fetch documents with pagination - while (hasMore) { - // Delay before API call (except first) - if (offset > 0) { - await sleep(API_DELAY_MS); + let runId: string | null = null; + let runStartedAt = 0; + const ensureRun = async () => { + if (!runId) { + const run = await serviceLogger.startRun({ + service: 'granola', + message: 'Syncing Granola notes', + trigger: 'timer', + }); + runId = run.runId; + runStartedAt = run.startedAt; + } + }; + + try { + // Check if enabled + const granolaRepo = container.resolve('granolaConfigRepo'); + const config = await granolaRepo.getConfig(); + if (!config.enabled) { + console.log('[Granola] Sync disabled in config'); + return; } - const docsResponse = await getDocuments(accessToken, MAX_BATCH_SIZE, offset); - if (!docsResponse) { - console.log('[Granola] Failed to fetch documents'); - break; + // Extract access token + const accessToken = extractAccessToken(); + if (!accessToken) { + console.log('[Granola] No access token available'); + return; } - if (docsResponse.docs.length === 0) { - console.log('[Granola] No more documents to fetch'); - hasMore = false; - break; - } + // Ensure sync directory exists + ensureDir(SYNC_DIR); - // Process each document - for (const doc of docsResponse.docs) { - const docUpdatedAt = doc.updated_at || doc.created_at; - const lastSyncedAt = state.syncedDocs[doc.id]; + // Load state + const state = loadState(); - // Check if needs sync (new or updated) - const needsSync = !lastSyncedAt || lastSyncedAt !== docUpdatedAt; + let newCount = 0; + let updatedCount = 0; + let offset = 0; + let hasMore = true; + const changedTitles: string[] = []; - if (!needsSync) { - continue; + // Fetch documents with pagination + while (hasMore) { + // Delay before API call (except first) + if (offset > 0) { + await sleep(API_DELAY_MS); } - // Convert to markdown and save - const markdown = documentToMarkdown(doc); - const docTitle = doc.title || 'Untitled'; - const filename = `${doc.id}_${cleanFilename(docTitle)}.md`; - const filePath = path.join(SYNC_DIR, filename); - - fs.writeFileSync(filePath, markdown); - - if (lastSyncedAt) { - console.log(`[Granola] Updated: ${filename}`); - updatedCount++; - } else { - console.log(`[Granola] Saved: ${filename}`); - newCount++; + const docsResponse = await getDocuments(accessToken, MAX_BATCH_SIZE, offset); + if (!docsResponse) { + console.log('[Granola] Failed to fetch documents'); + break; } - // Update state - state.syncedDocs[doc.id] = docUpdatedAt; + if (docsResponse.docs.length === 0) { + console.log('[Granola] No more documents to fetch'); + hasMore = false; + break; + } + + // Process each document + for (const doc of docsResponse.docs) { + const docUpdatedAt = doc.updated_at || doc.created_at; + const lastSyncedAt = state.syncedDocs[doc.id]; + + // Check if needs sync (new or updated) + const needsSync = !lastSyncedAt || lastSyncedAt !== docUpdatedAt; + + if (!needsSync) { + continue; + } + + await ensureRun(); + const docTitle = doc.title || 'Untitled'; + changedTitles.push(docTitle); + + // Convert to markdown and save + const markdown = documentToMarkdown(doc); + const filename = `${doc.id}_${cleanFilename(docTitle)}.md`; + const filePath = path.join(SYNC_DIR, filename); + + fs.writeFileSync(filePath, markdown); + + if (lastSyncedAt) { + console.log(`[Granola] Updated: ${filename}`); + updatedCount++; + } else { + console.log(`[Granola] Saved: ${filename}`); + newCount++; + } + + // Update state + state.syncedDocs[doc.id] = docUpdatedAt; + } + + // Move to next page + offset += docsResponse.docs.length; + + // Stop if we got fewer docs than requested (last page) + if (docsResponse.docs.length < MAX_BATCH_SIZE) { + hasMore = false; + } } - // Move to next page - offset += docsResponse.docs.length; + // Save state + state.lastSyncDate = new Date().toISOString(); + saveState(state); - // Stop if we got fewer docs than requested (last page) - if (docsResponse.docs.length < MAX_BATCH_SIZE) { - hasMore = false; + console.log(`[Granola] Sync complete: ${newCount} new, ${updatedCount} updated`); + + if (runId) { + const totalChanges = newCount + updatedCount; + const limitedTitles = limitEventItems(changedTitles); + await serviceLogger.log({ + type: 'changes_identified', + service: 'granola', + runId, + level: 'info', + message: `Granola updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`, + counts: { newNotes: newCount, updatedNotes: updatedCount }, + items: limitedTitles.items, + truncated: limitedTitles.truncated, + }); + await serviceLogger.log({ + type: 'run_complete', + service: 'granola', + runId, + level: 'info', + message: `Granola sync complete: ${newCount} new, ${updatedCount} updated`, + durationMs: Date.now() - runStartedAt, + outcome: 'ok', + summary: { newNotes: newCount, updatedNotes: updatedCount }, + }); } - } - // Save state - state.lastSyncDate = new Date().toISOString(); - saveState(state); - - console.log(`[Granola] Sync complete: ${newCount} new, ${updatedCount} updated`); - - // Build knowledge graph if there were changes - if (newCount > 0 || updatedCount > 0) { - // Graph building is now handled by the independent graph builder service + // Build knowledge graph if there were changes + if (newCount > 0 || updatedCount > 0) { + // Graph building is now handled by the independent graph builder service + } + } catch (error) { + console.error('[Granola] Error in sync:', error); + if (runId) { + await serviceLogger.log({ + type: 'error', + service: 'granola', + runId, + level: 'error', + message: 'Granola sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: 'granola', + runId, + level: 'error', + message: 'Granola sync failed', + durationMs: Date.now() - runStartedAt, + outcome: 'error', + }); + } + throw error; } } @@ -443,4 +519,3 @@ export async function init(): Promise { await interruptibleSleep(SYNC_INTERVAL_MS); } } - diff --git a/apps/x/packages/core/src/knowledge/sync_calendar.ts b/apps/x/packages/core/src/knowledge/sync_calendar.ts index 46ec2e1e..36c83016 100644 --- a/apps/x/packages/core/src/knowledge/sync_calendar.ts +++ b/apps/x/packages/core/src/knowledge/sync_calendar.ts @@ -5,6 +5,7 @@ import { OAuth2Client } from 'google-auth-library'; import { NodeHtmlMarkdown } from 'node-html-markdown' import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; +import { serviceLogger } from '../services/service_logger.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'calendar_sync'); @@ -14,6 +15,14 @@ const REQUIRED_SCOPES = [ 'https://www.googleapis.com/auth/calendar.events.readonly', 'https://www.googleapis.com/auth/drive.readonly' ]; +const MAX_EVENT_ITEMS = 50; + +function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} const nhm = new NodeHtmlMarkdown(); @@ -49,10 +58,11 @@ function cleanFilename(name: string): string { // --- Sync Logic --- -function cleanUpOldFiles(currentEventIds: Set, syncDir: string) { - if (!fs.existsSync(syncDir)) return; +function cleanUpOldFiles(currentEventIds: Set, syncDir: string): string[] { + if (!fs.existsSync(syncDir)) return []; const files = fs.readdirSync(syncDir); + const deleted: string[] = []; for (const filename of files) { if (filename === 'sync_state.json') continue; @@ -79,36 +89,49 @@ function cleanUpOldFiles(currentEventIds: Set, syncDir: string) { try { fs.unlinkSync(path.join(syncDir, filename)); console.log(`Removed old/out-of-window file: ${filename}`); + deleted.push(filename); } catch (e) { console.error(`Error deleting file ${filename}:`, e); } } } + return deleted; } -async function saveEvent(event: cal.Schema$Event, syncDir: string): Promise { +async function saveEvent(event: cal.Schema$Event, syncDir: string): Promise<{ changed: boolean; isNew: boolean; title: string }> { const eventId = event.id; - if (!eventId) return false; + if (!eventId) return { changed: false, isNew: false, title: 'Unknown' }; const filePath = path.join(syncDir, `${eventId}.json`); + const content = JSON.stringify(event, null, 2); + const exists = fs.existsSync(filePath); try { - fs.writeFileSync(filePath, JSON.stringify(event, null, 2)); - return true; + if (exists) { + const existing = fs.readFileSync(filePath, 'utf-8'); + if (existing === content) { + return { changed: false, isNew: false, title: event.summary || eventId }; + } + } + + fs.writeFileSync(filePath, content); + return { changed: true, isNew: !exists, title: event.summary || eventId }; } catch (e) { console.error(`Error saving event ${eventId}:`, e); - return false; + return { changed: false, isNew: false, title: event.summary || eventId }; } } -async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, syncDir: string) { - if (!event.attachments || event.attachments.length === 0) return; +async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, syncDir: string): Promise { + if (!event.attachments || event.attachments.length === 0) return 0; const eventId = event.id; const eventTitle = event.summary || 'Untitled'; const eventDate = event.start?.dateTime || event.start?.date || 'Unknown'; const organizer = event.organizer?.email || 'Unknown'; + let savedCount = 0; + for (const att of event.attachments) { // We only care about Google Docs if (att.mimeType === 'application/vnd.google-apps.document') { @@ -145,12 +168,14 @@ async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, s ].join('\n'); fs.writeFileSync(filePath, frontmatter + md); + savedCount++; console.log(`Synced Note: ${att.title} for event ${eventTitle}`); } catch (e) { console.error(`Failed to download note ${att.title}:`, e); } } } + return savedCount; } async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackDays: number) { @@ -167,6 +192,26 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD const calendar = google.calendar({ version: 'v3', auth }); const drive = google.drive({ version: 'v3', auth }); + let runId: string | null = null; + let runStartedAt = 0; + let newCount = 0; + let updatedCount = 0; + let deletedCount = 0; + let attachmentCount = 0; + const changedTitles: string[] = []; + + const ensureRun = async () => { + if (!runId) { + const run = await serviceLogger.startRun({ + service: 'calendar', + message: 'Syncing calendar', + trigger: 'timer', + }); + runId = run.runId; + runStartedAt = run.startedAt; + } + }; + try { const res = await calendar.events.list({ calendarId: 'primary', @@ -185,17 +230,90 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD console.log(`Found ${events.length} events.`); for (const event of events) { if (event.id) { - await saveEvent(event, syncDir); - await processAttachments(drive, event, syncDir); + const result = await saveEvent(event, syncDir); + const attachmentsSaved = await processAttachments(drive, event, syncDir); currentEventIds.add(event.id); + + if (result.changed) { + await ensureRun(); + changedTitles.push(result.title); + if (result.isNew) { + newCount++; + } else { + updatedCount++; + } + } + + if (attachmentsSaved > 0) { + await ensureRun(); + attachmentCount += attachmentsSaved; + } } } } - cleanUpOldFiles(currentEventIds, syncDir); + const deletedFiles = cleanUpOldFiles(currentEventIds, syncDir); + if (deletedFiles.length > 0) { + await ensureRun(); + deletedCount = deletedFiles.length; + } + + if (runId) { + const totalChanges = newCount + updatedCount + deletedCount + attachmentCount; + const limitedTitles = limitEventItems(changedTitles); + await serviceLogger.log({ + type: 'changes_identified', + service: 'calendar', + runId, + level: 'info', + message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`, + counts: { + newEvents: newCount, + updatedEvents: updatedCount, + deletedFiles: deletedCount, + attachments: attachmentCount, + }, + items: limitedTitles.items, + truncated: limitedTitles.truncated, + }); + await serviceLogger.log({ + type: 'run_complete', + service: 'calendar', + runId, + level: 'info', + message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`, + durationMs: Date.now() - runStartedAt, + outcome: 'ok', + summary: { + newEvents: newCount, + updatedEvents: updatedCount, + deletedFiles: deletedCount, + attachments: attachmentCount, + }, + }); + } } catch (error) { console.error("An error occurred during calendar sync:", error); + if (runId) { + await serviceLogger.log({ + type: 'error', + service: 'calendar', + runId, + level: 'error', + message: 'Calendar sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: 'calendar', + runId, + level: 'error', + message: 'Calendar sync failed', + durationMs: Date.now() - runStartedAt, + outcome: 'error', + }); + } // If 401, clear tokens to force re-auth next run const e = error as { response?: { status?: number } }; if (e.response?.status === 401) { @@ -256,4 +374,4 @@ export async function init() { console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); await interruptibleSleep(SYNC_INTERVAL_MS); } -} \ No newline at end of file +} diff --git a/apps/x/packages/core/src/knowledge/sync_fireflies.ts b/apps/x/packages/core/src/knowledge/sync_fireflies.ts index e65529f6..093ee044 100644 --- a/apps/x/packages/core/src/knowledge/sync_fireflies.ts +++ b/apps/x/packages/core/src/knowledge/sync_fireflies.ts @@ -2,6 +2,7 @@ import fs from 'fs'; import path from 'path'; import { WorkDir } from '../config/config.js'; import { FirefliesClientFactory } from './fireflies-client-factory.js'; +import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'fireflies_transcripts'); @@ -11,6 +12,14 @@ const LOOKBACK_DAYS = 30; // Last 1 month const API_DELAY_MS = 2000; // 2 second delay between API calls const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit const MAX_RETRIES = 3; // Maximum retries for rate-limited requests +const MAX_EVENT_ITEMS = 50; + +function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} // --- Wake Signal for Immediate Sync Trigger --- let wakeResolve: (() => void) | null = null; @@ -414,6 +423,8 @@ async function syncMeetings() { console.log(`[Fireflies] Fetching meetings from ${fromDateStr} to ${toDateStr}...`); + let run: ServiceRunContext | null = null; + try { // Step 1: Get list of transcripts with rate limiting const transcriptsResult = await callWithRateLimit( @@ -456,6 +467,31 @@ async function syncMeetings() { } console.log(`[Fireflies] Found ${meetings.length} transcripts`); + + const newMeetings = meetings.filter(m => m.id && !syncedIds.has(m.id)); + if (newMeetings.length === 0) { + console.log('[Fireflies] No new transcripts to sync'); + saveState(toDateStr, Array.from(syncedIds), new Date().toISOString()); + return; + } + + run = await serviceLogger.startRun({ + service: 'fireflies', + message: 'Syncing Fireflies transcripts', + trigger: 'timer', + }); + const meetingTitles = newMeetings.map(m => m.title || m.id); + const limitedTitles = limitEventItems(meetingTitles); + await serviceLogger.log({ + type: 'changes_identified', + service: run.service, + runId: run.runId, + level: 'info', + message: `Found ${newMeetings.length} new transcript${newMeetings.length === 1 ? '' : 's'}`, + counts: { transcripts: newMeetings.length }, + items: limitedTitles.items, + truncated: limitedTitles.truncated, + }); // Step 2: Fetch and save each transcript let newCount = 0; @@ -559,9 +595,39 @@ async function syncMeetings() { // Save state with updated timestamp saveState(toDateStr, Array.from(syncedIds), new Date().toISOString()); + + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: 'info', + message: `Fireflies sync complete: ${newCount} transcript${newCount === 1 ? '' : 's'}`, + durationMs: Date.now() - run.startedAt, + outcome: newCount > 0 ? 'ok' : 'error', + summary: { transcripts: newCount }, + }); } catch (error) { console.error('[Fireflies] Error during sync:', error); + if (run) { + await serviceLogger.log({ + type: 'error', + service: run.service, + runId: run.runId, + level: 'error', + message: 'Fireflies sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: 'error', + message: 'Fireflies sync failed', + durationMs: Date.now() - run.startedAt, + outcome: 'error', + }); + } // Check if it's an auth error const errorMessage = error instanceof Error ? error.message : String(error); @@ -600,4 +666,3 @@ export async function init() { await interruptibleSleep(SYNC_INTERVAL_MS); } } - diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index d1782a96..000c5f10 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -5,11 +5,20 @@ import { NodeHtmlMarkdown } from 'node-html-markdown' import { OAuth2Client } from 'google-auth-library'; import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; +import { serviceLogger } from '../services/service_logger.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; +const MAX_EVENT_ITEMS = 50; + +function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } { + if (items.length <= max) { + return { items, truncated: false }; + } + return { items: items.slice(0, max), truncated: true }; +} const nhm = new NodeHtmlMarkdown(); @@ -200,6 +209,7 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str const profile = await gmail.users.getProfile({ userId: 'me' }); const currentHistoryId = profile.data.historyId!; + const threadIds: string[] = []; let pageToken: string | undefined; do { const res = await gmail.users.threads.list({ @@ -211,13 +221,52 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str const threads = res.data.threads; if (threads) { for (const thread of threads) { - await processThread(auth, thread.id!, syncDir, attachmentsDir); + if (thread.id) { + threadIds.push(thread.id); + } } } pageToken = res.data.nextPageToken ?? undefined; } while (pageToken); + if (threadIds.length === 0) { + saveState(currentHistoryId, stateFile); + console.log("Full sync complete. No threads found."); + return; + } + + const run = await serviceLogger.startRun({ + service: 'gmail', + message: 'Syncing Gmail', + trigger: 'timer', + }); + const limitedThreads = limitEventItems(threadIds); + await serviceLogger.log({ + type: 'changes_identified', + service: run.service, + runId: run.runId, + level: 'info', + message: `Found ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'} to sync`, + counts: { threads: threadIds.length }, + items: limitedThreads.items, + truncated: limitedThreads.truncated, + }); + + for (const threadId of threadIds) { + await processThread(auth, threadId, syncDir, attachmentsDir); + } + saveState(currentHistoryId, stateFile); + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: 'info', + message: `Gmail sync complete: ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'}`, + durationMs: Date.now() - run.startedAt, + outcome: 'ok', + summary: { threads: threadIds.length }, + }); console.log("Full sync complete."); } @@ -253,12 +302,46 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: } } - for (const tid of threadIds) { + if (threadIds.size === 0) { + const profile = await gmail.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile); + return; + } + + const run = await serviceLogger.startRun({ + service: 'gmail', + message: 'Syncing Gmail', + trigger: 'timer', + }); + const threadIdList = Array.from(threadIds); + const limitedThreads = limitEventItems(threadIdList); + await serviceLogger.log({ + type: 'changes_identified', + service: run.service, + runId: run.runId, + level: 'info', + message: `Found ${threadIdList.length} new thread${threadIdList.length === 1 ? '' : 's'}`, + counts: { threads: threadIdList.length }, + items: limitedThreads.items, + truncated: limitedThreads.truncated, + }); + + for (const tid of threadIdList) { await processThread(auth, tid, syncDir, attachmentsDir); } const profile = await gmail.users.getProfile({ userId: 'me' }); saveState(profile.data.historyId!, stateFile); + await serviceLogger.log({ + type: 'run_complete', + service: run.service, + runId: run.runId, + level: 'info', + message: `Gmail sync complete: ${threadIdList.length} thread${threadIdList.length === 1 ? '' : 's'}`, + durationMs: Date.now() - run.startedAt, + outcome: 'ok', + summary: { threads: threadIdList.length }, + }); } catch (error: unknown) { const e = error as { response?: { status?: number } }; diff --git a/apps/x/packages/core/src/services/service_bus.ts b/apps/x/packages/core/src/services/service_bus.ts new file mode 100644 index 00000000..66b7c662 --- /dev/null +++ b/apps/x/packages/core/src/services/service_bus.ts @@ -0,0 +1,24 @@ +import type { ServiceEventType } from "@x/shared/dist/service-events.js"; + +type ServiceEventHandler = (event: ServiceEventType) => Promise | void; + +export class ServiceBus { + private subscribers: ServiceEventHandler[] = []; + + async publish(event: ServiceEventType): Promise { + const pending = this.subscribers.map(async (handler) => handler(event)); + await Promise.all(pending); + } + + async subscribe(handler: ServiceEventHandler): Promise<() => void> { + this.subscribers.push(handler); + return () => { + const idx = this.subscribers.indexOf(handler); + if (idx >= 0) { + this.subscribers.splice(idx, 1); + } + }; + } +} + +export const serviceBus = new ServiceBus(); diff --git a/apps/x/packages/core/src/services/service_logger.ts b/apps/x/packages/core/src/services/service_logger.ts new file mode 100644 index 00000000..50cac7e8 --- /dev/null +++ b/apps/x/packages/core/src/services/service_logger.ts @@ -0,0 +1,108 @@ +import fs from "fs"; +import fsp from "fs/promises"; +import path from "path"; +import { WorkDir } from "../config/config.js"; +import { IdGen } from "../application/lib/id-gen.js"; +import type { ServiceEventType } from "@x/shared/dist/service-events.js"; +import { serviceBus } from "./service_bus.js"; + +type ServiceNameType = ServiceEventType["service"]; +type DistributiveOmit = T extends any ? Omit : never; +type ServiceEventInput = DistributiveOmit; + +const LOG_DIR = path.join(WorkDir, "logs"); +const LOG_FILE = path.join(LOG_DIR, "services.jsonl"); +const MAX_LOG_BYTES = 10 * 1024 * 1024; + +export type ServiceRunContext = { + runId: string; + service: ServiceNameType; + startedAt: number; +}; + +function safeTimestampForFile(ts: string): string { + return ts.replace(/[:.]/g, "-"); +} + +export class ServiceLogger { + private idGen = new IdGen(); + private stream: fs.WriteStream | null = null; + private currentSize = 0; + private initialized = false; + private writeQueue: Promise = Promise.resolve(); + + private async ensureReady(): Promise { + if (this.initialized) return; + await fsp.mkdir(LOG_DIR, { recursive: true }); + try { + const stats = await fsp.stat(LOG_FILE); + this.currentSize = stats.size; + } catch { + this.currentSize = 0; + } + this.stream = fs.createWriteStream(LOG_FILE, { flags: "a", encoding: "utf8" }); + this.initialized = true; + } + + private async rotateIfNeeded(nextBytes: number): Promise { + if (this.currentSize + nextBytes <= MAX_LOG_BYTES) return; + if (this.stream) { + this.stream.close(); + this.stream = null; + } + const ts = safeTimestampForFile(new Date().toISOString()); + const rotatedPath = path.join(LOG_DIR, `services.${ts}.jsonl`); + try { + await fsp.rename(LOG_FILE, rotatedPath); + } catch { + // Ignore if file doesn't exist or rename fails + } + this.currentSize = 0; + this.stream = fs.createWriteStream(LOG_FILE, { flags: "a", encoding: "utf8" }); + } + + async log(event: ServiceEventInput): Promise { + const payload = { + ...event, + ts: new Date().toISOString(), + } as ServiceEventType; + const line = JSON.stringify(payload) + "\n"; + const bytes = Buffer.byteLength(line, "utf8"); + + this.writeQueue = this.writeQueue.then(async () => { + await this.ensureReady(); + await this.rotateIfNeeded(bytes); + this.stream?.write(line); + this.currentSize += bytes; + try { + await serviceBus.publish(payload); + } catch { + // Ignore publish errors to avoid blocking log writes + } + }); + + return this.writeQueue; + } + + async startRun(opts: { + service: ServiceNameType; + message: string; + trigger?: "timer" | "manual" | "startup"; + config?: Record; + }): Promise { + const runId = `${opts.service}_${await this.idGen.next()}`; + const startedAt = Date.now(); + await this.log({ + type: "run_start", + service: opts.service, + runId, + level: "info", + message: opts.message, + trigger: opts.trigger, + config: opts.config, + }); + return { runId, service: opts.service, startedAt }; + } +} + +export const serviceLogger = new ServiceLogger(); diff --git a/apps/x/packages/shared/src/index.ts b/apps/x/packages/shared/src/index.ts index 5d54883f..4f10fc82 100644 --- a/apps/x/packages/shared/src/index.ts +++ b/apps/x/packages/shared/src/index.ts @@ -6,4 +6,5 @@ export * as workspace from './workspace.js'; export * as mcp from './mcp.js'; export * as agentSchedule from './agent-schedule.js'; export * as agentScheduleState from './agent-schedule-state.js'; +export * as serviceEvents from './service-events.js'; export { PrefixLogger }; diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index 767de9a0..67b29a28 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -5,6 +5,7 @@ import { AskHumanResponsePayload, CreateRunOptions, Run, ListRunsResponse, ToolP import { LlmModelConfig } from './models.js'; import { AgentScheduleConfig, AgentScheduleEntry } from './agent-schedule.js'; import { AgentScheduleState } from './agent-schedule-state.js'; +import { ServiceEvent } from './service-events.js'; // ============================================================================ // Runtime Validation Schemas (Single Source of Truth) @@ -176,6 +177,10 @@ const ipcSchemas = { req: z.null(), res: z.null(), }, + 'services:events': { + req: ServiceEvent, + res: z.null(), + }, 'models:list': { req: z.null(), res: z.object({ diff --git a/apps/x/packages/shared/src/service-events.ts b/apps/x/packages/shared/src/service-events.ts new file mode 100644 index 00000000..d214472c --- /dev/null +++ b/apps/x/packages/shared/src/service-events.ts @@ -0,0 +1,65 @@ +import z from 'zod'; + +export const ServiceName = z.enum([ + 'graph', + 'gmail', + 'calendar', + 'fireflies', + 'granola', + 'voice_memo', +]); + +const ServiceEventBase = z.object({ + service: ServiceName, + runId: z.string(), + ts: z.iso.datetime(), + level: z.enum(['info', 'warn', 'error']), + message: z.string(), +}); + +export const ServiceRunStartEvent = ServiceEventBase.extend({ + type: z.literal('run_start'), + trigger: z.enum(['timer', 'manual', 'startup']).optional(), + config: z.record(z.string(), z.unknown()).optional(), +}); + +export const ServiceChangesIdentifiedEvent = ServiceEventBase.extend({ + type: z.literal('changes_identified'), + counts: z.record(z.string(), z.number()).optional(), + items: z.array(z.string()).optional(), + truncated: z.boolean().optional(), +}); + +export const ServiceProgressEvent = ServiceEventBase.extend({ + type: z.literal('progress'), + step: z.string().optional(), + current: z.number().optional(), + total: z.number().optional(), + details: z.record(z.string(), z.unknown()).optional(), +}); + +export const ServiceRunCompleteEvent = ServiceEventBase.extend({ + type: z.literal('run_complete'), + durationMs: z.number(), + outcome: z.enum(['ok', 'idle', 'skipped', 'error']), + summary: z.record(z.string(), z.union([z.string(), z.number(), z.boolean()])).optional(), + items: z.array(z.string()).optional(), + truncated: z.boolean().optional(), +}); + +export const ServiceErrorEvent = ServiceEventBase.extend({ + type: z.literal('error'), + error: z.string(), + context: z.record(z.string(), z.unknown()).optional(), +}); + +export const ServiceEvent = z.union([ + ServiceRunStartEvent, + ServiceChangesIdentifiedEvent, + ServiceProgressEvent, + ServiceRunCompleteEvent, + ServiceErrorEvent, +]); + +export type ServiceNameType = z.infer; +export type ServiceEventType = z.infer;