From c561f4425f9a48b64e690d5922fd0e963d64f18a Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Mon, 18 Aug 2025 16:50:36 +0530 Subject: [PATCH] add index maintenance scripts --- .../app/scripts/mongodb-drop-indexes.ts | 14 ++++++ .../app/scripts/mongodb-ensure-indexes.ts | 14 ++++++ apps/rowboat/package.json | 4 +- .../infrastructure/mongodb/drop-indexes.ts | 49 +++++++++++++++++++ .../infrastructure/mongodb/ensure-indexes.ts | 24 +++++++++ .../repositories/mongodb.api-keys.indexes.ts | 8 +++ ...db.composio-trigger-deployments.indexes.ts | 9 ++++ ...composio-trigger-deployments.repository.ts | 17 ------- .../mongodb.conversations.indexes.ts | 7 +++ .../mongodb.data-source-docs.indexes.ts | 8 +++ .../mongodb.data-sources.indexes.ts | 9 ++++ .../repositories/mongodb.jobs.indexes.ts | 11 +++++ .../mongodb.project-members.indexes.ts | 8 +++ .../repositories/mongodb.projects.indexes.ts | 7 +++ .../mongodb.recurring-job-rules.indexes.ts | 8 +++ .../mongodb.scheduled-job-rules.indexes.ts | 8 +++ 16 files changed, 187 insertions(+), 18 deletions(-) create mode 100644 apps/rowboat/app/scripts/mongodb-drop-indexes.ts create mode 100644 apps/rowboat/app/scripts/mongodb-ensure-indexes.ts create mode 100644 apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts create mode 100644 apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.conversations.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.data-source-docs.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.data-sources.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.jobs.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.project-members.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.projects.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.indexes.ts create mode 100644 apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.indexes.ts diff --git a/apps/rowboat/app/scripts/mongodb-drop-indexes.ts b/apps/rowboat/app/scripts/mongodb-drop-indexes.ts new file mode 100644 index 00000000..145156a3 --- /dev/null +++ b/apps/rowboat/app/scripts/mongodb-drop-indexes.ts @@ -0,0 +1,14 @@ +import '../lib/loadenv'; +import { db } from '../lib/mongodb'; +import { dropAllIndexes } from "../../src/infrastructure/mongodb/drop-indexes"; + +async function main() { + await dropAllIndexes(db); + console.log("Indexes dropped (non-_id)"); +} + +main().catch((err) => { + // eslint-disable-next-line no-console + console.error(err); + process.exit(1); +}); \ No newline at end of file diff --git a/apps/rowboat/app/scripts/mongodb-ensure-indexes.ts b/apps/rowboat/app/scripts/mongodb-ensure-indexes.ts new file mode 100644 index 00000000..b1bd8359 --- /dev/null +++ b/apps/rowboat/app/scripts/mongodb-ensure-indexes.ts @@ -0,0 +1,14 @@ +import '../lib/loadenv'; +import { db } from '../lib/mongodb'; +import { ensureAllIndexes } from "../../src/infrastructure/mongodb/ensure-indexes"; + +async function main() { + await ensureAllIndexes(db); + console.log("Indexes ensured"); +} + +main().catch((err) => { + // eslint-disable-next-line no-console + console.error(err); + process.exit(1); +}); \ No newline at end of file diff --git a/apps/rowboat/package.json b/apps/rowboat/package.json index 025d88dc..ff5d8f61 100644 --- a/apps/rowboat/package.json +++ b/apps/rowboat/package.json @@ -11,7 +11,9 @@ "setupQdrant": "tsx app/scripts/setup_qdrant.ts", "deleteQdrant": "tsx app/scripts/delete_qdrant.ts", "rag-worker": "tsx app/scripts/rag-worker.ts", - "jobs-worker": "tsx app/scripts/jobs-worker.ts" + "jobs-worker": "tsx app/scripts/jobs-worker.ts", + "mongodb-drop-indexes": "tsx app/scripts/mongodb-drop-indexes.ts", + "mongodb-ensure-indexes": "tsx app/scripts/mongodb-ensure-indexes.ts" }, "dependencies": { "@ai-sdk/openai": "^1.3.21", diff --git a/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts b/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts new file mode 100644 index 00000000..2810cf6d --- /dev/null +++ b/apps/rowboat/src/infrastructure/mongodb/drop-indexes.ts @@ -0,0 +1,49 @@ +import { Db } from "mongodb"; +import { API_KEYS_COLLECTION } from "../repositories/mongodb.api-keys.indexes"; +import { PROJECTS_COLLECTION } from "../repositories/mongodb.projects.indexes"; +import { JOBS_COLLECTION } from "../repositories/mongodb.jobs.indexes"; +import { CONVERSATIONS_COLLECTION } from "../repositories/mongodb.conversations.indexes"; +import { DATA_SOURCES_COLLECTION } from "../repositories/mongodb.data-sources.indexes"; +import { DATA_SOURCE_DOCS_COLLECTION } from "../repositories/mongodb.data-source-docs.indexes"; +import { PROJECT_MEMBERS_COLLECTION } from "../repositories/mongodb.project-members.indexes"; +import { RECURRING_JOB_RULES_COLLECTION } from "../repositories/mongodb.recurring-job-rules.indexes"; +import { SCHEDULED_JOB_RULES_COLLECTION } from "../repositories/mongodb.scheduled-job-rules.indexes"; +import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION } from "../repositories/mongodb.composio-trigger-deployments.indexes"; + +export async function dropAllIndexes(database: Db): Promise { + const collections: string[] = [ + API_KEYS_COLLECTION, + PROJECTS_COLLECTION, + JOBS_COLLECTION, + CONVERSATIONS_COLLECTION, + DATA_SOURCES_COLLECTION, + DATA_SOURCE_DOCS_COLLECTION, + PROJECT_MEMBERS_COLLECTION, + RECURRING_JOB_RULES_COLLECTION, + SCHEDULED_JOB_RULES_COLLECTION, + COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION, + ]; + + for (const collectionName of collections) { + try { + // Drops all non-_id indexes for the collection + await database.collection(collectionName).dropIndexes(); + } catch (err: any) { + // Ignore errors for non-existent collections or missing indexes + const codeName = err?.codeName; + const code = err?.code; + const message: string | undefined = err?.message; + + if ( + codeName === "NamespaceNotFound" || + code === 26 || // NamespaceNotFound + codeName === "IndexNotFound" || + (message && (message.includes("ns not found") || message.includes("index not found"))) + ) { + continue; + } + + throw err; + } + } +} \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts b/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts new file mode 100644 index 00000000..32d00fc9 --- /dev/null +++ b/apps/rowboat/src/infrastructure/mongodb/ensure-indexes.ts @@ -0,0 +1,24 @@ +import { Db } from "mongodb"; +import { API_KEYS_COLLECTION, API_KEYS_INDEXES } from "../repositories/mongodb.api-keys.indexes"; +import { PROJECTS_COLLECTION, PROJECTS_INDEXES } from "../repositories/mongodb.projects.indexes"; +import { JOBS_COLLECTION, JOBS_INDEXES } from "../repositories/mongodb.jobs.indexes"; +import { CONVERSATIONS_COLLECTION, CONVERSATIONS_INDEXES } from "../repositories/mongodb.conversations.indexes"; +import { DATA_SOURCES_COLLECTION, DATA_SOURCES_INDEXES } from "../repositories/mongodb.data-sources.indexes"; +import { DATA_SOURCE_DOCS_COLLECTION, DATA_SOURCE_DOCS_INDEXES } from "../repositories/mongodb.data-source-docs.indexes"; +import { PROJECT_MEMBERS_COLLECTION, PROJECT_MEMBERS_INDEXES } from "../repositories/mongodb.project-members.indexes"; +import { RECURRING_JOB_RULES_COLLECTION, RECURRING_JOB_RULES_INDEXES } from "../repositories/mongodb.recurring-job-rules.indexes"; +import { SCHEDULED_JOB_RULES_COLLECTION, SCHEDULED_JOB_RULES_INDEXES } from "../repositories/mongodb.scheduled-job-rules.indexes"; +import { COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION, COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES } from "../repositories/mongodb.composio-trigger-deployments.indexes"; + +export async function ensureAllIndexes(database: Db): Promise { + await database.collection(API_KEYS_COLLECTION).createIndexes(API_KEYS_INDEXES); + await database.collection(PROJECTS_COLLECTION).createIndexes(PROJECTS_INDEXES); + await database.collection(JOBS_COLLECTION).createIndexes(JOBS_INDEXES); + await database.collection(CONVERSATIONS_COLLECTION).createIndexes(CONVERSATIONS_INDEXES); + await database.collection(DATA_SOURCES_COLLECTION).createIndexes(DATA_SOURCES_INDEXES); + await database.collection(DATA_SOURCE_DOCS_COLLECTION).createIndexes(DATA_SOURCE_DOCS_INDEXES); + await database.collection(PROJECT_MEMBERS_COLLECTION).createIndexes(PROJECT_MEMBERS_INDEXES); + await database.collection(RECURRING_JOB_RULES_COLLECTION).createIndexes(RECURRING_JOB_RULES_INDEXES); + await database.collection(SCHEDULED_JOB_RULES_COLLECTION).createIndexes(SCHEDULED_JOB_RULES_INDEXES); + await database.collection(COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION).createIndexes(COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES); +} \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.indexes.ts new file mode 100644 index 00000000..7b40e8a2 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.api-keys.indexes.ts @@ -0,0 +1,8 @@ +import { IndexDescription } from "mongodb"; + +export const API_KEYS_COLLECTION = "api_keys"; + +export const API_KEYS_INDEXES: IndexDescription[] = [ + { key: { projectId: 1, key: 1 }, name: "projectId_key" }, + { key: { projectId: 1, createdAt: -1 }, name: "projectId_createdAt_desc" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.indexes.ts new file mode 100644 index 00000000..0cf245fe --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.indexes.ts @@ -0,0 +1,9 @@ +import { IndexDescription } from "mongodb"; + +export const COMPOSIO_TRIGGER_DEPLOYMENTS_COLLECTION = "composio_trigger_deployments"; + +export const COMPOSIO_TRIGGER_DEPLOYMENTS_INDEXES: IndexDescription[] = [ + { key: { projectId: 1 }, name: "projectId_idx" }, + { key: { triggerId: 1 }, name: "triggerId_idx" }, + { key: { triggerTypeSlug: 1, connectedAccountId: 1 }, name: "triggerTypeSlug_connectedAccountId" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts index 296c44e7..1a16273f 100644 --- a/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.composio-trigger-deployments.repository.ts @@ -22,23 +22,6 @@ const DocSchema = ComposioTriggerDeployment.omit({ export class MongodbComposioTriggerDeploymentsRepository implements IComposioTriggerDeploymentsRepository { private readonly collection = db.collection>("composio_trigger_deployments"); - constructor() { - // Create indexes for efficient querying - this.createIndexes(); - } - - /** - * Creates the necessary indexes for efficient querying. - */ - private async createIndexes(): Promise { - await this.collection.createIndexes([ - { key: { projectId: 1 }, name: "projectId_idx" }, - { key: { triggerTypeSlug: 1 }, name: "triggerTypeSlug_idx" }, - { key: { connectedAccountId: 1 }, name: "connectedAccountId_idx" }, - { key: { triggerId: 1 }, name: "triggerId_idx" }, - ]); - } - /** * Creates a new Composio trigger deployment. */ diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.conversations.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.conversations.indexes.ts new file mode 100644 index 00000000..4eecd938 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.conversations.indexes.ts @@ -0,0 +1,7 @@ +import { IndexDescription } from "mongodb"; + +export const CONVERSATIONS_COLLECTION = "conversations"; + +export const CONVERSATIONS_INDEXES: IndexDescription[] = [ + { key: { projectId: 1, _id: -1 }, name: "projectId__id_desc" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.data-source-docs.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.data-source-docs.indexes.ts new file mode 100644 index 00000000..7e9b61d1 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.data-source-docs.indexes.ts @@ -0,0 +1,8 @@ +import { IndexDescription } from "mongodb"; + +export const DATA_SOURCE_DOCS_COLLECTION = "source_docs"; + +export const DATA_SOURCE_DOCS_INDEXES: IndexDescription[] = [ + { key: { sourceId: 1, status: 1, _id: -1 }, name: "sourceId_status__id_desc" }, + { key: { projectId: 1 }, name: "projectId_idx" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.data-sources.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.data-sources.indexes.ts new file mode 100644 index 00000000..c1d74c03 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.data-sources.indexes.ts @@ -0,0 +1,9 @@ +import { IndexDescription } from "mongodb"; + +export const DATA_SOURCES_COLLECTION = "sources"; + +export const DATA_SOURCES_INDEXES: IndexDescription[] = [ + { key: { projectId: 1, _id: -1 }, name: "projectId__id_desc" }, + { key: { status: 1, createdAt: 1 }, name: "status_createdAt" }, + { key: { status: 1, lastAttemptAt: 1, attempts: 1, createdAt: 1 }, name: "status_attempts_createdAt" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.jobs.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.jobs.indexes.ts new file mode 100644 index 00000000..a5b1be9c --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.jobs.indexes.ts @@ -0,0 +1,11 @@ +import { IndexDescription } from "mongodb"; + +export const JOBS_COLLECTION = "jobs"; + +export const JOBS_INDEXES: IndexDescription[] = [ + { key: { status: 1, workerId: 1, createdAt: 1 }, name: "status_workerId_createdAt" }, + { key: { projectId: 1, _id: -1 }, name: "projectId__id_desc" }, + { key: { status: 1, projectId: 1, _id: -1 }, name: "status_projectId__id_desc" }, + { key: { "reason.type": 1, "reason.ruleId": 1, _id: -1 }, name: "reason_rule__id_desc" }, + { key: { "reason.type": 1, "reason.triggerDeploymentId": 1, _id: -1 }, name: "reason_trigger__id_desc" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.indexes.ts new file mode 100644 index 00000000..4e70296b --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.project-members.indexes.ts @@ -0,0 +1,8 @@ +import { IndexDescription } from "mongodb"; + +export const PROJECT_MEMBERS_COLLECTION = "project_members"; + +export const PROJECT_MEMBERS_INDEXES: IndexDescription[] = [ + { key: { userId: 1, _id: -1 }, name: "userId__id_desc" }, + { key: { userId: 1, projectId: 1 }, name: "userId_projectId" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.projects.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.projects.indexes.ts new file mode 100644 index 00000000..2d0e6915 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.projects.indexes.ts @@ -0,0 +1,7 @@ +import { IndexDescription } from "mongodb"; + +export const PROJECTS_COLLECTION = "projects"; + +export const PROJECTS_INDEXES: IndexDescription[] = [ + { key: { createdByUserId: 1 }, name: "createdByUserId_idx" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.indexes.ts new file mode 100644 index 00000000..0d3e3781 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.indexes.ts @@ -0,0 +1,8 @@ +import { IndexDescription } from "mongodb"; + +export const RECURRING_JOB_RULES_COLLECTION = "recurring_job_rules"; + +export const RECURRING_JOB_RULES_INDEXES: IndexDescription[] = [ + { key: { nextRunAt: 1, workerId: 1, disabled: 1 }, name: "nextRunAt_worker_disabled" }, + { key: { projectId: 1, _id: -1 }, name: "projectId__id_desc" }, +]; \ No newline at end of file diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.indexes.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.indexes.ts new file mode 100644 index 00000000..ea6e75a4 --- /dev/null +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.indexes.ts @@ -0,0 +1,8 @@ +import { IndexDescription } from "mongodb"; + +export const SCHEDULED_JOB_RULES_COLLECTION = "scheduled_job_rules"; + +export const SCHEDULED_JOB_RULES_INDEXES: IndexDescription[] = [ + { key: { nextRunAt: 1, status: 1, workerId: 1 }, name: "nextRunAt_status_worker" }, + { key: { projectId: 1, _id: -1 }, name: "projectId__id_desc" }, +]; \ No newline at end of file