mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-08 06:42:39 +02:00
refactor usage quota service as a policy
This commit is contained in:
parent
5b6d592d09
commit
831356a155
9 changed files with 37 additions and 37 deletions
|
|
@ -14,9 +14,9 @@ import { USE_BILLING } from "../lib/feature_flags";
|
||||||
import { WithStringId } from "../lib/types/types";
|
import { WithStringId } from "../lib/types/types";
|
||||||
import { getEditAgentInstructionsResponse } from "../lib/copilot/copilot";
|
import { getEditAgentInstructionsResponse } from "../lib/copilot/copilot";
|
||||||
import { container } from "@/di/container";
|
import { container } from "@/di/container";
|
||||||
import { IUsageQuotaPolicyService } from "@/src/application/services/usage-quota-policy.service.interface";
|
import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface";
|
||||||
|
|
||||||
const usageQuotaPolicyService = container.resolve<IUsageQuotaPolicyService>('usageQuotaPolicyService');
|
const usageQuotaPolicy = container.resolve<IUsageQuotaPolicy>('usageQuotaPolicy');
|
||||||
|
|
||||||
export async function getCopilotResponseStream(
|
export async function getCopilotResponseStream(
|
||||||
projectId: string,
|
projectId: string,
|
||||||
|
|
@ -28,7 +28,7 @@ export async function getCopilotResponseStream(
|
||||||
streamId: string;
|
streamId: string;
|
||||||
} | { billingError: string }> {
|
} | { billingError: string }> {
|
||||||
await projectAuthCheck(projectId);
|
await projectAuthCheck(projectId);
|
||||||
await usageQuotaPolicyService.assertAndConsume(projectId);
|
await usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// Check billing authorization
|
// Check billing authorization
|
||||||
const authResponse = await authorizeUserAction({
|
const authResponse = await authorizeUserAction({
|
||||||
|
|
@ -39,7 +39,7 @@ export async function getCopilotResponseStream(
|
||||||
return { billingError: authResponse.error || 'Billing error' };
|
return { billingError: authResponse.error || 'Billing error' };
|
||||||
}
|
}
|
||||||
|
|
||||||
await usageQuotaPolicyService.assertAndConsume(projectId);
|
await usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// prepare request
|
// prepare request
|
||||||
const request: z.infer<typeof CopilotAPIRequest> = {
|
const request: z.infer<typeof CopilotAPIRequest> = {
|
||||||
|
|
@ -71,7 +71,7 @@ export async function getCopilotAgentInstructions(
|
||||||
agentName: string,
|
agentName: string,
|
||||||
): Promise<string | { billingError: string }> {
|
): Promise<string | { billingError: string }> {
|
||||||
await projectAuthCheck(projectId);
|
await projectAuthCheck(projectId);
|
||||||
await usageQuotaPolicyService.assertAndConsume(projectId);
|
await usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// Check billing authorization
|
// Check billing authorization
|
||||||
const authResponse = await authorizeUserAction({
|
const authResponse = await authorizeUserAction({
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import { authorize, getCustomerIdForProject, logUsage } from "@/app/lib/billing"
|
||||||
import { USE_BILLING } from "@/app/lib/feature_flags";
|
import { USE_BILLING } from "@/app/lib/feature_flags";
|
||||||
import { getResponse } from "@/app/lib/agents";
|
import { getResponse } from "@/app/lib/agents";
|
||||||
import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage } from "@/app/lib/types/types";
|
import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage } from "@/app/lib/types/types";
|
||||||
import { IUsageQuotaPolicyService } from "@/src/application/services/usage-quota-policy.service.interface";
|
import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface";
|
||||||
import { container } from "@/di/container";
|
import { container } from "@/di/container";
|
||||||
|
|
||||||
function convert(messages: z.infer<typeof apiV1.ChatMessage>[]): z.infer<typeof Message>[] {
|
function convert(messages: z.infer<typeof apiV1.ChatMessage>[]): z.infer<typeof Message>[] {
|
||||||
|
|
@ -125,8 +125,8 @@ export async function POST(
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert and consume quota
|
// assert and consume quota
|
||||||
const usageQuotaPolicyService = container.resolve<IUsageQuotaPolicyService>('usageQuotaPolicyService');
|
const usageQuotaPolicy = container.resolve<IUsageQuotaPolicy>('usageQuotaPolicy');
|
||||||
await usageQuotaPolicyService.assertAndConsume(session.projectId);
|
await usageQuotaPolicy.assertAndConsume(session.projectId);
|
||||||
|
|
||||||
// parse and validate the request body
|
// parse and validate the request body
|
||||||
let body;
|
let body;
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import { CreateCachedTurnUseCase } from "@/src/application/use-cases/conversatio
|
||||||
import { FetchCachedTurnUseCase } from "@/src/application/use-cases/conversations/fetch-cached-turn.use-case";
|
import { FetchCachedTurnUseCase } from "@/src/application/use-cases/conversations/fetch-cached-turn.use-case";
|
||||||
import { CreateCachedTurnController } from "@/src/interface-adapters/controllers/conversations/create-cached-turn.controller";
|
import { CreateCachedTurnController } from "@/src/interface-adapters/controllers/conversations/create-cached-turn.controller";
|
||||||
import { RunTurnController } from "@/src/interface-adapters/controllers/conversations/run-turn.controller";
|
import { RunTurnController } from "@/src/interface-adapters/controllers/conversations/run-turn.controller";
|
||||||
import { RedisUsageQuotaPolicyService } from "@/src/infrastructure/services/redis.usage-quota-policy.service";
|
import { RedisUsageQuotaPolicy } from "@/src/infrastructure/policies/redis.usage-quota.policy";
|
||||||
|
|
||||||
export const container = createContainer({
|
export const container = createContainer({
|
||||||
injectionMode: InjectionMode.PROXY,
|
injectionMode: InjectionMode.PROXY,
|
||||||
|
|
@ -20,7 +20,7 @@ container.register({
|
||||||
// services
|
// services
|
||||||
// ---
|
// ---
|
||||||
cacheService: asClass(RedisCacheService).singleton(),
|
cacheService: asClass(RedisCacheService).singleton(),
|
||||||
usageQuotaPolicyService: asClass(RedisUsageQuotaPolicyService).singleton(),
|
usageQuotaPolicy: asClass(RedisUsageQuotaPolicy).singleton(),
|
||||||
|
|
||||||
// conversations
|
// conversations
|
||||||
// ---
|
// ---
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
export interface IUsageQuotaPolicyService {
|
export interface IUsageQuotaPolicy {
|
||||||
// this method will throw a QuotaExceededError if the quota is exceeded
|
// this method will throw a QuotaExceededError if the quota is exceeded
|
||||||
assertAndConsume(projectId: string): Promise<void>;
|
assertAndConsume(projectId: string): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
@ -5,7 +5,7 @@ import { z } from "zod";
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
import { ICacheService } from '@/src/application/services/cache.service.interface';
|
import { ICacheService } from '@/src/application/services/cache.service.interface';
|
||||||
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
|
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
|
||||||
import { IUsageQuotaPolicyService } from '../../services/usage-quota-policy.service.interface';
|
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
caller: z.enum(["user", "api"]),
|
caller: z.enum(["user", "api"]),
|
||||||
|
|
@ -22,20 +22,20 @@ export interface ICreateCachedTurnUseCase {
|
||||||
export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase {
|
export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase {
|
||||||
private readonly cacheService: ICacheService;
|
private readonly cacheService: ICacheService;
|
||||||
private readonly conversationsRepository: IConversationsRepository;
|
private readonly conversationsRepository: IConversationsRepository;
|
||||||
private readonly usageQuotaPolicyService: IUsageQuotaPolicyService;
|
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
|
||||||
|
|
||||||
constructor({
|
constructor({
|
||||||
cacheService,
|
cacheService,
|
||||||
conversationsRepository,
|
conversationsRepository,
|
||||||
usageQuotaPolicyService,
|
usageQuotaPolicy,
|
||||||
}: {
|
}: {
|
||||||
cacheService: ICacheService,
|
cacheService: ICacheService,
|
||||||
conversationsRepository: IConversationsRepository,
|
conversationsRepository: IConversationsRepository,
|
||||||
usageQuotaPolicyService: IUsageQuotaPolicyService,
|
usageQuotaPolicy: IUsageQuotaPolicy,
|
||||||
}) {
|
}) {
|
||||||
this.cacheService = cacheService;
|
this.cacheService = cacheService;
|
||||||
this.conversationsRepository = conversationsRepository;
|
this.conversationsRepository = conversationsRepository;
|
||||||
this.usageQuotaPolicyService = usageQuotaPolicyService;
|
this.usageQuotaPolicy = usageQuotaPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }> {
|
async execute(data: z.infer<typeof inputSchema>): Promise<{ key: string }> {
|
||||||
|
|
@ -49,7 +49,7 @@ export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase {
|
||||||
const { projectId } = conversation;
|
const { projectId } = conversation;
|
||||||
|
|
||||||
// assert and consume quota
|
// assert and consume quota
|
||||||
await this.usageQuotaPolicyService.assertAndConsume(projectId);
|
await this.usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// if caller is a user, ensure they are a member of project
|
// if caller is a user, ensure they are a member of project
|
||||||
if (data.caller === "user") {
|
if (data.caller === "user") {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import { IConversationsRepository } from "@/src/application/repositories/convers
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import { Conversation } from "@/src/entities/models/conversation";
|
import { Conversation } from "@/src/entities/models/conversation";
|
||||||
import { Workflow } from "@/app/lib/types/workflow_types";
|
import { Workflow } from "@/app/lib/types/workflow_types";
|
||||||
import { IUsageQuotaPolicyService } from '../../services/usage-quota-policy.service.interface';
|
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
caller: z.enum(["user", "api"]),
|
caller: z.enum(["user", "api"]),
|
||||||
|
|
@ -21,17 +21,17 @@ export interface ICreateConversationUseCase {
|
||||||
|
|
||||||
export class CreateConversationUseCase implements ICreateConversationUseCase {
|
export class CreateConversationUseCase implements ICreateConversationUseCase {
|
||||||
private readonly conversationsRepository: IConversationsRepository;
|
private readonly conversationsRepository: IConversationsRepository;
|
||||||
private readonly usageQuotaPolicyService: IUsageQuotaPolicyService;
|
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
|
||||||
|
|
||||||
constructor({
|
constructor({
|
||||||
conversationsRepository,
|
conversationsRepository,
|
||||||
usageQuotaPolicyService,
|
usageQuotaPolicy,
|
||||||
}: {
|
}: {
|
||||||
conversationsRepository: IConversationsRepository,
|
conversationsRepository: IConversationsRepository,
|
||||||
usageQuotaPolicyService: IUsageQuotaPolicyService,
|
usageQuotaPolicy: IUsageQuotaPolicy,
|
||||||
}) {
|
}) {
|
||||||
this.conversationsRepository = conversationsRepository;
|
this.conversationsRepository = conversationsRepository;
|
||||||
this.usageQuotaPolicyService = usageQuotaPolicyService;
|
this.usageQuotaPolicy = usageQuotaPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>> {
|
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof Conversation>> {
|
||||||
|
|
@ -40,7 +40,7 @@ export class CreateConversationUseCase implements ICreateConversationUseCase {
|
||||||
let workflow = data.workflow;
|
let workflow = data.workflow;
|
||||||
|
|
||||||
// assert and consume quota
|
// assert and consume quota
|
||||||
await this.usageQuotaPolicyService.assertAndConsume(projectId);
|
await this.usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// if caller is a user, ensure they are a member of project
|
// if caller is a user, ensure they are a member of project
|
||||||
if (caller === "user") {
|
if (caller === "user") {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import { IConversationsRepository } from "@/src/application/repositories/convers
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import { ICacheService } from '@/src/application/services/cache.service.interface';
|
import { ICacheService } from '@/src/application/services/cache.service.interface';
|
||||||
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
|
import { CachedTurnRequest, Turn } from '@/src/entities/models/turn';
|
||||||
import { IUsageQuotaPolicyService } from '../../services/usage-quota-policy.service.interface';
|
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
caller: z.enum(["user", "api"]),
|
caller: z.enum(["user", "api"]),
|
||||||
|
|
@ -20,20 +20,20 @@ export interface IFetchCachedTurnUseCase {
|
||||||
export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase {
|
export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase {
|
||||||
private readonly cacheService: ICacheService;
|
private readonly cacheService: ICacheService;
|
||||||
private readonly conversationsRepository: IConversationsRepository;
|
private readonly conversationsRepository: IConversationsRepository;
|
||||||
private readonly usageQuotaPolicyService: IUsageQuotaPolicyService;
|
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
|
||||||
|
|
||||||
constructor({
|
constructor({
|
||||||
cacheService,
|
cacheService,
|
||||||
conversationsRepository,
|
conversationsRepository,
|
||||||
usageQuotaPolicyService,
|
usageQuotaPolicy,
|
||||||
}: {
|
}: {
|
||||||
cacheService: ICacheService,
|
cacheService: ICacheService,
|
||||||
conversationsRepository: IConversationsRepository,
|
conversationsRepository: IConversationsRepository,
|
||||||
usageQuotaPolicyService: IUsageQuotaPolicyService,
|
usageQuotaPolicy: IUsageQuotaPolicy,
|
||||||
}) {
|
}) {
|
||||||
this.cacheService = cacheService;
|
this.cacheService = cacheService;
|
||||||
this.conversationsRepository = conversationsRepository;
|
this.conversationsRepository = conversationsRepository;
|
||||||
this.usageQuotaPolicyService = usageQuotaPolicyService;
|
this.usageQuotaPolicy = usageQuotaPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof CachedTurnRequest>> {
|
async execute(data: z.infer<typeof inputSchema>): Promise<z.infer<typeof CachedTurnRequest>> {
|
||||||
|
|
@ -56,7 +56,7 @@ export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase {
|
||||||
const { projectId } = conversation;
|
const { projectId } = conversation;
|
||||||
|
|
||||||
// assert and consume quota
|
// assert and consume quota
|
||||||
await this.usageQuotaPolicyService.assertAndConsume(projectId);
|
await this.usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// if caller is a user, ensure they are a member of project
|
// if caller is a user, ensure they are a member of project
|
||||||
if (data.caller === "user") {
|
if (data.caller === "user") {
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import { IConversationsRepository } from "@/src/application/repositories/convers
|
||||||
import { streamResponse } from "@/app/lib/agents";
|
import { streamResponse } from "@/app/lib/agents";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import { Message } from "@/app/lib/types/types";
|
import { Message } from "@/app/lib/types/types";
|
||||||
import { IUsageQuotaPolicyService } from '../../services/usage-quota-policy.service.interface';
|
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
caller: z.enum(["user", "api"]),
|
caller: z.enum(["user", "api"]),
|
||||||
|
|
@ -24,17 +24,17 @@ export interface IRunConversationTurnUseCase {
|
||||||
|
|
||||||
export class RunConversationTurnUseCase implements IRunConversationTurnUseCase {
|
export class RunConversationTurnUseCase implements IRunConversationTurnUseCase {
|
||||||
private readonly conversationsRepository: IConversationsRepository;
|
private readonly conversationsRepository: IConversationsRepository;
|
||||||
private readonly usageQuotaPolicyService: IUsageQuotaPolicyService;
|
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
|
||||||
|
|
||||||
constructor({
|
constructor({
|
||||||
conversationsRepository,
|
conversationsRepository,
|
||||||
usageQuotaPolicyService,
|
usageQuotaPolicy,
|
||||||
}: {
|
}: {
|
||||||
conversationsRepository: IConversationsRepository,
|
conversationsRepository: IConversationsRepository,
|
||||||
usageQuotaPolicyService: IUsageQuotaPolicyService,
|
usageQuotaPolicy: IUsageQuotaPolicy,
|
||||||
}) {
|
}) {
|
||||||
this.conversationsRepository = conversationsRepository;
|
this.conversationsRepository = conversationsRepository;
|
||||||
this.usageQuotaPolicyService = usageQuotaPolicyService;
|
this.usageQuotaPolicy = usageQuotaPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
async *execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown> {
|
async *execute(data: z.infer<typeof inputSchema>): AsyncGenerator<z.infer<typeof TurnEvent>, void, unknown> {
|
||||||
|
|
@ -48,7 +48,7 @@ export class RunConversationTurnUseCase implements IRunConversationTurnUseCase {
|
||||||
const { id: conversationId, projectId } = conversation;
|
const { id: conversationId, projectId } = conversation;
|
||||||
|
|
||||||
// assert and consume quota
|
// assert and consume quota
|
||||||
await this.usageQuotaPolicyService.assertAndConsume(projectId);
|
await this.usageQuotaPolicy.assertAndConsume(projectId);
|
||||||
|
|
||||||
// if caller is a user, ensure they are a member of project
|
// if caller is a user, ensure they are a member of project
|
||||||
if (data.caller === "user") {
|
if (data.caller === "user") {
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
import { IUsageQuotaPolicyService } from "@/src/application/services/usage-quota-policy.service.interface";
|
import { IUsageQuotaPolicy } from "@/src/application/policies/usage-quota.policy.interface";
|
||||||
import { redisClient } from "@/app/lib/redis";
|
import { redisClient } from "@/app/lib/redis";
|
||||||
import { QuotaExceededError } from "@/src/entities/errors/common";
|
import { QuotaExceededError } from "@/src/entities/errors/common";
|
||||||
|
|
||||||
const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0;
|
const MAX_QUERIES_PER_MINUTE = Number(process.env.MAX_QUERIES_PER_MINUTE) || 0;
|
||||||
|
|
||||||
export class RedisUsageQuotaPolicyService implements IUsageQuotaPolicyService {
|
export class RedisUsageQuotaPolicy implements IUsageQuotaPolicy {
|
||||||
async assertAndConsume(projectId: string): Promise<void> {
|
async assertAndConsume(projectId: string): Promise<void> {
|
||||||
if (MAX_QUERIES_PER_MINUTE === 0) {
|
if (MAX_QUERIES_PER_MINUTE === 0) {
|
||||||
return;
|
return;
|
||||||
Loading…
Add table
Add a link
Reference in a new issue