diff --git a/apps/rowboat/app/actions/copilot.actions.ts b/apps/rowboat/app/actions/copilot.actions.ts index 5dc6c897..5712e0af 100644 --- a/apps/rowboat/app/actions/copilot.actions.ts +++ b/apps/rowboat/app/actions/copilot.actions.ts @@ -28,7 +28,7 @@ export async function getCopilotResponseStream( streamId: string; } | { billingError: string }> { await projectAuthCheck(projectId); - await usageQuotaPolicy.assertAndConsume(projectId); + await usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // Check billing authorization const authResponse = await authorizeUserAction({ @@ -38,7 +38,7 @@ export async function getCopilotResponseStream( return { billingError: authResponse.error || 'Billing error' }; } - await usageQuotaPolicy.assertAndConsume(projectId); + await usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // prepare request const request: z.infer = { @@ -70,7 +70,7 @@ export async function getCopilotAgentInstructions( agentName: string, ): Promise { await projectAuthCheck(projectId); - await usageQuotaPolicy.assertAndConsume(projectId); + await usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // Check billing authorization const authResponse = await authorizeUserAction({ diff --git a/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts b/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts index b5996681..808cbe99 100644 --- a/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts +++ b/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts @@ -3,4 +3,10 @@ export function secondsToNextMinute(): number { const now = new Date(); const secondsUntilNextMinute = 60 - now.getSeconds(); return secondsUntilNextMinute; +} + +export function minutesToNextHour(): number { + const now = new Date(); + const minutesUntilNextHour = 60 - now.getMinutes(); + return minutesUntilNextHour; } \ No newline at end of file diff --git a/apps/rowboat/src/application/policies/usage-quota.policy.interface.ts b/apps/rowboat/src/application/policies/usage-quota.policy.interface.ts index 20618588..4e311d63 100644 --- a/apps/rowboat/src/application/policies/usage-quota.policy.interface.ts +++ b/apps/rowboat/src/application/policies/usage-quota.policy.interface.ts @@ -1,4 +1,21 @@ +import { QuotaExceededError } from "@/src/entities/errors/common"; + export interface IUsageQuotaPolicy { - // this method will throw a QuotaExceededError if the quota is exceeded - assertAndConsume(projectId: string): Promise; + /** + * Asserts that the project has not exceeded its usage quota and consumes the action. + * Used for general project actions. + * + * @param projectId - The ID of the project to assert and consume. + * @throws QuotaExceededError if the quota is exceeded. + */ + assertAndConsumeProjectAction(projectId: string): Promise; + + + /** + * Asserts that the project has not exceeded its usage quota for running jobs. + * + * @param projectId - The ID of the project to assert and consume. + * @throws QuotaExceededError if the quota is exceeded. + */ + assertAndConsumeRunJobAction(projectId: string): Promise; } \ No newline at end of file diff --git a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/create-composio-trigger-deployment.use-case.ts b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/create-composio-trigger-deployment.use-case.ts index fdd3bbfc..1b6c3786 100644 --- a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/create-composio-trigger-deployment.use-case.ts +++ b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/create-composio-trigger-deployment.use-case.ts @@ -59,7 +59,7 @@ export class CreateComposioTriggerDeploymentUseCase implements ICreateComposioTr }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // get trigger type info const triggerType = await getTriggersType(request.data.triggerTypeSlug); diff --git a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/delete-composio-trigger-deployment.use-case.ts b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/delete-composio-trigger-deployment.use-case.ts index f3d998f2..26373815 100644 --- a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/delete-composio-trigger-deployment.use-case.ts +++ b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/delete-composio-trigger-deployment.use-case.ts @@ -54,7 +54,7 @@ export class DeleteComposioTriggerDeploymentUseCase implements IDeleteComposioTr }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // ensure deployment belongs to this project const deployment = await this.composioTriggerDeploymentsRepository.fetch(request.deploymentId); diff --git a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/fetch-composio-trigger-deployment.use-case.ts b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/fetch-composio-trigger-deployment.use-case.ts index ed0be2de..32aa7909 100644 --- a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/fetch-composio-trigger-deployment.use-case.ts +++ b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/fetch-composio-trigger-deployment.use-case.ts @@ -53,7 +53,7 @@ export class FetchComposioTriggerDeploymentUseCase implements IFetchComposioTrig }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return deployment; } diff --git a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/list-composio-trigger-deployments.use-case.ts b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/list-composio-trigger-deployments.use-case.ts index d233b096..f915e187 100644 --- a/apps/rowboat/src/application/use-cases/composio-trigger-deployments/list-composio-trigger-deployments.use-case.ts +++ b/apps/rowboat/src/application/use-cases/composio-trigger-deployments/list-composio-trigger-deployments.use-case.ts @@ -51,7 +51,7 @@ export class ListComposioTriggerDeploymentsUseCase implements IListComposioTrigg }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch deployments for project return await this.composioTriggerDeploymentsRepository.listByProjectId(projectId, request.cursor, limit); diff --git a/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts index 3d7bdb3b..c122e7f4 100644 --- a/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts @@ -61,7 +61,7 @@ export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // create cache entry const key = nanoid(); diff --git a/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts index e3ac496a..d73ac340 100644 --- a/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts @@ -61,7 +61,7 @@ export class CreateConversationUseCase implements ICreateConversationUseCase { } // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // if workflow is not provided, fetch workflow if (!workflow) { diff --git a/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts index 273fdab3..3154f6f7 100644 --- a/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts @@ -68,7 +68,7 @@ export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // delete from cache await this.cacheService.delete(`turn-${data.key}`); diff --git a/apps/rowboat/src/application/use-cases/conversations/fetch-conversation.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/fetch-conversation.use-case.ts index b6a5ed26..0e1f5c01 100644 --- a/apps/rowboat/src/application/use-cases/conversations/fetch-conversation.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/fetch-conversation.use-case.ts @@ -54,7 +54,7 @@ export class FetchConversationUseCase implements IFetchConversationUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // return the conversation return conversation; diff --git a/apps/rowboat/src/application/use-cases/conversations/list-conversations.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/list-conversations.use-case.ts index d3b69afa..7891cc71 100644 --- a/apps/rowboat/src/application/use-cases/conversations/list-conversations.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/list-conversations.use-case.ts @@ -51,7 +51,7 @@ export class ListConversationsUseCase implements IListConversationsUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch conversations for project return await this.conversationsRepository.list(projectId, request.cursor, limit); diff --git a/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts index d8b05aed..635a960d 100644 --- a/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts @@ -62,7 +62,7 @@ export class RunConversationTurnUseCase implements IRunConversationTurnUseCase { } // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // Check billing auth let billingCustomerId: string | null = null; diff --git a/apps/rowboat/src/application/use-cases/data-sources/add-docs-to-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/add-docs-to-data-source.use-case.ts index be0cf3f3..2d732788 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/add-docs-to-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/add-docs-to-data-source.use-case.ts @@ -55,7 +55,7 @@ export class AddDocsToDataSourceUseCase implements IAddDocsToDataSourceUseCase { projectId: source.projectId, }); - await this.usageQuotaPolicy.assertAndConsume(source.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId); await this.dataSourceDocsRepository.bulkCreate(source.projectId, sourceId, docs); diff --git a/apps/rowboat/src/application/use-cases/data-sources/create-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/create-data-source.use-case.ts index 3c7c4dbb..7d5e7110 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/create-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/create-data-source.use-case.ts @@ -44,7 +44,7 @@ export class CreateDataSourceUseCase implements ICreateDataSourceUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); let _status = "pending"; // Only set status for non-file data sources diff --git a/apps/rowboat/src/application/use-cases/data-sources/delete-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/delete-data-source.use-case.ts index b09c38b0..9ef47942 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/delete-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/delete-data-source.use-case.ts @@ -49,7 +49,7 @@ export class DeleteDataSourceUseCase implements IDeleteDataSourceUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); await this.dataSourcesRepository.update(request.sourceId, { status: 'deleted', diff --git a/apps/rowboat/src/application/use-cases/data-sources/delete-doc-from-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/delete-doc-from-data-source.use-case.ts index f879f62d..7490a702 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/delete-doc-from-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/delete-doc-from-data-source.use-case.ts @@ -54,7 +54,7 @@ export class DeleteDocFromDataSourceUseCase implements IDeleteDocFromDataSourceU projectId: doc.projectId, }); - await this.usageQuotaPolicy.assertAndConsume(doc.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(doc.projectId); await this.dataSourceDocsRepository.markAsDeleted(docId); diff --git a/apps/rowboat/src/application/use-cases/data-sources/fetch-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/fetch-data-source.use-case.ts index f3d5f0a2..19b6a311 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/fetch-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/fetch-data-source.use-case.ts @@ -50,7 +50,7 @@ export class FetchDataSourceUseCase implements IFetchDataSourceUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return source; } diff --git a/apps/rowboat/src/application/use-cases/data-sources/get-download-url-for-file.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/get-download-url-for-file.use-case.ts index f3da5d6e..e45dabfe 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/get-download-url-for-file.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/get-download-url-for-file.use-case.ts @@ -58,7 +58,7 @@ export class GetDownloadUrlForFileUseCase implements IGetDownloadUrlForFileUseCa projectId: file.projectId, }); - await this.usageQuotaPolicy.assertAndConsume(file.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(file.projectId); if (file.data.type === 'file_local') { // use the file id instead of path here diff --git a/apps/rowboat/src/application/use-cases/data-sources/get-upload-urls-for-files.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/get-upload-urls-for-files.use-case.ts index d044d85a..348fa7bd 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/get-upload-urls-for-files.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/get-upload-urls-for-files.use-case.ts @@ -60,7 +60,7 @@ export class GetUploadUrlsForFilesUseCase implements IGetUploadUrlsForFilesUseCa projectId: source.projectId, }); - await this.usageQuotaPolicy.assertAndConsume(source.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId); const urls: { fileId: string, uploadUrl: string, path: string }[] = []; for (const file of files) { diff --git a/apps/rowboat/src/application/use-cases/data-sources/list-data-sources.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/list-data-sources.use-case.ts index 851266a0..99f70eae 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/list-data-sources.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/list-data-sources.use-case.ts @@ -44,7 +44,7 @@ export class ListDataSourcesUseCase implements IListDataSourcesUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // list all sources for now const sources = []; diff --git a/apps/rowboat/src/application/use-cases/data-sources/list-docs-in-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/list-docs-in-data-source.use-case.ts index 7a1eb9e4..94c66877 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/list-docs-in-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/list-docs-in-data-source.use-case.ts @@ -55,7 +55,7 @@ export class ListDocsInDataSourceUseCase implements IListDocsInDataSourceUseCase projectId: source.projectId, }); - await this.usageQuotaPolicy.assertAndConsume(source.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId); // fetch all docs const docs = []; diff --git a/apps/rowboat/src/application/use-cases/data-sources/recrawl-web-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/recrawl-web-data-source.use-case.ts index 6ebf11dc..d9572034 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/recrawl-web-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/recrawl-web-data-source.use-case.ts @@ -58,7 +58,7 @@ export class RecrawlWebDataSourceUseCase implements IRecrawlWebDataSourceUseCase projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); await this.dataSourceDocsRepository.markSourceDocsPending(request.sourceId); diff --git a/apps/rowboat/src/application/use-cases/data-sources/toggle-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/toggle-data-source.use-case.ts index 47f141a2..cafc0fbc 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/toggle-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/toggle-data-source.use-case.ts @@ -51,7 +51,7 @@ export class ToggleDataSourceUseCase implements IToggleDataSourceUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await this.dataSourcesRepository.update(request.sourceId, { active: request.active }); } diff --git a/apps/rowboat/src/application/use-cases/data-sources/update-data-source.use-case.ts b/apps/rowboat/src/application/use-cases/data-sources/update-data-source.use-case.ts index abed87fd..fa4c08e2 100644 --- a/apps/rowboat/src/application/use-cases/data-sources/update-data-source.use-case.ts +++ b/apps/rowboat/src/application/use-cases/data-sources/update-data-source.use-case.ts @@ -55,7 +55,7 @@ export class UpdateDataSourceUseCase implements IUpdateDataSourceUseCase { projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await this.dataSourcesRepository.update(request.sourceId, request.data, true); } diff --git a/apps/rowboat/src/application/use-cases/jobs/fetch-job.use-case.ts b/apps/rowboat/src/application/use-cases/jobs/fetch-job.use-case.ts index 0156bc64..e18627fe 100644 --- a/apps/rowboat/src/application/use-cases/jobs/fetch-job.use-case.ts +++ b/apps/rowboat/src/application/use-cases/jobs/fetch-job.use-case.ts @@ -54,7 +54,7 @@ export class FetchJobUseCase implements IFetchJobUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // return the job return job; diff --git a/apps/rowboat/src/application/use-cases/jobs/list-jobs.use-case.ts b/apps/rowboat/src/application/use-cases/jobs/list-jobs.use-case.ts index 95d4474a..d91adeae 100644 --- a/apps/rowboat/src/application/use-cases/jobs/list-jobs.use-case.ts +++ b/apps/rowboat/src/application/use-cases/jobs/list-jobs.use-case.ts @@ -52,7 +52,7 @@ export class ListJobsUseCase implements IListJobsUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch jobs for project return await this.jobsRepository.list(projectId, request.filters, request.cursor, limit); diff --git a/apps/rowboat/src/application/use-cases/projects/add-custom-mcp-server.use-case.ts b/apps/rowboat/src/application/use-cases/projects/add-custom-mcp-server.use-case.ts index dd4669ac..fc5855c3 100644 --- a/apps/rowboat/src/application/use-cases/projects/add-custom-mcp-server.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/add-custom-mcp-server.use-case.ts @@ -48,7 +48,7 @@ export class AddCustomMcpServerUseCase implements IAddCustomMcpServerUseCase { const { caller, userId, apiKey, projectId, name } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // Validate server URL const serverUrl = validateHttpHttpsUrl(request.server.serverUrl); diff --git a/apps/rowboat/src/application/use-cases/projects/create-composio-managed-connected-account.use-case.ts b/apps/rowboat/src/application/use-cases/projects/create-composio-managed-connected-account.use-case.ts index 4536c8fd..c53b7aa9 100644 --- a/apps/rowboat/src/application/use-cases/projects/create-composio-managed-connected-account.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/create-composio-managed-connected-account.use-case.ts @@ -44,7 +44,7 @@ export class CreateComposioManagedConnectedAccountUseCase implements ICreateComp const { caller, userId, apiKey, projectId, toolkitSlug, callbackUrl } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch managed auth configs const configs = await listAuthConfigs(toolkitSlug, null, true); diff --git a/apps/rowboat/src/application/use-cases/projects/create-custom-connected-account.use-case.ts b/apps/rowboat/src/application/use-cases/projects/create-custom-connected-account.use-case.ts index da3110fa..c547a1e9 100644 --- a/apps/rowboat/src/application/use-cases/projects/create-custom-connected-account.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/create-custom-connected-account.use-case.ts @@ -50,7 +50,7 @@ export class CreateCustomConnectedAccountUseCase implements ICreateCustomConnect const { caller, userId, apiKey, projectId, toolkitSlug, authConfig, callbackUrl } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // create custom auth config const created: z.infer = await createAuthConfig({ diff --git a/apps/rowboat/src/application/use-cases/projects/create-project.use-case.ts b/apps/rowboat/src/application/use-cases/projects/create-project.use-case.ts index 260139cd..c439c788 100644 --- a/apps/rowboat/src/application/use-cases/projects/create-project.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/create-project.use-case.ts @@ -113,7 +113,7 @@ export class CreateProjectUseCase implements ICreateProjectUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(project.id); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(project.id); return project; } diff --git a/apps/rowboat/src/application/use-cases/projects/delete-composio-connected-account.use-case.ts b/apps/rowboat/src/application/use-cases/projects/delete-composio-connected-account.use-case.ts index cdc36f22..56cce560 100644 --- a/apps/rowboat/src/application/use-cases/projects/delete-composio-connected-account.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/delete-composio-connected-account.use-case.ts @@ -56,7 +56,7 @@ export class DeleteComposioConnectedAccountUseCase implements IDeleteComposioCon }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch project const project = await this.projectsRepository.fetch(projectId); diff --git a/apps/rowboat/src/application/use-cases/projects/fetch-project.use-case.ts b/apps/rowboat/src/application/use-cases/projects/fetch-project.use-case.ts index 7773c38f..03b292c3 100644 --- a/apps/rowboat/src/application/use-cases/projects/fetch-project.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/fetch-project.use-case.ts @@ -52,7 +52,7 @@ export class FetchProjectUseCase implements IFetchProjectUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await this.projectsRepository.fetch(projectId); } diff --git a/apps/rowboat/src/application/use-cases/projects/get-composio-toolkit.use-case.ts b/apps/rowboat/src/application/use-cases/projects/get-composio-toolkit.use-case.ts index b7611069..a2d2d9a6 100644 --- a/apps/rowboat/src/application/use-cases/projects/get-composio-toolkit.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/get-composio-toolkit.use-case.ts @@ -28,7 +28,7 @@ export class GetComposioToolkitUseCase implements IGetComposioToolkitUseCase { async execute(request: z.infer): Promise> { const { caller, userId, apiKey, projectId, toolkitSlug } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await getToolkit(toolkitSlug); } } diff --git a/apps/rowboat/src/application/use-cases/projects/list-composio-toolkits.use-case.ts b/apps/rowboat/src/application/use-cases/projects/list-composio-toolkits.use-case.ts index becf43d7..b952f342 100644 --- a/apps/rowboat/src/application/use-cases/projects/list-composio-toolkits.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/list-composio-toolkits.use-case.ts @@ -29,7 +29,7 @@ export class ListComposioToolkitsUseCase implements IListComposioToolkitsUseCase async execute(request: z.infer): Promise>>> { const { caller, userId, apiKey, projectId, cursor } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await listToolkits(cursor ?? null); } } diff --git a/apps/rowboat/src/application/use-cases/projects/list-composio-tools.use-case.ts b/apps/rowboat/src/application/use-cases/projects/list-composio-tools.use-case.ts index 3a53992c..04278aa9 100644 --- a/apps/rowboat/src/application/use-cases/projects/list-composio-tools.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/list-composio-tools.use-case.ts @@ -31,7 +31,7 @@ export class ListComposioToolsUseCase implements IListComposioToolsUseCase { async execute(request: z.infer): Promise>>> { const { caller, userId, apiKey, projectId, toolkitSlug, searchQuery, cursor } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); return await listTools(toolkitSlug, searchQuery ?? null, cursor ?? null); } } diff --git a/apps/rowboat/src/application/use-cases/projects/remove-custom-mcp-server.use-case.ts b/apps/rowboat/src/application/use-cases/projects/remove-custom-mcp-server.use-case.ts index 6a588045..74455fbb 100644 --- a/apps/rowboat/src/application/use-cases/projects/remove-custom-mcp-server.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/remove-custom-mcp-server.use-case.ts @@ -37,7 +37,7 @@ export class RemoveCustomMcpServerUseCase implements IRemoveCustomMcpServerUseCa async execute(request: z.infer): Promise { const { caller, userId, apiKey, projectId, name } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); await this.projectsRepository.deleteCustomMcpServer(projectId, name); } } diff --git a/apps/rowboat/src/application/use-cases/projects/revert-to-live-workflow.use-case.ts b/apps/rowboat/src/application/use-cases/projects/revert-to-live-workflow.use-case.ts index 150dc5f8..a8d0aae4 100644 --- a/apps/rowboat/src/application/use-cases/projects/revert-to-live-workflow.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/revert-to-live-workflow.use-case.ts @@ -42,7 +42,7 @@ export class RevertToLiveWorkflowUseCase implements IRevertToLiveWorkflowUseCase apiKey: request.apiKey, projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); const project = await this.projectsRepository.fetch(projectId); if (!project) { diff --git a/apps/rowboat/src/application/use-cases/projects/rotate-secret.use-case.ts b/apps/rowboat/src/application/use-cases/projects/rotate-secret.use-case.ts index f2abdbaa..67e33737 100644 --- a/apps/rowboat/src/application/use-cases/projects/rotate-secret.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/rotate-secret.use-case.ts @@ -37,7 +37,7 @@ export class RotateSecretUseCase implements IRotateSecretUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); const secret = crypto.randomBytes(32).toString("hex"); await this.projectsRepository.updateSecret(projectId, secret); diff --git a/apps/rowboat/src/application/use-cases/projects/sync-connected-account.use-case.ts b/apps/rowboat/src/application/use-cases/projects/sync-connected-account.use-case.ts index c3d908fc..089585c9 100644 --- a/apps/rowboat/src/application/use-cases/projects/sync-connected-account.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/sync-connected-account.use-case.ts @@ -42,7 +42,7 @@ export class SyncConnectedAccountUseCase implements ISyncConnectedAccountUseCase const { caller, userId, apiKey, projectId, toolkitSlug, connectedAccountId } = request; await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch project & account to verify const project = await this.projectsRepository.fetch(projectId); diff --git a/apps/rowboat/src/application/use-cases/projects/update-draft-workflow.use-case.ts b/apps/rowboat/src/application/use-cases/projects/update-draft-workflow.use-case.ts index fe1ecc37..14743e61 100644 --- a/apps/rowboat/src/application/use-cases/projects/update-draft-workflow.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/update-draft-workflow.use-case.ts @@ -43,7 +43,7 @@ export class UpdateDraftWorkflowUseCase implements IUpdateDraftWorkflowUseCase { apiKey: request.apiKey, projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); const workflow = { ...request.workflow, lastUpdatedAt: new Date().toISOString() } as z.infer; await this.projectsRepository.updateDraftWorkflow(projectId, workflow); diff --git a/apps/rowboat/src/application/use-cases/projects/update-live-workflow.use-case.ts b/apps/rowboat/src/application/use-cases/projects/update-live-workflow.use-case.ts index e1c770f0..ebf13e38 100644 --- a/apps/rowboat/src/application/use-cases/projects/update-live-workflow.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/update-live-workflow.use-case.ts @@ -43,7 +43,7 @@ export class UpdateLiveWorkflowUseCase implements IUpdateLiveWorkflowUseCase { apiKey: request.apiKey, projectId, }); - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); const workflow = { ...request.workflow, lastUpdatedAt: new Date().toISOString() } as z.infer; await this.projectsRepository.updateLiveWorkflow(projectId, workflow); diff --git a/apps/rowboat/src/application/use-cases/projects/update-project-name.use-case.ts b/apps/rowboat/src/application/use-cases/projects/update-project-name.use-case.ts index 7c768fc3..b1562d1d 100644 --- a/apps/rowboat/src/application/use-cases/projects/update-project-name.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/update-project-name.use-case.ts @@ -36,7 +36,7 @@ export class UpdateProjectNameUseCase implements IUpdateProjectNameUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); await this.projectsRepository.updateName(projectId, name); } diff --git a/apps/rowboat/src/application/use-cases/projects/update-webhook-url.use-case.ts b/apps/rowboat/src/application/use-cases/projects/update-webhook-url.use-case.ts index 56ed6268..b71fd487 100644 --- a/apps/rowboat/src/application/use-cases/projects/update-webhook-url.use-case.ts +++ b/apps/rowboat/src/application/use-cases/projects/update-webhook-url.use-case.ts @@ -36,7 +36,7 @@ export class UpdateWebhookUrlUseCase implements IUpdateWebhookUrlUseCase { }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); await this.projectsRepository.updateWebhookUrl(projectId, url); } diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts index e7bf8365..0c2c2c0c 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts @@ -55,7 +55,7 @@ export class CreateRecurringJobRuleUseCase implements ICreateRecurringJobRuleUse }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(request.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); // create the recurring job rule const rule = await this.recurringJobRulesRepository.create({ diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case.ts index dec59bca..2cb3c98e 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case.ts @@ -45,7 +45,7 @@ export class DeleteRecurringJobRuleUseCase implements IDeleteRecurringJobRuleUse }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(request.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); // ensure rule belongs to this project const rule = await this.recurringJobRulesRepository.fetch(request.ruleId); diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case.ts index cd89ba2c..6d3846d9 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case.ts @@ -54,7 +54,7 @@ export class FetchRecurringJobRuleUseCase implements IFetchRecurringJobRuleUseCa }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // return the rule return rule; diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case.ts index 22f45186..9dbe0a42 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case.ts @@ -49,7 +49,7 @@ export class ListRecurringJobRulesUseCase implements IListRecurringJobRulesUseCa }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch recurring job rules for project return await this.recurringJobRulesRepository.list(projectId, request.cursor, limit); diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case.ts index e7258fa5..8bb52a28 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case.ts @@ -55,7 +55,7 @@ export class ToggleRecurringJobRuleUseCase implements IToggleRecurringJobRuleUse }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // update the rule return await this.recurringJobRulesRepository.toggle(request.ruleId, request.disabled); diff --git a/apps/rowboat/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case.ts index 06170434..544a5519 100644 --- a/apps/rowboat/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case.ts @@ -50,7 +50,7 @@ export class CreateScheduledJobRuleUseCase implements ICreateScheduledJobRuleUse }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(request.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); // create the scheduled job rule with UTC time const rule = await this.scheduledJobRulesRepository.create({ diff --git a/apps/rowboat/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case.ts index b7b7123f..6977fbbd 100644 --- a/apps/rowboat/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case.ts @@ -45,7 +45,7 @@ export class DeleteScheduledJobRuleUseCase implements IDeleteScheduledJobRuleUse }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(request.projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); // ensure rule belongs to this project const rule = await this.scheduledJobRulesRepository.fetch(request.ruleId); diff --git a/apps/rowboat/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case.ts index 6b5004cd..33b6b78c 100644 --- a/apps/rowboat/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case.ts @@ -54,7 +54,7 @@ export class FetchScheduledJobRuleUseCase implements IFetchScheduledJobRuleUseCa }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // return the scheduled job rule return rule; diff --git a/apps/rowboat/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case.ts b/apps/rowboat/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case.ts index b5f79ce8..17a929fc 100644 --- a/apps/rowboat/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case.ts +++ b/apps/rowboat/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case.ts @@ -49,7 +49,7 @@ export class ListScheduledJobRulesUseCase implements IListScheduledJobRulesUseCa }); // assert and consume quota - await this.usageQuotaPolicy.assertAndConsume(projectId); + await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId); // fetch scheduled job rules for project return await this.scheduledJobRulesRepository.list(projectId, request.cursor, limit); diff --git a/apps/rowboat/src/application/workers/jobs.worker.ts b/apps/rowboat/src/application/workers/jobs.worker.ts index ee5a74a1..93e6af93 100644 --- a/apps/rowboat/src/application/workers/jobs.worker.ts +++ b/apps/rowboat/src/application/workers/jobs.worker.ts @@ -8,6 +8,8 @@ import { IPubSubService, Subscription } from "../services/pub-sub.service.interf import { nanoid } from "nanoid"; import { z } from "zod"; import { PrefixLogger } from "@/app/lib/utils"; +import { IUsageQuotaPolicy } from "../policies/usage-quota.policy.interface"; +import { QuotaExceededError } from "@/src/entities/errors/common"; export interface IJobsWorker { run(): Promise; @@ -20,6 +22,7 @@ export class JobsWorker implements IJobsWorker { private readonly createConversationUseCase: ICreateConversationUseCase; private readonly runConversationTurnUseCase: IRunConversationTurnUseCase; private readonly pubSubService: IPubSubService; + private readonly usageQuotaPolicy: IUsageQuotaPolicy; private workerId: string; private subscription: Subscription | null = null; private isRunning: boolean = false; @@ -33,18 +36,21 @@ export class JobsWorker implements IJobsWorker { createConversationUseCase, runConversationTurnUseCase, pubSubService, + usageQuotaPolicy, }: { jobsRepository: IJobsRepository; projectsRepository: IProjectsRepository; createConversationUseCase: ICreateConversationUseCase; runConversationTurnUseCase: IRunConversationTurnUseCase; pubSubService: IPubSubService; + usageQuotaPolicy: IUsageQuotaPolicy; }) { this.jobsRepository = jobsRepository; this.projectsRepository = projectsRepository; this.createConversationUseCase = createConversationUseCase; this.runConversationTurnUseCase = runConversationTurnUseCase; this.pubSubService = pubSubService; + this.usageQuotaPolicy = usageQuotaPolicy; this.workerId = nanoid(); this.logger = new PrefixLogger(`jobs-worker-[${this.workerId}]`); } @@ -52,7 +58,7 @@ export class JobsWorker implements IJobsWorker { async processJob(job: z.infer): Promise { const logger = this.logger.child(`job-${job.id}`); logger.log('Processing job'); - + try { // extract project id from job const { projectId } = job; @@ -63,6 +69,9 @@ export class JobsWorker implements IJobsWorker { throw new Error("Project not found"); } + // check job-run quota usage + await this.usageQuotaPolicy.assertAndConsumeRunJobAction(projectId); + // create conversation logger.log('Creating conversation'); const conversation = await this.createConversationUseCase.execute({ @@ -114,6 +123,18 @@ export class JobsWorker implements IJobsWorker { }); logger.log(`Completed successfully`); } catch (error) { + if (error instanceof QuotaExceededError) { + logger.log(`Failed due to quota exceeded`); + + // update job + await this.jobsRepository.update(job.id, { + status: "failed", + output: { + error: (error instanceof QuotaExceededError) ? error.message : "Usage quota exceeded.", + }, + }); + return; + } logger.log(`Failed: ${error instanceof Error ? error.message : "Unknown error"}`); // update job @@ -174,7 +195,7 @@ export class JobsWorker implements IJobsWorker { } logger.log(`Found job ${job.id} via polling`); - + // process job await this.processJob(job); } catch (error) { @@ -185,7 +206,7 @@ export class JobsWorker implements IJobsWorker { private async startPolling(): Promise { const logger = this.logger.child(`start-polling`); logger.log("Starting polling mechanism"); - + const scheduleNextPoll = () => { this.pollTimeoutId = setTimeout(async () => { await this.pollForJobs(); diff --git a/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts b/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts index 29fe2d08..4b97ace0 100644 --- a/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts +++ b/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts @@ -1,12 +1,13 @@ import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface"; import { redisClient } from "@/app/lib/redis"; import { QuotaExceededError } from "@/src/entities/errors/common"; -import { secondsToNextMinute } from "@/src/application/lib/utils/time-to-next-minute"; +import { secondsToNextMinute, minutesToNextHour } from "@/src/application/lib/utils/time-to-next-minute"; const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0; +const MAX_JOBS_PER_HOUR = Number(process.env.MAX_JOBS_PER_HOUR) || 0; export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy { - async assertAndConsume(projectId: string): Promise { + async assertAndConsumeProjectAction(projectId: string): Promise { if (MAX_QUERIES_PER_MINUTE === 0) { return; } @@ -23,4 +24,22 @@ export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy { throw new QuotaExceededError(`Quota exceeded for project ${projectId}`); } } + + async assertAndConsumeRunJobAction(projectId: string): Promise { + if (MAX_JOBS_PER_HOUR === 0) { + return; + } + + const hour_of_the_day = new Date().getHours(); + const key = `jobs_limit:${projectId}:${hour_of_the_day}`; + + const count = await redisClient.incr(key); + if (count === 1) { + await redisClient.expire(key, minutesToNextHour() * 60); // Set TTL to clean up automatically + } + + if (count > MAX_JOBS_PER_HOUR) { + throw new QuotaExceededError(`Jobs quota exceeded for project ${projectId}`); + } + } } \ No newline at end of file