graph is a separate service; fireflies rate limit handling; better note agent instructions

This commit is contained in:
Arjun 2026-01-10 10:59:18 +05:30 committed by Ramnique Singh
parent 31e6eed96f
commit df93066fe0
6 changed files with 1173 additions and 522 deletions

View file

@ -20,6 +20,16 @@ import {
const NOTES_OUTPUT_DIR = path.join(WorkDir, 'notes');
const NOTE_CREATION_AGENT = 'note_creation';
// Configuration for the graph builder service
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes (reduced frequency)
const SOURCE_FOLDERS = [
'gmail_sync',
'fireflies_transcripts',
'granola_notes' // Corrected from 'granola_meetings'
];
const MAX_CONCURRENT_BATCHES = 1; // Process only 1 batch at a time to avoid overwhelming the agent
const BATCH_DELAY_MS = 5000; // 5 second delay between batches to avoid overwhelming the system
/**
* Read content for specific files
*/
@ -121,7 +131,7 @@ export async function buildGraph(sourceDir: string): Promise<void> {
return;
}
const BATCH_SIZE = 25; // Process 25 files per agent run
const BATCH_SIZE = 10; // Reduced from 25 to 10 files per agent run for faster processing
const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE);
console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`);
@ -137,6 +147,12 @@ export async function buildGraph(sourceDir: string): Promise<void> {
await createNotesFromBatch(batch, batchNumber);
console.log(`Batch ${batchNumber}/${totalBatches} complete`);
// Add delay between batches to avoid overwhelming the system
if (i + BATCH_SIZE < contentFiles.length) {
console.log(`Waiting ${BATCH_DELAY_MS/1000} seconds before next batch...`);
await new Promise(resolve => setTimeout(resolve, BATCH_DELAY_MS));
}
// Mark files in this batch as processed
for (const file of batch) {
markFileAsProcessed(file.path, state);
@ -160,11 +176,66 @@ export async function buildGraph(sourceDir: string): Promise<void> {
}
/**
* Main entry point - processes gmail_sync directory by default
* Process all configured source directories
*/
async function processAllSources(): Promise<void> {
console.log('[GraphBuilder] Checking for new content in all sources...');
let anyFilesProcessed = false;
for (const folder of SOURCE_FOLDERS) {
const sourceDir = path.join(WorkDir, folder);
// Skip if folder doesn't exist
if (!fs.existsSync(sourceDir)) {
// Don't log this every time - it's noisy
continue;
}
try {
// Quick check if there are any files to process before doing the full build
const state = loadState();
const filesToProcess = getFilesToProcess(sourceDir, state);
if (filesToProcess.length > 0) {
console.log(`[GraphBuilder] Found ${filesToProcess.length} new/changed files in ${folder}`);
await buildGraph(sourceDir);
anyFilesProcessed = true;
}
} catch (error) {
console.error(`[GraphBuilder] Error processing ${folder}:`, error);
// Continue with other folders even if one fails
}
}
if (!anyFilesProcessed) {
console.log('[GraphBuilder] No new content to process');
} else {
console.log('[GraphBuilder] Completed processing all sources');
}
}
/**
* Main entry point - runs as independent service monitoring all source folders
*/
export async function init() {
const defaultSourceDir = path.join(WorkDir, 'gmail_sync');
await buildGraph(defaultSourceDir);
console.log('[GraphBuilder] Starting Knowledge Graph Builder Service...');
console.log(`[GraphBuilder] Monitoring folders: ${SOURCE_FOLDERS.join(', ')}`);
console.log(`[GraphBuilder] Will check for new content every ${SYNC_INTERVAL_MS / 1000} seconds`);
// Initial run
await processAllSources();
// Set up periodic processing
while (true) {
await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS));
try {
await processAllSources();
} catch (error) {
console.error('[GraphBuilder] Error in main loop:', error);
}
}
}
/**

View file

@ -2,7 +2,6 @@ import fs from 'fs';
import path from 'path';
import { homedir } from 'os';
import { WorkDir } from '../../config/config.js';
import { buildGraph } from '../build_graph.js';
import container from '../../di/container.js';
import { IGranolaConfigRepo } from './repo.js';
import {
@ -314,12 +313,7 @@ async function syncNotes(): Promise<void> {
// Build knowledge graph if there were changes
if (newCount > 0 || updatedCount > 0) {
console.log('[Granola] Starting knowledge graph build...');
try {
await buildGraph(SYNC_DIR);
} catch (error) {
console.error('[Granola] Error building knowledge graph:', error);
}
// Graph building is now handled by the independent graph builder service
}
}

File diff suppressed because it is too large Load diff

View file

@ -2,13 +2,15 @@ import fs from 'fs';
import path from 'path';
import { WorkDir } from '../config/config.js';
import { FirefliesClientFactory } from './fireflies-client-factory.js';
import { buildGraph } from './build_graph.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'fireflies_transcripts');
const SYNC_INTERVAL_MS = 60 * 1000; // Check every minute
const SYNC_INTERVAL_MS = 30 * 60 * 1000; // Check every 30 minutes (reduced from 1 minute)
const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json');
const LOOKBACK_DAYS = 30; // Last 1 month
const API_DELAY_MS = 2000; // 2 second delay between API calls
const RATE_LIMIT_RETRY_DELAY_MS = 60 * 1000; // Wait 1 minute on rate limit
const MAX_RETRIES = 3; // Maximum retries for rate-limited requests
// --- Types for Fireflies API responses ---
@ -77,6 +79,56 @@ interface McpToolResult {
// --- Helper Functions ---
/**
* Sleep for a specified number of milliseconds
*/
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Execute an API call with rate limit handling and exponential backoff
*/
async function callWithRateLimit<T>(
operation: () => Promise<T>,
operationName: string
): Promise<T | null> {
let retries = 0;
let delay = RATE_LIMIT_RETRY_DELAY_MS;
while (retries < MAX_RETRIES) {
try {
const result = await operation();
return result;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Check if it's a rate limit error (429 Too Many Requests)
if (errorMessage.includes('429') ||
errorMessage.includes('Too Many Requests') ||
errorMessage.includes('too many requests') ||
errorMessage.includes('rate limit')) {
retries++;
console.log(`[Fireflies] Rate limit hit for ${operationName}. Retry ${retries}/${MAX_RETRIES} in ${delay/1000}s...`);
if (retries >= MAX_RETRIES) {
console.error(`[Fireflies] Max retries reached for ${operationName}. Skipping.`);
return null;
}
await sleep(delay);
delay *= 2; // Exponential backoff
} else {
// Not a rate limit error, throw it
throw error;
}
}
}
return null;
}
function cleanFilename(name: string): string {
return name.replace(/[\\/*?:"<>|]/g, "_").substring(0, 100).trim();
}
@ -95,7 +147,11 @@ function formatTimestamp(seconds?: number): string {
return `[${mins.toString().padStart(2, '0')}:${secs.toString().padStart(2, '0')}]`;
}
function loadState(): { lastSyncDate?: string; syncedIds?: string[] } {
function loadState(): {
lastSyncDate?: string;
syncedIds?: string[];
lastCheckTime?: string;
} {
if (fs.existsSync(STATE_FILE)) {
try {
return JSON.parse(fs.readFileSync(STATE_FILE, 'utf-8'));
@ -106,10 +162,11 @@ function loadState(): { lastSyncDate?: string; syncedIds?: string[] } {
return {};
}
function saveState(lastSyncDate: string, syncedIds: string[]) {
function saveState(lastSyncDate: string, syncedIds: string[], lastCheckTime?: string) {
fs.writeFileSync(STATE_FILE, JSON.stringify({
lastSyncDate,
syncedIds,
lastCheckTime: lastCheckTime || new Date().toISOString(),
last_sync: new Date().toISOString()
}, null, 2));
}
@ -296,46 +353,68 @@ function meetingToMarkdown(meeting: FirefliesMeetingData): string {
async function syncMeetings() {
console.log('[Fireflies] Starting sync...');
// Ensure sync directory exists
if (!fs.existsSync(SYNC_DIR)) {
fs.mkdirSync(SYNC_DIR, { recursive: true });
}
const client = await FirefliesClientFactory.getClient();
if (!client) {
console.log('[Fireflies] No valid client available');
return;
}
const state = loadState();
const syncedIds = new Set(state.syncedIds || []);
// Skip if we checked very recently (within 5 minutes)
if (state.lastCheckTime) {
const lastCheck = new Date(state.lastCheckTime);
const now = new Date();
const minutesSinceLastCheck = (now.getTime() - lastCheck.getTime()) / (1000 * 60);
if (minutesSinceLastCheck < 5) {
console.log(`[Fireflies] Skipping - last check was ${minutesSinceLastCheck.toFixed(1)} minutes ago`);
return;
}
}
// Calculate date range (last 30 days)
const toDate = new Date();
const fromDate = new Date();
fromDate.setDate(fromDate.getDate() - LOOKBACK_DAYS);
const fromDateStr = fromDate.toISOString().split('T')[0]; // YYYY-MM-DD
const toDateStr = toDate.toISOString().split('T')[0];
console.log(`[Fireflies] Fetching meetings from ${fromDateStr} to ${toDateStr}...`);
try {
// Step 1: Get list of transcripts
const transcriptsResult = await client.callTool({
name: 'fireflies_get_transcripts',
arguments: {
fromDate: fromDateStr,
toDate: toDateStr,
limit: 50,
format: 'json',
},
}) as McpToolResult;
// Step 1: Get list of transcripts with rate limiting
const transcriptsResult = await callWithRateLimit(
async () => client.callTool({
name: 'fireflies_get_transcripts',
arguments: {
fromDate: fromDateStr,
toDate: toDateStr,
limit: 50,
format: 'json',
},
}) as McpToolResult,
'get_transcripts'
);
// Handle rate-limited failure
if (!transcriptsResult) {
console.log('[Fireflies] Failed to fetch transcripts due to rate limit');
saveState(toDateStr, Array.from(syncedIds), new Date().toISOString());
return;
}
// Parse result - API returns array directly, not { transcripts: [...] }
const parsedData = parseMcpResult<FirefliesMeeting[] | { transcripts?: FirefliesMeeting[] }>(transcriptsResult);
// Handle both array and object responses
let meetings: FirefliesMeeting[];
if (Array.isArray(parsedData)) {
@ -345,10 +424,10 @@ async function syncMeetings() {
} else {
meetings = [];
}
if (meetings.length === 0) {
console.log('[Fireflies] No transcripts found in date range');
saveState(toDateStr, Array.from(syncedIds));
saveState(toDateStr, Array.from(syncedIds), new Date().toISOString());
return;
}
@ -356,44 +435,66 @@ async function syncMeetings() {
// Step 2: Fetch and save each transcript
let newCount = 0;
let processedInBatch = 0;
const MAX_BATCH_SIZE = 5; // Process max 5 new transcripts per sync to avoid rate limits
for (const meeting of meetings) {
const meetingId = meeting.id;
// Skip if already synced
if (syncedIds.has(meetingId)) {
console.log(`[Fireflies] Skipping already synced: ${meeting.title || meetingId}`);
continue;
}
// Limit batch size to avoid too many API calls
if (processedInBatch >= MAX_BATCH_SIZE) {
console.log(`[Fireflies] Reached batch limit (${MAX_BATCH_SIZE}), will continue in next sync`);
break;
}
// Add delay between API calls to respect rate limits
if (processedInBatch > 0) {
console.log(`[Fireflies] Waiting ${API_DELAY_MS/1000}s before next API call...`);
await sleep(API_DELAY_MS);
}
try {
console.log(`[Fireflies] Fetching full transcript: ${meeting.title || meetingId}`);
// Try to get transcript sentences using fireflies_get_transcript
// Try to get transcript sentences using fireflies_get_transcript with rate limiting
let sentences: FirefliesTranscriptSentence[] = [];
try {
const transcriptResult = await client.callTool({
name: 'fireflies_get_transcript',
arguments: {
transcriptId: meetingId,
},
}) as McpToolResult;
const transcriptResult = await callWithRateLimit(
async () => client.callTool({
name: 'fireflies_get_transcript',
arguments: {
transcriptId: meetingId,
},
}) as McpToolResult,
`get_transcript_${meetingId}`
);
// Try JSON first
const transcriptData = parseMcpResult<{ sentences?: FirefliesTranscriptSentence[] } | FirefliesTranscriptSentence[]>(transcriptResult);
if (transcriptData) {
if (Array.isArray(transcriptData)) {
sentences = transcriptData;
} else if (transcriptData.sentences) {
sentences = transcriptData.sentences;
if (transcriptResult) {
// Try JSON first
const transcriptData = parseMcpResult<{ sentences?: FirefliesTranscriptSentence[] } | FirefliesTranscriptSentence[]>(transcriptResult);
if (transcriptData) {
if (Array.isArray(transcriptData)) {
sentences = transcriptData;
} else if (transcriptData.sentences) {
sentences = transcriptData.sentences;
}
} else {
// Try parsing toon format
const rawText = getRawText(transcriptResult);
if (rawText) {
sentences = parseToonTranscript(rawText);
console.log(`[Fireflies] Parsed ${sentences.length} sentences from toon format`);
}
}
} else {
// Try parsing toon format
const rawText = getRawText(transcriptResult);
if (rawText) {
sentences = parseToonTranscript(rawText);
console.log(`[Fireflies] Parsed ${sentences.length} sentences from toon format`);
}
console.log(`[Fireflies] Skipping transcript due to rate limit: ${meetingId}`);
}
} catch (err) {
console.log(`[Fireflies] Could not fetch transcript sentences: ${err}`);
@ -420,29 +521,20 @@ async function syncMeetings() {
fs.writeFileSync(filePath, markdown);
console.log(`[Fireflies] Saved: ${filename}`);
syncedIds.add(meetingId);
newCount++;
processedInBatch++;
} catch (error) {
console.error(`[Fireflies] Error fetching meeting ${meetingId}:`, error);
// Continue with next meeting
}
}
console.log(`[Fireflies] Synced ${newCount} new transcripts`);
// Save state
saveState(toDateStr, Array.from(syncedIds));
// Build knowledge graph after successful sync
if (newCount > 0) {
console.log('\n[Fireflies] Starting knowledge graph build...');
try {
await buildGraph(SYNC_DIR);
} catch (error) {
console.error('[Fireflies] Error building knowledge graph:', error);
}
}
console.log(`[Fireflies] Synced ${newCount} new transcripts in this batch`);
// Save state with updated timestamp
saveState(toDateStr, Array.from(syncedIds), new Date().toISOString());
} catch (error) {
console.error('[Fireflies] Error during sync:', error);

View file

@ -4,7 +4,6 @@ import { google, gmail_v1 as gmail } from 'googleapis';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { OAuth2Client } from 'google-auth-library';
import { WorkDir } from '../config/config.js';
import { buildGraph } from './build_graph.js';
import { GoogleClientFactory } from './google-client-factory.js';
// Configuration
@ -281,14 +280,6 @@ async function performSync() {
}
console.log("Sync completed.");
// Build knowledge graph after successful sync
console.log("\nStarting knowledge graph build...");
try {
await buildGraph(SYNC_DIR);
} catch (error) {
console.error("Error building knowledge graph:", error);
}
} catch (error) {
console.error("Error during sync:", error);
}