diff --git a/apps/rowboat/di/container.ts b/apps/rowboat/di/container.ts index 8054910b..247ab5cf 100644 --- a/apps/rowboat/di/container.ts +++ b/apps/rowboat/di/container.ts @@ -10,6 +10,9 @@ import { FetchCachedTurnUseCase } from "@/src/application/use-cases/conversation import { CreateCachedTurnController } from "@/src/interface-adapters/controllers/conversations/create-cached-turn.controller"; import { RunTurnController } from "@/src/interface-adapters/controllers/conversations/run-turn.controller"; import { RedisUsageQuotaPolicy } from "@/src/infrastructure/policies/redis.usage-quota.policy"; +import { ProjectActionAuthorizationPolicy } from "@/src/application/policies/project-action-authorization.policy"; +import { MongoDBProjectMembersRepository } from "@/src/infrastructure/repositories/mongodb.project-members.repository"; +import { MongoDBApiKeysRepository } from "@/src/infrastructure/repositories/mongodb.api-keys.repository"; export const container = createContainer({ injectionMode: InjectionMode.PROXY, @@ -20,7 +23,19 @@ container.register({ // services // --- cacheService: asClass(RedisCacheService).singleton(), + + // policies + // --- usageQuotaPolicy: asClass(RedisUsageQuotaPolicy).singleton(), + projectActionAuthorizationPolicy: asClass(ProjectActionAuthorizationPolicy).singleton(), + + // project members + // --- + projectMembersRepository: asClass(MongoDBProjectMembersRepository).singleton(), + + // api keys + // --- + apiKeysRepository: asClass(MongoDBApiKeysRepository).singleton(), // conversations // --- diff --git a/apps/rowboat/src/application/policies/project-action-authorization.policy.ts b/apps/rowboat/src/application/policies/project-action-authorization.policy.ts new file mode 100644 index 00000000..b79d5c1f --- /dev/null +++ b/apps/rowboat/src/application/policies/project-action-authorization.policy.ts @@ -0,0 +1,55 @@ +import { BadRequestError, NotAuthorizedError } from "@/src/entities/errors/common"; +import { IProjectMembersRepository } from "../repositories/project-members.repository.interface"; +import { z } from "zod"; +import { IApiKeysRepository } from "../repositories/api-keys.repository.interface"; + +const inputSchema = z.object({ + caller: z.enum(["user", "api"]), + userId: z.string().optional(), + apiKey: z.string().optional(), + projectId: z.string(), +}); + +export interface IProjectActionAuthorizationPolicy { + authorize(data: z.infer): Promise; +} + +export class ProjectActionAuthorizationPolicy implements IProjectActionAuthorizationPolicy { + private readonly projectMembersRepository: IProjectMembersRepository; + private readonly apiKeysRepository: IApiKeysRepository; + + constructor({ + projectMembersRepository, + apiKeysRepository, + }: { + projectMembersRepository: IProjectMembersRepository; + apiKeysRepository: IApiKeysRepository; + }) { + this.projectMembersRepository = projectMembersRepository; + this.apiKeysRepository = apiKeysRepository; + } + + async authorize(data: z.infer): Promise { + const { caller, userId, apiKey, projectId } = data; + + if (caller === "user") { + if (!userId) { + throw new BadRequestError('User ID is required'); + } + const membership = await this.projectMembersRepository.checkMembership(projectId, userId); + if (!membership) { + throw new NotAuthorizedError('User is not a member of the project'); + } + } else { + if (!apiKey) { + throw new BadRequestError('API key is required'); + } + // check and consume api key + // while also updating last used timestamp + const result = await this.apiKeysRepository.checkAndConsumeKey(projectId, apiKey); + if (!result) { + throw new NotAuthorizedError('Invalid API key'); + } + } + } +} \ No newline at end of file diff --git a/apps/rowboat/src/application/repositories/api-keys.repository.interface.ts b/apps/rowboat/src/application/repositories/api-keys.repository.interface.ts new file mode 100644 index 00000000..383443d0 --- /dev/null +++ b/apps/rowboat/src/application/repositories/api-keys.repository.interface.ts @@ -0,0 +1,3 @@ +export interface IApiKeysRepository { + checkAndConsumeKey(projectId: string, apiKey: string): Promise; +} \ No newline at end of file diff --git a/apps/rowboat/src/application/repositories/project-members.repository.interface.ts b/apps/rowboat/src/application/repositories/project-members.repository.interface.ts new file mode 100644 index 00000000..d20063dd --- /dev/null +++ b/apps/rowboat/src/application/repositories/project-members.repository.interface.ts @@ -0,0 +1,3 @@ +export interface IProjectMembersRepository { + checkMembership(projectId: string, userId: string): Promise; +} \ No newline at end of file diff --git a/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts index d10d0c0e..d1bf4df5 100644 --- a/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/create-cached-turn.use-case.ts @@ -1,11 +1,11 @@ -import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common'; -import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb"; +import { NotFoundError } from '@/src/entities/errors/common'; import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface"; import { z } from "zod"; import { nanoid } from 'nanoid'; import { ICacheService } from '@/src/application/services/cache.service.interface'; import { CachedTurnRequest, Turn } from '@/src/entities/models/turn'; import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; const inputSchema = z.object({ caller: z.enum(["user", "api"]), @@ -23,19 +23,23 @@ export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase { private readonly cacheService: ICacheService; private readonly conversationsRepository: IConversationsRepository; private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; constructor({ cacheService, conversationsRepository, usageQuotaPolicy, + projectActionAuthorizationPolicy, }: { cacheService: ICacheService, conversationsRepository: IConversationsRepository, usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, }) { this.cacheService = cacheService; this.conversationsRepository = conversationsRepository; this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; } async execute(data: z.infer): Promise<{ key: string }> { @@ -51,35 +55,13 @@ export class CreateCachedTurnUseCase implements ICreateCachedTurnUseCase { // assert and consume quota await this.usageQuotaPolicy.assertAndConsume(projectId); - // if caller is a user, ensure they are a member of project - if (data.caller === "user") { - if (!data.userId) { - throw new BadRequestError('User ID is required'); - } - const membership = await projectMembersCollection.findOne({ - projectId, - userId: data.userId, - }); - if (!membership) { - throw new NotAuthorizedError('User not a member of project'); - } - } else { - if (!data.apiKey) { - throw new BadRequestError('API key is required'); - } - // check if api key is valid - // while also updating last used timestamp - const result = await apiKeysCollection.findOneAndUpdate( - { - projectId, - key: data.apiKey, - }, - { $set: { lastUsedAt: new Date().toISOString() } } - ); - if (!result) { - throw new NotAuthorizedError('Invalid API key'); - } - } + // authz check + await this.projectActionAuthorizationPolicy.authorize({ + caller: data.caller, + userId: data.userId, + apiKey: data.apiKey, + projectId, + }); // create cache entry const key = nanoid(); diff --git a/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts index 0fb9a35e..5c06c0c0 100644 --- a/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/create-conversation.use-case.ts @@ -1,10 +1,11 @@ -import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common'; -import { apiKeysCollection, projectMembersCollection, projectsCollection } from "@/app/lib/mongodb"; +import { BadRequestError, NotFoundError } from '@/src/entities/errors/common'; +import { projectsCollection } from "@/app/lib/mongodb"; import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface"; import { z } from "zod"; import { Conversation } from "@/src/entities/models/conversation"; import { Workflow } from "@/app/lib/types/workflow_types"; import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; const inputSchema = z.object({ caller: z.enum(["user", "api"]), @@ -22,16 +23,20 @@ export interface ICreateConversationUseCase { export class CreateConversationUseCase implements ICreateConversationUseCase { private readonly conversationsRepository: IConversationsRepository; private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; constructor({ conversationsRepository, usageQuotaPolicy, + projectActionAuthorizationPolicy, }: { conversationsRepository: IConversationsRepository, usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, }) { this.conversationsRepository = conversationsRepository; this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; } async execute(data: z.infer): Promise> { @@ -42,35 +47,13 @@ export class CreateConversationUseCase implements ICreateConversationUseCase { // assert and consume quota await this.usageQuotaPolicy.assertAndConsume(projectId); - // if caller is a user, ensure they are a member of project - if (caller === "user") { - if (!userId) { - throw new BadRequestError('User ID is required'); - } - const membership = await projectMembersCollection.findOne({ - projectId, - userId, - }); - if (!membership) { - throw new NotAuthorizedError('User not a member of project'); - } - } else { - if (!apiKey) { - throw new BadRequestError('API key is required'); - } - // check if api key is valid - // while also updating last used timestamp - const result = await apiKeysCollection.findOneAndUpdate( - { - projectId, - key: apiKey, - }, - { $set: { lastUsedAt: new Date().toISOString() } } - ); - if (!result) { - throw new NotAuthorizedError('Invalid API key'); - } - } + // authz check + await this.projectActionAuthorizationPolicy.authorize({ + caller, + userId, + apiKey, + projectId, + }); // if workflow is not provided, fetch workflow if (!workflow) { diff --git a/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts index f5047e13..6e16e5b4 100644 --- a/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/fetch-cached-turn.use-case.ts @@ -1,10 +1,10 @@ -import { BadRequestError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common'; -import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb"; +import { NotFoundError } from '@/src/entities/errors/common'; import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface"; import { z } from "zod"; import { ICacheService } from '@/src/application/services/cache.service.interface'; import { CachedTurnRequest, Turn } from '@/src/entities/models/turn'; import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; const inputSchema = z.object({ caller: z.enum(["user", "api"]), @@ -21,19 +21,23 @@ export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase { private readonly cacheService: ICacheService; private readonly conversationsRepository: IConversationsRepository; private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; constructor({ cacheService, conversationsRepository, usageQuotaPolicy, + projectActionAuthorizationPolicy, }: { cacheService: ICacheService, conversationsRepository: IConversationsRepository, usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, }) { this.cacheService = cacheService; this.conversationsRepository = conversationsRepository; this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; } async execute(data: z.infer): Promise> { @@ -58,35 +62,13 @@ export class FetchCachedTurnUseCase implements IFetchCachedTurnUseCase { // assert and consume quota await this.usageQuotaPolicy.assertAndConsume(projectId); - // if caller is a user, ensure they are a member of project - if (data.caller === "user") { - if (!data.userId) { - throw new BadRequestError('User ID is required'); - } - const membership = await projectMembersCollection.findOne({ - projectId, - userId: data.userId, - }); - if (!membership) { - throw new NotAuthorizedError('User not a member of project'); - } - } else { - if (!data.apiKey) { - throw new BadRequestError('API key is required'); - } - // check if api key is valid - // while also updating last used timestamp - const result = await apiKeysCollection.findOneAndUpdate( - { - projectId, - key: data.apiKey, - }, - { $set: { lastUsedAt: new Date().toISOString() } } - ); - if (!result) { - throw new NotAuthorizedError('Invalid API key'); - } - } + // authz check + await this.projectActionAuthorizationPolicy.authorize({ + caller: data.caller, + userId: data.userId, + apiKey: data.apiKey, + projectId, + }); // delete from cache await this.cacheService.delete(`turn-${data.key}`); diff --git a/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts b/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts index 67c481f6..824e60ac 100644 --- a/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts +++ b/apps/rowboat/src/application/use-cases/conversations/run-conversation-turn.use-case.ts @@ -1,13 +1,13 @@ import { Turn, TurnEvent } from "@/src/entities/models/turn"; import { USE_BILLING } from "@/app/lib/feature_flags"; import { authorize, getCustomerIdForProject } from "@/app/lib/billing"; -import { BadRequestError, BillingError, NotAuthorizedError, NotFoundError } from '@/src/entities/errors/common'; -import { apiKeysCollection, projectMembersCollection } from "@/app/lib/mongodb"; +import { NotFoundError } from '@/src/entities/errors/common'; import { IConversationsRepository } from "@/src/application/repositories/conversations.repository.interface"; import { streamResponse } from "@/app/lib/agents"; import { z } from "zod"; import { Message } from "@/app/lib/types/types"; import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; const inputSchema = z.object({ caller: z.enum(["user", "api"]), @@ -25,16 +25,20 @@ export interface IRunConversationTurnUseCase { export class RunConversationTurnUseCase implements IRunConversationTurnUseCase { private readonly conversationsRepository: IConversationsRepository; private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; constructor({ conversationsRepository, usageQuotaPolicy, + projectActionAuthorizationPolicy, }: { conversationsRepository: IConversationsRepository, usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, }) { this.conversationsRepository = conversationsRepository; this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; } async *execute(data: z.infer): AsyncGenerator, void, unknown> { @@ -50,35 +54,13 @@ export class RunConversationTurnUseCase implements IRunConversationTurnUseCase { // assert and consume quota await this.usageQuotaPolicy.assertAndConsume(projectId); - // if caller is a user, ensure they are a member of project - if (data.caller === "user") { - if (!data.userId) { - throw new BadRequestError('User ID is required'); - } - const membership = await projectMembersCollection.findOne({ - projectId, - userId: data.userId, - }); - if (!membership) { - throw new NotAuthorizedError('User not a member of project'); - } - } else { - if (!data.apiKey) { - throw new BadRequestError('API key is required'); - } - // check if api key is valid - // while also updating last used timestamp - const result = await apiKeysCollection.findOneAndUpdate( - { - projectId, - key: data.apiKey, - }, - { $set: { lastUsedAt: new Date().toISOString() } } - ); - if (!result) { - throw new NotAuthorizedError('Invalid API key'); - } - } + // authz check + await this.projectActionAuthorizationPolicy.authorize({ + caller: data.caller, + userId: data.userId, + apiKey: data.apiKey, + projectId, + }); // Check billing auth if (USE_BILLING) { diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.repository.ts new file mode 100644 index 00000000..657a2461 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.repository.ts @@ -0,0 +1,12 @@ +import { IApiKeysRepository } from "@/src/application/repositories/api-keys.repository.interface"; +import { apiKeysCollection } from "@/app/lib/mongodb"; + +export class MongoDBApiKeysRepository implements IApiKeysRepository { + async checkAndConsumeKey(projectId: string, apiKey: string): Promise { + const result = await apiKeysCollection.findOneAndUpdate( + { projectId, key: apiKey }, + { $set: { lastUsedAt: new Date().toISOString() } } + ); + return !!result; + } +} \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.repository.ts new file mode 100644 index 00000000..6b0416d9 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.repository.ts @@ -0,0 +1,12 @@ +import { IProjectMembersRepository } from "@/src/application/repositories/project-members.repository.interface"; +import { db } from "@/app/lib/mongodb"; + +export class MongoDBProjectMembersRepository implements IProjectMembersRepository { + async checkMembership(projectId: string, userId: string): Promise { + const membership = await db.collection('project_members').findOne({ + projectId, + userId, + }); + return !!membership; + } +} \ No newline at end of file