ddd: create users repository

This commit is contained in:
Ramnique Singh 2025-08-23 08:15:58 +05:30
parent eac001527c
commit 219d4c7901
23 changed files with 220 additions and 134 deletions

View file

@ -1,13 +1,15 @@
"use server";
import { auth0 } from "../lib/auth0";
import { USE_AUTH } from "../lib/feature_flags";
import { WithStringId, User } from "../lib/types/types";
import { User } from "@/src/entities/models/user";
import { getUserFromSessionId, GUEST_DB_USER } from "../lib/auth";
import { z } from "zod";
import { ObjectId } from "mongodb";
import { usersCollection } from "../lib/mongodb";
import { container } from "@/di/container";
import { IUsersRepository } from "@/src/application/repositories/users.repository.interface";
export async function authCheck(): Promise<WithStringId<z.infer<typeof User>>> {
const usersRepository = container.resolve<IUsersRepository>("usersRepository");
export async function authCheck(): Promise<z.infer<typeof User>> {
if (!USE_AUTH) {
return GUEST_DB_USER;
}
@ -42,12 +44,5 @@ export async function updateUserEmail(email: string) {
}
// update customer email in db
await usersCollection.updateOne({
_id: new ObjectId(user._id),
}, {
$set: {
email,
updatedAt: new Date().toISOString(),
}
});
await usersRepository.updateEmail(user.id, email);
}

View file

@ -49,7 +49,7 @@ export async function listToolkits(projectId: string, cursor: string | null = nu
const user = await authCheck();
return await listComposioToolkitsController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
cursor,
});
@ -59,7 +59,7 @@ export async function getToolkit(projectId: string, toolkitSlug: string): Promis
const user = await authCheck();
return await getComposioToolkitController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug,
});
@ -69,7 +69,7 @@ export async function listTools(projectId: string, toolkitSlug: string, searchQu
const user = await authCheck();
return await listComposioToolsController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug,
searchQuery,
@ -81,7 +81,7 @@ export async function createComposioManagedOauth2ConnectedAccount(projectId: str
const user = await authCheck();
return await createComposioManagedConnectedAccountController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug,
callbackUrl,
@ -92,7 +92,7 @@ export async function createCustomConnectedAccount(projectId: string, request: z
const user = await authCheck();
return await createCustomConnectedAccountController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug: request.toolkitSlug,
authConfig: request.authConfig,
@ -104,7 +104,7 @@ export async function syncConnectedAccount(projectId: string, toolkitSlug: strin
const user = await authCheck();
return await syncConnectedAccountController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug,
connectedAccountId,
@ -116,7 +116,7 @@ export async function deleteConnectedAccount(projectId: string, toolkitSlug: str
await deleteComposioConnectedAccountController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
toolkitSlug,
});
@ -144,7 +144,7 @@ export async function createComposioTriggerDeployment(request: {
// create trigger deployment
return await createComposioTriggerDeploymentController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
data: {
triggerTypeSlug: request.triggerTypeSlug,
@ -163,7 +163,7 @@ export async function listComposioTriggerDeployments(request: {
// list trigger deployments
return await listComposioTriggerDeploymentsController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
cursor: request.cursor,
});
@ -178,7 +178,7 @@ export async function deleteComposioTriggerDeployment(request: {
// delete trigger deployment
return await deleteComposioTriggerDeploymentController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
deploymentId: request.deploymentId,
});
@ -188,7 +188,7 @@ export async function fetchComposioTriggerDeployment(request: { deploymentId: st
const user = await authCheck();
return await fetchComposioTriggerDeploymentController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
deploymentId: request.deploymentId,
});
}

View file

@ -17,7 +17,7 @@ export async function listConversations(request: {
return await listConversationsController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
cursor: request.cursor,
limit: request.limit,
@ -31,7 +31,7 @@ export async function fetchConversation(request: {
return await fetchConversationController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
conversationId: request.conversationId,
});
}

View file

@ -32,7 +32,7 @@ export async function addServer(projectId: string, name: string, server: McpServ
validateUrl(server.serverUrl);
await addCustomMcpServerController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
name,
server,
@ -43,7 +43,7 @@ export async function removeServer(projectId: string, name: string): Promise<voi
const user = await authCheck();
await removeCustomMcpServerController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
name,
});

View file

@ -35,7 +35,7 @@ export async function getDataSource(sourceId: string): Promise<z.infer<typeof Da
return await fetchDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
});
}
@ -45,7 +45,7 @@ export async function listDataSources(projectId: string): Promise<z.infer<typeof
return await listDataSourcesController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}
@ -66,7 +66,7 @@ export async function createDataSource({
const user = await authCheck();
return await createDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
data: {
projectId,
name,
@ -82,7 +82,7 @@ export async function recrawlWebDataSource(sourceId: string) {
return await recrawlWebDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
});
}
@ -92,7 +92,7 @@ export async function deleteDataSource(sourceId: string) {
return await deleteDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
});
}
@ -102,7 +102,7 @@ export async function toggleDataSource(sourceId: string, active: boolean) {
return await toggleDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
active,
});
@ -122,7 +122,7 @@ export async function addDocsToDataSource({
return await addDocsToDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
docs: docData,
});
@ -144,7 +144,7 @@ export async function listDocsInDataSource({
const docs = await listDocsInDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
});
@ -162,7 +162,7 @@ export async function deleteDocFromDataSource({
const user = await authCheck();
return await deleteDocFromDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
docId,
});
}
@ -174,7 +174,7 @@ export async function getDownloadUrlForFile(
return await getDownloadUrlForFileController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
fileId,
});
}
@ -191,7 +191,7 @@ export async function getUploadUrlsForFilesDataSource(
return await getUploadUrlsForFilesController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
files,
});
@ -208,7 +208,7 @@ export async function updateDataSource({
return await updateDataSourceController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
sourceId,
data: {
description,

View file

@ -20,7 +20,7 @@ export async function listJobs(request: {
return await listJobsController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
filters: request.filters,
cursor: request.cursor,
@ -35,7 +35,7 @@ export async function fetchJob(request: {
return await fetchJobController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
jobId: request.jobId,
});
}

View file

@ -22,7 +22,7 @@ export async function createConversation({
const controller = container.resolve<ICreatePlaygroundConversationController>("createPlaygroundConversationController");
return await controller.execute({
userId: user._id,
userId: user.id,
projectId,
workflow,
isLiveWorkflow,
@ -41,7 +41,7 @@ export async function createCachedTurn({
const { key } = await createCachedTurnController.execute({
caller: "user",
userId: user._id,
userId: user.id,
conversationId,
input: {
messages,

View file

@ -57,7 +57,7 @@ export async function projectAuthCheck(projectId: string) {
const user = await authCheck();
await projectActionAuthorizationPolicy.authorize({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}
@ -69,7 +69,7 @@ export async function createProject(formData: FormData): Promise<{ id: string }
try {
const project = await createProjectController.execute({
userId: user._id,
userId: user.id,
data: {
name: name || '',
mode: {
@ -94,7 +94,7 @@ export async function createProjectFromWorkflowJson(formData: FormData): Promise
try {
const project = await createProjectController.execute({
userId: user._id,
userId: user.id,
data: {
name: name || '',
mode: {
@ -116,7 +116,7 @@ export async function fetchProject(projectId: string): Promise<z.infer<typeof Pr
const user = await authCheck();
const project = await fetchProjectController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
@ -134,7 +134,7 @@ export async function listProjects(): Promise<z.infer<typeof Project>[]> {
let cursor = undefined;
do {
const result = await listProjectsController.execute({
userId: user._id,
userId: user.id,
cursor,
});
projects.push(...result.items);
@ -148,7 +148,7 @@ export async function rotateSecret(projectId: string): Promise<string> {
const user = await authCheck();
return await rotateSecretController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}
@ -157,7 +157,7 @@ export async function updateWebhookUrl(projectId: string, url: string) {
const user = await authCheck();
await updateWebhookUrlController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
url,
});
@ -167,7 +167,7 @@ export async function createApiKey(projectId: string): Promise<z.infer<typeof Ap
const user = await authCheck();
return await createApiKeyController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}
@ -176,7 +176,7 @@ export async function deleteApiKey(projectId: string, id: string) {
const user = await authCheck();
return await deleteApiKeyController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
id,
});
@ -186,7 +186,7 @@ export async function listApiKeys(projectId: string): Promise<z.infer<typeof Api
const user = await authCheck();
return await listApiKeysController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}
@ -195,7 +195,7 @@ export async function updateProjectName(projectId: string, name: string) {
const user = await authCheck();
await updateProjectNameController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
name,
});
@ -205,7 +205,7 @@ export async function deleteProject(projectId: string) {
const user = await authCheck();
await deleteProjectController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
@ -216,7 +216,7 @@ export async function saveWorkflow(projectId: string, workflow: z.infer<typeof W
const user = await authCheck();
await updateDraftWorkflowController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
workflow,
});
@ -226,7 +226,7 @@ export async function publishWorkflow(projectId: string, workflow: z.infer<typeo
const user = await authCheck();
await updateLiveWorkflowController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
workflow,
});
@ -236,7 +236,7 @@ export async function revertToLiveWorkflow(projectId: string) {
const user = await authCheck();
await revertToLiveWorkflowController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId,
});
}

View file

@ -27,7 +27,7 @@ export async function createRecurringJobRule(request: {
return await createRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
input: request.input,
cron: request.cron,
@ -43,7 +43,7 @@ export async function listRecurringJobRules(request: {
return await listRecurringJobRulesController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
cursor: request.cursor,
limit: request.limit,
@ -57,7 +57,7 @@ export async function fetchRecurringJobRule(request: {
return await fetchRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
ruleId: request.ruleId,
});
}
@ -70,7 +70,7 @@ export async function toggleRecurringJobRule(request: {
return await toggleRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
ruleId: request.ruleId,
disabled: request.disabled,
});
@ -84,7 +84,7 @@ export async function deleteRecurringJobRule(request: {
return await deleteRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
ruleId: request.ruleId,
});

View file

@ -25,7 +25,7 @@ export async function createScheduledJobRule(request: {
return await createScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
input: request.input,
scheduledTime: request.scheduledTime,
@ -41,7 +41,7 @@ export async function listScheduledJobRules(request: {
return await listScheduledJobRulesController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
cursor: request.cursor,
limit: request.limit,
@ -55,7 +55,7 @@ export async function fetchScheduledJobRule(request: {
return await fetchScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
ruleId: request.ruleId,
});
}
@ -68,7 +68,7 @@ export async function deleteScheduledJobRule(request: {
return await deleteScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
userId: user.id,
projectId: request.projectId,
ruleId: request.ruleId,
});

View file

@ -22,7 +22,7 @@ export async function GET(request: Request, props: { params: Promise<{ streamId:
// Iterate over the generator
for await (const event of runCachedTurnController.execute({
caller: "user",
userId: user._id,
userId: user.id,
cachedTurnKey: params.streamId,
})) {
controller.enqueue(encoder.encode(`event: message\ndata: ${JSON.stringify(event)}\n\n`));

View file

@ -1,10 +1,10 @@
import { z } from "zod";
import { ObjectId } from "mongodb";
import { usersCollection } from "./mongodb";
import { auth0 } from "./auth0";
import { User, WithStringId } from "./types/types";
import { User } from "@/src/entities/models/user";
import { USE_AUTH } from "./feature_flags";
import { redirect } from "next/navigation";
import { container } from "@/di/container";
import { IUsersRepository } from "@/src/application/repositories/users.repository.interface";
export const GUEST_SESSION = {
email: "guest@rowboatlabs.com",
@ -12,13 +12,12 @@ export const GUEST_SESSION = {
sub: "guest_user",
}
export const GUEST_DB_USER: WithStringId<z.infer<typeof User>> = {
_id: "guest_user",
export const GUEST_DB_USER: z.infer<typeof User> = {
id: "guest_user",
auth0Id: "guest_user",
name: "Guest",
email: "guest@rowboatlabs.com",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
}
/**
@ -33,7 +32,7 @@ export const GUEST_DB_USER: WithStringId<z.infer<typeof User>> = {
* const user = await requireAuth();
* ```
*/
export async function requireAuth(): Promise<WithStringId<z.infer<typeof User>>> {
export async function requireAuth(): Promise<z.infer<typeof User>> {
if (!USE_AUTH) {
return GUEST_DB_USER;
}
@ -44,48 +43,26 @@ export async function requireAuth(): Promise<WithStringId<z.infer<typeof User>>>
}
// fetch db user
const usersRepository = container.resolve<IUsersRepository>("usersRepository");
let dbUser = await getUserFromSessionId(user.sub);
// if db user does not exist, create one
if (!dbUser) {
// create user record
const doc = {
_id: new ObjectId(),
dbUser = await usersRepository.create({
auth0Id: user.sub,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
email: user.email,
};
console.log(`creating new user id ${doc._id.toString()} for session id ${user.sub}`);
await usersCollection.insertOne(doc);
dbUser = {
...doc,
_id: doc._id.toString(),
};
});
console.log(`created new user id ${dbUser.id} for session id ${user.sub}`);
}
const { _id, ...rest } = dbUser;
return {
...rest,
_id: _id.toString(),
};
return dbUser;
}
export async function getUserFromSessionId(sessionUserId: string): Promise<WithStringId<z.infer<typeof User>> | null> {
export async function getUserFromSessionId(sessionUserId: string): Promise<z.infer<typeof User> | null> {
if (!USE_AUTH) {
return GUEST_DB_USER;
}
let dbUser = await usersCollection.findOne({
auth0Id: sessionUserId
});
if (!dbUser) {
return null;
}
const { _id, ...rest } = dbUser;
return {
...rest,
_id: _id.toString(),
};
const usersRepository = container.resolve<IUsersRepository>("usersRepository");
return await usersRepository.fetchByAuth0Id(sessionUserId);
}

View file

@ -1,13 +1,12 @@
import { WithStringId } from './types/types';
import { z } from 'zod';
import { Customer, AuthorizeRequest, AuthorizeResponse, LogUsageRequest, UsageResponse, CustomerPortalSessionResponse, PricesResponse, UpdateSubscriptionPlanRequest, UpdateSubscriptionPlanResponse, ModelsResponse, UsageItem } from './types/billing_types';
import { ObjectId } from 'mongodb';
import { usersCollection } from './mongodb';
import { redirect } from 'next/navigation';
import { getUserFromSessionId, requireAuth } from './auth';
import { USE_BILLING } from './feature_flags';
import { container } from '@/di/container';
import { IProjectsRepository } from '@/src/application/repositories/projects.repository.interface';
import { IUsersRepository } from '@/src/application/repositories/users.repository.interface';
const BILLING_API_URL = process.env.BILLING_API_URL || 'http://billing';
const BILLING_API_KEY = process.env.BILLING_API_KEY || 'test';
@ -27,6 +26,7 @@ const GUEST_BILLING_CUSTOMER = {
updatedAt: new Date().toISOString(),
};
export class UsageTracker{
private items: z.infer<typeof UsageItem>[] = [];
@ -42,7 +42,9 @@ export class UsageTracker{
}
export async function getCustomerForUserId(userId: string): Promise<WithStringId<z.infer<typeof Customer>> | null> {
const user = await usersCollection.findOne({ _id: new ObjectId(userId) });
const usersRepository = container.resolve<IUsersRepository>("usersRepository");
const user = await usersRepository.fetch(userId);
if (!user) {
throw new Error("User not found");
}
@ -266,11 +268,12 @@ export async function getEligibleModels(customerId: string): Promise<z.infer<typ
*/
export async function requireBillingCustomer(): Promise<WithStringId<z.infer<typeof Customer>>> {
const user = await requireAuth();
const usersRepository = container.resolve<IUsersRepository>("usersRepository");
if (!USE_BILLING) {
return {
...GUEST_BILLING_CUSTOMER,
userId: user._id,
userId: user.id,
};
}
@ -284,18 +287,11 @@ export async function requireBillingCustomer(): Promise<WithStringId<z.infer<typ
if (user.billingCustomerId) {
customer = await getBillingCustomer(user.billingCustomerId);
} else {
customer = await createBillingCustomer(user._id, user.email);
console.log("created billing customer", JSON.stringify({ userId: user._id, customer }));
customer = await createBillingCustomer(user.id, user.email);
console.log("created billing customer", JSON.stringify({ userId: user.id, customer }));
// update customer id in db
await usersCollection.updateOne({
_id: new ObjectId(user._id),
}, {
$set: {
billingCustomerId: customer._id,
updatedAt: new Date().toISOString(),
}
});
await usersRepository.updateBillingCustomerId(user.id, customer._id);
}
if (!customer) {
throw new Error("Failed to fetch or create billing customer");
@ -323,5 +319,4 @@ export async function requireActiveBillingSubscription(): Promise<WithStringId<z
redirect('/billing');
}
return billingCustomer;
}
}

View file

@ -1,5 +1,4 @@
import { MongoClient } from "mongodb";
import { User } from "./types/types";
import { TwilioConfig, TwilioInboundCall } from "./types/voice_types";
import { z } from 'zod';
import { apiV1 } from "rowboat-shared";
@ -10,7 +9,6 @@ export const db = client.db("rowboat");
export const chatsCollection = db.collection<z.infer<typeof apiV1.Chat>>("chats");
export const chatMessagesCollection = db.collection<z.infer<typeof apiV1.ChatMessage>>("chat_messages");
export const twilioConfigsCollection = db.collection<z.infer<typeof TwilioConfig>>("twilio_configs");
export const usersCollection = db.collection<z.infer<typeof User>>("users");
export const twilioInboundCallsCollection = db.collection<z.infer<typeof TwilioInboundCall>>("twilio_inbound_calls");
// Create indexes

View file

@ -124,15 +124,6 @@ export const McpServerResponse = z.object({
error: z.string().nullable(),
});
export const User = z.object({
auth0Id: z.string(),
billingCustomerId: z.string().optional(),
name: z.string().optional(),
email: z.string().optional(),
createdAt: z.string().datetime(),
updatedAt: z.string().datetime(),
});
export const Webpage = z.object({
_id: z.string(),
title: z.string(),

View file

@ -32,7 +32,7 @@ export default async function Page(
const project = await fetchProjectController.execute({
caller: "user",
userId: user._id,
userId: user.id,
projectId: params.projectId,
});
if (!project) {
@ -41,7 +41,7 @@ export default async function Page(
const sources = await listDataSourcesController.execute({
caller: "user",
userId: user._id,
userId: user.id,
projectId: params.projectId,
});

View file

@ -145,6 +145,9 @@ import { UpdateLiveWorkflowController } from "@/src/interface-adapters/controlle
import { RevertToLiveWorkflowUseCase } from "@/src/application/use-cases/projects/revert-to-live-workflow.use-case";
import { RevertToLiveWorkflowController } from "@/src/interface-adapters/controllers/projects/revert-to-live-workflow.controller";
// users
import { MongoDBUsersRepository } from "@/src/infrastructure/repositories/mongodb.users.repository";
export const container = createContainer({
injectionMode: InjectionMode.PROXY,
strict: true,
@ -324,4 +327,8 @@ container.register({
runTurnController: asClass(RunTurnController).singleton(),
listConversationsController: asClass(ListConversationsController).singleton(),
fetchConversationController: asClass(FetchConversationController).singleton(),
// users
// ---
usersRepository: asClass(MongoDBUsersRepository).singleton(),
});

View file

@ -0,0 +1,19 @@
import { z } from "zod";
import { User } from "@/src/entities/models/user";
export const CreateSchema = User.pick({
auth0Id: true,
email: true,
});
export interface IUsersRepository {
create(data: z.infer<typeof CreateSchema>): Promise<z.infer<typeof User>>;
fetch(id: string): Promise<z.infer<typeof User> | null>;
fetchByAuth0Id(auth0Id: string): Promise<z.infer<typeof User> | null>;
updateEmail(id: string, email: string): Promise<z.infer<typeof User>>;
updateBillingCustomerId(id: string, billingCustomerId: string): Promise<z.infer<typeof User>>;
}

View file

@ -0,0 +1,11 @@
import { z } from "zod";
export const User = z.object({
id: z.string(),
auth0Id: z.string(),
billingCustomerId: z.string().optional(),
name: z.string().optional(),
email: z.string().optional(),
createdAt: z.string().datetime(),
updatedAt: z.string().datetime().optional(),
});

View file

@ -9,6 +9,7 @@ import { PROJECT_MEMBERS_COLLECTION } from "../repositories/mongodb.project-memb
import { RECURRING_JOB_RULES_COLLECTION } from "../repositories/mongodb.recurring-job-rules.indexes";
import { SCHEDULED_JOB_RULES_COLLECTION } from "../repositories/mongodb.scheduled-job-rules.indexes";
import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION } from "../repositories/mongodb.composio-trigger-deployments.indexes";
import { USERS_COLLECTION } from "../repositories/mongodb.users.indexes";
export async function dropAllIndexes(database: Db): Promise<void> {
const collections: string[] = [
@ -22,6 +23,7 @@ export async function dropAllIndexes(database: Db): Promise<void> {
RECURRING_JOB_RULES_COLLECTION,
SCHEDULED_JOB_RULES_COLLECTION,
COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION,
USERS_COLLECTION,
];
for (const collectionName of collections) {

View file

@ -9,6 +9,7 @@ import { PROJECT_MEMBERS_COLLECTION, PROJECT_MEMBERS_INDEXES } from "../reposito
import { RECURRING_JOB_RULES_COLLECTION, RECURRING_JOB_RULES_INDEXES } from "../repositories/mongodb.recurring-job-rules.indexes";
import { SCHEDULED_JOB_RULES_COLLECTION, SCHEDULED_JOB_RULES_INDEXES } from "../repositories/mongodb.scheduled-job-rules.indexes";
import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION, COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES } from "../repositories/mongodb.composio-trigger-deployments.indexes";
import { USERS_COLLECTION, USERS_INDEXES } from "../repositories/mongodb.users.indexes";
export async function ensureAllIndexes(database: Db): Promise<void> {
await database.collection(API_KEYS_COLLECTION).createIndexes(API_KEYS_INDEXES);
@ -21,4 +22,5 @@ export async function ensureAllIndexes(database: Db): Promise<void> {
await database.collection(RECURRING_JOB_RULES_COLLECTION).createIndexes(RECURRING_JOB_RULES_INDEXES);
await database.collection(SCHEDULED_JOB_RULES_COLLECTION).createIndexes(SCHEDULED_JOB_RULES_INDEXES);
await database.collection(COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION).createIndexes(COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES);
await database.collection(USERS_COLLECTION).createIndexes(USERS_INDEXES);
}

View file

@ -0,0 +1,7 @@
import { IndexDescription } from "mongodb";
export const USERS_COLLECTION = "users";
export const USERS_INDEXES: IndexDescription[] = [
{ key: { auth0Id: 1 }, name: "auth0Id_unique", unique: true },
];

View file

@ -0,0 +1,82 @@
import { z } from "zod";
import { db } from "@/app/lib/mongodb";
import { ObjectId } from "mongodb";
import { CreateSchema, IUsersRepository } from "@/src/application/repositories/users.repository.interface";
import { User } from "@/src/entities/models/user";
const DocSchema = User
.omit({
id: true,
});
export class MongoDBUsersRepository implements IUsersRepository {
private readonly collection = db.collection<z.infer<typeof DocSchema>>("users");
async create(data: z.infer<typeof CreateSchema>): Promise<z.infer<typeof User>> {
const now = new Date().toISOString();
const _id = new ObjectId();
const doc = {
...data,
createdAt: now,
};
await this.collection.insertOne({
_id,
...doc,
});
return {
...doc,
id: _id.toString(),
};
}
async fetch(id: string): Promise<z.infer<typeof User> | null> {
const result = await this.collection.findOne({ _id: new ObjectId(id) });
if (!result) return null;
return {
...result,
id: result._id.toString(),
};
}
async fetchByAuth0Id(auth0Id: string): Promise<z.infer<typeof User> | null> {
const result = await this.collection.findOne({ auth0Id });
if (!result) return null;
return {
...result,
id: result._id.toString(),
};
}
async updateEmail(id: string, email: string): Promise<z.infer<typeof User>> {
const result = await this.collection.findOneAndUpdate(
{ _id: new ObjectId(id) },
{ $set: { email, updatedAt: new Date().toISOString() } }
);
if (!result) throw new Error("User not found");
return {
...result,
id: result._id.toString(),
};
}
async updateBillingCustomerId(id: string, billingCustomerId: string): Promise<z.infer<typeof User>> {
const result = await this.collection.findOneAndUpdate(
{ _id: new ObjectId(id) },
{ $set: { billingCustomerId, updatedAt: new Date().toISOString() } }
);
if (!result) throw new Error("User not found");
return {
...result,
id: result._id.toString(),
};
}
}