enforce max jobs per hour

This commit is contained in:
Ramnique Singh 2025-08-20 17:08:19 +05:30
parent 0b31585141
commit d2e590956b
55 changed files with 123 additions and 60 deletions

View file

@ -28,7 +28,7 @@ export async function getCopilotResponseStream(
streamId: string;
} | { billingError: string }> {
await projectAuthCheck(projectId);
await usageQuotaPolicy.assertAndConsume(projectId);
await usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// Check billing authorization
const authResponse = await authorizeUserAction({
@ -38,7 +38,7 @@ export async function getCopilotResponseStream(
return { billingError: authResponse.error || 'Billing error' };
}
await usageQuotaPolicy.assertAndConsume(projectId);
await usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// prepare request
const request: z.infer<typeof CopilotAPIRequest> = {
@ -70,7 +70,7 @@ export async function getCopilotAgentInstructions(
agentName: string,
): Promise<string | { billingError: string }> {
await projectAuthCheck(projectId);
await usageQuotaPolicy.assertAndConsume(projectId);
await usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// Check billing authorization
const authResponse = await authorizeUserAction({

View file

@ -3,4 +3,10 @@ export function secondsToNextMinute(): number {
const now = new Date();
const secondsUntilNextMinute = 60 - now.getSeconds();
return secondsUntilNextMinute;
}
export function minutesToNextHour(): number {
const now = new Date();
const minutesUntilNextHour = 60 - now.getMinutes();
return minutesUntilNextHour;
}

View file

@ -1,4 +1,21 @@
import { QuotaExceededError } from "@/src/entities/errors/common";
export interface IUsageQuotaPolicy {
// this method will throw a QuotaExceededError if the quota is exceeded
assertAndConsume(projectId: string): Promise<void>;
/**
* Asserts that the project has not exceeded its usage quota and consumes the action.
* Used for general project actions.
*
* @param projectId - The ID of the project to assert and consume.
* @throws QuotaExceededError if the quota is exceeded.
*/
assertAndConsumeProjectAction(projectId: string): Promise<void>;
/**
* Asserts that the project has not exceeded its usage quota for running jobs.
*
* @param projectId - The ID of the project to assert and consume.
* @throws QuotaExceededError if the quota is exceeded.
*/
assertAndConsumeRunJobAction(projectId: string): Promise<void>;
}

View file

@ -59,7 +59,7 @@ export class CreateComposioTriggerDeploymentUseCase implements ICreateComposioTr
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// get trigger type info
const triggerType = await getTriggersType(request.data.triggerTypeSlug);

View file

@ -54,7 +54,7 @@ export class DeleteComposioTriggerDeploymentUseCase implements IDeleteComposioTr
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// ensure deployment belongs to this project
const deployment = await this.composioTriggerDeploymentsRepository.fetch(request.deploymentId);

View file

@ -53,7 +53,7 @@ export class FetchComposioTriggerDeploymentUseCase implements IFetchComposioTrig
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return deployment;
}

View file

@ -51,7 +51,7 @@ export class ListComposioTriggerDeploymentsUseCase implements IListComposioTrigg
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch deployments for project
return await this.composioTriggerDeploymentsRepository.listByProjectId(projectId, request.cursor, limit);

View file

@ -61,7 +61,7 @@ export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// create cache entry
const key = nanoid();

View file

@ -61,7 +61,7 @@ export class CreateConversationUseCase implements ICreateConversationUseCase {
}
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// if workflow is not provided, fetch workflow
if (!workflow) {

View file

@ -68,7 +68,7 @@ export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// delete from cache
await this.cacheService.delete(`turn-${data.key}`);

View file

@ -54,7 +54,7 @@ export class FetchConversationUseCase implements IFetchConversationUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// return the conversation
return conversation;

View file

@ -51,7 +51,7 @@ export class ListConversationsUseCase implements IListConversationsUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch conversations for project
return await this.conversationsRepository.list(projectId, request.cursor, limit);

View file

@ -62,7 +62,7 @@ export class RunConversationTurnUseCase implements IRunConversationTurnUseCase {
}
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// Check billing auth
let billingCustomerId: string | null = null;

View file

@ -55,7 +55,7 @@ export class AddDocsToDataSourceUseCase implements IAddDocsToDataSourceUseCase {
projectId: source.projectId,
});
await this.usageQuotaPolicy.assertAndConsume(source.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId);
await this.dataSourceDocsRepository.bulkCreate(source.projectId, sourceId, docs);

View file

@ -44,7 +44,7 @@ export class CreateDataSourceUseCase implements ICreateDataSourceUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
let _status = "pending";
// Only set status for non-file data sources

View file

@ -49,7 +49,7 @@ export class DeleteDataSourceUseCase implements IDeleteDataSourceUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
await this.dataSourcesRepository.update(request.sourceId, {
status: 'deleted',

View file

@ -54,7 +54,7 @@ export class DeleteDocFromDataSourceUseCase implements IDeleteDocFromDataSourceU
projectId: doc.projectId,
});
await this.usageQuotaPolicy.assertAndConsume(doc.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(doc.projectId);
await this.dataSourceDocsRepository.markAsDeleted(docId);

View file

@ -50,7 +50,7 @@ export class FetchDataSourceUseCase implements IFetchDataSourceUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return source;
}

View file

@ -58,7 +58,7 @@ export class GetDownloadUrlForFileUseCase implements IGetDownloadUrlForFileUseCa
projectId: file.projectId,
});
await this.usageQuotaPolicy.assertAndConsume(file.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(file.projectId);
if (file.data.type === 'file_local') {
// use the file id instead of path here

View file

@ -60,7 +60,7 @@ export class GetUploadUrlsForFilesUseCase implements IGetUploadUrlsForFilesUseCa
projectId: source.projectId,
});
await this.usageQuotaPolicy.assertAndConsume(source.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId);
const urls: { fileId: string, uploadUrl: string, path: string }[] = [];
for (const file of files) {

View file

@ -44,7 +44,7 @@ export class ListDataSourcesUseCase implements IListDataSourcesUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// list all sources for now
const sources = [];

View file

@ -55,7 +55,7 @@ export class ListDocsInDataSourceUseCase implements IListDocsInDataSourceUseCase
projectId: source.projectId,
});
await this.usageQuotaPolicy.assertAndConsume(source.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(source.projectId);
// fetch all docs
const docs = [];

View file

@ -58,7 +58,7 @@ export class RecrawlWebDataSourceUseCase implements IRecrawlWebDataSourceUseCase
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
await this.dataSourceDocsRepository.markSourceDocsPending(request.sourceId);

View file

@ -51,7 +51,7 @@ export class ToggleDataSourceUseCase implements IToggleDataSourceUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await this.dataSourcesRepository.update(request.sourceId, { active: request.active });
}

View file

@ -55,7 +55,7 @@ export class UpdateDataSourceUseCase implements IUpdateDataSourceUseCase {
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await this.dataSourcesRepository.update(request.sourceId, request.data, true);
}

View file

@ -54,7 +54,7 @@ export class FetchJobUseCase implements IFetchJobUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// return the job
return job;

View file

@ -52,7 +52,7 @@ export class ListJobsUseCase implements IListJobsUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch jobs for project
return await this.jobsRepository.list(projectId, request.filters, request.cursor, limit);

View file

@ -48,7 +48,7 @@ export class AddCustomMcpServerUseCase implements IAddCustomMcpServerUseCase {
const { caller, userId, apiKey, projectId, name } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// Validate server URL
const serverUrl = validateHttpHttpsUrl(request.server.serverUrl);

View file

@ -44,7 +44,7 @@ export class CreateComposioManagedConnectedAccountUseCase implements ICreateComp
const { caller, userId, apiKey, projectId, toolkitSlug, callbackUrl } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch managed auth configs
const configs = await listAuthConfigs(toolkitSlug, null, true);

View file

@ -50,7 +50,7 @@ export class CreateCustomConnectedAccountUseCase implements ICreateCustomConnect
const { caller, userId, apiKey, projectId, toolkitSlug, authConfig, callbackUrl } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// create custom auth config
const created: z.infer<typeof ZCreateAuthConfigResponse> = await createAuthConfig({

View file

@ -113,7 +113,7 @@ export class CreateProjectUseCase implements ICreateProjectUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(project.id);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(project.id);
return project;
}

View file

@ -56,7 +56,7 @@ export class DeleteComposioConnectedAccountUseCase implements IDeleteComposioCon
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch project
const project = await this.projectsRepository.fetch(projectId);

View file

@ -52,7 +52,7 @@ export class FetchProjectUseCase implements IFetchProjectUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await this.projectsRepository.fetch(projectId);
}

View file

@ -28,7 +28,7 @@ export class GetComposioToolkitUseCase implements IGetComposioToolkitUseCase {
async execute(request: z.infer<typeof InputSchema>): Promise<z.infer<typeof ZGetToolkitResponse>> {
const { caller, userId, apiKey, projectId, toolkitSlug } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await getToolkit(toolkitSlug);
}
}

View file

@ -29,7 +29,7 @@ export class ListComposioToolkitsUseCase implements IListComposioToolkitsUseCase
async execute(request: z.infer<typeof InputSchema>): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZToolkit>>>> {
const { caller, userId, apiKey, projectId, cursor } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await listToolkits(cursor ?? null);
}
}

View file

@ -31,7 +31,7 @@ export class ListComposioToolsUseCase implements IListComposioToolsUseCase {
async execute(request: z.infer<typeof InputSchema>): Promise<z.infer<ReturnType<typeof ZListResponse<typeof ZTool>>>> {
const { caller, userId, apiKey, projectId, toolkitSlug, searchQuery, cursor } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
return await listTools(toolkitSlug, searchQuery ?? null, cursor ?? null);
}
}

View file

@ -37,7 +37,7 @@ export class RemoveCustomMcpServerUseCase implements IRemoveCustomMcpServerUseCa
async execute(request: z.infer<typeof InputSchema>): Promise<void> {
const { caller, userId, apiKey, projectId, name } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
await this.projectsRepository.deleteCustomMcpServer(projectId, name);
}
}

View file

@ -42,7 +42,7 @@ export class RevertToLiveWorkflowUseCase implements IRevertToLiveWorkflowUseCase
apiKey: request.apiKey,
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
const project = await this.projectsRepository.fetch(projectId);
if (!project) {

View file

@ -37,7 +37,7 @@ export class RotateSecretUseCase implements IRotateSecretUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
const secret = crypto.randomBytes(32).toString("hex");
await this.projectsRepository.updateSecret(projectId, secret);

View file

@ -42,7 +42,7 @@ export class SyncConnectedAccountUseCase implements ISyncConnectedAccountUseCase
const { caller, userId, apiKey, projectId, toolkitSlug, connectedAccountId } = request;
await this.projectActionAuthorizationPolicy.authorize({ caller, userId, apiKey, projectId });
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch project & account to verify
const project = await this.projectsRepository.fetch(projectId);

View file

@ -43,7 +43,7 @@ export class UpdateDraftWorkflowUseCase implements IUpdateDraftWorkflowUseCase {
apiKey: request.apiKey,
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
const workflow = { ...request.workflow, lastUpdatedAt: new Date().toISOString() } as z.infer<typeof Workflow>;
await this.projectsRepository.updateDraftWorkflow(projectId, workflow);

View file

@ -43,7 +43,7 @@ export class UpdateLiveWorkflowUseCase implements IUpdateLiveWorkflowUseCase {
apiKey: request.apiKey,
projectId,
});
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
const workflow = { ...request.workflow, lastUpdatedAt: new Date().toISOString() } as z.infer<typeof Workflow>;
await this.projectsRepository.updateLiveWorkflow(projectId, workflow);

View file

@ -36,7 +36,7 @@ export class UpdateProjectNameUseCase implements IUpdateProjectNameUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
await this.projectsRepository.updateName(projectId, name);
}

View file

@ -36,7 +36,7 @@ export class UpdateWebhookUrlUseCase implements IUpdateWebhookUrlUseCase {
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
await this.projectsRepository.updateWebhookUrl(projectId, url);
}

View file

@ -55,7 +55,7 @@ export class CreateRecurringJobRuleUseCase implements ICreateRecurringJobRuleUse
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId);
// create the recurring job rule
const rule = await this.recurringJobRulesRepository.create({

View file

@ -45,7 +45,7 @@ export class DeleteRecurringJobRuleUseCase implements IDeleteRecurringJobRuleUse
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId);
// ensure rule belongs to this project
const rule = await this.recurringJobRulesRepository.fetch(request.ruleId);

View file

@ -54,7 +54,7 @@ export class FetchRecurringJobRuleUseCase implements IFetchRecurringJobRuleUseCa
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// return the rule
return rule;

View file

@ -49,7 +49,7 @@ export class ListRecurringJobRulesUseCase implements IListRecurringJobRulesUseCa
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch recurring job rules for project
return await this.recurringJobRulesRepository.list(projectId, request.cursor, limit);

View file

@ -55,7 +55,7 @@ export class ToggleRecurringJobRuleUseCase implements IToggleRecurringJobRuleUse
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// update the rule
return await this.recurringJobRulesRepository.toggle(request.ruleId, request.disabled);

View file

@ -50,7 +50,7 @@ export class CreateScheduledJobRuleUseCase implements ICreateScheduledJobRuleUse
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId);
// create the scheduled job rule with UTC time
const rule = await this.scheduledJobRulesRepository.create({

View file

@ -45,7 +45,7 @@ export class DeleteScheduledJobRuleUseCase implements IDeleteScheduledJobRuleUse
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId);
// ensure rule belongs to this project
const rule = await this.scheduledJobRulesRepository.fetch(request.ruleId);

View file

@ -54,7 +54,7 @@ export class FetchScheduledJobRuleUseCase implements IFetchScheduledJobRuleUseCa
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// return the scheduled job rule
return rule;

View file

@ -49,7 +49,7 @@ export class ListScheduledJobRulesUseCase implements IListScheduledJobRulesUseCa
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
await this.usageQuotaPolicy.assertAndConsumeProjectAction(projectId);
// fetch scheduled job rules for project
return await this.scheduledJobRulesRepository.list(projectId, request.cursor, limit);

View file

@ -8,6 +8,8 @@ import { IPubSubService, Subscription } from "../services/pub-sub.service.interf
import { nanoid } from "nanoid";
import { z } from "zod";
import { PrefixLogger } from "@/app/lib/utils";
import { IUsageQuotaPolicy } from "../policies/usage-quota.policy.interface";
import { QuotaExceededError } from "@/src/entities/errors/common";
export interface IJobsWorker {
run(): Promise<void>;
@ -20,6 +22,7 @@ export class JobsWorker implements IJobsWorker {
private readonly createConversationUseCase: ICreateConversationUseCase;
private readonly runConversationTurnUseCase: IRunConversationTurnUseCase;
private readonly pubSubService: IPubSubService;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private workerId: string;
private subscription: Subscription | null = null;
private isRunning: boolean = false;
@ -33,18 +36,21 @@ export class JobsWorker implements IJobsWorker {
createConversationUseCase,
runConversationTurnUseCase,
pubSubService,
usageQuotaPolicy,
}: {
jobsRepository: IJobsRepository;
projectsRepository: IProjectsRepository;
createConversationUseCase: ICreateConversationUseCase;
runConversationTurnUseCase: IRunConversationTurnUseCase;
pubSubService: IPubSubService;
usageQuotaPolicy: IUsageQuotaPolicy;
}) {
this.jobsRepository = jobsRepository;
this.projectsRepository = projectsRepository;
this.createConversationUseCase = createConversationUseCase;
this.runConversationTurnUseCase = runConversationTurnUseCase;
this.pubSubService = pubSubService;
this.usageQuotaPolicy = usageQuotaPolicy;
this.workerId = nanoid();
this.logger = new PrefixLogger(`jobs-worker-[${this.workerId}]`);
}
@ -52,7 +58,7 @@ export class JobsWorker implements IJobsWorker {
async processJob(job: z.infer<typeof Job>): Promise<void> {
const logger = this.logger.child(`job-${job.id}`);
logger.log('Processing job');
try {
// extract project id from job
const { projectId } = job;
@ -63,6 +69,9 @@ export class JobsWorker implements IJobsWorker {
throw new Error("Project not found");
}
// check job-run quota usage
await this.usageQuotaPolicy.assertAndConsumeRunJobAction(projectId);
// create conversation
logger.log('Creating conversation');
const conversation = await this.createConversationUseCase.execute({
@ -114,6 +123,18 @@ export class JobsWorker implements IJobsWorker {
});
logger.log(`Completed successfully`);
} catch (error) {
if (error instanceof QuotaExceededError) {
logger.log(`Failed due to quota exceeded`);
// update job
await this.jobsRepository.update(job.id, {
status: "failed",
output: {
error: (error instanceof QuotaExceededError) ? error.message : "Usage quota exceeded.",
},
});
return;
}
logger.log(`Failed: ${error instanceof Error ? error.message : "Unknown error"}`);
// update job
@ -174,7 +195,7 @@ export class JobsWorker implements IJobsWorker {
}
logger.log(`Found job ${job.id} via polling`);
// process job
await this.processJob(job);
} catch (error) {
@ -185,7 +206,7 @@ export class JobsWorker implements IJobsWorker {
private async startPolling(): Promise<void> {
const logger = this.logger.child(`start-polling`);
logger.log("Starting polling mechanism");
const scheduleNextPoll = () => {
this.pollTimeoutId = setTimeout(async () => {
await this.pollForJobs();

View file

@ -1,12 +1,13 @@
import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface";
import { redisClient } from "@/app/lib/redis";
import { QuotaExceededError } from "@/src/entities/errors/common";
import { secondsToNextMinute } from "@/src/application/lib/utils/time-to-next-minute";
import { secondsToNextMinute, minutesToNextHour } from "@/src/application/lib/utils/time-to-next-minute";
const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0;
const MAX_JOBS_PER_HOUR = Number(process.env.MAX_JOBS_PER_HOUR) || 0;
export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy {
async assertAndConsume(projectId: string): Promise<void> {
async assertAndConsumeProjectAction(projectId: string): Promise<void> {
if (MAX_QUERIES_PER_MINUTE === 0) {
return;
}
@ -23,4 +24,22 @@ export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy {
throw new QuotaExceededError(`Quota exceeded for project ${projectId}`);
}
}
async assertAndConsumeRunJobAction(projectId: string): Promise<void> {
if (MAX_JOBS_PER_HOUR === 0) {
return;
}
const hour_of_the_day = new Date().getHours();
const key = `jobs_limit:${projectId}:${hour_of_the_day}`;
const count = await redisClient.incr(key);
if (count === 1) {
await redisClient.expire(key, minutesToNextHour() * 60); // Set TTL to clean up automatically
}
if (count > MAX_JOBS_PER_HOUR) {
throw new QuotaExceededError(`Jobs quota exceeded for project ${projectId}`);
}
}
}