mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-25 00:16:29 +02:00
refactor waitForRunCompletion, extractAgentResponse
This commit is contained in:
parent
490b14ad58
commit
b462643e6d
7 changed files with 46 additions and 112 deletions
40
apps/x/packages/core/src/agents/utils.ts
Normal file
40
apps/x/packages/core/src/agents/utils.ts
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
import { bus } from "../runs/bus.js";
|
||||
import { fetchRun } from "../runs/runs.js";
|
||||
|
||||
/**
|
||||
* Extract the assistant's final text response from a run's log.
|
||||
* @param runId
|
||||
* @returns The assistant's final text response or null if not found.
|
||||
*/
|
||||
export async function extractAgentResponse(runId: string): Promise<string | null> {
|
||||
const run = await fetchRun(runId);
|
||||
for (let i = run.log.length - 1; i >= 0; i--) {
|
||||
const event = run.log[i];
|
||||
if (event.type === 'message' && event.message.role === 'assistant') {
|
||||
const content = event.message.content;
|
||||
if (typeof content === 'string') return content;
|
||||
if (Array.isArray(content)) {
|
||||
const text = content
|
||||
.filter((p) => p.type === 'text')
|
||||
.map((p) => 'text' in p ? p.text : '')
|
||||
.join('');
|
||||
return text || null;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
export async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -3,7 +3,7 @@ import path from 'path';
|
|||
import { google } from 'googleapis';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import { waitForRunCompletion } from '../agents/utils.js';
|
||||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { loadUserConfig, updateUserEmail } from '../config/user_config.js';
|
||||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
|
|
@ -190,19 +190,6 @@ function extractConversationMessages(runFilePath: string): { role: string; text:
|
|||
return messages;
|
||||
}
|
||||
|
||||
// --- Wait for agent run completion ---
|
||||
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// --- User email resolution ---
|
||||
|
||||
async function ensureUserEmail(): Promise<string | null> {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import path from 'path';
|
|||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import { waitForRunCompletion } from '../agents/utils.js';
|
||||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||
import {
|
||||
loadState,
|
||||
|
|
@ -185,20 +186,6 @@ async function readFileContents(filePaths: string[]): Promise<{ path: string; co
|
|||
return files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run note creation agent on a batch of files to extract entities and create/update notes
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -4,11 +4,11 @@ import { CronExpressionParser } from 'cron-parser';
|
|||
import { generateText } from 'ai';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage, fetchRun } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import container from '../di/container.js';
|
||||
import type { IModelConfigRepo } from '../models/repo.js';
|
||||
import { createProvider } from '../models/models.js';
|
||||
import { inlineTask } from '@x/shared';
|
||||
import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js';
|
||||
|
||||
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
|
||||
const INLINE_TASK_AGENT = 'inline_task_agent';
|
||||
|
|
@ -129,46 +129,6 @@ function scanDirectoryRecursive(dir: string): string[] {
|
|||
return files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the assistant's final text response from a run's log.
|
||||
*/
|
||||
async function extractAgentResponse(runId: string): Promise<string | null> {
|
||||
const run = await fetchRun(runId);
|
||||
// Walk backwards through the log to find the last assistant message
|
||||
for (let i = run.log.length - 1; i >= 0; i--) {
|
||||
const event = run.log[i];
|
||||
if (event.type === 'message' && event.message.role === 'assistant') {
|
||||
const content = event.message.content;
|
||||
if (typeof content === 'string') {
|
||||
return content;
|
||||
}
|
||||
// Content may be an array of parts — concatenate text parts
|
||||
if (Array.isArray(content)) {
|
||||
const text = content
|
||||
.filter((p) => p.type === 'text')
|
||||
.map((p) => (p as { type: 'text'; text: string }).text)
|
||||
.join('');
|
||||
return text || null;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
interface InlineTask {
|
||||
instruction: string;
|
||||
schedule: InlineTaskSchedule | null;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import path from 'path';
|
|||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import { waitForRunCompletion } from '../agents/utils.js';
|
||||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import {
|
||||
|
|
@ -62,20 +63,6 @@ function getUnlabeledEmails(state: LabelingState): string[] {
|
|||
return unlabeled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Label a batch of email files using the labeling agent
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import path from 'path';
|
|||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import { waitForRunCompletion } from '../agents/utils.js';
|
||||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import {
|
||||
|
|
@ -75,20 +76,6 @@ function getUntaggedNotes(state: NoteTaggingState): string[] {
|
|||
return untagged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tag a batch of note files using the tagging agent
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import fs from 'fs';
|
|||
import path from 'path';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
import { waitForRunCompletion } from '../agents/utils.js';
|
||||
import {
|
||||
loadConfig,
|
||||
loadState,
|
||||
|
|
@ -18,20 +18,6 @@ import { PREBUILT_AGENTS } from './types.js';
|
|||
const CHECK_INTERVAL_MS = 60 * 1000; // Check every minute which agents need to run
|
||||
const PREBUILT_DIR = path.join(WorkDir, 'pre-built');
|
||||
|
||||
/**
|
||||
* Wait for a run to complete by listening for run-processing-end event
|
||||
*/
|
||||
async function waitForRunCompletion(runId: string): Promise<void> {
|
||||
return new Promise(async (resolve) => {
|
||||
const unsubscribe = await bus.subscribe('*', async (event) => {
|
||||
if (event.type === 'run-processing-end' && event.runId === runId) {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a pre-built agent by name
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue