refactor and add local execution profile

This commit is contained in:
Arjun 2026-02-12 15:11:50 +05:30 committed by Ramnique Singh
parent e1d50c62da
commit 1a82226ea6
21 changed files with 380 additions and 53 deletions

View file

@ -15,8 +15,8 @@ import { CopilotAgent } from "../application/assistant/agent.js";
import { isBlocked } from "../application/lib/command-executor.js";
import container from "../di/container.js";
import { IModelConfigRepo } from "../models/repo.js";
import { createProvider } from "../models/models.js";
import { IAgentsRepo } from "./repo.js";
import type { ILlmService } from "../execution/llm-service.js";
import { IMonotonicallyIncreasingIdGenerator } from "../application/lib/id-gen.js";
import { IBus } from "../application/lib/bus.js";
import { IMessageQueue } from "../application/lib/message-queue.js";
@ -41,6 +41,7 @@ export class AgentRuntime implements IAgentRuntime {
private modelConfigRepo: IModelConfigRepo;
private runsLock: IRunsLock;
private abortRegistry: IAbortRegistry;
private llmService: ILlmService;
constructor({
runsRepo,
@ -50,6 +51,7 @@ export class AgentRuntime implements IAgentRuntime {
modelConfigRepo,
runsLock,
abortRegistry,
llmService,
}: {
runsRepo: IRunsRepo;
idGenerator: IMonotonicallyIncreasingIdGenerator;
@ -58,6 +60,7 @@ export class AgentRuntime implements IAgentRuntime {
modelConfigRepo: IModelConfigRepo;
runsLock: IRunsLock;
abortRegistry: IAbortRegistry;
llmService: ILlmService;
}) {
this.runsRepo = runsRepo;
this.idGenerator = idGenerator;
@ -66,6 +69,7 @@ export class AgentRuntime implements IAgentRuntime {
this.modelConfigRepo = modelConfigRepo;
this.runsLock = runsLock;
this.abortRegistry = abortRegistry;
this.llmService = llmService;
}
async trigger(runId: string): Promise<void> {
@ -104,6 +108,7 @@ export class AgentRuntime implements IAgentRuntime {
modelConfigRepo: this.modelConfigRepo,
signal,
abortRegistry: this.abortRegistry,
llmService: this.llmService,
})) {
eventCount++;
if (event.type !== "llm-stream-event") {
@ -629,6 +634,7 @@ export async function* streamAgent({
modelConfigRepo,
signal,
abortRegistry,
llmService,
}: {
state: AgentState,
idGenerator: IMonotonicallyIncreasingIdGenerator;
@ -637,6 +643,7 @@ export async function* streamAgent({
modelConfigRepo: IModelConfigRepo;
signal: AbortSignal;
abortRegistry: IAbortRegistry;
llmService: ILlmService;
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const logger = new PrefixLogger(`run-${runId}-${state.agentName}`);
@ -657,8 +664,7 @@ export async function* streamAgent({
const tools = await buildTools(agent);
// set up provider + model
const provider = createProvider(modelConfig.provider);
const model = provider.languageModel(modelConfig.model);
const model = llmService.getLanguageModel(modelConfig);
let loopCounter = 0;
while (true) {
@ -731,6 +737,7 @@ export async function* streamAgent({
modelConfigRepo,
signal,
abortRegistry,
llmService,
})) {
yield* processEvent({
...event,

View file

@ -13,12 +13,12 @@ import * as workspace from "../../workspace/workspace.js";
import { IAgentsRepo } from "../../agents/repo.js";
import { WorkDir } from "../../config/config.js";
import { composioAccountsRepo } from "../../composio/repo.js";
import { executeAction as executeComposioAction, isConfigured as isComposioConfigured, listToolkitTools } from "../../composio/client.js";
import { slackToolCatalog } from "../assistant/skills/slack/tool-catalog.js";
import type { ToolContext } from "./exec-tool.js";
import { generateText } from "ai";
import { createProvider } from "../../models/models.js";
import { IModelConfigRepo } from "../../models/repo.js";
import type { ILlmService } from "../../execution/llm-service.js";
import type { IComposioService } from "../../execution/composio-service.js";
// Parser libraries are loaded dynamically inside parseFile.execute()
// to avoid pulling pdfjs-dist's DOM polyfills into the main bundle.
// Import paths are computed so esbuild cannot statically resolve them.
@ -144,8 +144,9 @@ async function executeSlackTool(
return { success: false, error: 'Slack is not connected' };
}
try {
const composioService = container.resolve<IComposioService>('composioService');
const toolSlug = await resolveSlackToolSlug(hintKey);
return await executeComposioAction(toolSlug, account.id, compactObject(params));
return await composioService.executeAction(toolSlug, account.id, compactObject(params));
} catch (error) {
return {
success: false,
@ -249,7 +250,8 @@ const resolveSlackToolSlug = async (hintKey: keyof typeof slackToolHints) => {
const allSlug = resolveFromTools(slackToolCatalog);
if (!allSlug) {
const fallback = await listToolkitTools("slack", hint.search || null);
const composioService = container.resolve<IComposioService>('composioService');
const fallback = await composioService.listToolkitTools("slack", hint.search || null);
const fallbackSlug = resolveFromTools(fallback.items || []);
if (!fallbackSlug) {
throw new Error(`Unable to resolve Slack tool for ${hintKey}. Try slack-listAvailableTools.`);
@ -858,11 +860,11 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
const base64 = buffer.toString('base64');
// Resolve model config from DI container
// Resolve model config and LLM service from DI container
const modelConfigRepo = container.resolve<IModelConfigRepo>('modelConfigRepo');
const modelConfig = await modelConfigRepo.getConfig();
const provider = createProvider(modelConfig.provider);
const model = provider.languageModel(modelConfig.model);
const llmService = container.resolve<ILlmService>('llmService');
const model = llmService.getLanguageModel(modelConfig);
const userPrompt = prompt || 'Convert this file to well-structured markdown.';
@ -1117,7 +1119,8 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
description: 'Check if Slack is connected and ready to use. Use this before other Slack operations.',
inputSchema: z.object({}),
execute: async () => {
if (!isComposioConfigured()) {
const composioService = container.resolve<IComposioService>('composioService');
if (!composioService.isConfigured()) {
return {
connected: false,
error: 'Composio is not configured. Please set up your Composio API key first.',
@ -1143,12 +1146,13 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
search: z.string().optional().describe('Optional search query to filter tools (e.g., "message", "channel", "user")'),
}),
execute: async ({ search }: { search?: string }) => {
if (!isComposioConfigured()) {
const composioService = container.resolve<IComposioService>('composioService');
if (!composioService.isConfigured()) {
return { success: false, error: 'Composio is not configured' };
}
try {
const result = await listToolkitTools('slack', search || null);
const result = await composioService.listToolkitTools('slack', search || null);
return {
success: true,
tools: result.items,
@ -1176,7 +1180,8 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
}
try {
const result = await executeComposioAction(toolSlug, account.id, input);
const composioService = container.resolve<IComposioService>('composioService');
const result = await composioService.executeAction(toolSlug, account.id, input);
return result;
} catch (error) {
return {

View file

@ -14,6 +14,14 @@ import { FSGranolaConfigRepo, IGranolaConfigRepo } from "../knowledge/granola/re
import { IAbortRegistry, InMemoryAbortRegistry } from "../runs/abort-registry.js";
import { FSAgentScheduleRepo, IAgentScheduleRepo } from "../agent-schedule/repo.js";
import { FSAgentScheduleStateRepo, IAgentScheduleStateRepo } from "../agent-schedule/state-repo.js";
import type { ILlmService } from "../execution/llm-service.js";
import type { IGmailService } from "../execution/gmail-service.js";
import type { ISttService } from "../execution/stt-service.js";
import type { IComposioService } from "../execution/composio-service.js";
import { LocalLlmService } from "../execution/local/local-llm-service.js";
import { LocalGmailService } from "../execution/local/local-gmail-service.js";
import { LocalSttService } from "../execution/local/local-stt-service.js";
import { LocalComposioService } from "../execution/local/local-composio-service.js";
const container = createContainer({
injectionMode: InjectionMode.PROXY,
@ -37,6 +45,11 @@ container.register({
granolaConfigRepo: asClass<IGranolaConfigRepo>(FSGranolaConfigRepo).singleton(),
agentScheduleRepo: asClass<IAgentScheduleRepo>(FSAgentScheduleRepo).singleton(),
agentScheduleStateRepo: asClass<IAgentScheduleStateRepo>(FSAgentScheduleStateRepo).singleton(),
llmService: asClass<ILlmService>(LocalLlmService).singleton(),
gmailService: asClass<IGmailService>(LocalGmailService).singleton(),
sttService: asClass<ISttService>(LocalSttService).singleton(),
composioService: asClass<IComposioService>(LocalComposioService).singleton(),
});
export default container;

View file

@ -0,0 +1,51 @@
import { z } from "zod";
import {
ZCreateAuthConfigRequest,
ZCreateAuthConfigResponse,
ZAuthConfig,
ZConnectedAccount,
ZCreateConnectedAccountRequest,
ZCreateConnectedAccountResponse,
ZDeleteOperationResponse,
ZExecuteActionResponse,
ZListResponse,
ZToolkit,
} from "../composio/types.js";
export interface IComposioService {
isConfigured(): boolean;
setApiKey(apiKey: string): void;
executeAction(
actionSlug: string,
connectedAccountId: string,
input: Record<string, unknown>,
): Promise<z.infer<typeof ZExecuteActionResponse>>;
listToolkits(
cursor?: string | null,
): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZToolkit>>>>;
getToolkit(toolkitSlug: string): Promise<z.infer<typeof ZToolkit>>;
listToolkitTools(
toolkitSlug: string,
searchQuery?: string | null,
): Promise<{ items: Array<{ slug: string; name: string; description: string }> }>;
listAuthConfigs(
toolkitSlug: string,
cursor?: string | null,
managedOnly?: boolean,
): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZAuthConfig>>>>;
createAuthConfig(
request: z.infer<typeof ZCreateAuthConfigRequest>,
): Promise<z.infer<typeof ZCreateAuthConfigResponse>>;
deleteAuthConfig(
authConfigId: string,
): Promise<z.infer<typeof ZDeleteOperationResponse>>;
createConnectedAccount(
request: z.infer<typeof ZCreateConnectedAccountRequest>,
): Promise<z.infer<typeof ZCreateConnectedAccountResponse>>;
getConnectedAccount(
connectedAccountId: string,
): Promise<z.infer<typeof ZConnectedAccount>>;
deleteConnectedAccount(
connectedAccountId: string,
): Promise<z.infer<typeof ZDeleteOperationResponse>>;
}

View file

@ -0,0 +1,30 @@
import { ExecutionProfile } from "@x/shared/dist/execution-profile.js";
import { ILlmService } from "./llm-service.js";
import { IGmailService } from "./gmail-service.js";
import { ISttService } from "./stt-service.js";
import { IComposioService } from "./composio-service.js";
import { LocalLlmService } from "./local/local-llm-service.js";
import { LocalGmailService } from "./local/local-gmail-service.js";
import { LocalSttService } from "./local/local-stt-service.js";
import { LocalComposioService } from "./local/local-composio-service.js";
export interface ExecutionServices {
llm: ILlmService;
gmail: IGmailService;
stt: ISttService;
composio: IComposioService;
}
export function createServices(profile: ExecutionProfile): ExecutionServices {
switch (profile.mode) {
case "local":
return {
llm: new LocalLlmService(),
gmail: new LocalGmailService(),
stt: new LocalSttService(),
composio: new LocalComposioService(),
};
default:
throw new Error(`Unsupported execution profile mode: ${(profile as { mode: string }).mode}`);
}
}

View file

@ -0,0 +1,4 @@
export interface IGmailService {
init(): void;
triggerSync(): void;
}

View file

@ -0,0 +1,12 @@
export type { ILlmService } from "./llm-service.js";
export type { IGmailService } from "./gmail-service.js";
export type { ISttService } from "./stt-service.js";
export type { IComposioService } from "./composio-service.js";
export { createServices } from "./factory.js";
export type { ExecutionServices } from "./factory.js";
// Local implementations
export { LocalLlmService } from "./local/local-llm-service.js";
export { LocalGmailService } from "./local/local-gmail-service.js";
export { LocalSttService } from "./local/local-stt-service.js";
export { LocalComposioService } from "./local/local-composio-service.js";

View file

@ -0,0 +1,12 @@
import { LanguageModel } from "ai";
import { z } from "zod";
import { LlmModelConfig, LlmProvider } from "@x/shared/dist/models.js";
export interface ILlmService {
getLanguageModel(config: z.infer<typeof LlmModelConfig>): LanguageModel;
testConnection(
provider: z.infer<typeof LlmProvider>,
model: string,
timeoutMs?: number,
): Promise<{ success: boolean; error?: string }>;
}

View file

@ -0,0 +1,88 @@
import { z } from "zod";
import { IComposioService } from "../composio-service.js";
import * as composioClient from "../../composio/client.js";
import {
ZCreateAuthConfigRequest,
ZCreateAuthConfigResponse,
ZAuthConfig,
ZConnectedAccount,
ZCreateConnectedAccountRequest,
ZCreateConnectedAccountResponse,
ZDeleteOperationResponse,
ZExecuteActionResponse,
ZListResponse,
ZToolkit,
} from "../../composio/types.js";
export class LocalComposioService implements IComposioService {
isConfigured(): boolean {
return composioClient.isConfigured();
}
setApiKey(apiKey: string): void {
composioClient.setApiKey(apiKey);
}
async executeAction(
actionSlug: string,
connectedAccountId: string,
input: Record<string, unknown>,
): Promise<z.infer<typeof ZExecuteActionResponse>> {
return composioClient.executeAction(actionSlug, connectedAccountId, input);
}
async listToolkits(
cursor: string | null = null,
): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZToolkit>>>> {
return composioClient.listToolkits(cursor);
}
async getToolkit(toolkitSlug: string): Promise<z.infer<typeof ZToolkit>> {
return composioClient.getToolkit(toolkitSlug);
}
async listToolkitTools(
toolkitSlug: string,
searchQuery: string | null = null,
): Promise<{ items: Array<{ slug: string; name: string; description: string }> }> {
return composioClient.listToolkitTools(toolkitSlug, searchQuery);
}
async listAuthConfigs(
toolkitSlug: string,
cursor: string | null = null,
managedOnly: boolean = false,
): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZAuthConfig>>>> {
return composioClient.listAuthConfigs(toolkitSlug, cursor, managedOnly);
}
async createAuthConfig(
request: z.infer<typeof ZCreateAuthConfigRequest>,
): Promise<z.infer<typeof ZCreateAuthConfigResponse>> {
return composioClient.createAuthConfig(request);
}
async deleteAuthConfig(
authConfigId: string,
): Promise<z.infer<typeof ZDeleteOperationResponse>> {
return composioClient.deleteAuthConfig(authConfigId);
}
async createConnectedAccount(
request: z.infer<typeof ZCreateConnectedAccountRequest>,
): Promise<z.infer<typeof ZCreateConnectedAccountResponse>> {
return composioClient.createConnectedAccount(request);
}
async getConnectedAccount(
connectedAccountId: string,
): Promise<z.infer<typeof ZConnectedAccount>> {
return composioClient.getConnectedAccount(connectedAccountId);
}
async deleteConnectedAccount(
connectedAccountId: string,
): Promise<z.infer<typeof ZDeleteOperationResponse>> {
return composioClient.deleteConnectedAccount(connectedAccountId);
}
}

View file

@ -0,0 +1,12 @@
import { IGmailService } from "../gmail-service.js";
import { init, triggerSync } from "../../knowledge/sync_gmail.js";
export class LocalGmailService implements IGmailService {
init(): void {
init();
}
triggerSync(): void {
triggerSync();
}
}

View file

@ -0,0 +1,20 @@
import { LanguageModel } from "ai";
import { z } from "zod";
import { LlmModelConfig, LlmProvider } from "@x/shared/dist/models.js";
import { ILlmService } from "../llm-service.js";
import { createProvider, testModelConnection } from "../../models/models.js";
export class LocalLlmService implements ILlmService {
getLanguageModel(config: z.infer<typeof LlmModelConfig>): LanguageModel {
const provider = createProvider(config.provider);
return provider.languageModel(config.model);
}
async testConnection(
providerConfig: z.infer<typeof LlmProvider>,
model: string,
timeoutMs?: number,
): Promise<{ success: boolean; error?: string }> {
return testModelConnection(providerConfig, model, timeoutMs);
}
}

View file

@ -0,0 +1,46 @@
import fs from "fs";
import path from "path";
import { ISttService } from "../stt-service.js";
import { WorkDir } from "../../config/config.js";
export class LocalSttService implements ISttService {
async transcribe(audioBase64: string, mimeType: string): Promise<string | null> {
try {
const configPath = path.join(WorkDir, 'config', 'deepgram.json');
if (!fs.existsSync(configPath)) {
throw new Error('Deepgram config not found');
}
const configData = fs.readFileSync(configPath, 'utf-8');
const { apiKey } = JSON.parse(configData) as { apiKey: string };
if (!apiKey) throw new Error('No apiKey in deepgram.json');
const audioBuffer = Buffer.from(audioBase64, 'base64');
const response = await fetch(
'https://api.deepgram.com/v1/listen?model=nova-2&smart_format=true',
{
method: 'POST',
headers: {
Authorization: `Token ${apiKey}`,
'Content-Type': mimeType,
},
body: audioBuffer,
},
);
if (!response.ok) throw new Error(`Deepgram API error: ${response.status}`);
const result = await response.json() as {
results?: {
channels?: Array<{
alternatives?: Array<{ transcript?: string }>;
}>;
};
};
return result.results?.channels?.[0]?.alternatives?.[0]?.transcript ?? null;
} catch (err) {
console.error('Deepgram transcription failed:', err);
return null;
}
}
}

View file

@ -0,0 +1,3 @@
export interface ISttService {
transcribe(audioBase64: string, mimeType: string): Promise<string | null>;
}

View file

@ -0,0 +1,15 @@
import { z } from 'zod';
export const ExecutionProfile = z.discriminatedUnion('mode', [
z.object({ mode: z.literal('local') }),
z.object({
mode: z.literal('cloud'),
session: z.object({
accessToken: z.string(),
refreshToken: z.string().optional(),
userId: z.string(),
}),
}),
]);
export type ExecutionProfile = z.infer<typeof ExecutionProfile>;

View file

@ -7,4 +7,5 @@ export * as mcp from './mcp.js';
export * as agentSchedule from './agent-schedule.js';
export * as agentScheduleState from './agent-schedule-state.js';
export * as serviceEvents from './service-events.js';
export * as executionProfile from './execution-profile.js';
export { PrefixLogger };

View file

@ -395,6 +395,10 @@ const ipcSchemas = {
req: z.object({ path: z.string() }),
res: z.object({ data: z.string(), mimeType: z.string(), size: z.number() }),
},
'stt:transcribe': {
req: z.object({ audioBase64: z.string(), mimeType: z.string() }),
res: z.object({ transcript: z.string().nullable() }),
},
} as const;
// ============================================================================