diff --git a/apps/x/packages/core/src/knowledge/README.md b/apps/x/packages/core/src/knowledge/README.md new file mode 100644 index 00000000..b548a554 --- /dev/null +++ b/apps/x/packages/core/src/knowledge/README.md @@ -0,0 +1,149 @@ +# Knowledge Graph System + +This directory contains the knowledge graph building system that processes emails and meeting transcripts to create an Obsidian-style knowledge base. + +## Components + +### `build_graph.ts` +Main orchestrator that: +- Processes source files (emails/transcripts) in batches +- Runs the `note_creation` agent to extract entities +- Only processes new or changed files (tracked via state) + +### `graph_state.ts` +State management module that tracks which files have been processed: +- Uses hybrid mtime + hash approach for change detection +- Stores state in `~/.rowboat/knowledge_graph_state.json` +- Provides modular functions for state operations + +### `sync_gmail.ts` & `sync_fireflies.ts` +Sync scripts that: +- Pull data from Gmail and Fireflies +- Save as markdown files in their respective directories +- Trigger knowledge graph build after successful sync + +## How It Works + +### Change Detection Strategy + +The system uses a **hybrid mtime + hash approach**: + +1. **Quick check**: Compare file modification time (mtime) + - If mtime unchanged → file definitely hasn't changed → skip + +2. **Verification**: If mtime changed, compute content hash + - If hash unchanged → false positive (mtime changed but content didn't) → skip + - If hash changed → file actually changed → process + +This is efficient (only hashes potentially changed files) and reliable (confirms actual content changes). + +### State File Structure + +`~/.rowboat/knowledge_graph_state.json`: +```json +{ + "processedFiles": { + "/path/to/file.md": { + "mtime": "2026-01-07T10:30:00.000Z", + "hash": "a3f5e9d2c8b1...", + "lastProcessed": "2026-01-07T10:35:00.000Z" + } + }, + "lastBuildTime": "2026-01-07T10:35:00.000Z" +} +``` + +### Processing Flow + +1. **Sync runs** (Gmail or Fireflies) + - Fetches new/updated data + - Saves as markdown files + - Calls `buildGraph(SYNC_DIR)` + +2. **buildGraph()** + - Loads state + - Scans source directory for files + - Filters to only new/changed files + - Processes in batches of 25 + - Updates state after each successful batch (saves progress incrementally) + +3. **Agent processes batch** + - Extracts entities (people, orgs, projects, topics) + - Creates/updates notes in `~/.rowboat/notes/` + - Merges information for entities appearing in multiple files + +## Replacing the Change Detection Logic + +The state management is modular. To implement a different change detection strategy: + +### Option 1: Modify `graph_state.ts` + +Replace the functions while keeping the same interface: + +```typescript +// Current: mtime + hash +export function hasFileChanged(filePath: string, state: GraphState): boolean { + // Your custom logic here +} + +export function markFileAsProcessed(filePath: string, state: GraphState): void { + // Your custom tracking here +} +``` + +### Option 2: Create a new state module + +Create `graph_state_v2.ts` with the same exported interface: + +```typescript +export interface FileState { /* ... */ } +export interface GraphState { /* ... */ } +export function loadState(): GraphState { /* ... */ } +export function saveState(state: GraphState): void { /* ... */ } +export function getFilesToProcess(sourceDir: string, state: GraphState): string[] { /* ... */ } +export function markFileAsProcessed(filePath: string, state: GraphState): void { /* ... */ } +``` + +Then update the import in `build_graph.ts`: +```typescript +import { /* ... */ } from './graph_state_v2.js'; +``` + +### Option 3: Pass a strategy object + +Refactor to accept a change detection strategy: + +```typescript +interface ChangeDetectionStrategy { + hasFileChanged(filePath: string, state: GraphState): boolean; + markFileAsProcessed(filePath: string, state: GraphState): void; +} + +export async function buildGraph(sourceDir: string, strategy?: ChangeDetectionStrategy) { + const detector = strategy || defaultStrategy; + // Use detector.hasFileChanged(), etc. +} +``` + +## Resetting State + +To force reprocessing of all files: + +```typescript +import { resetGraphState } from './build_graph.js'; + +resetGraphState(); // Clears the state file +``` + +Or manually delete: `~/.rowboat/knowledge_graph_state.json` + +## Configuration + +### Batch Size +Change `BATCH_SIZE` in `build_graph.ts` (currently 25 files per batch) + +### State File Location +Change `STATE_FILE` in `graph_state.ts` (currently `~/.rowboat/knowledge_graph_state.json`) + +### Hash Algorithm +Change `crypto.createHash('sha256')` in `graph_state.ts` to use a different algorithm (md5, sha1, etc.) diff --git a/apps/x/packages/core/src/knowledge/build_graph.ts b/apps/x/packages/core/src/knowledge/build_graph.ts index 82860bdc..a57fd6a7 100644 --- a/apps/x/packages/core/src/knowledge/build_graph.ts +++ b/apps/x/packages/core/src/knowledge/build_graph.ts @@ -3,6 +3,14 @@ import path from 'path'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; +import { + loadState, + saveState, + getFilesToProcess, + markFileAsProcessed, + resetState, + type GraphState, +} from './graph_state.js'; /** * Build obsidian-style knowledge graph by running topic extraction @@ -13,24 +21,17 @@ const NOTES_OUTPUT_DIR = path.join(WorkDir, 'notes'); const NOTE_CREATION_AGENT = 'note_creation'; /** - * Read all markdown files from the specified source directory + * Read content for specific files */ -async function getContentFiles(sourceDir: string): Promise<{ path: string; content: string }[]> { - if (!fs.existsSync(sourceDir)) { - console.log(`Knowledge source directory not found: ${sourceDir}`); - return []; - } - +async function readFileContents(filePaths: string[]): Promise<{ path: string; content: string }[]> { const files: { path: string; content: string }[] = []; - const entries = fs.readdirSync(sourceDir); - for (const entry of entries) { - const fullPath = path.join(sourceDir, entry); - const stat = fs.statSync(fullPath); - - if (stat.isFile() && entry.endsWith('.md')) { - const content = fs.readFileSync(fullPath, 'utf-8'); - files.push({ path: fullPath, content }); + for (const filePath of filePaths) { + try { + const content = fs.readFileSync(filePath, 'utf-8'); + files.push({ path: filePath, content }); + } catch (error) { + console.error(`Error reading file ${filePath}:`, error); } } @@ -92,21 +93,41 @@ async function createNotesFromBatch(files: { path: string; content: string }[], /** * Build the knowledge graph from all content files in the specified source directory + * Only processes new or changed files based on state tracking */ export async function buildGraph(sourceDir: string): Promise { - const contentFiles = await getContentFiles(sourceDir); + console.log(`[buildGraph] Starting build for directory: ${sourceDir}`); - if (contentFiles.length === 0) { - console.log(`No files found in ${sourceDir}`); + // Load current state + const state = loadState(); + const previouslyProcessedCount = Object.keys(state.processedFiles).length; + console.log(`[buildGraph] State loaded. Previously processed: ${previouslyProcessedCount} files`); + + // Get files that need processing (new or changed) + const filesToProcess = getFilesToProcess(sourceDir, state); + + if (filesToProcess.length === 0) { + console.log(`[buildGraph] No new or changed files to process in ${path.basename(sourceDir)}`); return; } - const BATCH_SIZE = 10; // Process 10 emails per agent run + console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`); + + // Read file contents + const contentFiles = await readFileContents(filesToProcess); + + if (contentFiles.length === 0) { + console.log(`No files could be read from ${sourceDir}`); + return; + } + + const BATCH_SIZE = 25; // Process 25 files per agent run const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE); - console.log(`Processing ${contentFiles.length} files from ${path.basename(sourceDir)} in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`); + console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`); // Process files in batches + const processedFiles: string[] = []; for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) { const batch = contentFiles.slice(i, i + BATCH_SIZE); const batchNumber = Math.floor(i / BATCH_SIZE) + 1; @@ -115,13 +136,27 @@ export async function buildGraph(sourceDir: string): Promise { console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`); await createNotesFromBatch(batch, batchNumber); console.log(`Batch ${batchNumber}/${totalBatches} complete`); + + // Mark files in this batch as processed + for (const file of batch) { + markFileAsProcessed(file.path, state); + processedFiles.push(file.path); + } + + // Save state after each successful batch + // This ensures partial progress is saved even if later batches fail + saveState(state); } catch (error) { console.error(`Error processing batch ${batchNumber}:`, error); - // Continue with next batch + // Continue with next batch (without saving state for failed batch) } } - console.log('Knowledge graph build complete'); + // Update state with last build time and save + state.lastBuildTime = new Date().toISOString(); + saveState(state); + + console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`); } /** @@ -131,3 +166,13 @@ export async function init() { const defaultSourceDir = path.join(WorkDir, 'gmail_sync'); await buildGraph(defaultSourceDir); } + +/** + * Reset the knowledge graph state - forces reprocessing of all files on next run + * Useful for debugging or when you want to rebuild everything from scratch + */ +export function resetGraphState(): void { + console.log('Resetting knowledge graph state...'); + resetState(); + console.log('State reset complete. All files will be reprocessed on next build.'); +} diff --git a/apps/x/packages/core/src/knowledge/graph_state.ts b/apps/x/packages/core/src/knowledge/graph_state.ts new file mode 100644 index 00000000..ad5ccfe3 --- /dev/null +++ b/apps/x/packages/core/src/knowledge/graph_state.ts @@ -0,0 +1,140 @@ +import fs from 'fs'; +import path from 'path'; +import crypto from 'crypto'; +import { WorkDir } from '../config/config.js'; + +/** + * State tracking for knowledge graph processing + * Uses mtime + hash hybrid approach to detect file changes + */ + +const STATE_FILE = path.join(WorkDir, 'knowledge_graph_state.json'); + +export interface FileState { + mtime: string; // ISO timestamp of last modification + hash: string; // Content hash + lastProcessed: string; // ISO timestamp of when it was processed +} + +export interface GraphState { + processedFiles: Record; // filepath -> FileState + lastBuildTime: string; // ISO timestamp of last successful build +} + +/** + * Load the current state from disk + */ +export function loadState(): GraphState { + if (fs.existsSync(STATE_FILE)) { + try { + return JSON.parse(fs.readFileSync(STATE_FILE, 'utf-8')); + } catch (error) { + console.error('Error loading knowledge graph state:', error); + } + } + + return { + processedFiles: {}, + lastBuildTime: new Date(0).toISOString(), // epoch + }; +} + +/** + * Save the current state to disk + */ +export function saveState(state: GraphState): void { + try { + fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2)); + } catch (error) { + console.error('Error saving knowledge graph state:', error); + throw error; + } +} + +/** + * Compute hash of file content + */ +export function computeFileHash(filePath: string): string { + const content = fs.readFileSync(filePath, 'utf-8'); + return crypto.createHash('sha256').update(content).digest('hex'); +} + +/** + * Check if a file has changed since it was last processed + * Uses mtime for quick check, then hash for verification + */ +export function hasFileChanged(filePath: string, state: GraphState): boolean { + const fileState = state.processedFiles[filePath]; + + // New file - never processed + if (!fileState) { + return true; + } + + // Check mtime first (fast) + const stats = fs.statSync(filePath); + const currentMtime = stats.mtime.toISOString(); + + // If mtime is the same, file definitely hasn't changed + if (currentMtime === fileState.mtime) { + return false; + } + + // mtime changed - verify with hash to confirm actual content change + const currentHash = computeFileHash(filePath); + return currentHash !== fileState.hash; +} + +/** + * Update state after processing a file + */ +export function markFileAsProcessed(filePath: string, state: GraphState): void { + const stats = fs.statSync(filePath); + const hash = computeFileHash(filePath); + + state.processedFiles[filePath] = { + mtime: stats.mtime.toISOString(), + hash: hash, + lastProcessed: new Date().toISOString(), + }; +} + +/** + * Get list of files that need processing from a source directory + * Returns only new or changed files + */ +export function getFilesToProcess( + sourceDir: string, + state: GraphState +): string[] { + if (!fs.existsSync(sourceDir)) { + return []; + } + + const filesToProcess: string[] = []; + const entries = fs.readdirSync(sourceDir); + + for (const entry of entries) { + const fullPath = path.join(sourceDir, entry); + const stat = fs.statSync(fullPath); + + if (stat.isFile() && entry.endsWith('.md')) { + if (hasFileChanged(fullPath, state)) { + filesToProcess.push(fullPath); + } + } + } + + return filesToProcess; +} + +/** + * Reset state - useful for reprocessing everything + */ +export function resetState(): void { + const emptyState: GraphState = { + processedFiles: {}, + lastBuildTime: new Date().toISOString(), + }; + saveState(emptyState); +}