From 9f5d8f08debc3ff8e71a54de24e92d834eef1252 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Thu, 14 Aug 2025 22:15:21 +0530 Subject: [PATCH] billing for composio triggers --- apps/rowboat/app/lib/types/billing_types.ts | 8 ++ ...rigger-deployments.repository.interface.ts | 18 ++- ...andle-composio-webhook-request.use-case.ts | 111 ++++++++++-------- .../src/application/workers/jobs.worker.ts | 3 + ...composio-trigger-deployments.repository.ts | 49 +++----- 5 files changed, 99 insertions(+), 90 deletions(-) diff --git a/apps/rowboat/app/lib/types/billing_types.ts b/apps/rowboat/app/lib/types/billing_types.ts index 3371d335..adb13a8b 100644 --- a/apps/rowboat/app/lib/types/billing_types.ts +++ b/apps/rowboat/app/lib/types/billing_types.ts @@ -6,6 +6,7 @@ export const UsageTypeKey = z.enum([ "LLM_USAGE", "EMBEDDING_MODEL_USAGE", "COMPOSIO_TOOL_USAGE", + "COMPOSIO_TRIGGER_USAGE", "FIRECRAWL_SCRAPE_USAGE", ]); @@ -30,6 +31,12 @@ export const ComposioToolUsage = z.object({ context: z.string(), }); +export const ComposioTriggerUsage = z.object({ + type: z.literal(UsageTypeKey.Enum.COMPOSIO_TRIGGER_USAGE), + triggerSlug: z.string(), + context: z.string(), +}); + export const FirecrawlScrapeUsage = z.object({ type: z.literal(UsageTypeKey.Enum.FIRECRAWL_SCRAPE_USAGE), context: z.string(), @@ -39,6 +46,7 @@ export const UsageItem = z.discriminatedUnion("type", [ LLMUsage, EmbeddingModelUsage, ComposioToolUsage, + ComposioTriggerUsage, FirecrawlScrapeUsage, ]); diff --git a/apps/rowboat/src/application/repositories/composio-trigger-deployments.repository.interface.ts b/apps/rowboat/src/application/repositories/composio-trigger-deployments.repository.interface.ts index 8aba0da1..d013ea22 100644 --- a/apps/rowboat/src/application/repositories/composio-trigger-deployments.repository.interface.ts +++ b/apps/rowboat/src/application/repositories/composio-trigger-deployments.repository.interface.ts @@ -42,6 +42,14 @@ export interface IComposioTriggerDeploymentsRepository { * @returns Promise resolving to the deployment if found, null if not found */ fetch(id: string): Promise | null>; + + /** + * Fetches a trigger deployment by its Composio trigger ID. + * + * @param triggerId - The unique identifier of the Composio trigger + * @returns Promise resolving to the deployment if found, null if not found + */ + fetchByComposioTriggerId(triggerId: string): Promise | null>; /** * Deletes a Composio trigger deployment by its ID. @@ -70,16 +78,6 @@ export interface IComposioTriggerDeploymentsRepository { */ listByProjectId(projectId: string, cursor?: string, limit?: number): Promise>>>; - /** - * Retrieves all trigger deployments for a specific trigger. - * - * @param triggerId - The identifier of the trigger - * @param cursor - Optional cursor for pagination - * @param limit - Optional limit for the number of items to return - * @returns Promise resolving to a paginated list of deployments for the specified trigger - */ - listByTriggerId(triggerId: string, cursor?: string, limit?: number): Promise>>>; - /** * Deletes all trigger deployments associated with a specific connected account. * diff --git a/apps/rowboat/src/application/use-cases/composio/webhook/handle-composio-webhook-request.use-case.ts b/apps/rowboat/src/application/use-cases/composio/webhook/handle-composio-webhook-request.use-case.ts index e9fee78d..20316e4f 100644 --- a/apps/rowboat/src/application/use-cases/composio/webhook/handle-composio-webhook-request.use-case.ts +++ b/apps/rowboat/src/application/use-cases/composio/webhook/handle-composio-webhook-request.use-case.ts @@ -2,11 +2,14 @@ import { IJobsRepository } from "@/src/application/repositories/jobs.repository. import { IComposioTriggerDeploymentsRepository } from "@/src/application/repositories/composio-trigger-deployments.repository.interface"; import { createHmac, timingSafeEqual } from "crypto"; import { z } from "zod"; -import { BadRequestError } from "@/src/entities/errors/common"; +import { BadRequestError, BillingError, NotFoundError } from "@/src/entities/errors/common"; import { UserMessage } from "@/app/lib/types/types"; import { PrefixLogger } from "@/app/lib/utils"; import { IProjectsRepository } from "@/src/application/repositories/projects.repository.interface"; import { IPubSubService } from "@/src/application/services/pub-sub.service.interface"; +import { authorize, logUsage } from "@/app/lib/billing"; +import { getCustomerIdForProject } from "@/app/lib/billing"; +import { USE_BILLING } from "@/app/lib/feature_flags"; const WEBHOOK_SECRET = process.env.COMPOSIO_TRIGGERS_WEBHOOK_SECRET || "test"; @@ -91,60 +94,70 @@ export class HandleCompsioWebhookRequestUseCase implements IHandleCompsioWebhook const logger = new PrefixLogger(`composio-trigger-webhook-[${event.type}]-[${event.data.trigger_nano_id}]`); - // create a job for each deployment across all pages - const msg: z.infer = { - role: "user", - content: `This chat is being invoked through a trigger. Here is the trigger data:\n\n${JSON.stringify(event, null, 2)}`, - }; + // fetch trigger deployment data from db + const deployment = await this.composioTriggerDeploymentsRepository.fetchByComposioTriggerId(event.data.trigger_nano_id); + if (!deployment) { + throw new BadRequestError("Trigger not found"); + } - // fetch registered trigger deployments for this event type - let cursor: string | null = null; - let jobs = 0; - do { - const triggerDeployments = await this.composioTriggerDeploymentsRepository.listByTriggerId(event.data.trigger_nano_id, cursor || undefined); + const { projectId } = deployment; - // create a job for each deployment in the current page - for (const deployment of triggerDeployments.items) { - // fetch project - const project = await this.projectsRepository.fetch(deployment.projectId); - if (!project) { - logger.log(`Project ${deployment.projectId} not found`); - continue; - } + // Check billing auth + if (USE_BILLING) { + // get billing customer id for project + const billingCustomerId = await getCustomerIdForProject(projectId); - // ensure workflow - if (!project.liveWorkflow) { - logger.log(`Project ${deployment.projectId} has no live workflow`); - continue; - } - - // create job - const job = await this.jobsRepository.create({ - reason: { - type: "composio_trigger", - triggerId: event.data.trigger_nano_id, - triggerDeploymentId: deployment.id, - triggerTypeSlug: deployment.triggerTypeSlug, - payload: event, - }, - projectId: deployment.projectId, - input: { - messages: [msg], - }, - }); - - // notify workers - await this.pubSubService.publish('new_jobs', job.id); - - jobs++; - logger.log(`Created job ${job.id} for trigger deployment ${deployment.id}`); + // validate enough credits + const result = await authorize(billingCustomerId, { + type: "use_credits" + }); + if (!result.success) { + throw new BillingError("Not enough credits"); } - // check if there are more pages - cursor = triggerDeployments.nextCursor; - } while (cursor); + // log usage for composio trigger + await logUsage(billingCustomerId, { + items: [{ + type: "COMPOSIO_TRIGGER_USAGE", + triggerSlug: deployment.triggerTypeSlug, + context: "trigger.composio", + }], + }); + } - logger.log(`Created ${jobs} jobs for trigger ${event.data.trigger_nano_id}`); + // fetch project + const project = await this.projectsRepository.fetch(deployment.projectId); + if (!project) { + throw new NotFoundError("Project not found"); + } + + // ensure workflow + if (!project.liveWorkflow) { + throw new BadRequestError("Project has no live workflow"); + } + + // create job + const job = await this.jobsRepository.create({ + reason: { + type: "composio_trigger", + triggerId: event.data.trigger_nano_id, + triggerDeploymentId: deployment.id, + triggerTypeSlug: deployment.triggerTypeSlug, + payload: event, + }, + projectId: deployment.projectId, + input: { + messages: [{ + role: "user", + content: `This chat is being invoked through a trigger. Here is the trigger data:\n\n${JSON.stringify(event, null, 2)}`, + }], + }, + }); + + // notify workers + await this.pubSubService.publish('new_jobs', job.id); + + logger.log(`Created job ${job.id} for trigger deployment ${deployment.id}`); } private verifySignature(headers: Record, payload: string): void { diff --git a/apps/rowboat/src/application/workers/jobs.worker.ts b/apps/rowboat/src/application/workers/jobs.worker.ts index 450a6aba..ee67ab2a 100644 --- a/apps/rowboat/src/application/workers/jobs.worker.ts +++ b/apps/rowboat/src/application/workers/jobs.worker.ts @@ -94,6 +94,9 @@ export class JobsWorker implements IJobsWorker { logger.log(`Received event: ${event.type}`); if (event.type === "done") { turn = event.turn; + } else if (event.type === "error") { + logger.log(`Error: ${event.error}`); + throw new Error(event.error); } } if (!turn) { diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts index 5619650a..d6044eb5 100644 --- a/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts @@ -81,6 +81,24 @@ export class MongodbComposioTriggerDeploymentsRepository implements IComposioTri }; } + /** + * Fetches a trigger deployment by its Composio trigger ID. + */ + async fetchByComposioTriggerId(triggerId: string): Promise | null> { + const result = await this.collection.findOne({ triggerId }); + + if (!result) { + return null; + } + + const { _id, ...rest } = result; + + return { + ...rest, + id: _id.toString(), + }; + } + /** * Deletes a Composio trigger deployment by its ID. */ @@ -144,37 +162,6 @@ export class MongodbComposioTriggerDeploymentsRepository implements IComposioTri }; } - /** - * Retrieves all trigger deployments for a specific trigger with pagination. - */ - async listByTriggerId(triggerId: string, cursor?: string, limit: number = 50): Promise>>> { - const query: any = { triggerId }; - - if (cursor) { - query._id = { $gt: new ObjectId(cursor) }; - } - - const results = await this.collection - .find(query) - .sort({ _id: 1 }) - .limit(limit + 1) // Fetch one extra to determine if there's a next page - .toArray(); - - const hasNextPage = results.length > limit; - const items = results.slice(0, limit).map(doc => { - const { _id, ...rest } = doc; - return { - ...rest, - id: _id.toString(), - }; - }); - - return { - items, - nextCursor: hasNextPage ? results[limit - 1]._id.toString() : null, - }; - } - /** * Deletes all trigger deployments associated with a specific connected account. */