diff --git a/apps/x/packages/core/src/knowledge/label_emails.ts b/apps/x/packages/core/src/knowledge/label_emails.ts index 567e73b6..68bca5a1 100644 --- a/apps/x/packages/core/src/knowledge/label_emails.ts +++ b/apps/x/packages/core/src/knowledge/label_emails.ts @@ -14,6 +14,7 @@ import { const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds const BATCH_SIZE = 15; +const DEFAULT_CONCURRENCY = 3; const LABELING_AGENT = 'labeling_agent'; const GMAIL_SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const MAX_CONTENT_LENGTH = 8000; @@ -129,7 +130,7 @@ async function labelEmailBatch( /** * Process all unlabeled emails in batches */ -export async function processUnlabeledEmails(): Promise { +export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCURRENCY): Promise { console.log('[EmailLabeling] Checking for unlabeled emails...'); const state = loadLabelingState(); @@ -140,7 +141,7 @@ export async function processUnlabeledEmails(): Promise { return; } - console.log(`[EmailLabeling] Found ${unlabeled.length} unlabeled emails`); + console.log(`[EmailLabeling] Found ${unlabeled.length} unlabeled emails (concurrency: ${concurrency})`); const run = await serviceLogger.startRun({ service: 'email_labeling', @@ -161,69 +162,81 @@ export async function processUnlabeledEmails(): Promise { truncated: limitedFiles.truncated, }); - const totalBatches = Math.ceil(unlabeled.length / BATCH_SIZE); - let totalEdited = 0; - let hadError = false; - + // Build all batches upfront + const batches: { batchNumber: number; files: { path: string; content: string }[] }[] = []; for (let i = 0; i < unlabeled.length; i += BATCH_SIZE) { const batchPaths = unlabeled.slice(i, i + BATCH_SIZE); const batchNumber = Math.floor(i / BATCH_SIZE) + 1; - - try { - // Read file contents for the batch - const files: { path: string; content: string }[] = []; - for (const filePath of batchPaths) { - try { - const content = fs.readFileSync(filePath, 'utf-8'); - files.push({ path: filePath, content }); - } catch (error) { - console.error(`[EmailLabeling] Error reading ${filePath}:`, error); - } + const files: { path: string; content: string }[] = []; + for (const filePath of batchPaths) { + try { + const content = fs.readFileSync(filePath, 'utf-8'); + files.push({ path: filePath, content }); + } catch (error) { + console.error(`[EmailLabeling] Error reading ${filePath}:`, error); } - - if (files.length === 0) { - continue; - } - - console.log(`[EmailLabeling] Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`); - await serviceLogger.log({ - type: 'progress', - service: run.service, - runId: run.runId, - level: 'info', - message: `Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`, - step: 'batch', - current: batchNumber, - total: totalBatches, - details: { filesInBatch: files.length }, - }); - - const result = await labelEmailBatch(files); - totalEdited += result.filesEdited.size; - - // Only mark files that were actually edited by the agent - for (const file of files) { - const relativePath = path.relative(WorkDir, file.path); - if (result.filesEdited.has(relativePath)) { - markFileAsLabeled(file.path, state); - } - } - - saveLabelingState(state); - console.log(`[EmailLabeling] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files edited`); - } catch (error) { - hadError = true; - console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error); - await serviceLogger.log({ - type: 'error', - service: run.service, - runId: run.runId, - level: 'error', - message: `Error processing batch ${batchNumber}`, - error: error instanceof Error ? error.message : String(error), - context: { batchNumber }, - }); } + if (files.length > 0) { + batches.push({ batchNumber, files }); + } + } + + const totalBatches = batches.length; + let totalEdited = 0; + let hadError = false; + + // Process batches with concurrency limit + for (let i = 0; i < batches.length; i += concurrency) { + const chunk = batches.slice(i, i + concurrency); + + const promises = chunk.map(async ({ batchNumber, files }) => { + try { + console.log(`[EmailLabeling] Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`); + await serviceLogger.log({ + type: 'progress', + service: run.service, + runId: run.runId, + level: 'info', + message: `Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`, + step: 'batch', + current: batchNumber, + total: totalBatches, + details: { filesInBatch: files.length }, + }); + + const result = await labelEmailBatch(files); + + // Only mark files that were actually edited by the agent + for (const file of files) { + const relativePath = path.relative(WorkDir, file.path); + if (result.filesEdited.has(relativePath)) { + markFileAsLabeled(file.path, state); + } + } + + console.log(`[EmailLabeling] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files edited`); + return result.filesEdited.size; + } catch (error) { + hadError = true; + console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error); + await serviceLogger.log({ + type: 'error', + service: run.service, + runId: run.runId, + level: 'error', + message: `Error processing batch ${batchNumber}`, + error: error instanceof Error ? error.message : String(error), + context: { batchNumber }, + }); + return 0; + } + }); + + const results = await Promise.all(promises); + totalEdited += results.reduce((sum, n) => sum + n, 0); + + // Save state after each concurrent chunk completes + saveLabelingState(state); } state.lastRunTime = new Date().toISOString(); diff --git a/apps/x/packages/core/src/knowledge/run_pipeline.ts b/apps/x/packages/core/src/knowledge/run_pipeline.ts index e0072bfd..3bde77c2 100644 --- a/apps/x/packages/core/src/knowledge/run_pipeline.ts +++ b/apps/x/packages/core/src/knowledge/run_pipeline.ts @@ -27,28 +27,36 @@ import path from 'path'; const VALID_STEPS = ['label', 'graph', 'tag'] as const; type Step = typeof VALID_STEPS[number]; -function parseArgs(): { workdir: string; steps: Step[] } { +function parseArgs(): { workdir: string; steps: Step[]; concurrency: number } { const args = process.argv.slice(2); let workdir: string | undefined; let stepsRaw: string | undefined; + let concurrency = 3; for (let i = 0; i < args.length; i++) { if (args[i] === '--workdir' && args[i + 1]) { workdir = args[++i]; } else if (args[i] === '--steps' && args[i + 1]) { stepsRaw = args[++i]; + } else if (args[i] === '--concurrency' && args[i + 1]) { + concurrency = parseInt(args[++i], 10); + if (isNaN(concurrency) || concurrency < 1) { + console.error('Error: --concurrency must be a positive integer'); + process.exit(1); + } } else if (args[i] === '--help' || args[i] === '-h') { console.log(` -Usage: run_pipeline --workdir [--steps label,graph,tag] +Usage: run_pipeline --workdir [--steps label,graph,tag] [--concurrency N] Options: - --workdir Working directory containing gmail_sync/ folder (required) - --steps Comma-separated steps to run: label, graph, tag (default: all) - --help, -h Show this help message + --workdir Working directory containing gmail_sync/ folder (required) + --steps Comma-separated steps to run: label, graph, tag (default: all) + --concurrency Number of parallel batches for labeling (default: 3) + --help, -h Show this help message Examples: run_pipeline --workdir ./my-emails - run_pipeline --workdir ./my-emails --steps label + run_pipeline --workdir ./my-emails --steps label --concurrency 5 run_pipeline --workdir ./my-emails --steps label,graph run_pipeline --workdir ./my-emails --steps graph,tag `); @@ -83,10 +91,10 @@ Examples: steps = [...VALID_STEPS]; } - return { workdir, steps }; + return { workdir, steps, concurrency }; } -const { workdir, steps } = parseArgs(); +const { workdir, steps, concurrency } = parseArgs(); // Set env BEFORE importing core modules (WorkDir is read at module load time) process.env.ROWBOAT_WORKDIR = workdir; @@ -96,6 +104,7 @@ process.env.ROWBOAT_WORKDIR = workdir; async function main() { console.log(`[Pipeline] Working directory: ${workdir}`); console.log(`[Pipeline] Steps to run: ${steps.join(', ')}`); + console.log(`[Pipeline] Concurrency: ${concurrency}`); console.log(); // Verify gmail_sync exists if label or graph step is requested @@ -109,7 +118,7 @@ async function main() { if (steps.includes('label')) { console.log('[Pipeline] === Step 1: Email Labeling ==='); const { processUnlabeledEmails } = await import('./label_emails.js'); - await processUnlabeledEmails(); + await processUnlabeledEmails(concurrency); console.log(); }