mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-25 00:16:29 +02:00
llm prioritization on local models
This commit is contained in:
parent
884b5d0414
commit
1dabb971c4
4 changed files with 93 additions and 0 deletions
|
|
@ -15,6 +15,7 @@ import { isBlocked, extractCommandNames } from "../application/lib/command-execu
|
|||
import container from "../di/container.js";
|
||||
import { IModelConfigRepo } from "../models/repo.js";
|
||||
import { createProvider } from "../models/models.js";
|
||||
import { markChatActive, markChatIdle, waitIfChatActive } from "../models/llm-queue.js";
|
||||
import { isSignedIn } from "../account/account.js";
|
||||
import { getGatewayProvider } from "../models/gateway.js";
|
||||
import { IAgentsRepo } from "./repo.js";
|
||||
|
|
@ -857,6 +858,8 @@ export async function* streamAgent({
|
|||
const knowledgeGraphAgents = ["note_creation", "email-draft", "meeting-prep", "labeling_agent", "note_tagging_agent", "agent_notes_agent"];
|
||||
const isKgAgent = knowledgeGraphAgents.includes(state.agentName!);
|
||||
const isInlineTaskAgent = state.agentName === "inline_task_agent";
|
||||
const isBackgroundAgent = isKgAgent || isInlineTaskAgent;
|
||||
const providerFlavor = modelConfig.provider.flavor;
|
||||
const defaultModel = signedIn ? "gpt-5.4" : modelConfig.model;
|
||||
const defaultKgModel = signedIn ? "gpt-5.4-mini" : defaultModel;
|
||||
const defaultInlineTaskModel = signedIn ? "gpt-5.4" : defaultModel;
|
||||
|
|
@ -1062,6 +1065,13 @@ export async function* streamAgent({
|
|||
instructionsWithDateTime += `\n\n# Search\nThe user has requested a search. Use the web-search tool to answer their query.`;
|
||||
}
|
||||
let streamError: string | null = null;
|
||||
// Local-provider prioritization: background agents yield to active chat
|
||||
if (isBackgroundAgent) {
|
||||
await waitIfChatActive(providerFlavor, signal);
|
||||
} else {
|
||||
markChatActive();
|
||||
}
|
||||
try {
|
||||
for await (const event of streamLlm(
|
||||
model,
|
||||
state.messages,
|
||||
|
|
@ -1087,6 +1097,11 @@ export async function* streamAgent({
|
|||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!isBackgroundAgent) {
|
||||
markChatIdle();
|
||||
}
|
||||
}
|
||||
|
||||
// build and emit final message from agent response
|
||||
const message = messageBuilder.get();
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import fs from 'fs';
|
|||
import path from 'path';
|
||||
import { CronExpressionParser } from 'cron-parser';
|
||||
import { generateText } from 'ai';
|
||||
import { waitIfChatActive } from '../models/llm-queue.js';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { createRun, createMessage, fetchRun } from '../runs/runs.js';
|
||||
import { bus } from '../runs/bus.js';
|
||||
|
|
@ -692,6 +693,7 @@ Default end time (local): ${localEnd}
|
|||
Respond with ONLY valid JSON: either a schedule object or null. No other text.`;
|
||||
|
||||
try {
|
||||
await waitIfChatActive(config.provider.flavor);
|
||||
const result = await generateText({
|
||||
model,
|
||||
system: systemPrompt,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { generateText } from 'ai';
|
||||
import { waitIfChatActive } from '../models/llm-queue.js';
|
||||
import container from '../di/container.js';
|
||||
import type { IModelConfigRepo } from '../models/repo.js';
|
||||
import { createProvider } from '../models/models.js';
|
||||
|
|
@ -159,6 +160,7 @@ export async function summarizeMeeting(transcript: string, meetingStartTime?: st
|
|||
|
||||
const prompt = `Meeting recording started at: ${meetingStartTime || 'unknown'}\n\n${transcript}${calendarContext}`;
|
||||
|
||||
await waitIfChatActive(config.provider.flavor);
|
||||
const result = await generateText({
|
||||
model,
|
||||
system: SYSTEM_PROMPT,
|
||||
|
|
|
|||
74
apps/x/packages/core/src/models/llm-queue.ts
Normal file
74
apps/x/packages/core/src/models/llm-queue.ts
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Local-provider-aware LLM request prioritization.
|
||||
*
|
||||
* When the configured provider is local (Ollama, local openai-compatible),
|
||||
* background services (knowledge graph, email labeling, etc.) pause their
|
||||
* LLM calls while an interactive chat stream is in progress. This prevents
|
||||
* background inference from competing with the user's chat on a single GPU/CPU.
|
||||
*
|
||||
* Cloud providers bypass this entirely — they handle concurrency fine.
|
||||
*/
|
||||
|
||||
let chatActiveCount = 0;
|
||||
let chatIdleResolvers: Array<() => void> = [];
|
||||
|
||||
/**
|
||||
* Call when an interactive chat LLM stream starts.
|
||||
* Nestable — supports concurrent interactive streams.
|
||||
*/
|
||||
export function markChatActive(): void {
|
||||
chatActiveCount++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call when an interactive chat LLM stream ends.
|
||||
* When all interactive streams finish, waiting background tasks resume.
|
||||
*/
|
||||
export function markChatIdle(): void {
|
||||
chatActiveCount = Math.max(0, chatActiveCount - 1);
|
||||
if (chatActiveCount === 0) {
|
||||
const resolvers = chatIdleResolvers;
|
||||
chatIdleResolvers = [];
|
||||
for (const resolve of resolvers) {
|
||||
resolve();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the provider flavor represents a local inference server.
|
||||
*/
|
||||
export function isLocalProvider(flavor: string): boolean {
|
||||
return flavor === "ollama" || flavor === "openai-compatible";
|
||||
}
|
||||
|
||||
/**
|
||||
* Background services call this before each LLM request.
|
||||
* - If the provider is cloud-based, returns immediately.
|
||||
* - If the provider is local and no chat is active, returns immediately.
|
||||
* - If the provider is local and chat IS active, waits until chat finishes.
|
||||
*
|
||||
* @param providerFlavor - The provider flavor string (e.g. "ollama", "openai", "anthropic")
|
||||
* @param signal - Optional AbortSignal to cancel the wait
|
||||
*/
|
||||
export function waitIfChatActive(providerFlavor: string, signal?: AbortSignal): Promise<void> {
|
||||
if (!isLocalProvider(providerFlavor)) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
if (chatActiveCount === 0) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
const idx = chatIdleResolvers.indexOf(wrappedResolve);
|
||||
if (idx !== -1) chatIdleResolvers.splice(idx, 1);
|
||||
reject(signal!.reason ?? new DOMException("Aborted", "AbortError"));
|
||||
};
|
||||
const wrappedResolve = () => {
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
};
|
||||
chatIdleResolvers.push(wrappedResolve);
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue