diff --git a/apps/x/packages/core/src/agents/runtime.ts b/apps/x/packages/core/src/agents/runtime.ts index 34e2b401..014f67fc 100644 --- a/apps/x/packages/core/src/agents/runtime.ts +++ b/apps/x/packages/core/src/agents/runtime.ts @@ -15,6 +15,7 @@ import { isBlocked, extractCommandNames } from "../application/lib/command-execu import container from "../di/container.js"; import { IModelConfigRepo } from "../models/repo.js"; import { createProvider } from "../models/models.js"; +import { markChatActive, markChatIdle, waitIfChatActive } from "../models/llm-queue.js"; import { isSignedIn } from "../account/account.js"; import { getGatewayProvider } from "../models/gateway.js"; import { IAgentsRepo } from "./repo.js"; @@ -857,6 +858,8 @@ export async function* streamAgent({ const knowledgeGraphAgents = ["note_creation", "email-draft", "meeting-prep", "labeling_agent", "note_tagging_agent", "agent_notes_agent"]; const isKgAgent = knowledgeGraphAgents.includes(state.agentName!); const isInlineTaskAgent = state.agentName === "inline_task_agent"; + const isBackgroundAgent = isKgAgent || isInlineTaskAgent; + const providerFlavor = modelConfig.provider.flavor; const defaultModel = signedIn ? "gpt-5.4" : modelConfig.model; const defaultKgModel = signedIn ? "gpt-5.4-mini" : defaultModel; const defaultInlineTaskModel = signedIn ? "gpt-5.4" : defaultModel; @@ -1062,6 +1065,13 @@ export async function* streamAgent({ instructionsWithDateTime += `\n\n# Search\nThe user has requested a search. Use the web-search tool to answer their query.`; } let streamError: string | null = null; + // Local-provider prioritization: background agents yield to active chat + if (isBackgroundAgent) { + await waitIfChatActive(providerFlavor, signal); + } else { + markChatActive(); + } + try { for await (const event of streamLlm( model, state.messages, @@ -1087,6 +1097,11 @@ export async function* streamAgent({ break; } } + } finally { + if (!isBackgroundAgent) { + markChatIdle(); + } + } // build and emit final message from agent response const message = messageBuilder.get(); diff --git a/apps/x/packages/core/src/knowledge/inline_tasks.ts b/apps/x/packages/core/src/knowledge/inline_tasks.ts index 3f7c5ffa..fdc53fa1 100644 --- a/apps/x/packages/core/src/knowledge/inline_tasks.ts +++ b/apps/x/packages/core/src/knowledge/inline_tasks.ts @@ -2,6 +2,7 @@ import fs from 'fs'; import path from 'path'; import { CronExpressionParser } from 'cron-parser'; import { generateText } from 'ai'; +import { waitIfChatActive } from '../models/llm-queue.js'; import { WorkDir } from '../config/config.js'; import { createRun, createMessage, fetchRun } from '../runs/runs.js'; import { bus } from '../runs/bus.js'; @@ -692,6 +693,7 @@ Default end time (local): ${localEnd} Respond with ONLY valid JSON: either a schedule object or null. No other text.`; try { + await waitIfChatActive(config.provider.flavor); const result = await generateText({ model, system: systemPrompt, diff --git a/apps/x/packages/core/src/knowledge/summarize_meeting.ts b/apps/x/packages/core/src/knowledge/summarize_meeting.ts index 30e3c5d4..60ce37aa 100644 --- a/apps/x/packages/core/src/knowledge/summarize_meeting.ts +++ b/apps/x/packages/core/src/knowledge/summarize_meeting.ts @@ -1,6 +1,7 @@ import fs from 'fs'; import path from 'path'; import { generateText } from 'ai'; +import { waitIfChatActive } from '../models/llm-queue.js'; import container from '../di/container.js'; import type { IModelConfigRepo } from '../models/repo.js'; import { createProvider } from '../models/models.js'; @@ -159,6 +160,7 @@ export async function summarizeMeeting(transcript: string, meetingStartTime?: st const prompt = `Meeting recording started at: ${meetingStartTime || 'unknown'}\n\n${transcript}${calendarContext}`; + await waitIfChatActive(config.provider.flavor); const result = await generateText({ model, system: SYSTEM_PROMPT, diff --git a/apps/x/packages/core/src/models/llm-queue.ts b/apps/x/packages/core/src/models/llm-queue.ts new file mode 100644 index 00000000..2b48a128 --- /dev/null +++ b/apps/x/packages/core/src/models/llm-queue.ts @@ -0,0 +1,74 @@ +/** + * Local-provider-aware LLM request prioritization. + * + * When the configured provider is local (Ollama, local openai-compatible), + * background services (knowledge graph, email labeling, etc.) pause their + * LLM calls while an interactive chat stream is in progress. This prevents + * background inference from competing with the user's chat on a single GPU/CPU. + * + * Cloud providers bypass this entirely — they handle concurrency fine. + */ + +let chatActiveCount = 0; +let chatIdleResolvers: Array<() => void> = []; + +/** + * Call when an interactive chat LLM stream starts. + * Nestable — supports concurrent interactive streams. + */ +export function markChatActive(): void { + chatActiveCount++; +} + +/** + * Call when an interactive chat LLM stream ends. + * When all interactive streams finish, waiting background tasks resume. + */ +export function markChatIdle(): void { + chatActiveCount = Math.max(0, chatActiveCount - 1); + if (chatActiveCount === 0) { + const resolvers = chatIdleResolvers; + chatIdleResolvers = []; + for (const resolve of resolvers) { + resolve(); + } + } +} + +/** + * Returns true if the provider flavor represents a local inference server. + */ +export function isLocalProvider(flavor: string): boolean { + return flavor === "ollama" || flavor === "openai-compatible"; +} + +/** + * Background services call this before each LLM request. + * - If the provider is cloud-based, returns immediately. + * - If the provider is local and no chat is active, returns immediately. + * - If the provider is local and chat IS active, waits until chat finishes. + * + * @param providerFlavor - The provider flavor string (e.g. "ollama", "openai", "anthropic") + * @param signal - Optional AbortSignal to cancel the wait + */ +export function waitIfChatActive(providerFlavor: string, signal?: AbortSignal): Promise { + if (!isLocalProvider(providerFlavor)) { + return Promise.resolve(); + } + if (chatActiveCount === 0) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + const onAbort = () => { + const idx = chatIdleResolvers.indexOf(wrappedResolve); + if (idx !== -1) chatIdleResolvers.splice(idx, 1); + reject(signal!.reason ?? new DOMException("Aborted", "AbortError")); + }; + const wrappedResolve = () => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }; + chatIdleResolvers.push(wrappedResolve); + signal?.addEventListener("abort", onAbort, { once: true }); + }); +}