process only added / changed files when building graph

This commit is contained in:
Arjun 2026-01-07 21:27:56 +05:30 committed by Ramnique Singh
parent 85024dd69b
commit 6a0e5981dd
3 changed files with 356 additions and 22 deletions

View file

@ -0,0 +1,149 @@
# Knowledge Graph System
This directory contains the knowledge graph building system that processes emails and meeting transcripts to create an Obsidian-style knowledge base.
## Components
### `build_graph.ts`
Main orchestrator that:
- Processes source files (emails/transcripts) in batches
- Runs the `note_creation` agent to extract entities
- Only processes new or changed files (tracked via state)
### `graph_state.ts`
State management module that tracks which files have been processed:
- Uses hybrid mtime + hash approach for change detection
- Stores state in `~/.rowboat/knowledge_graph_state.json`
- Provides modular functions for state operations
### `sync_gmail.ts` & `sync_fireflies.ts`
Sync scripts that:
- Pull data from Gmail and Fireflies
- Save as markdown files in their respective directories
- Trigger knowledge graph build after successful sync
## How It Works
### Change Detection Strategy
The system uses a **hybrid mtime + hash approach**:
1. **Quick check**: Compare file modification time (mtime)
- If mtime unchanged → file definitely hasn't changed → skip
2. **Verification**: If mtime changed, compute content hash
- If hash unchanged → false positive (mtime changed but content didn't) → skip
- If hash changed → file actually changed → process
This is efficient (only hashes potentially changed files) and reliable (confirms actual content changes).
### State File Structure
`~/.rowboat/knowledge_graph_state.json`:
```json
{
"processedFiles": {
"/path/to/file.md": {
"mtime": "2026-01-07T10:30:00.000Z",
"hash": "a3f5e9d2c8b1...",
"lastProcessed": "2026-01-07T10:35:00.000Z"
}
},
"lastBuildTime": "2026-01-07T10:35:00.000Z"
}
```
### Processing Flow
1. **Sync runs** (Gmail or Fireflies)
- Fetches new/updated data
- Saves as markdown files
- Calls `buildGraph(SYNC_DIR)`
2. **buildGraph()**
- Loads state
- Scans source directory for files
- Filters to only new/changed files
- Processes in batches of 25
- Updates state after each successful batch (saves progress incrementally)
3. **Agent processes batch**
- Extracts entities (people, orgs, projects, topics)
- Creates/updates notes in `~/.rowboat/notes/`
- Merges information for entities appearing in multiple files
## Replacing the Change Detection Logic
The state management is modular. To implement a different change detection strategy:
### Option 1: Modify `graph_state.ts`
Replace the functions while keeping the same interface:
```typescript
// Current: mtime + hash
export function hasFileChanged(filePath: string, state: GraphState): boolean {
// Your custom logic here
}
export function markFileAsProcessed(filePath: string, state: GraphState): void {
// Your custom tracking here
}
```
### Option 2: Create a new state module
Create `graph_state_v2.ts` with the same exported interface:
```typescript
export interface FileState { /* ... */ }
export interface GraphState { /* ... */ }
export function loadState(): GraphState { /* ... */ }
export function saveState(state: GraphState): void { /* ... */ }
export function getFilesToProcess(sourceDir: string, state: GraphState): string[] { /* ... */ }
export function markFileAsProcessed(filePath: string, state: GraphState): void { /* ... */ }
```
Then update the import in `build_graph.ts`:
```typescript
import { /* ... */ } from './graph_state_v2.js';
```
### Option 3: Pass a strategy object
Refactor to accept a change detection strategy:
```typescript
interface ChangeDetectionStrategy {
hasFileChanged(filePath: string, state: GraphState): boolean;
markFileAsProcessed(filePath: string, state: GraphState): void;
}
export async function buildGraph(sourceDir: string, strategy?: ChangeDetectionStrategy) {
const detector = strategy || defaultStrategy;
// Use detector.hasFileChanged(), etc.
}
```
## Resetting State
To force reprocessing of all files:
```typescript
import { resetGraphState } from './build_graph.js';
resetGraphState(); // Clears the state file
```
Or manually delete: `~/.rowboat/knowledge_graph_state.json`
## Configuration
### Batch Size
Change `BATCH_SIZE` in `build_graph.ts` (currently 25 files per batch)
### State File Location
Change `STATE_FILE` in `graph_state.ts` (currently `~/.rowboat/knowledge_graph_state.json`)
### Hash Algorithm
Change `crypto.createHash('sha256')` in `graph_state.ts` to use a different algorithm (md5, sha1, etc.)

View file

@ -3,6 +3,14 @@ import path from 'path';
import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { bus } from '../runs/bus.js';
import {
loadState,
saveState,
getFilesToProcess,
markFileAsProcessed,
resetState,
type GraphState,
} from './graph_state.js';
/**
* Build obsidian-style knowledge graph by running topic extraction
@ -13,24 +21,17 @@ const NOTES_OUTPUT_DIR = path.join(WorkDir, 'notes');
const NOTE_CREATION_AGENT = 'note_creation';
/**
* Read all markdown files from the specified source directory
* Read content for specific files
*/
async function getContentFiles(sourceDir: string): Promise<{ path: string; content: string }[]> {
if (!fs.existsSync(sourceDir)) {
console.log(`Knowledge source directory not found: ${sourceDir}`);
return [];
}
async function readFileContents(filePaths: string[]): Promise<{ path: string; content: string }[]> {
const files: { path: string; content: string }[] = [];
const entries = fs.readdirSync(sourceDir);
for (const entry of entries) {
const fullPath = path.join(sourceDir, entry);
const stat = fs.statSync(fullPath);
if (stat.isFile() && entry.endsWith('.md')) {
const content = fs.readFileSync(fullPath, 'utf-8');
files.push({ path: fullPath, content });
for (const filePath of filePaths) {
try {
const content = fs.readFileSync(filePath, 'utf-8');
files.push({ path: filePath, content });
} catch (error) {
console.error(`Error reading file ${filePath}:`, error);
}
}
@ -92,21 +93,41 @@ async function createNotesFromBatch(files: { path: string; content: string }[],
/**
* Build the knowledge graph from all content files in the specified source directory
* Only processes new or changed files based on state tracking
*/
export async function buildGraph(sourceDir: string): Promise<void> {
const contentFiles = await getContentFiles(sourceDir);
console.log(`[buildGraph] Starting build for directory: ${sourceDir}`);
if (contentFiles.length === 0) {
console.log(`No files found in ${sourceDir}`);
// Load current state
const state = loadState();
const previouslyProcessedCount = Object.keys(state.processedFiles).length;
console.log(`[buildGraph] State loaded. Previously processed: ${previouslyProcessedCount} files`);
// Get files that need processing (new or changed)
const filesToProcess = getFilesToProcess(sourceDir, state);
if (filesToProcess.length === 0) {
console.log(`[buildGraph] No new or changed files to process in ${path.basename(sourceDir)}`);
return;
}
const BATCH_SIZE = 10; // Process 10 emails per agent run
console.log(`[buildGraph] Found ${filesToProcess.length} new/changed files to process in ${path.basename(sourceDir)}`);
// Read file contents
const contentFiles = await readFileContents(filesToProcess);
if (contentFiles.length === 0) {
console.log(`No files could be read from ${sourceDir}`);
return;
}
const BATCH_SIZE = 25; // Process 25 files per agent run
const totalBatches = Math.ceil(contentFiles.length / BATCH_SIZE);
console.log(`Processing ${contentFiles.length} files from ${path.basename(sourceDir)} in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`);
console.log(`Processing ${contentFiles.length} files in ${totalBatches} batches (${BATCH_SIZE} files per batch)...`);
// Process files in batches
const processedFiles: string[] = [];
for (let i = 0; i < contentFiles.length; i += BATCH_SIZE) {
const batch = contentFiles.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
@ -115,13 +136,27 @@ export async function buildGraph(sourceDir: string): Promise<void> {
console.log(`Processing batch ${batchNumber}/${totalBatches} (${batch.length} files)...`);
await createNotesFromBatch(batch, batchNumber);
console.log(`Batch ${batchNumber}/${totalBatches} complete`);
// Mark files in this batch as processed
for (const file of batch) {
markFileAsProcessed(file.path, state);
processedFiles.push(file.path);
}
// Save state after each successful batch
// This ensures partial progress is saved even if later batches fail
saveState(state);
} catch (error) {
console.error(`Error processing batch ${batchNumber}:`, error);
// Continue with next batch
// Continue with next batch (without saving state for failed batch)
}
}
console.log('Knowledge graph build complete');
// Update state with last build time and save
state.lastBuildTime = new Date().toISOString();
saveState(state);
console.log(`Knowledge graph build complete. Processed ${processedFiles.length} files.`);
}
/**
@ -131,3 +166,13 @@ export async function init() {
const defaultSourceDir = path.join(WorkDir, 'gmail_sync');
await buildGraph(defaultSourceDir);
}
/**
* Reset the knowledge graph state - forces reprocessing of all files on next run
* Useful for debugging or when you want to rebuild everything from scratch
*/
export function resetGraphState(): void {
console.log('Resetting knowledge graph state...');
resetState();
console.log('State reset complete. All files will be reprocessed on next build.');
}

View file

@ -0,0 +1,140 @@
import fs from 'fs';
import path from 'path';
import crypto from 'crypto';
import { WorkDir } from '../config/config.js';
/**
* State tracking for knowledge graph processing
* Uses mtime + hash hybrid approach to detect file changes
*/
const STATE_FILE = path.join(WorkDir, 'knowledge_graph_state.json');
export interface FileState {
mtime: string; // ISO timestamp of last modification
hash: string; // Content hash
lastProcessed: string; // ISO timestamp of when it was processed
}
export interface GraphState {
processedFiles: Record<string, FileState>; // filepath -> FileState
lastBuildTime: string; // ISO timestamp of last successful build
}
/**
* Load the current state from disk
*/
export function loadState(): GraphState {
if (fs.existsSync(STATE_FILE)) {
try {
return JSON.parse(fs.readFileSync(STATE_FILE, 'utf-8'));
} catch (error) {
console.error('Error loading knowledge graph state:', error);
}
}
return {
processedFiles: {},
lastBuildTime: new Date(0).toISOString(), // epoch
};
}
/**
* Save the current state to disk
*/
export function saveState(state: GraphState): void {
try {
fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2));
} catch (error) {
console.error('Error saving knowledge graph state:', error);
throw error;
}
}
/**
* Compute hash of file content
*/
export function computeFileHash(filePath: string): string {
const content = fs.readFileSync(filePath, 'utf-8');
return crypto.createHash('sha256').update(content).digest('hex');
}
/**
* Check if a file has changed since it was last processed
* Uses mtime for quick check, then hash for verification
*/
export function hasFileChanged(filePath: string, state: GraphState): boolean {
const fileState = state.processedFiles[filePath];
// New file - never processed
if (!fileState) {
return true;
}
// Check mtime first (fast)
const stats = fs.statSync(filePath);
const currentMtime = stats.mtime.toISOString();
// If mtime is the same, file definitely hasn't changed
if (currentMtime === fileState.mtime) {
return false;
}
// mtime changed - verify with hash to confirm actual content change
const currentHash = computeFileHash(filePath);
return currentHash !== fileState.hash;
}
/**
* Update state after processing a file
*/
export function markFileAsProcessed(filePath: string, state: GraphState): void {
const stats = fs.statSync(filePath);
const hash = computeFileHash(filePath);
state.processedFiles[filePath] = {
mtime: stats.mtime.toISOString(),
hash: hash,
lastProcessed: new Date().toISOString(),
};
}
/**
* Get list of files that need processing from a source directory
* Returns only new or changed files
*/
export function getFilesToProcess(
sourceDir: string,
state: GraphState
): string[] {
if (!fs.existsSync(sourceDir)) {
return [];
}
const filesToProcess: string[] = [];
const entries = fs.readdirSync(sourceDir);
for (const entry of entries) {
const fullPath = path.join(sourceDir, entry);
const stat = fs.statSync(fullPath);
if (stat.isFile() && entry.endsWith('.md')) {
if (hasFileChanged(fullPath, state)) {
filesToProcess.push(fullPath);
}
}
}
return filesToProcess;
}
/**
* Reset state - useful for reprocessing everything
*/
export function resetState(): void {
const emptyState: GraphState = {
processedFiles: {},
lastBuildTime: new Date().toISOString(),
};
saveState(emptyState);
}