ddd refactor: copilot streaming

This commit is contained in:
Ramnique Singh 2025-08-23 12:27:58 +05:30
parent 531f9feea6
commit 8f40b5fb33
9 changed files with 352 additions and 104 deletions

View file

@ -12,6 +12,7 @@ import { CURRENT_WORKFLOW_PROMPT } from "./current_workflow";
import { USE_COMPOSIO_TOOLS } from "@/app/lib/feature_flags";
import { composio, getTool } from "../composio/composio";
import { UsageTracker } from "@/app/lib/billing";
import { CopilotStreamEvent } from "@/src/entities/models/copilot";
const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || '';
const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined;
@ -35,30 +36,6 @@ const openai = createOpenAI({
compatibility: "strict",
});
const ZTextEvent = z.object({
content: z.string(),
});
const ZToolCallEvent = z.object({
type: z.literal('tool-call'),
toolName: z.string(),
toolCallId: z.string(),
args: z.record(z.any()),
query: z.string().optional(),
});
const ZToolResultEvent = z.object({
type: z.literal('tool-result'),
toolCallId: z.string(),
result: z.any(),
});
const ZDoneEvent = z.object({
done: z.literal(true),
});
const ZEvent = z.union([ZTextEvent, ZToolCallEvent, ZToolResultEvent, ZDoneEvent]);
const composioToolSearchToolSuggestion = z.object({
toolkit: z.string(),
tool_slug: z.string(),
@ -273,7 +250,7 @@ export async function* streamMultiAgentResponse(
messages: z.infer<typeof CopilotMessage>[],
workflow: z.infer<typeof Workflow>,
dataSources: z.infer<typeof DataSourceSchemaForCopilot>[]
): AsyncIterable<z.infer<typeof ZEvent>> {
): AsyncIterable<z.infer<typeof CopilotStreamEvent>> {
const logger = new PrefixLogger('copilot /stream');
logger.log('context', context);
logger.log('projectId', projectId);
@ -375,9 +352,4 @@ export async function* streamMultiAgentResponse(
projectId,
totalChunks: chunkCount
});
// done
yield {
done: true,
};
}

View file

@ -0,0 +1,87 @@
import { z } from "zod";
import { nanoid } from 'nanoid';
import { ICacheService } from '@/src/application/services/cache.service.interface';
import { IUsageQuotaPolicy } from '@/src/application/policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '@/src/application/policies/project-action-authorization.policy';
import { CopilotChatContext, CopilotMessage, DataSourceSchemaForCopilot } from '@/src/entities/models/copilot';
import { Workflow } from '@/app/lib/types/workflow_types';
import { USE_BILLING } from "@/app/lib/feature_flags";
import { authorize, getCustomerIdForProject } from "@/app/lib/billing";
import { BillingError } from "@/src/entities/errors/common";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
data: z.object({
projectId: z.string(),
messages: z.array(CopilotMessage),
workflow: Workflow,
context: CopilotChatContext.nullable(),
dataSources: z.array(DataSourceSchemaForCopilot).optional(),
}),
});
export interface ICreateCopilotCachedTurnUseCase {
execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }>;
}
export class CreateCopilotCachedTurnUseCase implements ICreateCopilotCachedTurnUseCase {
private readonly cacheService: ICacheService;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
cacheService,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
cacheService: ICacheService,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.cacheService = cacheService;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }> {
const { projectId } = data.data;
// check auth
await this.projectActionAuthorizationPolicy.authorize({
projectId,
caller: data.caller,
userId: data.userId,
apiKey: data.apiKey,
});
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// check billing authorization
if (USE_BILLING) {
// get billing customer id for this project
const billingCustomerId = await getCustomerIdForProject(projectId);
// validate enough credits
const result = await authorize(billingCustomerId, {
type: "use_credits"
});
if (!result.success) {
throw new BillingError(result.error || 'Billing error');
}
}
// serialize request
const payload = JSON.stringify(data.data);
// create unique id for stream
const key = nanoid();
// store in cache
await this.cacheService.set(`copilot-stream-${key}`, payload, 60 * 10); // expire in 10 minutes
return {
key,
}
}
}

View file

@ -0,0 +1,104 @@
import { z } from "zod";
import { ICacheService } from '@/src/application/services/cache.service.interface';
import { IUsageQuotaPolicy } from '@/src/application/policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '@/src/application/policies/project-action-authorization.policy';
import { CopilotAPIRequest, CopilotStreamEvent } from '@/src/entities/models/copilot';
import { USE_BILLING } from "@/app/lib/feature_flags";
import { authorize, getCustomerIdForProject, logUsage, UsageTracker } from "@/app/lib/billing";
import { BillingError, NotFoundError } from "@/src/entities/errors/common";
import { streamMultiAgentResponse } from "@/src/application/lib/copilot/copilot";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
key: z.string(),
});
export interface IRunCopilotCachedTurnUseCase {
execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof CopilotStreamEvent>, void, unknown>;
}
export class RunCopilotCachedTurnUseCase implements IRunCopilotCachedTurnUseCase {
private readonly cacheService: ICacheService;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
cacheService,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
cacheService: ICacheService,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.cacheService = cacheService;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async *execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof CopilotStreamEvent>, void, unknown> {
// fetch cached turn
const lookupKey = `copilot-stream-${data.key}`;
const payload = await this.cacheService.get(lookupKey);
if (!payload) {
throw new NotFoundError('Cached turn not found');
}
// delete from cache
await this.cacheService.delete(lookupKey);
// parse cached turn
const cachedTurn = CopilotAPIRequest.parse(JSON.parse(payload));
const { projectId } = cachedTurn;
// check auth
await this.projectActionAuthorizationPolicy.authorize({
projectId,
caller: data.caller,
userId: data.userId,
apiKey: data.apiKey,
});
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// check billing authorization
let billingCustomerId: string | null = null;
if (USE_BILLING) {
// get billing customer id for this project
billingCustomerId = await getCustomerIdForProject(projectId);
// validate enough credits
const result = await authorize(billingCustomerId, {
type: "use_credits"
});
if (!result.success) {
throw new BillingError(result.error || 'Billing error');
}
}
// init usage tracking
const usageTracker = new UsageTracker();
try {
for await (const event of streamMultiAgentResponse(
usageTracker,
projectId,
cachedTurn.context,
cachedTurn.messages,
cachedTurn.workflow,
cachedTurn.dataSources || [],
)) {
yield event;
}
} finally {
if (USE_BILLING && billingCustomerId) {
await logUsage(billingCustomerId, {
items: usageTracker.flush(),
});
}
}
}
}