diff --git a/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts b/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts new file mode 100644 index 00000000..b5996681 --- /dev/null +++ b/apps/rowboat/src/application/lib/utils/time-to-next-minute.ts @@ -0,0 +1,6 @@ +// returns the number of seconds until the next minute +export function secondsToNextMinute(): number { + const now = new Date(); + const secondsUntilNextMinute = 60 - now.getSeconds(); + return secondsUntilNextMinute; +} \ No newline at end of file diff --git a/apps/rowboat/src/application/workers/job-rules.worker.ts b/apps/rowboat/src/application/workers/job-rules.worker.ts index 5b03e025..039665b7 100644 --- a/apps/rowboat/src/application/workers/job-rules.worker.ts +++ b/apps/rowboat/src/application/workers/job-rules.worker.ts @@ -8,6 +8,7 @@ import { z } from "zod"; import { nanoid } from "nanoid"; import { PrefixLogger } from "@/app/lib/utils"; import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule"; +import { secondsToNextMinute } from "../lib/utils/time-to-next-minute"; export interface IJobRulesWorker { run(): Promise; @@ -20,8 +21,6 @@ export class JobRulesWorker implements IJobRulesWorker { private readonly jobsRepository: IJobsRepository; private readonly projectsRepository: IProjectsRepository; private readonly pubSubService: IPubSubService; - // Run polls aligned to minute marks at this offset (e.g., 2000 ms => :02 each minute) - private readonly minuteAlignmentOffsetMs: number = 2_000; private workerId: string; private logger: PrefixLogger; private isRunning: boolean = false; @@ -131,14 +130,6 @@ export class JobRulesWorker implements IJobRulesWorker { } } - // Calculates delay so the next run happens at next minute + minuteAlignmentOffsetMs - private calculateDelayToNextAlignedMinute(): number { - const now = new Date(); - const millisecondsUntilNextMinute = (60 - now.getSeconds()) * 1000 - now.getMilliseconds(); - const delayMs = millisecondsUntilNextMinute + this.minuteAlignmentOffsetMs; - return delayMs > 0 ? delayMs : this.minuteAlignmentOffsetMs; - } - private async pollScheduled(): Promise { const logger = this.logger.child(`poll-scheduled`); logger.log("Polling..."); @@ -176,7 +167,8 @@ export class JobRulesWorker implements IJobRulesWorker { } private scheduleNextPoll(): void { - const delayMs = this.calculateDelayToNextAlignedMinute(); + // schedule next poll for 2 s past the minute mark + const delayMs = (secondsToNextMinute() + 2) * 1000; this.logger.log(`Scheduling next poll in ${delayMs} ms`); this.pollTimeoutId = setTimeout(async () => { if (!this.isRunning) return; @@ -195,7 +187,6 @@ export class JobRulesWorker implements IJobRulesWorker { } this.isRunning = true; this.logger.log(`Starting worker ${this.workerId}`); - // No immediate polling; align to 2s past the next minute this.scheduleNextPoll(); } diff --git a/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts b/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts index 05fecbdb..29fe2d08 100644 --- a/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts +++ b/apps/rowboat/src/infrastructure/policies/redis.usage-quota.policy.ts @@ -1,6 +1,7 @@ import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface"; import { redisClient } from "@/app/lib/redis"; import { QuotaExceededError } from "@/src/entities/errors/common"; +import { secondsToNextMinute } from "@/src/application/lib/utils/time-to-next-minute"; const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0; @@ -15,7 +16,7 @@ export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy { const count = await redisClient.incr(key); if (count === 1) { - await redisClient.expire(key, 70); // Set TTL to clean up automatically + await redisClient.expire(key, secondsToNextMinute()); // Set TTL to clean up automatically } if (count > MAX_QUERIES_PER_MINUTE) {