diff --git a/apps/x/packages/core/src/agents/utils.ts b/apps/x/packages/core/src/agents/utils.ts new file mode 100644 index 00000000..bd4c84af --- /dev/null +++ b/apps/x/packages/core/src/agents/utils.ts @@ -0,0 +1,40 @@ +import { bus } from "../runs/bus.js"; +import { fetchRun } from "../runs/runs.js"; + +/** + * Extract the assistant's final text response from a run's log. + * @param runId + * @returns The assistant's final text response or null if not found. + */ +export async function extractAgentResponse(runId: string): Promise { + const run = await fetchRun(runId); + for (let i = run.log.length - 1; i >= 0; i--) { + const event = run.log[i]; + if (event.type === 'message' && event.message.role === 'assistant') { + const content = event.message.content; + if (typeof content === 'string') return content; + if (Array.isArray(content)) { + const text = content + .filter((p) => p.type === 'text') + .map((p) => 'text' in p ? p.text : '') + .join(''); + return text || null; + } + } + } + return null; +} + +/** + * Wait for a run to complete by listening for run-processing-end event + */ +export async function waitForRunCompletion(runId: string): Promise { + return new Promise(async (resolve) => { + const unsubscribe = await bus.subscribe('*', async (event) => { + if (event.type === 'run-processing-end' && event.runId === runId) { + unsubscribe(); + resolve(); + } + }); + }); +} \ No newline at end of file diff --git a/apps/x/packages/core/src/knowledge/agent_notes.ts b/apps/x/packages/core/src/knowledge/agent_notes.ts index 5ec3e801..16307bb5 100644 --- a/apps/x/packages/core/src/knowledge/agent_notes.ts +++ b/apps/x/packages/core/src/knowledge/agent_notes.ts @@ -3,7 +3,7 @@ import path from 'path'; import { google } from 'googleapis'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; -import { bus } from '../runs/bus.js'; +import { waitForRunCompletion } from '../agents/utils.js'; import { serviceLogger } from '../services/service_logger.js'; import { loadUserConfig, updateUserEmail } from '../config/user_config.js'; import { GoogleClientFactory } from './google-client-factory.js'; @@ -190,19 +190,6 @@ function extractConversationMessages(runFilePath: string): { role: string; text: return messages; } -// --- Wait for agent run completion --- - -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - // --- User email resolution --- async function ensureUserEmail(): Promise { diff --git a/apps/x/packages/core/src/knowledge/build_graph.ts b/apps/x/packages/core/src/knowledge/build_graph.ts index f408a844..06fd1194 100644 --- a/apps/x/packages/core/src/knowledge/build_graph.ts +++ b/apps/x/packages/core/src/knowledge/build_graph.ts @@ -3,6 +3,7 @@ import path from 'path'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; +import { waitForRunCompletion } from '../agents/utils.js'; import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; import { loadState, @@ -185,20 +186,6 @@ async function readFileContents(filePaths: string[]): Promise<{ path: string; co return files; } -/** - * Wait for a run to complete by listening for run-processing-end event - */ -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - /** * Run note creation agent on a batch of files to extract entities and create/update notes */ diff --git a/apps/x/packages/core/src/knowledge/inline_tasks.ts b/apps/x/packages/core/src/knowledge/inline_tasks.ts index 3f7c5ffa..01d22352 100644 --- a/apps/x/packages/core/src/knowledge/inline_tasks.ts +++ b/apps/x/packages/core/src/knowledge/inline_tasks.ts @@ -4,11 +4,11 @@ import { CronExpressionParser } from 'cron-parser'; import { generateText } from 'ai'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage, fetchRun } from '../runs/runs.js'; -import { bus } from '../runs/bus.js'; import container from '../di/container.js'; import type { IModelConfigRepo } from '../models/repo.js'; import { createProvider } from '../models/models.js'; import { inlineTask } from '@x/shared'; +import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js'; const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds const INLINE_TASK_AGENT = 'inline_task_agent'; @@ -129,46 +129,6 @@ function scanDirectoryRecursive(dir: string): string[] { return files; } -/** - * Wait for a run to complete by listening for run-processing-end event - */ -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - -/** - * Extract the assistant's final text response from a run's log. - */ -async function extractAgentResponse(runId: string): Promise { - const run = await fetchRun(runId); - // Walk backwards through the log to find the last assistant message - for (let i = run.log.length - 1; i >= 0; i--) { - const event = run.log[i]; - if (event.type === 'message' && event.message.role === 'assistant') { - const content = event.message.content; - if (typeof content === 'string') { - return content; - } - // Content may be an array of parts — concatenate text parts - if (Array.isArray(content)) { - const text = content - .filter((p) => p.type === 'text') - .map((p) => (p as { type: 'text'; text: string }).text) - .join(''); - return text || null; - } - } - } - return null; -} - interface InlineTask { instruction: string; schedule: InlineTaskSchedule | null; diff --git a/apps/x/packages/core/src/knowledge/label_emails.ts b/apps/x/packages/core/src/knowledge/label_emails.ts index 68bca5a1..98b10c2f 100644 --- a/apps/x/packages/core/src/knowledge/label_emails.ts +++ b/apps/x/packages/core/src/knowledge/label_emails.ts @@ -3,6 +3,7 @@ import path from 'path'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; +import { waitForRunCompletion } from '../agents/utils.js'; import { serviceLogger } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; import { @@ -62,20 +63,6 @@ function getUnlabeledEmails(state: LabelingState): string[] { return unlabeled; } -/** - * Wait for a run to complete by listening for run-processing-end event - */ -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - /** * Label a batch of email files using the labeling agent */ diff --git a/apps/x/packages/core/src/knowledge/tag_notes.ts b/apps/x/packages/core/src/knowledge/tag_notes.ts index 39d46a3b..8fdabb86 100644 --- a/apps/x/packages/core/src/knowledge/tag_notes.ts +++ b/apps/x/packages/core/src/knowledge/tag_notes.ts @@ -3,6 +3,7 @@ import path from 'path'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; +import { waitForRunCompletion } from '../agents/utils.js'; import { serviceLogger } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; import { @@ -75,20 +76,6 @@ function getUntaggedNotes(state: NoteTaggingState): string[] { return untagged; } -/** - * Wait for a run to complete by listening for run-processing-end event - */ -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - /** * Tag a batch of note files using the tagging agent */ diff --git a/apps/x/packages/core/src/pre_built/runner.ts b/apps/x/packages/core/src/pre_built/runner.ts index 4856dd7f..c1985380 100644 --- a/apps/x/packages/core/src/pre_built/runner.ts +++ b/apps/x/packages/core/src/pre_built/runner.ts @@ -2,7 +2,7 @@ import fs from 'fs'; import path from 'path'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage } from '../runs/runs.js'; -import { bus } from '../runs/bus.js'; +import { waitForRunCompletion } from '../agents/utils.js'; import { loadConfig, loadState, @@ -18,20 +18,6 @@ import { PREBUILT_AGENTS } from './types.js'; const CHECK_INTERVAL_MS = 60 * 1000; // Check every minute which agents need to run const PREBUILT_DIR = path.join(WorkDir, 'pre-built'); -/** - * Wait for a run to complete by listening for run-processing-end event - */ -async function waitForRunCompletion(runId: string): Promise { - return new Promise(async (resolve) => { - const unsubscribe = await bus.subscribe('*', async (event) => { - if (event.type === 'run-processing-end' && event.runId === runId) { - unsubscribe(); - resolve(); - } - }); - }); -} - /** * Run a pre-built agent by name */