feat: add syncing update for graph building on the UI

This commit is contained in:
tusharmagar 2026-02-05 16:05:10 +05:30
parent 6425dbcf28
commit eefc6a9700
13 changed files with 1093 additions and 163 deletions

View file

@ -4,6 +4,7 @@ import { WorkDir } from '../config/config.js';
import { autoConfigureStrictnessIfNeeded } from '../config/strictness_analyzer.js';
import { createRun, createMessage } from '../runs/runs.js';
import { bus } from '../runs/bus.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import {
loadState,
saveState,
@ -32,6 +33,23 @@ const SOURCE_FOLDERS = [
// Voice memos are now created directly in knowledge/Voice Memos/<date>/
const VOICE_MEMOS_KNOWLEDGE_DIR = path.join(NOTES_OUTPUT_DIR, 'Voice Memos');
const MAX_EVENT_ITEMS = 50;
function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } {
if (items.length <= max) {
return { items, truncated: false };
}
return { items: items.slice(0, max), truncated: true };
}
function extractPathFromToolInput(input: string): string | null {
try {
const parsed = JSON.parse(input) as { path?: string };
return typeof parsed.path === 'string' ? parsed.path : null;
} catch {
return null;
}
}
/**
* Get unprocessed voice memo files from knowledge/Voice Memos/
@ -148,7 +166,11 @@ async function waitForRunCompletion(runId: string): Promise<void> {
/**
* Run note creation agent on a batch of files to extract entities and create/update notes
*/
async function createNotesFromBatch(files: { path: string; content: string }[], batchNumber: number, knowledgeIndex: string): Promise<string> {
async function createNotesFromBatch(
files: { path: string; content: string }[],
batchNumber: number,
knowledgeIndex: string
): Promise<{ runId: string; notesCreated: Set<string>; notesModified: Set<string> }> {
// Ensure notes output directory exists
if (!fs.existsSync(NOTES_OUTPUT_DIR)) {
fs.mkdirSync(NOTES_OUTPUT_DIR, { recursive: true });
@ -182,18 +204,155 @@ async function createNotesFromBatch(files: { path: string; content: string }[],
message += `\n\n---\n\n`;
});
const notesCreated = new Set<string>();
const notesModified = new Set<string>();
const unsubscribe = await bus.subscribe(run.id, async (event) => {
if (event.type !== "tool-invocation") {
return;
}
if (event.toolName !== "workspace-writeFile" && event.toolName !== "workspace-edit") {
return;
}
const toolPath = extractPathFromToolInput(event.input);
if (!toolPath) {
return;
}
if (event.toolName === "workspace-writeFile") {
notesCreated.add(toolPath);
} else if (event.toolName === "workspace-edit") {
notesModified.add(toolPath);
}
});
await createMessage(run.id, message);
// Wait for the run to complete
await waitForRunCompletion(run.id);
unsubscribe();
return run.id;
return { runId: run.id, notesCreated, notesModified };
}
/**
* Build the knowledge graph from all content files in the specified source directory
* Only processes new or changed files based on state tracking
*/
type BatchResult = {
processedFiles: string[];
notesCreated: Set<string>;
notesModified: Set<string>;
hadError: boolean;
};
async function buildGraphWithFiles(
sourceDir: string,
filesToProcess: string[],
state: GraphState,
run?: ServiceRunContext
): Promise<BatchResult> {
console.log(`[buildGraph] Starting build for directory: ${sourceDir}`);
if (filesToProcess.length === 0) {
console.log(`[buildGraph] No new or changed files to process in ${path.basename(sourceDir)}`);
return { processedFiles: [], notesCreated: new Set(), notesModified: new Set(), hadError: false };
}
console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`);
// Read file contents
const contentFiles = await readFileContents(filesToProcess);
if (contentFiles.length === 0) {
console.log(`No files could be read from ${sourceDir}`);
return { processedFiles: [], notesCreated: new Set(), notesModified: new Set(), hadError: false };
}
const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing
const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE);
console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`);
const processedFiles: string[] = [];
const notesCreated = new Set<string>();
const notesModified = new Set<string>();
let hadError = false;
// Process files in batches
for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) {
const batch = contentFiles.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
try {
// Build fresh index before each batch to include notes from previous batches
console.log(`Building knowledge index for batch ${batchNumber}...`);
const indexStartTime = Date.now();
const index = buildKnowledgeIndex();
const indexForPrompt = formatIndexForPrompt(index);
const indexDuration = ((Date.now() - indexStartTime) / 1000).toFixed(2);
console.log(`Index built in ${indexDuration}s: ${index.people.length} people, ${index.organizations.length} orgs, ${index.projects.length} projects, ${index.topics.length} topics, ${index.other.length} other`);
console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`);
if (run) {
await serviceLogger.log({
type: 'progress',
service: run.service,
runId: run.runId,
level: 'info',
message: `Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)`,
step: 'batch',
current: batchNumber,
total: totalBatches,
details: { filesInBatch: batch.length },
});
}
const agentStartTime = Date.now();
const batchResult = await createNotesFromBatch(batch, batchNumber, indexForPrompt);
const agentDuration = ((Date.now() - agentStartTime) / 1000).toFixed(2);
console.log(`Batch ${batchNumber}/${totalBatches} complete in ${agentDuration}s`);
for (const note of batchResult.notesCreated) {
notesCreated.add(note);
}
for (const note of batchResult.notesModified) {
notesModified.add(note);
}
// Mark files in this batch as processed
for (const file of batch) {
markFileAsProcessed(file.path, state);
processedFiles.push(file.path);
}
// Save state after each successful batch
// This ensures partial progress is saved even if later batches fail
saveState(state);
} catch (error) {
hadError = true;
console.error(`Error processing batch ${batchNumber}:`, error);
if (run) {
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
context: { batchNumber },
});
}
// Continue with next batch (without saving state for failed batch)
}
}
// Update state with last build time and save
state.lastBuildTime = new Date().toISOString();
saveState(state);
console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`);
return { processedFiles, notesCreated, notesModified, hadError };
}
export async function buildGraph(sourceDir: string): Promise<void> {
console.log(`[buildGraph] Starting build for directory: ${sourceDir}`);
@ -210,62 +369,7 @@ export async function buildGraph(sourceDir: string): Promise<void> {
return;
}
console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`);
// Read file contents
const contentFiles = await readFileContents(filesToProcess);
if (contentFiles.length === 0) {
console.log(`No files could be read from ${sourceDir}`);
return;
}
const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing
const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE);
console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`);
// Process files in batches
const processedFiles: string[] = [];
for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) {
const batch = contentFiles.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
try {
// Build fresh index before each batch to include notes from previous batches
console.log(`Building knowledge index for batch ${batchNumber}...`);
const indexStartTime = Date.now();
const index = buildKnowledgeIndex();
const indexForPrompt = formatIndexForPrompt(index);
const indexDuration = ((Date.now() - indexStartTime) / 1000).toFixed(2);
console.log(`Index built in ${indexDuration}s: ${index.people.length} people, ${index.organizations.length} orgs, ${index.projects.length} projects, ${index.topics.length} topics, ${index.other.length} other`);
console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`);
const agentStartTime = Date.now();
await createNotesFromBatch(batch, batchNumber, indexForPrompt);
const agentDuration = ((Date.now() - agentStartTime) / 1000).toFixed(2);
console.log(`Batch ${batchNumber}/${totalBatches} complete in ${agentDuration}s`);
// Mark files in this batch as processed
for (const file of batch) {
markFileAsProcessed(file.path, state);
processedFiles.push(file.path);
}
// Save state after each successful batch
// This ensures partial progress is saved even if later batches fail
saveState(state);
} catch (error) {
console.error(`Error processing batch ${batchNumber}:`, error);
// Continue with next batch (without saving state for failed batch)
}
}
// Update state with last build time and save
state.lastBuildTime = new Date().toISOString();
saveState(state);
console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`);
await buildGraphWithFiles(sourceDir, filesToProcess, state);
}
/**
@ -287,10 +391,39 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
console.log(`[GraphBuilder] Processing ${unprocessedFiles.length} voice memo transcripts for entity extraction...`);
console.log(`[GraphBuilder] Files to process: ${unprocessedFiles.map(f => path.basename(f)).join(', ')}`);
const run = await serviceLogger.startRun({
service: 'voice_memo',
message: `Processing ${unprocessedFiles.length} voice memo${unprocessedFiles.length === 1 ? '' : 's'}`,
trigger: 'timer',
});
const relativeVoiceMemos = unprocessedFiles.map(filePath => path.relative(WorkDir, filePath));
const limitedVoiceMemos = limitEventItems(relativeVoiceMemos);
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Found ${unprocessedFiles.length} new voice memo${unprocessedFiles.length === 1 ? '' : 's'}`,
counts: { voiceMemos: unprocessedFiles.length },
items: limitedVoiceMemos.items,
truncated: limitedVoiceMemos.truncated,
});
// Read the files
const contentFiles = await readFileContents(unprocessedFiles);
if (contentFiles.length === 0) {
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: 'info',
message: 'No voice memos could be read',
durationMs: Date.now() - run.startedAt,
outcome: 'error',
summary: { processedFiles: 0 },
});
return false;
}
@ -298,6 +431,10 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
const BATCH_SIZE = 10;
const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE);
const notesCreated = new Set<string>();
const notesModified = new Set<string>();
let hadError = false;
for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) {
const batch = contentFiles.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
@ -309,9 +446,27 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
const indexForPrompt = formatIndexForPrompt(index);
console.log(`[GraphBuilder] Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`);
await createNotesFromBatch(batch, batchNumber, indexForPrompt);
await serviceLogger.log({
type: 'progress',
service: run.service,
runId: run.runId,
level: 'info',
message: `Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)`,
step: 'batch',
current: batchNumber,
total: totalBatches,
details: { filesInBatch: batch.length },
});
const batchResult = await createNotesFromBatch(batch, batchNumber, indexForPrompt);
console.log(`[GraphBuilder] Batch ${batchNumber}/${totalBatches} complete`);
for (const note of batchResult.notesCreated) {
notesCreated.add(note);
}
for (const note of batchResult.notesModified) {
notesModified.add(note);
}
// Mark files as processed
for (const file of batch) {
markFileAsProcessed(file.path, state);
@ -320,7 +475,17 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
// Save state after each batch
saveState(state);
} catch (error) {
hadError = true;
console.error(`[GraphBuilder] Error processing batch ${batchNumber}:`, error);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing voice memo batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
context: { batchNumber },
});
}
}
@ -328,6 +493,21 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
state.lastBuildTime = new Date().toISOString();
saveState(state);
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Voice memos processed: ${contentFiles.length} files, ${notesCreated.size} created, ${notesModified.size} updated`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
processedFiles: contentFiles.length,
notesCreated: notesCreated.size,
notesModified: notesModified.size,
},
});
return true;
}
@ -352,6 +532,11 @@ async function processAllSources(): Promise<void> {
console.error('[GraphBuilder] Error processing voice memos:', error);
}
const state = loadState();
const folderChanges: { folder: string; sourceDir: string; files: string[] }[] = [];
const countsByFolder: Record<string, number> = {};
const allFiles: string[] = [];
for (const folder of SOURCE_FOLDERS) {
const sourceDir = path.join(WorkDir, folder);
@ -362,14 +547,13 @@ async function processAllSources(): Promise<void> {
}
try {
// Quick check if there are any files to process before doing the full build
const state = loadState();
const filesToProcess = getFilesToProcess(sourceDir, state);
if (filesToProcess.length > 0) {
console.log(`[GraphBuilder] Found ${filesToProcess.length} new/changed files in ${folder}`);
await buildGraph(sourceDir);
anyFilesProcessed = true;
folderChanges.push({ folder, sourceDir, files: filesToProcess });
countsByFolder[folder] = filesToProcess.length;
allFiles.push(...filesToProcess);
}
} catch (error) {
console.error(`[GraphBuilder] Error processing ${folder}:`, error);
@ -377,6 +561,63 @@ async function processAllSources(): Promise<void> {
}
}
if (allFiles.length > 0) {
const run = await serviceLogger.startRun({
service: 'graph',
message: 'Syncing knowledge graph',
trigger: 'timer',
config: { sources: SOURCE_FOLDERS },
});
const relativeFiles = allFiles.map(filePath => path.relative(WorkDir, filePath));
const limitedFiles = limitEventItems(relativeFiles);
const foldersList = Object.keys(countsByFolder).join(', ');
const folderMessage = foldersList ? ` across ${foldersList}` : '';
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Found ${allFiles.length} changed file${allFiles.length === 1 ? '' : 's'}${folderMessage}`,
counts: countsByFolder,
items: limitedFiles.items,
truncated: limitedFiles.truncated,
});
const notesCreated = new Set<string>();
const notesModified = new Set<string>();
const processedFiles: string[] = [];
let hadError = false;
for (const entry of folderChanges) {
const result = await buildGraphWithFiles(entry.sourceDir, entry.files, state, run);
result.processedFiles.forEach(file => processedFiles.push(file));
result.notesCreated.forEach(note => notesCreated.add(note));
result.notesModified.forEach(note => notesModified.add(note));
if (result.hadError) {
hadError = true;
}
}
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Graph sync complete: ${processedFiles.length} files, ${notesCreated.size} created, ${notesModified.size} updated`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
processedFiles: processedFiles.length,
notesCreated: notesCreated.size,
notesModified: notesModified.size,
},
});
anyFilesProcessed = true;
}
if (!anyFilesProcessed) {
console.log('[GraphBuilder] No new content to process');
} else {

View file

@ -4,6 +4,7 @@ import { homedir } from 'os';
import { WorkDir } from '../../config/config.js';
import container from '../../di/container.js';
import { IGranolaConfigRepo } from './repo.js';
import { serviceLogger } from '../../services/service_logger.js';
import {
GetDocumentsResponse,
SyncState,
@ -22,6 +23,14 @@ const API_DELAY_MS = 1000; // 1 second delay between API calls
const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit
const MAX_RETRIES = 3; // Maximum retries for rate-limited requests
const MAX_BATCH_SIZE = 10; // Process max 10 documents per folder per sync
const MAX_EVENT_ITEMS = 50;
function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } {
if (items.length <= max) {
return { items, truncated: false };
}
return { items: items.slice(0, max), truncated: true };
}
// --- Wake Signal for Immediate Sync Trigger ---
let wakeResolve: (() => void) | null = null;
@ -325,102 +334,169 @@ function documentToMarkdown(doc: Document): string {
async function syncNotes(): Promise<void> {
console.log('[Granola] Starting sync...');
// Check if enabled
const granolaRepo = container.resolve<IGranolaConfigRepo>('granolaConfigRepo');
const config = await granolaRepo.getConfig();
if (!config.enabled) {
console.log('[Granola] Sync disabled in config');
return;
}
// Extract access token
const accessToken = extractAccessToken();
if (!accessToken) {
console.log('[Granola] No access token available');
return;
}
// Ensure sync directory exists
ensureDir(SYNC_DIR);
// Load state
const state = loadState();
let newCount = 0;
let updatedCount = 0;
let offset = 0;
let hasMore = true;
// Fetch documents with pagination
while (hasMore) {
// Delay before API call (except first)
if (offset > 0) {
await sleep(API_DELAY_MS);
let runId: string | null = null;
let runStartedAt = 0;
const ensureRun = async () => {
if (!runId) {
const run = await serviceLogger.startRun({
service: 'granola',
message: 'Syncing Granola notes',
trigger: 'timer',
});
runId = run.runId;
runStartedAt = run.startedAt;
}
};
try {
// Check if enabled
const granolaRepo = container.resolve<IGranolaConfigRepo>('granolaConfigRepo');
const config = await granolaRepo.getConfig();
if (!config.enabled) {
console.log('[Granola] Sync disabled in config');
return;
}
const docsResponse = await getDocuments(accessToken, MAX_BATCH_SIZE, offset);
if (!docsResponse) {
console.log('[Granola] Failed to fetch documents');
break;
// Extract access token
const accessToken = extractAccessToken();
if (!accessToken) {
console.log('[Granola] No access token available');
return;
}
if (docsResponse.docs.length === 0) {
console.log('[Granola] No more documents to fetch');
hasMore = false;
break;
}
// Ensure sync directory exists
ensureDir(SYNC_DIR);
// Process each document
for (const doc of docsResponse.docs) {
const docUpdatedAt = doc.updated_at || doc.created_at;
const lastSyncedAt = state.syncedDocs[doc.id];
// Load state
const state = loadState();
// Check if needs sync (new or updated)
const needsSync = !lastSyncedAt || lastSyncedAt !== docUpdatedAt;
let newCount = 0;
let updatedCount = 0;
let offset = 0;
let hasMore = true;
const changedTitles: string[] = [];
if (!needsSync) {
continue;
// Fetch documents with pagination
while (hasMore) {
// Delay before API call (except first)
if (offset > 0) {
await sleep(API_DELAY_MS);
}
// Convert to markdown and save
const markdown = documentToMarkdown(doc);
const docTitle = doc.title || 'Untitled';
const filename = `${doc.id}_${cleanFilename(docTitle)}.md`;
const filePath = path.join(SYNC_DIR, filename);
fs.writeFileSync(filePath, markdown);
if (lastSyncedAt) {
console.log(`[Granola] Updated: ${filename}`);
updatedCount++;
} else {
console.log(`[Granola] Saved: ${filename}`);
newCount++;
const docsResponse = await getDocuments(accessToken, MAX_BATCH_SIZE, offset);
if (!docsResponse) {
console.log('[Granola] Failed to fetch documents');
break;
}
// Update state
state.syncedDocs[doc.id] = docUpdatedAt;
if (docsResponse.docs.length === 0) {
console.log('[Granola] No more documents to fetch');
hasMore = false;
break;
}
// Process each document
for (const doc of docsResponse.docs) {
const docUpdatedAt = doc.updated_at || doc.created_at;
const lastSyncedAt = state.syncedDocs[doc.id];
// Check if needs sync (new or updated)
const needsSync = !lastSyncedAt || lastSyncedAt !== docUpdatedAt;
if (!needsSync) {
continue;
}
await ensureRun();
const docTitle = doc.title || 'Untitled';
changedTitles.push(docTitle);
// Convert to markdown and save
const markdown = documentToMarkdown(doc);
const filename = `${doc.id}_${cleanFilename(docTitle)}.md`;
const filePath = path.join(SYNC_DIR, filename);
fs.writeFileSync(filePath, markdown);
if (lastSyncedAt) {
console.log(`[Granola] Updated: ${filename}`);
updatedCount++;
} else {
console.log(`[Granola] Saved: ${filename}`);
newCount++;
}
// Update state
state.syncedDocs[doc.id] = docUpdatedAt;
}
// Move to next page
offset += docsResponse.docs.length;
// Stop if we got fewer docs than requested (last page)
if (docsResponse.docs.length < MAX_BATCH_SIZE) {
hasMore = false;
}
}
// Move to next page
offset += docsResponse.docs.length;
// Save state
state.lastSyncDate = new Date().toISOString();
saveState(state);
// Stop if we got fewer docs than requested (last page)
if (docsResponse.docs.length < MAX_BATCH_SIZE) {
hasMore = false;
console.log(`[Granola] Sync complete: ${newCount} new, ${updatedCount} updated`);
if (runId) {
const totalChanges = newCount + updatedCount;
const limitedTitles = limitEventItems(changedTitles);
await serviceLogger.log({
type: 'changes_identified',
service: 'granola',
runId,
level: 'info',
message: `Granola updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
counts: { newNotes: newCount, updatedNotes: updatedCount },
items: limitedTitles.items,
truncated: limitedTitles.truncated,
});
await serviceLogger.log({
type: 'run_complete',
service: 'granola',
runId,
level: 'info',
message: `Granola sync complete: ${newCount} new, ${updatedCount} updated`,
durationMs: Date.now() - runStartedAt,
outcome: 'ok',
summary: { newNotes: newCount, updatedNotes: updatedCount },
});
}
}
// Save state
state.lastSyncDate = new Date().toISOString();
saveState(state);
console.log(`[Granola] Sync complete: ${newCount} new, ${updatedCount} updated`);
// Build knowledge graph if there were changes
if (newCount > 0 || updatedCount > 0) {
// Graph building is now handled by the independent graph builder service
// Build knowledge graph if there were changes
if (newCount > 0 || updatedCount > 0) {
// Graph building is now handled by the independent graph builder service
}
} catch (error) {
console.error('[Granola] Error in sync:', error);
if (runId) {
await serviceLogger.log({
type: 'error',
service: 'granola',
runId,
level: 'error',
message: 'Granola sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: 'granola',
runId,
level: 'error',
message: 'Granola sync failed',
durationMs: Date.now() - runStartedAt,
outcome: 'error',
});
}
throw error;
}
}
@ -443,4 +519,3 @@ export async function init(): Promise<void> {
await interruptibleSleep(SYNC_INTERVAL_MS);
}
}

View file

@ -5,6 +5,7 @@ import { OAuth2Client } from 'google-auth-library';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { WorkDir } from '../config/config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { serviceLogger } from '../services/service_logger.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
@ -14,6 +15,14 @@ const REQUIRED_SCOPES = [
'https://www.googleapis.com/auth/calendar.events.readonly',
'https://www.googleapis.com/auth/drive.readonly'
];
const MAX_EVENT_ITEMS = 50;
function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } {
if (items.length <= max) {
return { items, truncated: false };
}
return { items: items.slice(0, max), truncated: true };
}
const nhm = new NodeHtmlMarkdown();
@ -49,10 +58,11 @@ function cleanFilename(name: string): string {
// --- Sync Logic ---
function cleanUpOldFiles(currentEventIds: Set<string>, syncDir: string) {
if (!fs.existsSync(syncDir)) return;
function cleanUpOldFiles(currentEventIds: Set<string>, syncDir: string): string[] {
if (!fs.existsSync(syncDir)) return [];
const files = fs.readdirSync(syncDir);
const deleted: string[] = [];
for (const filename of files) {
if (filename === 'sync_state.json') continue;
@ -79,36 +89,49 @@ function cleanUpOldFiles(currentEventIds: Set<string>, syncDir: string) {
try {
fs.unlinkSync(path.join(syncDir, filename));
console.log(`Removed old/out-of-window file: ${filename}`);
deleted.push(filename);
} catch (e) {
console.error(`Error deleting file ${filename}:`, e);
}
}
}
return deleted;
}
async function saveEvent(event: cal.Schema$Event, syncDir: string): Promise<boolean> {
async function saveEvent(event: cal.Schema$Event, syncDir: string): Promise<{ changed: boolean; isNew: boolean; title: string }> {
const eventId = event.id;
if (!eventId) return false;
if (!eventId) return { changed: false, isNew: false, title: 'Unknown' };
const filePath = path.join(syncDir, `${eventId}.json`);
const content = JSON.stringify(event, null, 2);
const exists = fs.existsSync(filePath);
try {
fs.writeFileSync(filePath, JSON.stringify(event, null, 2));
return true;
if (exists) {
const existing = fs.readFileSync(filePath, 'utf-8');
if (existing === content) {
return { changed: false, isNew: false, title: event.summary || eventId };
}
}
fs.writeFileSync(filePath, content);
return { changed: true, isNew: !exists, title: event.summary || eventId };
} catch (e) {
console.error(`Error saving event ${eventId}:`, e);
return false;
return { changed: false, isNew: false, title: event.summary || eventId };
}
}
async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, syncDir: string) {
if (!event.attachments || event.attachments.length === 0) return;
async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, syncDir: string): Promise<number> {
if (!event.attachments || event.attachments.length === 0) return 0;
const eventId = event.id;
const eventTitle = event.summary || 'Untitled';
const eventDate = event.start?.dateTime || event.start?.date || 'Unknown';
const organizer = event.organizer?.email || 'Unknown';
let savedCount = 0;
for (const att of event.attachments) {
// We only care about Google Docs
if (att.mimeType === 'application/vnd.google-apps.document') {
@ -145,12 +168,14 @@ async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, s
].join('\n');
fs.writeFileSync(filePath, frontmatter + md);
savedCount++;
console.log(`Synced Note: ${att.title} for event ${eventTitle}`);
} catch (e) {
console.error(`Failed to download note ${att.title}:`, e);
}
}
}
return savedCount;
}
async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackDays: number) {
@ -167,6 +192,26 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
const calendar = google.calendar({ version: 'v3', auth });
const drive = google.drive({ version: 'v3', auth });
let runId: string | null = null;
let runStartedAt = 0;
let newCount = 0;
let updatedCount = 0;
let deletedCount = 0;
let attachmentCount = 0;
const changedTitles: string[] = [];
const ensureRun = async () => {
if (!runId) {
const run = await serviceLogger.startRun({
service: 'calendar',
message: 'Syncing calendar',
trigger: 'timer',
});
runId = run.runId;
runStartedAt = run.startedAt;
}
};
try {
const res = await calendar.events.list({
calendarId: 'primary',
@ -185,17 +230,90 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
console.log(`Found ${events.length} events.`);
for (const event of events) {
if (event.id) {
await saveEvent(event, syncDir);
await processAttachments(drive, event, syncDir);
const result = await saveEvent(event, syncDir);
const attachmentsSaved = await processAttachments(drive, event, syncDir);
currentEventIds.add(event.id);
if (result.changed) {
await ensureRun();
changedTitles.push(result.title);
if (result.isNew) {
newCount++;
} else {
updatedCount++;
}
}
if (attachmentsSaved > 0) {
await ensureRun();
attachmentCount += attachmentsSaved;
}
}
}
}
cleanUpOldFiles(currentEventIds, syncDir);
const deletedFiles = cleanUpOldFiles(currentEventIds, syncDir);
if (deletedFiles.length > 0) {
await ensureRun();
deletedCount = deletedFiles.length;
}
if (runId) {
const totalChanges = newCount + updatedCount + deletedCount + attachmentCount;
const limitedTitles = limitEventItems(changedTitles);
await serviceLogger.log({
type: 'changes_identified',
service: 'calendar',
runId,
level: 'info',
message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
counts: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
attachments: attachmentCount,
},
items: limitedTitles.items,
truncated: limitedTitles.truncated,
});
await serviceLogger.log({
type: 'run_complete',
service: 'calendar',
runId,
level: 'info',
message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
durationMs: Date.now() - runStartedAt,
outcome: 'ok',
summary: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
attachments: attachmentCount,
},
});
}
} catch (error) {
console.error("An error occurred during calendar sync:", error);
if (runId) {
await serviceLogger.log({
type: 'error',
service: 'calendar',
runId,
level: 'error',
message: 'Calendar sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: 'calendar',
runId,
level: 'error',
message: 'Calendar sync failed',
durationMs: Date.now() - runStartedAt,
outcome: 'error',
});
}
// If 401, clear tokens to force re-auth next run
const e = error as { response?: { status?: number } };
if (e.response?.status === 401) {
@ -256,4 +374,4 @@ export async function init() {
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await interruptibleSleep(SYNC_INTERVAL_MS);
}
}
}

View file

@ -2,6 +2,7 @@ import fs from 'fs';
import path from 'path';
import { WorkDir } from '../config/config.js';
import { FirefliesClientFactory } from './fireflies-client-factory.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'fireflies_transcripts');
@ -11,6 +12,14 @@ const LOOKBACK_DAYS = 30; // Last 1 month
const API_DELAY_MS = 2000; // 2 second delay between API calls
const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit
const MAX_RETRIES = 3; // Maximum retries for rate-limited requests
const MAX_EVENT_ITEMS = 50;
function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } {
if (items.length <= max) {
return { items, truncated: false };
}
return { items: items.slice(0, max), truncated: true };
}
// --- Wake Signal for Immediate Sync Trigger ---
let wakeResolve: (() => void) | null = null;
@ -414,6 +423,8 @@ async function syncMeetings() {
console.log(`[Fireflies] Fetching meetings from ${fromDateStr} to ${toDateStr}...`);
let run: ServiceRunContext | null = null;
try {
// Step 1: Get list of transcripts with rate limiting
const transcriptsResult = await callWithRateLimit(
@ -456,6 +467,31 @@ async function syncMeetings() {
}
console.log(`[Fireflies] Found ${meetings.length} transcripts`);
const newMeetings = meetings.filter(m => m.id && !syncedIds.has(m.id));
if (newMeetings.length === 0) {
console.log('[Fireflies] No new transcripts to sync');
saveState(toDateStr, Array.from(syncedIds), new Date().toISOString());
return;
}
run = await serviceLogger.startRun({
service: 'fireflies',
message: 'Syncing Fireflies transcripts',
trigger: 'timer',
});
const meetingTitles = newMeetings.map(m => m.title || m.id);
const limitedTitles = limitEventItems(meetingTitles);
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Found ${newMeetings.length} new transcript${newMeetings.length === 1 ? '' : 's'}`,
counts: { transcripts: newMeetings.length },
items: limitedTitles.items,
truncated: limitedTitles.truncated,
});
// Step 2: Fetch and save each transcript
let newCount = 0;
@ -559,9 +595,39 @@ async function syncMeetings() {
// Save state with updated timestamp
saveState(toDateStr, Array.from(syncedIds), new Date().toISOString());
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: 'info',
message: `Fireflies sync complete: ${newCount} transcript${newCount === 1 ? '' : 's'}`,
durationMs: Date.now() - run.startedAt,
outcome: newCount > 0 ? 'ok' : 'error',
summary: { transcripts: newCount },
});
} catch (error) {
console.error('[Fireflies] Error during sync:', error);
if (run) {
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: 'Fireflies sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: 'error',
message: 'Fireflies sync failed',
durationMs: Date.now() - run.startedAt,
outcome: 'error',
});
}
// Check if it's an auth error
const errorMessage = error instanceof Error ? error.message : String(error);
@ -600,4 +666,3 @@ export async function init() {
await interruptibleSleep(SYNC_INTERVAL_MS);
}
}

View file

@ -5,11 +5,20 @@ import { NodeHtmlMarkdown } from 'node-html-markdown'
import { OAuth2Client } from 'google-auth-library';
import { WorkDir } from '../config/config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { serviceLogger } from '../services/service_logger.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
const MAX_EVENT_ITEMS = 50;
function limitEventItems(items: string[], max: number = MAX_EVENT_ITEMS): { items: string[]; truncated: boolean } {
if (items.length <= max) {
return { items, truncated: false };
}
return { items: items.slice(0, max), truncated: true };
}
const nhm = new NodeHtmlMarkdown();
@ -200,6 +209,7 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
const profile = await gmail.users.getProfile({ userId: 'me' });
const currentHistoryId = profile.data.historyId!;
const threadIds: string[] = [];
let pageToken: string | undefined;
do {
const res = await gmail.users.threads.list({
@ -211,13 +221,52 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
const threads = res.data.threads;
if (threads) {
for (const thread of threads) {
await processThread(auth, thread.id!, syncDir, attachmentsDir);
if (thread.id) {
threadIds.push(thread.id);
}
}
}
pageToken = res.data.nextPageToken ?? undefined;
} while (pageToken);
if (threadIds.length === 0) {
saveState(currentHistoryId, stateFile);
console.log("Full sync complete. No threads found.");
return;
}
const run = await serviceLogger.startRun({
service: 'gmail',
message: 'Syncing Gmail',
trigger: 'timer',
});
const limitedThreads = limitEventItems(threadIds);
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Found ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'} to sync`,
counts: { threads: threadIds.length },
items: limitedThreads.items,
truncated: limitedThreads.truncated,
});
for (const threadId of threadIds) {
await processThread(auth, threadId, syncDir, attachmentsDir);
}
saveState(currentHistoryId, stateFile);
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: 'info',
message: `Gmail sync complete: ${threadIds.length} thread${threadIds.length === 1 ? '' : 's'}`,
durationMs: Date.now() - run.startedAt,
outcome: 'ok',
summary: { threads: threadIds.length },
});
console.log("Full sync complete.");
}
@ -253,12 +302,46 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
}
}
for (const tid of threadIds) {
if (threadIds.size === 0) {
const profile = await gmail.users.getProfile({ userId: 'me' });
saveState(profile.data.historyId!, stateFile);
return;
}
const run = await serviceLogger.startRun({
service: 'gmail',
message: 'Syncing Gmail',
trigger: 'timer',
});
const threadIdList = Array.from(threadIds);
const limitedThreads = limitEventItems(threadIdList);
await serviceLogger.log({
type: 'changes_identified',
service: run.service,
runId: run.runId,
level: 'info',
message: `Found ${threadIdList.length} new thread${threadIdList.length === 1 ? '' : 's'}`,
counts: { threads: threadIdList.length },
items: limitedThreads.items,
truncated: limitedThreads.truncated,
});
for (const tid of threadIdList) {
await processThread(auth, tid, syncDir, attachmentsDir);
}
const profile = await gmail.users.getProfile({ userId: 'me' });
saveState(profile.data.historyId!, stateFile);
await serviceLogger.log({
type: 'run_complete',
service: run.service,
runId: run.runId,
level: 'info',
message: `Gmail sync complete: ${threadIdList.length} thread${threadIdList.length === 1 ? '' : 's'}`,
durationMs: Date.now() - run.startedAt,
outcome: 'ok',
summary: { threads: threadIdList.length },
});
} catch (error: unknown) {
const e = error as { response?: { status?: number } };

View file

@ -0,0 +1,24 @@
import type { ServiceEventType } from "@x/shared/dist/service-events.js";
type ServiceEventHandler = (event: ServiceEventType) => Promise<void> | void;
export class ServiceBus {
private subscribers: ServiceEventHandler[] = [];
async publish(event: ServiceEventType): Promise<void> {
const pending = this.subscribers.map(async (handler) => handler(event));
await Promise.all(pending);
}
async subscribe(handler: ServiceEventHandler): Promise<() => void> {
this.subscribers.push(handler);
return () => {
const idx = this.subscribers.indexOf(handler);
if (idx >= 0) {
this.subscribers.splice(idx, 1);
}
};
}
}
export const serviceBus = new ServiceBus();

View file

@ -0,0 +1,108 @@
import fs from "fs";
import fsp from "fs/promises";
import path from "path";
import { WorkDir } from "../config/config.js";
import { IdGen } from "../application/lib/id-gen.js";
import type { ServiceEventType } from "@x/shared/dist/service-events.js";
import { serviceBus } from "./service_bus.js";
type ServiceNameType = ServiceEventType["service"];
type DistributiveOmit<T, K extends keyof any> = T extends any ? Omit<T, K> : never;
type ServiceEventInput = DistributiveOmit<ServiceEventType, "ts">;
const LOG_DIR = path.join(WorkDir, "logs");
const LOG_FILE = path.join(LOG_DIR, "services.jsonl");
const MAX_LOG_BYTES = 10 * 1024 * 1024;
export type ServiceRunContext = {
runId: string;
service: ServiceNameType;
startedAt: number;
};
function safeTimestampForFile(ts: string): string {
return ts.replace(/[:.]/g, "-");
}
export class ServiceLogger {
private idGen = new IdGen();
private stream: fs.WriteStream | null = null;
private currentSize = 0;
private initialized = false;
private writeQueue: Promise<void> = Promise.resolve();
private async ensureReady(): Promise<void> {
if (this.initialized) return;
await fsp.mkdir(LOG_DIR, { recursive: true });
try {
const stats = await fsp.stat(LOG_FILE);
this.currentSize = stats.size;
} catch {
this.currentSize = 0;
}
this.stream = fs.createWriteStream(LOG_FILE, { flags: "a", encoding: "utf8" });
this.initialized = true;
}
private async rotateIfNeeded(nextBytes: number): Promise<void> {
if (this.currentSize + nextBytes <= MAX_LOG_BYTES) return;
if (this.stream) {
this.stream.close();
this.stream = null;
}
const ts = safeTimestampForFile(new Date().toISOString());
const rotatedPath = path.join(LOG_DIR, `services.${ts}.jsonl`);
try {
await fsp.rename(LOG_FILE, rotatedPath);
} catch {
// Ignore if file doesn't exist or rename fails
}
this.currentSize = 0;
this.stream = fs.createWriteStream(LOG_FILE, { flags: "a", encoding: "utf8" });
}
async log(event: ServiceEventInput): Promise<void> {
const payload = {
...event,
ts: new Date().toISOString(),
} as ServiceEventType;
const line = JSON.stringify(payload) + "\n";
const bytes = Buffer.byteLength(line, "utf8");
this.writeQueue = this.writeQueue.then(async () => {
await this.ensureReady();
await this.rotateIfNeeded(bytes);
this.stream?.write(line);
this.currentSize += bytes;
try {
await serviceBus.publish(payload);
} catch {
// Ignore publish errors to avoid blocking log writes
}
});
return this.writeQueue;
}
async startRun(opts: {
service: ServiceNameType;
message: string;
trigger?: "timer" | "manual" | "startup";
config?: Record<string, unknown>;
}): Promise<ServiceRunContext> {
const runId = `${opts.service}_${await this.idGen.next()}`;
const startedAt = Date.now();
await this.log({
type: "run_start",
service: opts.service,
runId,
level: "info",
message: opts.message,
trigger: opts.trigger,
config: opts.config,
});
return { runId, service: opts.service, startedAt };
}
}
export const serviceLogger = new ServiceLogger();