mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-12 00:32:38 +02:00
improve minute alignment logic
This commit is contained in:
parent
08e76edd51
commit
0b31585141
3 changed files with 11 additions and 13 deletions
|
|
@ -0,0 +1,6 @@
|
|||
// returns the number of seconds until the next minute
|
||||
export function secondsToNextMinute(): number {
|
||||
const now = new Date();
|
||||
const secondsUntilNextMinute = 60 - now.getSeconds();
|
||||
return secondsUntilNextMinute;
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import { z } from "zod";
|
|||
import { nanoid } from "nanoid";
|
||||
import { PrefixLogger } from "@/app/lib/utils";
|
||||
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
|
||||
import { secondsToNextMinute } from "../lib/utils/time-to-next-minute";
|
||||
|
||||
export interface IJobRulesWorker {
|
||||
run(): Promise<void>;
|
||||
|
|
@ -20,8 +21,6 @@ export class JobRulesWorker implements IJobRulesWorker {
|
|||
private readonly jobsRepository: IJobsRepository;
|
||||
private readonly projectsRepository: IProjectsRepository;
|
||||
private readonly pubSubService: IPubSubService;
|
||||
// Run polls aligned to minute marks at this offset (e.g., 2000 ms => :02 each minute)
|
||||
private readonly minuteAlignmentOffsetMs: number = 2_000;
|
||||
private workerId: string;
|
||||
private logger: PrefixLogger;
|
||||
private isRunning: boolean = false;
|
||||
|
|
@ -131,14 +130,6 @@ export class JobRulesWorker implements IJobRulesWorker {
|
|||
}
|
||||
}
|
||||
|
||||
// Calculates delay so the next run happens at next minute + minuteAlignmentOffsetMs
|
||||
private calculateDelayToNextAlignedMinute(): number {
|
||||
const now = new Date();
|
||||
const millisecondsUntilNextMinute = (60 - now.getSeconds()) * 1000 - now.getMilliseconds();
|
||||
const delayMs = millisecondsUntilNextMinute + this.minuteAlignmentOffsetMs;
|
||||
return delayMs > 0 ? delayMs : this.minuteAlignmentOffsetMs;
|
||||
}
|
||||
|
||||
private async pollScheduled(): Promise<void> {
|
||||
const logger = this.logger.child(`poll-scheduled`);
|
||||
logger.log("Polling...");
|
||||
|
|
@ -176,7 +167,8 @@ export class JobRulesWorker implements IJobRulesWorker {
|
|||
}
|
||||
|
||||
private scheduleNextPoll(): void {
|
||||
const delayMs = this.calculateDelayToNextAlignedMinute();
|
||||
// schedule next poll for 2 s past the minute mark
|
||||
const delayMs = (secondsToNextMinute() + 2) * 1000;
|
||||
this.logger.log(`Scheduling next poll in ${delayMs} ms`);
|
||||
this.pollTimeoutId = setTimeout(async () => {
|
||||
if (!this.isRunning) return;
|
||||
|
|
@ -195,7 +187,6 @@ export class JobRulesWorker implements IJobRulesWorker {
|
|||
}
|
||||
this.isRunning = true;
|
||||
this.logger.log(`Starting worker ${this.workerId}`);
|
||||
// No immediate polling; align to 2s past the next minute
|
||||
this.scheduleNextPoll();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface";
|
||||
import { redisClient } from "@/app/lib/redis";
|
||||
import { QuotaExceededError } from "@/src/entities/errors/common";
|
||||
import { secondsToNextMinute } from "@/src/application/lib/utils/time-to-next-minute";
|
||||
|
||||
const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0;
|
||||
|
||||
|
|
@ -15,7 +16,7 @@ export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy {
|
|||
|
||||
const count = await redisClient.incr(key);
|
||||
if (count === 1) {
|
||||
await redisClient.expire(key, 70); // Set TTL to clean up automatically
|
||||
await redisClient.expire(key, secondsToNextMinute()); // Set TTL to clean up automatically
|
||||
}
|
||||
|
||||
if (count > MAX_QUERIES_PER_MINUTE) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue