diff --git a/apps/rowboat/app/actions/auth.actions.ts b/apps/rowboat/app/actions/auth.actions.ts index 14b483e4..584772ce 100644 --- a/apps/rowboat/app/actions/auth.actions.ts +++ b/apps/rowboat/app/actions/auth.actions.ts @@ -1,13 +1,15 @@ "use server"; import { auth0 } from "../lib/auth0"; import { USE_AUTH } from "../lib/feature_flags"; -import { WithStringId, User } from "../lib/types/types"; +import { User } from "@/src/entities/models/user"; import { getUserFromSessionId, GUEST_DB_USER } from "../lib/auth"; import { z } from "zod"; -import { ObjectId } from "mongodb"; -import { usersCollection } from "../lib/mongodb"; +import { container } from "@/di/container"; +import { IUsersRepository } from "@/src/application/repositories/users.repository.interface"; -export async function authCheck(): Promise>> { +const usersRepository = container.resolve("usersRepository"); + +export async function authCheck(): Promise> { if (!USE_AUTH) { return GUEST_DB_USER; } @@ -42,12 +44,5 @@ export async function updateUserEmail(email: string) { } // update customer email in db - await usersCollection.updateOne({ - _id: new ObjectId(user._id), - }, { - $set: { - email, - updatedAt: new Date().toISOString(), - } - }); + await usersRepository.updateEmail(user.id, email); } diff --git a/apps/rowboat/app/actions/composio.actions.ts b/apps/rowboat/app/actions/composio.actions.ts index 5609cf3c..daddc930 100644 --- a/apps/rowboat/app/actions/composio.actions.ts +++ b/apps/rowboat/app/actions/composio.actions.ts @@ -49,7 +49,7 @@ export async function listToolkits(projectId: string, cursor: string | null = nu const user = await authCheck(); return await listComposioToolkitsController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, cursor, }); @@ -59,7 +59,7 @@ export async function getToolkit(projectId: string, toolkitSlug: string): Promis const user = await authCheck(); return await getComposioToolkitController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug, }); @@ -69,7 +69,7 @@ export async function listTools(projectId: string, toolkitSlug: string, searchQu const user = await authCheck(); return await listComposioToolsController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug, searchQuery, @@ -81,7 +81,7 @@ export async function createComposioManagedOauth2ConnectedAccount(projectId: str const user = await authCheck(); return await createComposioManagedConnectedAccountController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug, callbackUrl, @@ -92,7 +92,7 @@ export async function createCustomConnectedAccount(projectId: string, request: z const user = await authCheck(); return await createCustomConnectedAccountController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug: request.toolkitSlug, authConfig: request.authConfig, @@ -104,7 +104,7 @@ export async function syncConnectedAccount(projectId: string, toolkitSlug: strin const user = await authCheck(); return await syncConnectedAccountController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug, connectedAccountId, @@ -116,7 +116,7 @@ export async function deleteConnectedAccount(projectId: string, toolkitSlug: str await deleteComposioConnectedAccountController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, toolkitSlug, }); @@ -144,7 +144,7 @@ export async function createComposioTriggerDeployment(request: { // create trigger deployment return await createComposioTriggerDeploymentController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId: request.projectId, data: { triggerTypeSlug: request.triggerTypeSlug, @@ -163,7 +163,7 @@ export async function listComposioTriggerDeployments(request: { // list trigger deployments return await listComposioTriggerDeploymentsController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId: request.projectId, cursor: request.cursor, }); @@ -178,7 +178,7 @@ export async function deleteComposioTriggerDeployment(request: { // delete trigger deployment return await deleteComposioTriggerDeploymentController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId: request.projectId, deploymentId: request.deploymentId, }); @@ -188,7 +188,7 @@ export async function fetchComposioTriggerDeployment(request: { deploymentId: st const user = await authCheck(); return await fetchComposioTriggerDeploymentController.execute({ caller: 'user', - userId: user._id, + userId: user.id, deploymentId: request.deploymentId, }); } \ No newline at end of file diff --git a/apps/rowboat/app/actions/conversation.actions.ts b/apps/rowboat/app/actions/conversation.actions.ts index 74389125..68f33fea 100644 --- a/apps/rowboat/app/actions/conversation.actions.ts +++ b/apps/rowboat/app/actions/conversation.actions.ts @@ -17,7 +17,7 @@ export async function listConversations(request: { return await listConversationsController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId: request.projectId, cursor: request.cursor, limit: request.limit, @@ -31,7 +31,7 @@ export async function fetchConversation(request: { return await fetchConversationController.execute({ caller: 'user', - userId: user._id, + userId: user.id, conversationId: request.conversationId, }); } \ No newline at end of file diff --git a/apps/rowboat/app/actions/custom-mcp-server.actions.ts b/apps/rowboat/app/actions/custom-mcp-server.actions.ts index 265f9673..0ce2e567 100644 --- a/apps/rowboat/app/actions/custom-mcp-server.actions.ts +++ b/apps/rowboat/app/actions/custom-mcp-server.actions.ts @@ -32,7 +32,7 @@ export async function addServer(projectId: string, name: string, server: McpServ validateUrl(server.serverUrl); await addCustomMcpServerController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, name, server, @@ -43,7 +43,7 @@ export async function removeServer(projectId: string, name: string): Promise("createPlaygroundConversationController"); return await controller.execute({ - userId: user._id, + userId: user.id, projectId, workflow, isLiveWorkflow, @@ -41,7 +41,7 @@ export async function createCachedTurn({ const { key } = await createCachedTurnController.execute({ caller: "user", - userId: user._id, + userId: user.id, conversationId, input: { messages, diff --git a/apps/rowboat/app/actions/project.actions.ts b/apps/rowboat/app/actions/project.actions.ts index a11d8430..5f042694 100644 --- a/apps/rowboat/app/actions/project.actions.ts +++ b/apps/rowboat/app/actions/project.actions.ts @@ -57,7 +57,7 @@ export async function projectAuthCheck(projectId: string) { const user = await authCheck(); await projectActionAuthorizationPolicy.authorize({ caller: 'user', - userId: user._id, + userId: user.id, projectId, }); } @@ -69,7 +69,7 @@ export async function createProject(formData: FormData): Promise<{ id: string } try { const project = await createProjectController.execute({ - userId: user._id, + userId: user.id, data: { name: name || '', mode: { @@ -94,7 +94,7 @@ export async function createProjectFromWorkflowJson(formData: FormData): Promise try { const project = await createProjectController.execute({ - userId: user._id, + userId: user.id, data: { name: name || '', mode: { @@ -116,7 +116,7 @@ export async function fetchProject(projectId: string): Promise[]> { let cursor = undefined; do { const result = await listProjectsController.execute({ - userId: user._id, + userId: user.id, cursor, }); projects.push(...result.items); @@ -148,7 +148,7 @@ export async function rotateSecret(projectId: string): Promise { const user = await authCheck(); return await rotateSecretController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, }); } @@ -157,7 +157,7 @@ export async function updateWebhookUrl(projectId: string, url: string) { const user = await authCheck(); await updateWebhookUrlController.execute({ caller: 'user', - userId: user._id, + userId: user.id, projectId, url, }); @@ -167,7 +167,7 @@ export async function createApiKey(projectId: string): Promise> = { - _id: "guest_user", +export const GUEST_DB_USER: z.infer = { + id: "guest_user", auth0Id: "guest_user", name: "Guest", email: "guest@rowboatlabs.com", createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), } /** @@ -33,7 +32,7 @@ export const GUEST_DB_USER: WithStringId> = { * const user = await requireAuth(); * ``` */ -export async function requireAuth(): Promise>> { +export async function requireAuth(): Promise> { if (!USE_AUTH) { return GUEST_DB_USER; } @@ -44,48 +43,26 @@ export async function requireAuth(): Promise>> } // fetch db user + const usersRepository = container.resolve("usersRepository"); let dbUser = await getUserFromSessionId(user.sub); // if db user does not exist, create one if (!dbUser) { - // create user record - const doc = { - _id: new ObjectId(), + dbUser = await usersRepository.create({ auth0Id: user.sub, - createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), email: user.email, - }; - console.log(`creating new user id ${doc._id.toString()} for session id ${user.sub}`); - await usersCollection.insertOne(doc); - - dbUser = { - ...doc, - _id: doc._id.toString(), - }; + }); + console.log(`created new user id ${dbUser.id} for session id ${user.sub}`); } - const { _id, ...rest } = dbUser; - return { - ...rest, - _id: _id.toString(), - }; + return dbUser; } -export async function getUserFromSessionId(sessionUserId: string): Promise> | null> { +export async function getUserFromSessionId(sessionUserId: string): Promise | null> { if (!USE_AUTH) { return GUEST_DB_USER; } - let dbUser = await usersCollection.findOne({ - auth0Id: sessionUserId - }); - if (!dbUser) { - return null; - } - const { _id, ...rest } = dbUser; - return { - ...rest, - _id: _id.toString(), - }; + const usersRepository = container.resolve("usersRepository"); + return await usersRepository.fetchByAuth0Id(sessionUserId); } \ No newline at end of file diff --git a/apps/rowboat/app/lib/billing.ts b/apps/rowboat/app/lib/billing.ts index 538f3992..7a0c00ee 100644 --- a/apps/rowboat/app/lib/billing.ts +++ b/apps/rowboat/app/lib/billing.ts @@ -1,13 +1,12 @@ import { WithStringId } from './types/types'; import { z } from 'zod'; import { Customer, AuthorizeRequest, AuthorizeResponse, LogUsageRequest, UsageResponse, CustomerPortalSessionResponse, PricesResponse, UpdateSubscriptionPlanRequest, UpdateSubscriptionPlanResponse, ModelsResponse, UsageItem } from './types/billing_types'; -import { ObjectId } from 'mongodb'; -import { usersCollection } from './mongodb'; import { redirect } from 'next/navigation'; import { getUserFromSessionId, requireAuth } from './auth'; import { USE_BILLING } from './feature_flags'; import { container } from '@/di/container'; import { IProjectsRepository } from '@/src/application/repositories/projects.repository.interface'; +import { IUsersRepository } from '@/src/application/repositories/users.repository.interface'; const BILLING_API_URL = process.env.BILLING_API_URL || 'http://billing'; const BILLING_API_KEY = process.env.BILLING_API_KEY || 'test'; @@ -27,6 +26,7 @@ const GUEST_BILLING_CUSTOMER = { updatedAt: new Date().toISOString(), }; + export class UsageTracker{ private items: z.infer[] = []; @@ -42,7 +42,9 @@ export class UsageTracker{ } export async function getCustomerForUserId(userId: string): Promise> | null> { - const user = await usersCollection.findOne({ _id: new ObjectId(userId) }); + const usersRepository = container.resolve("usersRepository"); + + const user = await usersRepository.fetch(userId); if (!user) { throw new Error("User not found"); } @@ -266,11 +268,12 @@ export async function getEligibleModels(customerId: string): Promise>> { const user = await requireAuth(); + const usersRepository = container.resolve("usersRepository"); if (!USE_BILLING) { return { ...GUEST_BILLING_CUSTOMER, - userId: user._id, + userId: user.id, }; } @@ -284,18 +287,11 @@ export async function requireBillingCustomer(): Promise>("chats"); export const chatMessagesCollection = db.collection>("chat_messages"); export const twilioConfigsCollection = db.collection>("twilio_configs"); -export const usersCollection = db.collection>("users"); export const twilioInboundCallsCollection = db.collection>("twilio_inbound_calls"); // Create indexes diff --git a/apps/rowboat/app/lib/types/types.ts b/apps/rowboat/app/lib/types/types.ts index ab9de57a..6023ab13 100644 --- a/apps/rowboat/app/lib/types/types.ts +++ b/apps/rowboat/app/lib/types/types.ts @@ -124,15 +124,6 @@ export const McpServerResponse = z.object({ error: z.string().nullable(), }); -export const User = z.object({ - auth0Id: z.string(), - billingCustomerId: z.string().optional(), - name: z.string().optional(), - email: z.string().optional(), - createdAt: z.string().datetime(), - updatedAt: z.string().datetime(), -}); - export const Webpage = z.object({ _id: z.string(), title: z.string(), diff --git a/apps/rowboat/app/projects/[projectId]/workflow/page.tsx b/apps/rowboat/app/projects/[projectId]/workflow/page.tsx index 4c9ea94a..008f4317 100644 --- a/apps/rowboat/app/projects/[projectId]/workflow/page.tsx +++ b/apps/rowboat/app/projects/[projectId]/workflow/page.tsx @@ -32,7 +32,7 @@ export default async function Page( const project = await fetchProjectController.execute({ caller: "user", - userId: user._id, + userId: user.id, projectId: params.projectId, }); if (!project) { @@ -41,7 +41,7 @@ export default async function Page( const sources = await listDataSourcesController.execute({ caller: "user", - userId: user._id, + userId: user.id, projectId: params.projectId, }); diff --git a/apps/rowboat/di/container.ts b/apps/rowboat/di/container.ts index f2af8a95..be9bf344 100644 --- a/apps/rowboat/di/container.ts +++ b/apps/rowboat/di/container.ts @@ -145,6 +145,9 @@ import { UpdateLiveWorkflowController } from "@/src/interface-adapters/controlle import { RevertToLiveWorkflowUseCase } from "@/src/application/use-cases/projects/revert-to-live-workflow.use-case"; import { RevertToLiveWorkflowController } from "@/src/interface-adapters/controllers/projects/revert-to-live-workflow.controller"; +// users +import { MongoDBUsersRepository } from "@/src/infrastructure/repositories/mongodb.users.repository"; + export const container = createContainer({ injectionMode: InjectionMode.PROXY, strict: true, @@ -324,4 +327,8 @@ container.register({ runTurnController: asClass(RunTurnController).singleton(), listConversationsController: asClass(ListConversationsController).singleton(), fetchConversationController: asClass(FetchConversationController).singleton(), + + // users + // --- + usersRepository: asClass(MongoDBUsersRepository).singleton(), }); \ No newline at end of file diff --git a/apps/rowboat/src/application/repositories/users.repository.interface.ts b/apps/rowboat/src/application/repositories/users.repository.interface.ts new file mode 100644 index 00000000..64684fb1 --- /dev/null +++ b/apps/rowboat/src/application/repositories/users.repository.interface.ts @@ -0,0 +1,19 @@ +import { z } from "zod"; +import { User } from "@/src/entities/models/user"; + +export const CreateSchema = User.pick({ + auth0Id: true, + email: true, +}); + +export interface IUsersRepository { + create(data: z.infer): Promise>; + + fetch(id: string): Promise | null>; + + fetchByAuth0Id(auth0Id: string): Promise | null>; + + updateEmail(id: string, email: string): Promise>; + + updateBillingCustomerId(id: string, billingCustomerId: string): Promise>; +} \ No newline at end of file diff --git a/apps/rowboat/src/entities/models/user.ts b/apps/rowboat/src/entities/models/user.ts new file mode 100644 index 00000000..7d39002a --- /dev/null +++ b/apps/rowboat/src/entities/models/user.ts @@ -0,0 +1,11 @@ +import { z } from "zod"; + +export const User = z.object({ + id: z.string(), + auth0Id: z.string(), + billingCustomerId: z.string().optional(), + name: z.string().optional(), + email: z.string().optional(), + createdAt: z.string().datetime(), + updatedAt: z.string().datetime().optional(), +}); \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts b/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts index 2810cf6d..cd75ed57 100644 --- a/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts +++ b/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts @@ -9,6 +9,7 @@ import { PROJECT_MEMBERS_COLLECTION } from "../repositories/mongodb.project-memb import { RECURRING_JOB_RULES_COLLECTION } from "../repositories/mongodb.recurring-job-rules.indexes"; import { SCHEDULED_JOB_RULES_COLLECTION } from "../repositories/mongodb.scheduled-job-rules.indexes"; import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION } from "../repositories/mongodb.composio-trigger-deployments.indexes"; +import { USERS_COLLECTION } from "../repositories/mongodb.users.indexes"; export async function dropAllIndexes(database: Db): Promise { const collections: string[] = [ @@ -22,6 +23,7 @@ export async function dropAllIndexes(database: Db): Promise { RECURRING_JOB_RULES_COLLECTION, SCHEDULED_JOB_RULES_COLLECTION, COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION, + USERS_COLLECTION, ]; for (const collectionName of collections) { diff --git a/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts b/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts index 32d00fc9..d4cdd6b5 100644 --- a/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts +++ b/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts @@ -9,6 +9,7 @@ import { PROJECT_MEMBERS_COLLECTION, PROJECT_MEMBERS_INDEXES } from "../reposito import { RECURRING_JOB_RULES_COLLECTION, RECURRING_JOB_RULES_INDEXES } from "../repositories/mongodb.recurring-job-rules.indexes"; import { SCHEDULED_JOB_RULES_COLLECTION, SCHEDULED_JOB_RULES_INDEXES } from "../repositories/mongodb.scheduled-job-rules.indexes"; import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION, COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES } from "../repositories/mongodb.composio-trigger-deployments.indexes"; +import { USERS_COLLECTION, USERS_INDEXES } from "../repositories/mongodb.users.indexes"; export async function ensureAllIndexes(database: Db): Promise { await database.collection(API_KEYS_COLLECTION).createIndexes(API_KEYS_INDEXES); @@ -21,4 +22,5 @@ export async function ensureAllIndexes(database: Db): Promise { await database.collection(RECURRING_JOB_RULES_COLLECTION).createIndexes(RECURRING_JOB_RULES_INDEXES); await database.collection(SCHEDULED_JOB_RULES_COLLECTION).createIndexes(SCHEDULED_JOB_RULES_INDEXES); await database.collection(COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION).createIndexes(COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES); + await database.collection(USERS_COLLECTION).createIndexes(USERS_INDEXES); } \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.users.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.users.indexes.ts new file mode 100644 index 00000000..4458f7ea --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.users.indexes.ts @@ -0,0 +1,7 @@ +import { IndexDescription } from "mongodb"; + +export const USERS_COLLECTION = "users"; + +export const USERS_INDEXES: IndexDescription[] = [ + { key: { auth0Id: 1 }, name: "auth0Id_unique", unique: true }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.users.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.users.repository.ts new file mode 100644 index 00000000..9d23538f --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.users.repository.ts @@ -0,0 +1,82 @@ +import { z } from "zod"; +import { db } from "@/app/lib/mongodb"; +import { ObjectId } from "mongodb"; +import { CreateSchema, IUsersRepository } from "@/src/application/repositories/users.repository.interface"; +import { User } from "@/src/entities/models/user"; + +const DocSchema = User + .omit({ + id: true, + }); + +export class MongoDBUsersRepository implements IUsersRepository { + private readonly collection = db.collection>("users"); + + async create(data: z.infer): Promise> { + const now = new Date().toISOString(); + const _id = new ObjectId(); + + const doc = { + ...data, + createdAt: now, + }; + + await this.collection.insertOne({ + _id, + ...doc, + }); + + return { + ...doc, + id: _id.toString(), + }; + } + + async fetch(id: string): Promise | null> { + const result = await this.collection.findOne({ _id: new ObjectId(id) }); + if (!result) return null; + + return { + ...result, + id: result._id.toString(), + }; + } + + async fetchByAuth0Id(auth0Id: string): Promise | null> { + const result = await this.collection.findOne({ auth0Id }); + if (!result) return null; + + return { + ...result, + id: result._id.toString(), + }; + } + + async updateEmail(id: string, email: string): Promise> { + const result = await this.collection.findOneAndUpdate( + { _id: new ObjectId(id) }, + { $set: { email, updatedAt: new Date().toISOString() } } + ); + + if (!result) throw new Error("User not found"); + + return { + ...result, + id: result._id.toString(), + }; + } + + async updateBillingCustomerId(id: string, billingCustomerId: string): Promise> { + const result = await this.collection.findOneAndUpdate( + { _id: new ObjectId(id) }, + { $set: { billingCustomerId, updatedAt: new Date().toISOString() } } + ); + + if (!result) throw new Error("User not found"); + + return { + ...result, + id: result._id.toString(), + }; + } +} \ No newline at end of file