add concurrency to labeling

This commit is contained in:
Arjun 2026-03-25 22:16:40 +05:30
parent ff58be2df0
commit f8aa76076e
2 changed files with 91 additions and 69 deletions

View file

@ -14,6 +14,7 @@ import {
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
const BATCH_SIZE = 15; const BATCH_SIZE = 15;
const DEFAULT_CONCURRENCY = 3;
const LABELING_AGENT = 'labeling_agent'; const LABELING_AGENT = 'labeling_agent';
const GMAIL_SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const GMAIL_SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const MAX_CONTENT_LENGTH = 8000; const MAX_CONTENT_LENGTH = 8000;
@ -129,7 +130,7 @@ async function labelEmailBatch(
/** /**
* Process all unlabeled emails in batches * Process all unlabeled emails in batches
*/ */
export async function processUnlabeledEmails(): Promise<void> { export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCURRENCY): Promise<void> {
console.log('[EmailLabeling] Checking for unlabeled emails...'); console.log('[EmailLabeling] Checking for unlabeled emails...');
const state = loadLabelingState(); const state = loadLabelingState();
@ -140,7 +141,7 @@ export async function processUnlabeledEmails(): Promise<void> {
return; return;
} }
console.log(`[EmailLabeling] Found ${unlabeled.length} unlabeled emails`); console.log(`[EmailLabeling] Found ${unlabeled.length} unlabeled emails (concurrency: ${concurrency})`);
const run = await serviceLogger.startRun({ const run = await serviceLogger.startRun({
service: 'email_labeling', service: 'email_labeling',
@ -161,16 +162,11 @@ export async function processUnlabeledEmails(): Promise<void> {
truncated: limitedFiles.truncated, truncated: limitedFiles.truncated,
}); });
const totalBatches = Math.ceil(unlabeled.length / BATCH_SIZE); // Build all batches upfront
let totalEdited = 0; const batches: { batchNumber: number; files: { path: string; content: string }[] }[] = [];
let hadError = false;
for (let i = 0; i < unlabeled.length; i += BATCH_SIZE) { for (let i = 0; i < unlabeled.length; i += BATCH_SIZE) {
const batchPaths = unlabeled.slice(i, i + BATCH_SIZE); const batchPaths = unlabeled.slice(i, i + BATCH_SIZE);
const batchNumber = Math.floor(i / BATCH_SIZE) + 1; const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
try {
// Read file contents for the batch
const files: { path: string; content: string }[] = []; const files: { path: string; content: string }[] = [];
for (const filePath of batchPaths) { for (const filePath of batchPaths) {
try { try {
@ -180,11 +176,21 @@ export async function processUnlabeledEmails(): Promise<void> {
console.error(`[EmailLabeling] Error reading ${filePath}:`, error); console.error(`[EmailLabeling] Error reading ${filePath}:`, error);
} }
} }
if (files.length > 0) {
if (files.length === 0) { batches.push({ batchNumber, files });
continue; }
} }
const totalBatches = batches.length;
let totalEdited = 0;
let hadError = false;
// Process batches with concurrency limit
for (let i = 0; i < batches.length; i += concurrency) {
const chunk = batches.slice(i, i + concurrency);
const promises = chunk.map(async ({ batchNumber, files }) => {
try {
console.log(`[EmailLabeling] Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`); console.log(`[EmailLabeling] Processing batch ${batchNumber}/${totalBatches} (${files.length} files)`);
await serviceLogger.log({ await serviceLogger.log({
type: 'progress', type: 'progress',
@ -199,7 +205,6 @@ export async function processUnlabeledEmails(): Promise<void> {
}); });
const result = await labelEmailBatch(files); const result = await labelEmailBatch(files);
totalEdited += result.filesEdited.size;
// Only mark files that were actually edited by the agent // Only mark files that were actually edited by the agent
for (const file of files) { for (const file of files) {
@ -209,8 +214,8 @@ export async function processUnlabeledEmails(): Promise<void> {
} }
} }
saveLabelingState(state);
console.log(`[EmailLabeling] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files edited`); console.log(`[EmailLabeling] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files edited`);
return result.filesEdited.size;
} catch (error) { } catch (error) {
hadError = true; hadError = true;
console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error); console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error);
@ -223,7 +228,15 @@ export async function processUnlabeledEmails(): Promise<void> {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
context: { batchNumber }, context: { batchNumber },
}); });
return 0;
} }
});
const results = await Promise.all(promises);
totalEdited += results.reduce((sum, n) => sum + n, 0);
// Save state after each concurrent chunk completes
saveLabelingState(state);
} }
state.lastRunTime = new Date().toISOString(); state.lastRunTime = new Date().toISOString();

