diff --git a/apps/rowboat/app/scripts/jobs-worker.ts b/apps/rowboat/app/scripts/jobs-worker.ts index d441d2cb..5214670a 100644 --- a/apps/rowboat/app/scripts/jobs-worker.ts +++ b/apps/rowboat/app/scripts/jobs-worker.ts @@ -1,12 +1,52 @@ import '../lib/loadenv'; import { container } from "@/di/container"; import { IJobsWorker } from "@/src/application/workers/jobs.worker"; +import { IJobRulesWorker } from "@/src/application/workers/job-rules.worker"; + +// this is the old script which just launches job-worker +// ------------------------------------------------------------ +// (async () => { +// try { +// const jobsWorker = container.resolve('jobsWorker'); +// await jobsWorker.run(); +// } catch (error) { +// console.error(`Unable to run jobs worker: ${error}`); +// } +// })(); (async () => { try { const jobsWorker = container.resolve('jobsWorker'); + const rulesWorker = container.resolve('jobRulesWorker'); + + // Start jobs worker first so subscription is ready before rules publish await jobsWorker.run(); + await rulesWorker.run(); + + const shutdown = async (signal: string) => { + console.log(`[worker] ${signal} received, shutting down...`); + try { + await Promise.allSettled([ + jobsWorker.stop(), + rulesWorker.stop(), + ]); + } finally { + process.exit(0); + } + }; + + process.on('SIGINT', () => shutdown('SIGINT')); + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('uncaughtException', (err) => { + console.error('[worker] uncaughtException', err); + shutdown('uncaughtException'); + }); + process.on('unhandledRejection', (reason) => { + console.error('[worker] unhandledRejection', reason); + shutdown('unhandledRejection'); + }); } catch (error) { - console.error(`Unable to run jobs worker: ${error}`); + console.error('Unable to start combined worker:', error); + process.exit(1); } })(); \ No newline at end of file diff --git a/apps/rowboat/package.json b/apps/rowboat/package.json index c1f371d8..025d88dc 100644 --- a/apps/rowboat/package.json +++ b/apps/rowboat/package.json @@ -11,8 +11,7 @@ "setupQdrant": "tsx app/scripts/setup_qdrant.ts", "deleteQdrant": "tsx app/scripts/delete_qdrant.ts", "rag-worker": "tsx app/scripts/rag-worker.ts", - "jobs-worker": "tsx app/scripts/jobs-worker.ts", - "job-rules-worker": "tsx app/scripts/job-rules.worker.ts" + "jobs-worker": "tsx app/scripts/jobs-worker.ts" }, "dependencies": { "@ai-sdk/openai": "^1.3.21", diff --git a/docker-compose.yml b/docker-compose.yml index 60d1bc15..b31c4ed9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -201,16 +201,6 @@ services: - COMPOSIO_API_KEY=${COMPOSIO_API_KEY} restart: unless-stopped - job-rules-worker: - build: - context: ./apps/rowboat - dockerfile: scripts.Dockerfile - command: ["npm", "run", "job-rules-worker"] - environment: - - MONGODB_CONNECTION_STRING=mongodb://mongo:27017/rowboat - - REDIS_URL=redis://redis:6379 - restart: unless-stopped - # chat_widget: # build: # context: ./apps/experimental/chat_widget