billing for composio triggers

This commit is contained in:
Ramnique Singh 2025-08-14 22:15:21 +05:30
parent bf29af3083
commit 9f5d8f08de
5 changed files with 99 additions and 90 deletions

View file

@ -6,6 +6,7 @@ export const UsageTypeKey = z.enum([
"LLM_USAGE", "LLM_USAGE",
"EMBEDDING_MODEL_USAGE", "EMBEDDING_MODEL_USAGE",
"COMPOSIO_TOOL_USAGE", "COMPOSIO_TOOL_USAGE",
"COMPOSIO_TRIGGER_USAGE",
"FIRECRAWL_SCRAPE_USAGE", "FIRECRAWL_SCRAPE_USAGE",
]); ]);
@ -30,6 +31,12 @@ export const ComposioToolUsage = z.object({
context: z.string(), context: z.string(),
}); });
export const ComposioTriggerUsage = z.object({
type: z.literal(UsageTypeKey.Enum.COMPOSIO_TRIGGER_USAGE),
triggerSlug: z.string(),
context: z.string(),
});
export const FirecrawlScrapeUsage = z.object({ export const FirecrawlScrapeUsage = z.object({
type: z.literal(UsageTypeKey.Enum.FIRECRAWL_SCRAPE_USAGE), type: z.literal(UsageTypeKey.Enum.FIRECRAWL_SCRAPE_USAGE),
context: z.string(), context: z.string(),
@ -39,6 +46,7 @@ export const UsageItem = z.discriminatedUnion("type", [
LLMUsage, LLMUsage,
EmbeddingModelUsage, EmbeddingModelUsage,
ComposioToolUsage, ComposioToolUsage,
ComposioTriggerUsage,
FirecrawlScrapeUsage, FirecrawlScrapeUsage,
]); ]);

View file

@ -42,6 +42,14 @@ export interface IComposioTriggerDeploymentsRepository {
* @returns Promise resolving to the deployment if found, null if not found * @returns Promise resolving to the deployment if found, null if not found
*/ */
fetch(id: string): Promise<z.infer<typeof ComposioTriggerDeployment> | null>; fetch(id: string): Promise<z.infer<typeof ComposioTriggerDeployment> | null>;
/**
* Fetches a trigger deployment by its Composio trigger ID.
*
* @param triggerId - The unique identifier of the Composio trigger
* @returns Promise resolving to the deployment if found, null if not found
*/
fetchByComposioTriggerId(triggerId: string): Promise<z.infer<typeof ComposioTriggerDeployment> | null>;
/** /**
* Deletes a Composio trigger deployment by its ID. * Deletes a Composio trigger deployment by its ID.
@ -70,16 +78,6 @@ export interface IComposioTriggerDeploymentsRepository {
*/ */
listByProjectId(projectId: string, cursor?: string, limit?: number): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ComposioTriggerDeployment>>>>; listByProjectId(projectId: string, cursor?: string, limit?: number): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ComposioTriggerDeployment>>>>;
/**
* Retrieves all trigger deployments for a specific trigger.
*
* @param triggerId - The identifier of the trigger
* @param cursor - Optional cursor for pagination
* @param limit - Optional limit for the number of items to return
* @returns Promise resolving to a paginated list of deployments for the specified trigger
*/
listByTriggerId(triggerId: string, cursor?: string, limit?: number): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ComposioTriggerDeployment>>>>;
/** /**
* Deletes all trigger deployments associated with a specific connected account. * Deletes all trigger deployments associated with a specific connected account.
* *

View file

@ -2,11 +2,14 @@ import { IJobsRepository } from "@/src/application/repositories/jobs.repository.
import { IComposioTriggerDeploymentsRepository } from "@/src/application/repositories/composio-trigger-deployments.repository.interface"; import { IComposioTriggerDeploymentsRepository } from "@/src/application/repositories/composio-trigger-deployments.repository.interface";
import { createHmac, timingSafeEqual } from "crypto"; import { createHmac, timingSafeEqual } from "crypto";
import { z } from "zod"; import { z } from "zod";
import { BadRequestError } from "@/src/entities/errors/common"; import { BadRequestError, BillingError, NotFoundError } from "@/src/entities/errors/common";
import { UserMessage } from "@/app/lib/types/types"; import { UserMessage } from "@/app/lib/types/types";
import { PrefixLogger } from "@/app/lib/utils"; import { PrefixLogger } from "@/app/lib/utils";
import { IProjectsRepository } from "@/src/application/repositories/projects.repository.interface"; import { IProjectsRepository } from "@/src/application/repositories/projects.repository.interface";
import { IPubSubService } from "@/src/application/services/pub-sub.service.interface"; import { IPubSubService } from "@/src/application/services/pub-sub.service.interface";
import { authorize, logUsage } from "@/app/lib/billing";
import { getCustomerIdForProject } from "@/app/lib/billing";
import { USE_BILLING } from "@/app/lib/feature_flags";
const WEBHOOK_SECRET = process.env.COMPOSIO_TRIGGERS_WEBHOOK_SECRET || "test"; const WEBHOOK_SECRET = process.env.COMPOSIO_TRIGGERS_WEBHOOK_SECRET || "test";
@ -91,60 +94,70 @@ export class HandleCompsioWebhookRequestUseCase implements IHandleCompsioWebhook
const logger = new PrefixLogger(`composio-trigger-webhook-[${event.type}]-[${event.data.trigger_nano_id}]`); const logger = new PrefixLogger(`composio-trigger-webhook-[${event.type}]-[${event.data.trigger_nano_id}]`);
// create a job for each deployment across all pages // fetch trigger deployment data from db
const msg: z.infer<typeof UserMessage> = { const deployment = await this.composioTriggerDeploymentsRepository.fetchByComposioTriggerId(event.data.trigger_nano_id);
role: "user", if (!deployment) {
content: `This chat is being invoked through a trigger. Here is the trigger data:\n\n${JSON.stringify(event, null, 2)}`, throw new BadRequestError("Trigger not found");
}; }
// fetch registered trigger deployments for this event type const { projectId } = deployment;
let cursor: string | null = null;
let jobs = 0;
do {
const triggerDeployments = await this.composioTriggerDeploymentsRepository.listByTriggerId(event.data.trigger_nano_id, cursor || undefined);
// create a job for each deployment in the current page // Check billing auth
for (const deployment of triggerDeployments.items) { if (USE_BILLING) {
// fetch project // get billing customer id for project
const project = await this.projectsRepository.fetch(deployment.projectId); const billingCustomerId = await getCustomerIdForProject(projectId);
if (!project) {
logger.log(`Project ${deployment.projectId} not found`);
continue;
}
// ensure workflow // validate enough credits
if (!project.liveWorkflow) { const result = await authorize(billingCustomerId, {
logger.log(`Project ${deployment.projectId} has no live workflow`); type: "use_credits"
continue; });
} if (!result.success) {
throw new BillingError("Not enough credits");
// create job
const job = await this.jobsRepository.create({
reason: {
type: "composio_trigger",
triggerId: event.data.trigger_nano_id,
triggerDeploymentId: deployment.id,
triggerTypeSlug: deployment.triggerTypeSlug,
payload: event,
},
projectId: deployment.projectId,
input: {
messages: [msg],
},
});
// notify workers
await this.pubSubService.publish('new_jobs', job.id);
jobs++;
logger.log(`Created job ${job.id} for trigger deployment ${deployment.id}`);
} }
// check if there are more pages // log usage for composio trigger
cursor = triggerDeployments.nextCursor; await logUsage(billingCustomerId, {
} while (cursor); items: [{
type: "COMPOSIO_TRIGGER_USAGE",
triggerSlug: deployment.triggerTypeSlug,
context: "trigger.composio",
}],
});
}
logger.log(`Created ${jobs} jobs for trigger ${event.data.trigger_nano_id}`); // fetch project
const project = await this.projectsRepository.fetch(deployment.projectId);
if (!project) {
throw new NotFoundError("Project not found");
}
// ensure workflow
if (!project.liveWorkflow) {
throw new BadRequestError("Project has no live workflow");
}
// create job
const job = await this.jobsRepository.create({
reason: {
type: "composio_trigger",
triggerId: event.data.trigger_nano_id,
triggerDeploymentId: deployment.id,
triggerTypeSlug: deployment.triggerTypeSlug,
payload: event,
},
projectId: deployment.projectId,
input: {
messages: [{
role: "user",
content: `This chat is being invoked through a trigger. Here is the trigger data:\n\n${JSON.stringify(event, null, 2)}`,
}],
},
});
// notify workers
await this.pubSubService.publish('new_jobs', job.id);
logger.log(`Created job ${job.id} for trigger deployment ${deployment.id}`);
} }
private verifySignature(headers: Record<string, string>, payload: string): void { private verifySignature(headers: Record<string, string>, payload: string): void {

View file

@ -94,6 +94,9 @@ export class JobsWorker implements IJobsWorker {
logger.log(`Received event: ${event.type}`); logger.log(`Received event: ${event.type}`);
if (event.type === "done") { if (event.type === "done") {
turn = event.turn; turn = event.turn;
} else if (event.type === "error") {
logger.log(`Error: ${event.error}`);
throw new Error(event.error);
} }
} }
if (!turn) { if (!turn) {

View file

@ -81,6 +81,24 @@ export class MongodbComposioTriggerDeploymentsRepository implements IComposioTri
}; };
} }
/**
* Fetches a trigger deployment by its Composio trigger ID.
*/
async fetchByComposioTriggerId(triggerId: string): Promise<z.infer<typeof ComposioTriggerDeployment> | null> {
const result = await this.collection.findOne({ triggerId });
if (!result) {
return null;
}
const { _id, ...rest } = result;
return {
...rest,
id: _id.toString(),
};
}
/** /**
* Deletes a Composio trigger deployment by its ID. * Deletes a Composio trigger deployment by its ID.
*/ */
@ -144,37 +162,6 @@ export class MongodbComposioTriggerDeploymentsRepository implements IComposioTri
}; };
} }
/**
* Retrieves all trigger deployments for a specific trigger with pagination.
*/
async listByTriggerId(triggerId: string, cursor?: string, limit: number = 50): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ComposioTriggerDeployment>>>> {
const query: any = { triggerId };
if (cursor) {
query._id = { $gt: new ObjectId(cursor) };
}
const results = await this.collection
.find(query)
.sort({ _id: 1 })
.limit(limit + 1) // Fetch one extra to determine if there's a next page
.toArray();
const hasNextPage = results.length > limit;
const items = results.slice(0, limit).map(doc => {
const { _id, ...rest } = doc;
return {
...rest,
id: _id.toString(),
};
});
return {
items,
nextCursor: hasNextPage ? results[limit - 1]._id.toString() : null,
};
}
/** /**
* Deletes all trigger deployments associated with a specific connected account. * Deletes all trigger deployments associated with a specific connected account.
*/ */