View file

@ -27,28 +27,36 @@ import path from 'path';
const VALID_STEPS = ['label', 'graph', 'tag'] as const; const VALID_STEPS = ['label', 'graph', 'tag'] as const;
type Step = typeof VALID_STEPS[number]; type Step = typeof VALID_STEPS[number];
function parseArgs(): { workdir: string; steps: Step[] } { function parseArgs(): { workdir: string; steps: Step[]; concurrency: number } {
const args = process.argv.slice(2); const args = process.argv.slice(2);
let workdir: string | undefined; let workdir: string | undefined;
let stepsRaw: string | undefined; let stepsRaw: string | undefined;
let concurrency = 3;
for (let i = 0; i < args.length; i++) { for (let i = 0; i < args.length; i++) {
if (args[i] === '--workdir' && args[i + 1]) { if (args[i] === '--workdir' && args[i + 1]) {
workdir = args[++i]; workdir = args[++i];
} else if (args[i] === '--steps' && args[i + 1]) { } else if (args[i] === '--steps' && args[i + 1]) {
stepsRaw = args[++i]; stepsRaw = args[++i];
} else if (args[i] === '--concurrency' && args[i + 1]) {
concurrency = parseInt(args[++i], 10);
if (isNaN(concurrency) || concurrency < 1) {
console.error('Error: --concurrency must be a positive integer');
process.exit(1);
}
} else if (args[i] === '--help' || args[i] === '-h') { } else if (args[i] === '--help' || args[i] === '-h') {
console.log(` console.log(`
Usage: run_pipeline --workdir <path> [--steps label,graph,tag] Usage: run_pipeline --workdir <path> [--steps label,graph,tag] [--concurrency N]
Options: Options:
--workdir <path> Working directory containing gmail_sync/ folder (required) --workdir <path> Working directory containing gmail_sync/ folder (required)
--steps <list> Comma-separated steps to run: label, graph, tag (default: all) --steps <list> Comma-separated steps to run: label, graph, tag (default: all)
--concurrency <N> Number of parallel batches for labeling (default: 3)
--help, -h Show this help message --help, -h Show this help message
Examples: Examples:
run_pipeline --workdir ./my-emails run_pipeline --workdir ./my-emails
run_pipeline --workdir ./my-emails --steps label run_pipeline --workdir ./my-emails --steps label --concurrency 5
run_pipeline --workdir ./my-emails --steps label,graph run_pipeline --workdir ./my-emails --steps label,graph
run_pipeline --workdir ./my-emails --steps graph,tag run_pipeline --workdir ./my-emails --steps graph,tag
`); `);
@ -83,10 +91,10 @@ Examples:
steps = [...VALID_STEPS]; steps = [...VALID_STEPS];
} }
return { workdir, steps }; return { workdir, steps, concurrency };
} }
const { workdir, steps } = parseArgs(); const { workdir, steps, concurrency } = parseArgs();
// Set env BEFORE importing core modules (WorkDir is read at module load time) // Set env BEFORE importing core modules (WorkDir is read at module load time)
process.env.ROWBOAT_WORKDIR = workdir; process.env.ROWBOAT_WORKDIR = workdir;
@ -96,6 +104,7 @@ process.env.ROWBOAT_WORKDIR = workdir;
async function main() { async function main() {
console.log(`[Pipeline] Working directory: ${workdir}`); console.log(`[Pipeline] Working directory: ${workdir}`);
console.log(`[Pipeline] Steps to run: ${steps.join(', ')}`); console.log(`[Pipeline] Steps to run: ${steps.join(', ')}`);
console.log(`[Pipeline] Concurrency: ${concurrency}`);
console.log(); console.log();
// Verify gmail_sync exists if label or graph step is requested // Verify gmail_sync exists if label or graph step is requested
@ -109,7 +118,7 @@ async function main() {
if (steps.includes('label')) { if (steps.includes('label')) {
console.log('[Pipeline] === Step 1: Email Labeling ==='); console.log('[Pipeline] === Step 1: Email Labeling ===');
const { processUnlabeledEmails } = await import('./label_emails.js'); const { processUnlabeledEmails } = await import('./label_emails.js');
await processUnlabeledEmails(); await processUnlabeledEmails(concurrency);
console.log(); console.log();
} }