enable scheduled jobs (#199)

- one-off scheduled jobs
- recurring jobs
This commit is contained in:
Ramnique Singh 2025-08-12 18:40:04 +05:30 committed by GitHub
parent fcfe5593b4
commit eda3f3821f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
52 changed files with 3833 additions and 71 deletions

View file

@ -0,0 +1,91 @@
"use server";
import { container } from "@/di/container";
import { ICreateRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/create-recurring-job-rule.controller";
import { IListRecurringJobRulesController } from "@/src/interface-adapters/controllers/recurring-job-rules/list-recurring-job-rules.controller";
import { IFetchRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/fetch-recurring-job-rule.controller";
import { IToggleRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/toggle-recurring-job-rule.controller";
import { IDeleteRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/delete-recurring-job-rule.controller";
import { authCheck } from "./auth_actions";
import { z } from "zod";
import { Message } from "@/app/lib/types/types";
const createRecurringJobRuleController = container.resolve<ICreateRecurringJobRuleController>('createRecurringJobRuleController');
const listRecurringJobRulesController = container.resolve<IListRecurringJobRulesController>('listRecurringJobRulesController');
const fetchRecurringJobRuleController = container.resolve<IFetchRecurringJobRuleController>('fetchRecurringJobRuleController');
const toggleRecurringJobRuleController = container.resolve<IToggleRecurringJobRuleController>('toggleRecurringJobRuleController');
const deleteRecurringJobRuleController = container.resolve<IDeleteRecurringJobRuleController>('deleteRecurringJobRuleController');
export async function createRecurringJobRule(request: {
projectId: string,
input: {
messages: z.infer<typeof Message>[],
},
cron: string,
}) {
const user = await authCheck();
return await createRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
input: request.input,
cron: request.cron,
});
}
export async function listRecurringJobRules(request: {
projectId: string,
cursor?: string,
limit?: number,
}) {
const user = await authCheck();
return await listRecurringJobRulesController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
cursor: request.cursor,
limit: request.limit,
});
}
export async function fetchRecurringJobRule(request: {
ruleId: string,
}) {
const user = await authCheck();
return await fetchRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
ruleId: request.ruleId,
});
}
export async function toggleRecurringJobRule(request: {
ruleId: string,
disabled: boolean,
}) {
const user = await authCheck();
return await toggleRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
ruleId: request.ruleId,
disabled: request.disabled,
});
}
export async function deleteRecurringJobRule(request: {
projectId: string,
ruleId: string,
}) {
const user = await authCheck();
return await deleteRecurringJobRuleController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
ruleId: request.ruleId,
});
}

View file

@ -0,0 +1,75 @@
"use server";
import { container } from "@/di/container";
import { ICreateScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/create-scheduled-job-rule.controller";
import { IListScheduledJobRulesController } from "@/src/interface-adapters/controllers/scheduled-job-rules/list-scheduled-job-rules.controller";
import { IFetchScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/fetch-scheduled-job-rule.controller";
import { IDeleteScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/delete-scheduled-job-rule.controller";
import { authCheck } from "./auth_actions";
import { z } from "zod";
import { Message } from "@/app/lib/types/types";
const createScheduledJobRuleController = container.resolve<ICreateScheduledJobRuleController>('createScheduledJobRuleController');
const listScheduledJobRulesController = container.resolve<IListScheduledJobRulesController>('listScheduledJobRulesController');
const fetchScheduledJobRuleController = container.resolve<IFetchScheduledJobRuleController>('fetchScheduledJobRuleController');
const deleteScheduledJobRuleController = container.resolve<IDeleteScheduledJobRuleController>('deleteScheduledJobRuleController');
export async function createScheduledJobRule(request: {
projectId: string,
input: {
messages: z.infer<typeof Message>[],
},
scheduledTime: string, // ISO datetime string
}) {
const user = await authCheck();
return await createScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
input: request.input,
scheduledTime: request.scheduledTime,
});
}
export async function listScheduledJobRules(request: {
projectId: string,
cursor?: string,
limit?: number,
}) {
const user = await authCheck();
return await listScheduledJobRulesController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
cursor: request.cursor,
limit: request.limit,
});
}
export async function fetchScheduledJobRule(request: {
ruleId: string,
}) {
const user = await authCheck();
return await fetchScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
ruleId: request.ruleId,
});
}
export async function deleteScheduledJobRule(request: {
projectId: string,
ruleId: string,
}) {
const user = await authCheck();
return await deleteScheduledJobRuleController.execute({
caller: 'user',
userId: user._id,
projectId: request.projectId,
ruleId: request.ruleId,
});
}

View file

@ -0,0 +1,244 @@
'use client';
import { useState } from "react";
import { useRouter } from "next/navigation";
import { Button } from "@/components/ui/button";
import { Panel } from "@/components/common/panel-common";
import { createRecurringJobRule } from "@/app/actions/recurring-job-rules.actions";
import { ArrowLeftIcon, PlusIcon, TrashIcon, InfoIcon } from "lucide-react";
import Link from "next/link";
// Define a simpler message type for the form that only includes the fields we need
type FormMessage = {
role: "system" | "user" | "assistant";
content: string;
};
const commonCronExamples = [
{ label: "Every minute", value: "* * * * *" },
{ label: "Every 5 minutes", value: "*/5 * * * *" },
{ label: "Every hour", value: "0 * * * *" },
{ label: "Every 2 hours", value: "0 */2 * * *" },
{ label: "Daily at midnight", value: "0 0 * * *" },
{ label: "Daily at 9 AM", value: "0 9 * * *" },
{ label: "Weekly on Sunday at midnight", value: "0 0 * * 0" },
{ label: "Monthly on the 1st at midnight", value: "0 0 1 * *" },
];
export function CreateRecurringJobRuleForm({ projectId }: { projectId: string }) {
const router = useRouter();
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<FormMessage[]>([
{ role: "user", content: "" }
]);
const [cronExpression, setCronExpression] = useState("* * * * *");
const [showCronHelp, setShowCronHelp] = useState(false);
const addMessage = () => {
setMessages([...messages, { role: "user", content: "" }]);
};
const removeMessage = (index: number) => {
if (messages.length > 1) {
setMessages(messages.filter((_, i) => i !== index));
}
};
const updateMessage = (index: number, field: keyof FormMessage, value: string) => {
const newMessages = [...messages];
newMessages[index] = { ...newMessages[index], [field]: value };
setMessages(newMessages);
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
// Validate required fields
if (!cronExpression.trim()) {
alert("Please enter a cron expression");
return;
}
if (messages.some(msg => !msg.content?.trim())) {
alert("Please fill in all message content");
return;
}
setLoading(true);
try {
// Convert FormMessage to the expected Message type
const convertedMessages = messages.map(msg => {
if (msg.role === "assistant") {
return {
role: msg.role,
content: msg.content,
agentName: null,
responseType: "internal" as const,
timestamp: undefined
};
}
return {
role: msg.role,
content: msg.content,
timestamp: undefined
};
});
await createRecurringJobRule({
projectId,
input: { messages: convertedMessages },
cron: cronExpression,
});
router.push(`/projects/${projectId}/job-rules`);
} catch (error) {
console.error("Failed to create recurring job rule:", error);
alert("Failed to create recurring job rule");
} finally {
setLoading(false);
}
};
return (
<Panel
title={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules`}>
<Button variant="secondary" size="sm">
<ArrowLeftIcon className="w-4 h-4 mr-2" />
Back
</Button>
</Link>
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
CREATE RECURRING JOB RULE
</div>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[800px] mx-auto">
<form onSubmit={handleSubmit} className="space-y-6">
{/* Cron Expression */}
<div className="space-y-2">
<div className="flex items-center gap-2">
<label className="block text-sm font-medium text-gray-700 dark:text-gray-300">
Cron Expression *
</label>
<Button
type="button"
variant="secondary"
size="sm"
onClick={() => setShowCronHelp(!showCronHelp)}
className="p-1"
>
<InfoIcon className="w-4 h-4" />
</Button>
</div>
<input
type="text"
value={cronExpression}
onChange={(e) => setCronExpression(e.target.value)}
placeholder="* * * * *"
className="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 rounded-md shadow-sm focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-blue-500 dark:bg-gray-700 dark:text-white font-mono"
required
/>
{showCronHelp && (
<div className="mt-3 p-3 bg-blue-50 dark:bg-blue-900/20 border border-blue-200 dark:border-blue-800 rounded-md">
<div className="text-sm text-blue-800 dark:text-blue-200 mb-2">
<strong>Format:</strong> minute hour day month dayOfWeek
</div>
<div className="text-sm text-blue-700 dark:text-blue-300 mb-3">
<strong>Examples:</strong>
</div>
<div className="grid grid-cols-1 md:grid-cols-2 gap-2">
{commonCronExamples.map((example, index) => (
<div key={index} className="flex items-center gap-2">
<code className="text-xs bg-blue-100 dark:bg-blue-800 px-2 py-1 rounded">
{example.value}
</code>
<span className="text-xs text-blue-600 dark:text-blue-300">
{example.label}
</span>
</div>
))}
</div>
<div className="text-xs text-blue-600 dark:text-blue-300 mt-2">
<strong>Note:</strong> All times are in UTC timezone
</div>
</div>
)}
</div>
{/* Messages */}
<div className="space-y-4">
<div className="flex items-center justify-between">
<label className="block text-sm font-medium text-gray-700 dark:text-gray-300">
Messages *
</label>
<Button
type="button"
onClick={addMessage}
variant="secondary"
size="sm"
className="flex items-center gap-2"
>
<PlusIcon className="w-4 h-4" />
Add Message
</Button>
</div>
<div className="space-y-4">
{messages.map((message, index) => (
<div key={index} className="border border-gray-200 dark:border-gray-700 rounded-lg p-4">
<div className="flex items-center justify-between mb-3">
<select
value={message.role}
onChange={(e) => updateMessage(index, "role", e.target.value)}
className="px-3 py-1 border border-gray-300 dark:border-gray-600 rounded-md text-sm dark:bg-gray-700 dark:text-white"
>
<option value="system">System</option>
<option value="user">User</option>
<option value="assistant">Assistant</option>
</select>
{messages.length > 1 && (
<Button
type="button"
onClick={() => removeMessage(index)}
variant="secondary"
size="sm"
className="text-red-600 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300"
>
<TrashIcon className="w-4 h-4" />
</Button>
)}
</div>
<textarea
value={message.content}
onChange={(e) => updateMessage(index, "content", e.target.value)}
placeholder={`Enter ${message.role} message...`}
className="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 rounded-md shadow-sm focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-blue-500 dark:bg-gray-700 dark:text-white"
rows={3}
required
/>
</div>
))}
</div>
</div>
{/* Submit Button */}
<div className="flex justify-end">
<Button
type="submit"
disabled={loading}
className="px-6 py-2"
>
{loading ? "Creating..." : "Create Rule"}
</Button>
</div>
</form>
</div>
</div>
</Panel>
);
}

View file

@ -0,0 +1,32 @@
'use client';
import { useState } from "react";
import { Tabs, Tab } from "@/components/ui/tabs";
import { ScheduledJobRulesList } from "../scheduled/components/scheduled-job-rules-list";
import { RecurringJobRulesList } from "./recurring-job-rules-list";
export function JobRulesTabs({ projectId }: { projectId: string }) {
const [activeTab, setActiveTab] = useState<string>("scheduled");
const handleTabChange = (key: React.Key) => {
setActiveTab(key.toString());
};
return (
<div className="h-full flex flex-col">
<Tabs
selectedKey={activeTab}
onSelectionChange={handleTabChange}
aria-label="Job Rules"
fullWidth
>
<Tab key="scheduled" title="Scheduled Rules">
<ScheduledJobRulesList projectId={projectId} />
</Tab>
<Tab key="recurring" title="Recurring Rules">
<RecurringJobRulesList projectId={projectId} />
</Tab>
</Tabs>
</div>
);
}

View file

@ -0,0 +1,312 @@
'use client';
import { useEffect, useState } from "react";
import { useRouter } from "next/navigation";
import { Button } from "@/components/ui/button";
import { Panel } from "@/components/common/panel-common";
import { fetchRecurringJobRule, toggleRecurringJobRule, deleteRecurringJobRule } from "@/app/actions/recurring-job-rules.actions";
import { ArrowLeftIcon, PlayIcon, PauseIcon, ClockIcon, AlertCircleIcon, Trash2Icon } from "lucide-react";
import Link from "next/link";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
import { Spinner } from "@heroui/react";
import { z } from "zod";
export function RecurringJobRuleView({ projectId, ruleId }: { projectId: string; ruleId: string }) {
const router = useRouter();
const [rule, setRule] = useState<z.infer<typeof RecurringJobRule> | null>(null);
const [loading, setLoading] = useState(true);
const [updating, setUpdating] = useState(false);
const [deleting, setDeleting] = useState(false);
const [showDeleteConfirm, setShowDeleteConfirm] = useState(false);
useEffect(() => {
const loadRule = async () => {
try {
const fetchedRule = await fetchRecurringJobRule({ ruleId });
setRule(fetchedRule);
} catch (error) {
console.error("Failed to fetch rule:", error);
} finally {
setLoading(false);
}
};
loadRule();
}, [ruleId]);
const handleToggleStatus = async () => {
if (!rule) return;
setUpdating(true);
try {
const updatedRule = await toggleRecurringJobRule({
ruleId: rule.id,
disabled: !rule.disabled,
});
setRule(updatedRule);
} catch (error) {
console.error("Failed to update rule:", error);
alert("Failed to update rule status");
} finally {
setUpdating(false);
}
};
const handleDelete = async () => {
if (!rule) return;
setDeleting(true);
try {
await deleteRecurringJobRule({
projectId,
ruleId: rule.id,
});
// Redirect back to job rules list
router.push(`/projects/${projectId}/job-rules`);
} catch (error) {
console.error("Failed to delete rule:", error);
alert("Failed to delete rule");
} finally {
setDeleting(false);
setShowDeleteConfirm(false);
}
};
const formatCronExpression = (cron: string) => {
// Simple cron formatting for display
const parts = cron.split(' ');
if (parts.length === 5) {
const [minute, hour, day, month, dayOfWeek] = parts;
if (minute === '*' && hour === '*' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Every minute';
}
if (minute === '0' && hour === '*' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Every hour';
}
if (minute === '0' && hour === '0' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Daily at midnight';
}
if (minute === '0' && hour === '0' && day === '1' && month === '*' && dayOfWeek === '*') {
return 'Monthly on the 1st';
}
if (minute === '0' && hour === '0' && day === '*' && month === '*' && dayOfWeek === '0') {
return 'Weekly on Sunday';
}
}
return cron;
};
const formatDate = (dateString: string) => {
return new Date(dateString).toLocaleString();
};
if (loading) {
return (
<Panel title="Loading...">
<div className="flex items-center justify-center h-64">
<Spinner size="lg" />
</div>
</Panel>
);
}
if (!rule) {
return (
<Panel title="Rule Not Found">
<div className="text-center py-8">
<p className="text-gray-500 dark:text-gray-400">The requested rule could not be found.</p>
<Link href={`/projects/${projectId}/job-rules`}>
<Button variant="secondary" className="mt-4">
Back to Job Rules
</Button>
</Link>
</div>
</Panel>
);
}
return (
<>
<Panel
title={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules`}>
<Button variant="secondary" size="sm">
<ArrowLeftIcon className="w-4 h-4 mr-2" />
Back
</Button>
</Link>
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
RECURRING JOB RULE
</div>
</div>
}
rightActions={
<div className="flex items-center gap-3">
<Button
onClick={handleToggleStatus}
disabled={updating}
variant={rule.disabled ? "secondary" : "primary"}
size="sm"
className="flex items-center gap-2"
>
{updating ? (
<Spinner size="sm" />
) : rule.disabled ? (
<>
<PlayIcon className="w-4 h-4" />
Enable
</>
) : (
<>
<PauseIcon className="w-4 h-4" />
Disable
</>
)}
</Button>
<Button
onClick={() => setShowDeleteConfirm(true)}
variant="secondary"
size="sm"
className="flex items-center gap-2 bg-red-50 hover:bg-red-100 text-red-700 dark:bg-red-950 dark:hover:bg-red-900 dark:text-red-400 border border-red-200 dark:border-red-800"
>
<Trash2Icon className="w-4 h-4" />
Delete
</Button>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[800px] mx-auto space-y-6">
{/* Status */}
<div className="bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4">
<div className="flex items-center gap-2 mb-2">
<div className={`w-3 h-3 rounded-full ${rule.disabled ? 'bg-red-500' : 'bg-green-500'}`} />
<span className="text-sm font-medium text-gray-900 dark:text-gray-100">
Status: {rule.disabled ? 'Disabled' : 'Active'}
</span>
</div>
{rule.lastError && (
<div className="flex items-start gap-2 mt-2 p-2 bg-red-50 dark:bg-red-900/20 border border-red-200 dark:border-red-800 rounded">
<AlertCircleIcon className="w-4 h-4 text-red-500 mt-0.5 flex-shrink-0" />
<div className="text-sm text-red-700 dark:text-red-300">
<strong>Last Error:</strong> {rule.lastError}
</div>
</div>
)}
</div>
{/* Schedule Information */}
<div className="bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4">
<h3 className="text-lg font-medium text-gray-900 dark:text-gray-100 mb-3">
Schedule Information
</h3>
<div className="space-y-3">
<div className="flex items-center gap-2">
<ClockIcon className="w-4 h-4 text-gray-500" />
<span className="text-sm text-gray-600 dark:text-gray-400">Cron Expression:</span>
<code className="px-2 py-1 bg-gray-100 dark:bg-gray-700 rounded text-sm font-mono">
{rule.cron}
</code>
</div>
<div className="text-sm text-gray-600 dark:text-gray-400">
<strong>Human Readable:</strong> {formatCronExpression(rule.cron)}
</div>
<div className="text-sm text-gray-600 dark:text-gray-400">
<strong>Next Run:</strong> {formatDate(rule.nextRunAt)}
</div>
{rule.lastProcessedAt && (
<div className="text-sm text-gray-600 dark:text-gray-400">
<strong>Last Processed:</strong> {formatDate(rule.lastProcessedAt)}
</div>
)}
</div>
</div>
{/* Messages */}
<div className="bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4">
<h3 className="text-lg font-medium text-gray-900 dark:text-gray-100 mb-3">
Messages
</h3>
<div className="space-y-3">
{rule.input.messages.map((message, index) => (
<div key={index} className="border border-gray-200 dark:border-gray-600 rounded-lg p-3">
<div className="flex items-center gap-2 mb-2">
<span className={`px-2 py-1 rounded text-xs font-medium ${
message.role === 'system'
? 'bg-blue-100 text-blue-800 dark:bg-blue-900 dark:text-blue-200'
: message.role === 'user'
? 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200'
: 'bg-purple-100 text-purple-800 dark:bg-purple-900 dark:text-purple-200'
}`}>
{message.role.charAt(0).toUpperCase() + message.role.slice(1)}
</span>
</div>
<div className="text-sm text-gray-700 dark:text-gray-300 whitespace-pre-wrap">
{message.content}
</div>
</div>
))}
</div>
</div>
{/* Metadata */}
<div className="bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4">
<h3 className="text-lg font-medium text-gray-900 dark:text-gray-100 mb-3">
Metadata
</h3>
<div className="space-y-2 text-sm text-gray-600 dark:text-gray-400">
<div><strong>Created:</strong> {formatDate(rule.createdAt)}</div>
{rule.updatedAt && (
<div><strong>Last Updated:</strong> {formatDate(rule.updatedAt)}</div>
)}
<div><strong>Rule ID:</strong> <code className="bg-gray-100 dark:bg-gray-700 px-1 rounded">{rule.id}</code></div>
</div>
</div>
</div>
</div>
</Panel>
{/* Delete Confirmation Modal */}
{showDeleteConfirm && (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50">
<div className="bg-white dark:bg-gray-800 rounded-lg p-6 max-w-md mx-4">
<h3 className="text-lg font-medium text-gray-900 dark:text-gray-100 mb-4">
Delete Recurring Job Rule
</h3>
<p className="text-sm text-gray-600 dark:text-gray-400 mb-6">
Are you sure you want to delete this recurring job rule? This action cannot be undone and will permanently remove the rule and all its associated data.
</p>
<div className="flex gap-3 justify-end">
<Button
variant="secondary"
onClick={() => setShowDeleteConfirm(false)}
disabled={deleting}
>
Cancel
</Button>
<Button
variant="secondary"
onClick={handleDelete}
disabled={deleting}
className="flex items-center gap-2 bg-red-50 hover:bg-red-100 text-red-700 dark:bg-red-950 dark:hover:bg-red-900 dark:text-red-400 border border-red-200 dark:border-red-800"
>
{deleting ? (
<>
<Spinner size="sm" />
Deleting...
</>
) : (
<>
<Trash2Icon className="w-4 h-4" />
Delete
</>
)}
</Button>
</div>
</div>
</div>
)}
</>
);
}

View file

@ -0,0 +1,215 @@
'use client';
import { useCallback, useEffect, useMemo, useState } from "react";
import { Link, Spinner } from "@heroui/react";
import { Button } from "@/components/ui/button";
import { Panel } from "@/components/common/panel-common";
import { listRecurringJobRules } from "@/app/actions/recurring-job-rules.actions";
import { z } from "zod";
import { ListedRecurringRuleItem } from "@/src/application/repositories/recurring-job-rules.repository.interface";
import { isToday, isThisWeek, isThisMonth } from "@/lib/utils/date";
import { PlusIcon } from "lucide-react";
type ListedItem = z.infer<typeof ListedRecurringRuleItem>;
export function RecurringJobRulesList({ projectId }: { projectId: string }) {
const [items, setItems] = useState<ListedItem[]>([]);
const [cursor, setCursor] = useState<string | null>(null);
const [loading, setLoading] = useState<boolean>(true);
const [loadingMore, setLoadingMore] = useState<boolean>(false);
const [hasMore, setHasMore] = useState<boolean>(false);
const fetchPage = useCallback(async (cursorArg?: string | null) => {
const res = await listRecurringJobRules({ projectId, cursor: cursorArg ?? undefined, limit: 20 });
return res;
}, [projectId]);
useEffect(() => {
let ignore = false;
(async () => {
setLoading(true);
const res = await fetchPage(null);
if (ignore) return;
setItems(res.items);
setCursor(res.nextCursor);
setHasMore(Boolean(res.nextCursor));
setLoading(false);
})();
return () => { ignore = true; };
}, [fetchPage]);
const loadMore = useCallback(async () => {
if (!cursor) return;
setLoadingMore(true);
const res = await fetchPage(cursor);
setItems(prev => [...prev, ...res.items]);
setCursor(res.nextCursor);
setHasMore(Boolean(res.nextCursor));
setLoadingMore(false);
}, [cursor, fetchPage]);
const sections = useMemo(() => {
const groups: Record<string, ListedItem[]> = {
Today: [],
'This week': [],
'This month': [],
Older: [],
};
for (const item of items) {
const d = new Date(item.nextRunAt);
if (isToday(d)) groups['Today'].push(item);
else if (isThisWeek(d)) groups['This week'].push(item);
else if (isThisMonth(d)) groups['This month'].push(item);
else groups['Older'].push(item);
}
return groups;
}, [items]);
const getStatusColor = (disabled: boolean, lastError: string | null) => {
if (disabled) return 'text-red-600 dark:text-red-400';
if (lastError) return 'text-yellow-600 dark:text-yellow-400';
return 'text-green-600 dark:text-green-400';
};
const getStatusText = (disabled: boolean, lastError: string | null) => {
if (disabled) return 'Disabled';
if (lastError) return 'Error';
return 'Active';
};
const formatNextRunAt = (dateString: string) => {
const date = new Date(dateString);
return date.toLocaleString();
};
const formatCronExpression = (cron: string) => {
// Simple cron formatting for display
const parts = cron.split(' ');
if (parts.length === 5) {
const [minute, hour, day, month, dayOfWeek] = parts;
if (minute === '*' && hour === '*' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Every minute';
}
if (minute === '0' && hour === '*' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Every hour';
}
if (minute === '0' && hour === '0' && day === '*' && month === '*' && dayOfWeek === '*') {
return 'Daily at midnight';
}
if (minute === '0' && hour === '0' && day === '1' && month === '*' && dayOfWeek === '*') {
return 'Monthly on the 1st';
}
if (minute === '0' && hour === '0' && day === '*' && month === '*' && dayOfWeek === '0') {
return 'Weekly on Sunday';
}
}
return cron;
};
return (
<Panel
title={
<div className="flex items-center gap-3">
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
RECURRING JOB RULES
</div>
</div>
}
rightActions={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules/recurring/new`}>
<Button size="sm" className="flex items-center gap-2">
<PlusIcon className="w-4 h-4" />
New Rule
</Button>
</Link>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[1024px] mx-auto">
{loading && (
<div className="flex items-center gap-2">
<Spinner size="sm" />
<div>Loading...</div>
</div>
)}
{!loading && (
<div className="flex flex-col gap-6">
{Object.entries(sections).map(([sectionName, sectionItems]) => {
if (sectionItems.length === 0) return null;
return (
<div key={sectionName} className="space-y-3">
<h3 className="text-lg font-semibold text-gray-900 dark:text-gray-100">
{sectionName}
</h3>
<div className="grid gap-3">
{sectionItems.map((item) => (
<Link
key={item.id}
href={`/projects/${projectId}/job-rules/recurring/${item.id}`}
className="block p-4 bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 hover:border-gray-300 dark:hover:border-gray-600 transition-colors"
>
<div className="flex items-center justify-between">
<div className="flex-1">
<div className="flex items-center gap-3 mb-2">
<span className={`text-sm font-medium ${getStatusColor(item.disabled, item.lastError || null)}`}>
{getStatusText(item.disabled, item.lastError || null)}
</span>
<span className="text-sm text-gray-500 dark:text-gray-400">
Next run: {formatNextRunAt(item.nextRunAt)}
</span>
</div>
<div className="text-sm text-gray-600 dark:text-gray-400 mb-1">
Schedule: {formatCronExpression(item.cron)}
</div>
<div className="text-sm text-gray-600 dark:text-gray-400">
Created: {new Date(item.createdAt).toLocaleDateString()}
</div>
{item.lastError && (
<div className="text-sm text-red-600 dark:text-red-400 mt-1">
Last error: {item.lastError}
</div>
)}
</div>
<div className="text-sm text-gray-500 dark:text-gray-400">
{new Date(item.createdAt).toLocaleDateString()}
</div>
</div>
</Link>
))}
</div>
</div>
);
})}
{items.length === 0 && !loading && (
<div className="text-center py-8 text-gray-500 dark:text-gray-400">
No recurring job rules found. Create your first rule to get started.
</div>
)}
{hasMore && (
<div className="text-center">
<Button
onClick={loadMore}
disabled={loadingMore}
variant="secondary"
size="sm"
>
{loadingMore ? (
<>
<Spinner size="sm" />
Loading...
</>
) : (
'Load More'
)}
</Button>
</div>
)}
</div>
)}
</div>
</div>
</Panel>
);
}

View file

@ -0,0 +1,17 @@
import { Metadata } from "next";
import { requireActiveBillingSubscription } from '@/app/lib/billing';
import { JobRulesTabs } from "./components/job-rules-tabs";
export const metadata: Metadata = {
title: "Job Rules",
};
export default async function Page(
props: {
params: Promise<{ projectId: string }>
}
) {
const params = await props.params;
await requireActiveBillingSubscription();
return <JobRulesTabs projectId={params.projectId} />;
}

View file

@ -0,0 +1,17 @@
import { Metadata } from "next";
import { requireActiveBillingSubscription } from '@/app/lib/billing';
import { RecurringJobRuleView } from "../../components/recurring-job-rule-view";
export const metadata: Metadata = {
title: "Recurring Job Rule",
};
export default async function Page(
props: {
params: Promise<{ projectId: string; ruleId: string }>
}
) {
const params = await props.params;
await requireActiveBillingSubscription();
return <RecurringJobRuleView projectId={params.projectId} ruleId={params.ruleId} />;
}

View file

@ -0,0 +1,17 @@
import { Metadata } from "next";
import { requireActiveBillingSubscription } from '@/app/lib/billing';
import { CreateRecurringJobRuleForm } from "../../components/create-recurring-job-rule-form";
export const metadata: Metadata = {
title: "Create Recurring Job Rule",
};
export default async function Page(
props: {
params: Promise<{ projectId: string }>
}
) {
const params = await props.params;
await requireActiveBillingSubscription();
return <CreateRecurringJobRuleForm projectId={params.projectId} />;
}

View file

@ -0,0 +1,17 @@
import { Metadata } from "next";
import { requireActiveBillingSubscription } from '@/app/lib/billing';
import { ScheduledJobRuleView } from "../components/scheduled-job-rule-view";
export const metadata: Metadata = {
title: "Scheduled Job Rule",
};
export default async function Page(
props: {
params: Promise<{ projectId: string; ruleId: string }>
}
) {
const params = await props.params;
await requireActiveBillingSubscription();
return <ScheduledJobRuleView projectId={params.projectId} ruleId={params.ruleId} />;
}

View file

@ -0,0 +1,210 @@
'use client';
import { useState } from "react";
import { useRouter } from "next/navigation";
import { Button } from "@/components/ui/button";
import { Panel } from "@/components/common/panel-common";
import { createScheduledJobRule } from "@/app/actions/scheduled-job-rules.actions";
import { ArrowLeftIcon, PlusIcon, TrashIcon } from "lucide-react";
import Link from "next/link";
import { DatePicker } from "@heroui/react";
import { ZonedDateTime, now, getLocalTimeZone } from "@internationalized/date";
// Define a simpler message type for the form that only includes the fields we need
type FormMessage = {
role: "system" | "user" | "assistant";
content: string;
};
export function CreateScheduledJobRuleForm({ projectId }: { projectId: string }) {
const router = useRouter();
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<FormMessage[]>([
{ role: "user", content: "" }
]);
// Set default to 30 minutes from now with timezone info
const getDefaultDateTime = () => {
const localTimeZone = getLocalTimeZone();
const currentTime = now(localTimeZone);
const thirtyMinutesFromNow = currentTime.add({ minutes: 30 });
return thirtyMinutesFromNow;
};
const [scheduledDateTime, setScheduledDateTime] = useState<ZonedDateTime | null>(getDefaultDateTime());
const addMessage = () => {
setMessages([...messages, { role: "user", content: "" }]);
};
const removeMessage = (index: number) => {
if (messages.length > 1) {
setMessages(messages.filter((_, i) => i !== index));
}
};
const updateMessage = (index: number, field: keyof FormMessage, value: string) => {
const newMessages = [...messages];
newMessages[index] = { ...newMessages[index], [field]: value };
setMessages(newMessages);
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
// Validate required fields
if (!scheduledDateTime) {
alert("Please select date and time");
return;
}
if (messages.some(msg => !msg.content?.trim())) {
alert("Please fill in all message content");
return;
}
setLoading(true);
try {
// Convert FormMessage to the expected Message type
const convertedMessages = messages.map(msg => {
if (msg.role === "assistant") {
return {
role: msg.role,
content: msg.content,
agentName: null,
responseType: "internal" as const,
timestamp: undefined
};
}
return {
role: msg.role,
content: msg.content,
timestamp: undefined
};
});
// Convert ZonedDateTime to ISO string (already in UTC)
const scheduledTimeString = scheduledDateTime.toDate().toISOString();
await createScheduledJobRule({
projectId,
input: { messages: convertedMessages },
scheduledTime: scheduledTimeString,
});
router.push(`/projects/${projectId}/job-rules`);
} catch (error) {
console.error("Failed to create scheduled job rule:", error);
alert("Failed to create scheduled job rule");
} finally {
setLoading(false);
}
};
return (
<Panel
title={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules`}>
<Button variant="secondary" size="sm">
<ArrowLeftIcon className="w-4 h-4 mr-2" />
Back
</Button>
</Link>
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
CREATE SCHEDULED JOB RULE
</div>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[800px] mx-auto">
<form onSubmit={handleSubmit} className="space-y-6">
{/* Scheduled Date & Time */}
<div className="space-y-2">
<label className="block text-sm font-medium text-gray-700 dark:text-gray-300">
Scheduled Date & Time *
</label>
<DatePicker
value={scheduledDateTime}
onChange={setScheduledDateTime}
placeholderValue={getDefaultDateTime()}
minValue={now(getLocalTimeZone())}
granularity="minute"
isRequired
className="w-full"
/>
</div>
{/* Messages */}
<div className="space-y-4">
<div className="flex items-center justify-between">
<label className="block text-sm font-medium text-gray-700 dark:text-gray-300">
Messages *
</label>
<Button
type="button"
onClick={addMessage}
variant="secondary"
size="sm"
className="flex items-center gap-2"
>
<PlusIcon className="w-4 h-4" />
Add Message
</Button>
</div>
<div className="space-y-4">
{messages.map((message, index) => (
<div key={index} className="border border-gray-200 dark:border-gray-700 rounded-lg p-4">
<div className="flex items-center justify-between mb-3">
<select
value={message.role}
onChange={(e) => updateMessage(index, "role", e.target.value)}
className="px-3 py-1 border border-gray-300 dark:border-gray-600 rounded-md text-sm dark:bg-gray-700 dark:text-white"
>
<option value="system">System</option>
<option value="user">User</option>
<option value="assistant">Assistant</option>
</select>
{messages.length > 1 && (
<Button
type="button"
onClick={() => removeMessage(index)}
variant="secondary"
size="sm"
className="text-red-600 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300"
>
<TrashIcon className="w-4 h-4" />
</Button>
)}
</div>
<textarea
value={message.content}
onChange={(e) => updateMessage(index, "content", e.target.value)}
placeholder={`Enter ${message.role} message...`}
className="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 rounded-md shadow-sm focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-blue-500 dark:bg-gray-700 dark:text-white"
rows={3}
required
/>
</div>
))}
</div>
</div>
{/* Submit Button */}
<div className="flex justify-end">
<Button
type="submit"
disabled={loading}
className="px-6 py-2"
>
{loading ? "Creating..." : "Create Rule"}
</Button>
</div>
</form>
</div>
</div>
</Panel>
);
}

View file

@ -0,0 +1,234 @@
'use client';
import { useEffect, useMemo, useState } from "react";
import { useRouter } from "next/navigation";
import { Spinner } from "@heroui/react";
import { Panel } from "@/components/common/panel-common";
import { fetchScheduledJobRule, deleteScheduledJobRule } from "@/app/actions/scheduled-job-rules.actions";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
import { z } from "zod";
import Link from "next/link";
import { Button } from "@/components/ui/button";
import { ArrowLeftIcon, Trash2Icon } from "lucide-react";
import { MessageDisplay } from "@/app/lib/components/message-display";
export function ScheduledJobRuleView({ projectId, ruleId }: { projectId: string; ruleId: string; }) {
const router = useRouter();
const [rule, setRule] = useState<z.infer<typeof ScheduledJobRule> | null>(null);
const [loading, setLoading] = useState<boolean>(true);
const [deleting, setDeleting] = useState(false);
const [showDeleteConfirm, setShowDeleteConfirm] = useState(false);
useEffect(() => {
let ignore = false;
(async () => {
setLoading(true);
const res = await fetchScheduledJobRule({ ruleId });
if (ignore) return;
setRule(res);
setLoading(false);
})();
return () => { ignore = true; };
}, [ruleId]);
const title = useMemo(() => {
if (!rule) return 'Scheduled Job Rule';
return `Scheduled Job Rule ${rule.id}`;
}, [rule]);
const handleDelete = async () => {
if (!rule) return;
setDeleting(true);
try {
await deleteScheduledJobRule({
projectId,
ruleId: rule.id,
});
// Redirect back to job rules list
router.push(`/projects/${projectId}/job-rules`);
} catch (error) {
console.error("Failed to delete rule:", error);
alert("Failed to delete rule");
} finally {
setDeleting(false);
setShowDeleteConfirm(false);
}
};
const getStatusColor = (status: string, processedAt: string | null) => {
if (processedAt) return 'text-green-600 dark:text-green-400';
if (status === 'processing') return 'text-yellow-600 dark:text-yellow-400';
if (status === 'triggered') return 'text-blue-600 dark:text-blue-400';
return 'text-gray-600 dark:text-gray-400'; // pending
};
const getStatusText = (status: string, processedAt: string | null) => {
if (processedAt) return 'Completed';
if (status === 'processing') return 'Processing';
if (status === 'triggered') return 'Triggered';
return 'Pending';
};
const formatDateTime = (dateString: string) => {
const date = new Date(dateString);
return date.toLocaleString();
};
return (
<>
<Panel
title={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules`}>
<Button variant="secondary" size="sm">
<ArrowLeftIcon className="w-4 h-4 mr-2" />
Back
</Button>
</Link>
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
{title}
</div>
</div>
}
rightActions={
<div className="flex items-center gap-3">
<Button
onClick={() => setShowDeleteConfirm(true)}
variant="secondary"
size="sm"
className="flex items-center gap-2 bg-red-50 hover:bg-red-100 text-red-700 dark:bg-red-950 dark:hover:bg-red-900 dark:text-red-400 border border-red-200 dark:border-red-800"
>
<Trash2Icon className="w-4 h-4" />
Delete
</Button>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[1024px] mx-auto">
{loading && (
<div className="flex items-center gap-2">
<Spinner size="sm" />
<div>Loading...</div>
</div>
)}
{!loading && rule && (
<div className="flex flex-col gap-6">
{/* Rule Metadata */}
<div className="bg-gray-50 dark:bg-gray-800/50 p-4 rounded-lg border border-gray-200 dark:border-gray-700">
<div className="grid grid-cols-2 gap-4 text-sm">
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Rule ID:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">{rule.id}</span>
</div>
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Status:</span>
<span className={`ml-2 font-mono ${getStatusColor(rule.status, rule.processedAt || null)}`}>
{getStatusText(rule.status, rule.processedAt || null)}
</span>
</div>
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Next Run:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">
{formatDateTime(rule.nextRunAt)}
</span>
</div>
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Created:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">
{formatDateTime(rule.createdAt)}
</span>
</div>
{rule.processedAt && (
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Processed:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">
{formatDateTime(rule.processedAt)}
</span>
</div>
)}
{rule.output?.jobId && (
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Job ID:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">
<Link
href={`/projects/${projectId}/jobs/${rule.output.jobId}`}
className="text-blue-600 hover:text-blue-800 dark:text-blue-400 dark:hover:text-blue-300"
>
{rule.output.jobId}
</Link>
</span>
</div>
)}
{rule.workerId && (
<div>
<span className="font-semibold text-gray-700 dark:text-gray-300">Worker ID:</span>
<span className="ml-2 font-mono text-gray-600 dark:text-gray-400">{rule.workerId}</span>
</div>
)}
</div>
</div>
{/* Messages */}
<div className="space-y-4">
<h3 className="text-lg font-semibold text-gray-900 dark:text-gray-100">
Messages
</h3>
<div className="space-y-4">
{rule.input.messages.map((message, index) => (
<div key={index} className="bg-white dark:bg-gray-800 p-4 rounded-lg border border-gray-200 dark:border-gray-700">
<MessageDisplay message={message} index={index} />
</div>
))}
</div>
</div>
</div>
)}
</div>
</div>
</Panel>
{/* Delete Confirmation Modal */}
{showDeleteConfirm && (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50">
<div className="bg-white dark:bg-gray-800 rounded-lg p-6 max-w-md mx-4">
<h3 className="text-lg font-medium text-gray-900 dark:text-gray-100 mb-4">
Delete Scheduled Job Rule
</h3>
<p className="text-sm text-gray-600 dark:text-gray-400 mb-6">
Are you sure you want to delete this scheduled job rule? This action cannot be undone and will permanently remove the rule and all its associated data.
</p>
<div className="flex gap-3 justify-end">
<Button
variant="secondary"
onClick={() => setShowDeleteConfirm(false)}
disabled={deleting}
>
Cancel
</Button>
<Button
variant="secondary"
onClick={handleDelete}
disabled={deleting}
className="flex items-center gap-2 bg-red-50 hover:bg-red-100 text-red-700 dark:bg-red-950 dark:hover:bg-red-900 dark:text-red-400 border border-red-200 dark:border-red-800"
>
{deleting ? (
<>
<Spinner size="sm" />
Deleting...
</>
) : (
<>
<Trash2Icon className="w-4 h-4" />
Delete
</>
)}
</Button>
</div>
</div>
</div>
)}
</>
);
}

View file

@ -0,0 +1,185 @@
'use client';
import { useCallback, useEffect, useMemo, useState } from "react";
import { Link, Spinner } from "@heroui/react";
import { Button } from "@/components/ui/button";
import { Panel } from "@/components/common/panel-common";
import { listScheduledJobRules } from "@/app/actions/scheduled-job-rules.actions";
import { z } from "zod";
import { ListedRuleItem } from "@/src/application/repositories/scheduled-job-rules.repository.interface";
import { isToday, isThisWeek, isThisMonth } from "@/lib/utils/date";
import { PlusIcon } from "lucide-react";
type ListedItem = z.infer<typeof ListedRuleItem>;
export function ScheduledJobRulesList({ projectId }: { projectId: string }) {
const [items, setItems] = useState<ListedItem[]>([]);
const [cursor, setCursor] = useState<string | null>(null);
const [loading, setLoading] = useState<boolean>(true);
const [loadingMore, setLoadingMore] = useState<boolean>(false);
const [hasMore, setHasMore] = useState<boolean>(false);
const fetchPage = useCallback(async (cursorArg?: string | null) => {
const res = await listScheduledJobRules({ projectId, cursor: cursorArg ?? undefined, limit: 20 });
return res;
}, [projectId]);
useEffect(() => {
let ignore = false;
(async () => {
setLoading(true);
const res = await fetchPage(null);
if (ignore) return;
setItems(res.items);
setCursor(res.nextCursor);
setHasMore(Boolean(res.nextCursor));
setLoading(false);
})();
return () => { ignore = true; };
}, [fetchPage]);
const loadMore = useCallback(async () => {
if (!cursor) return;
setLoadingMore(true);
const res = await fetchPage(cursor);
setItems(prev => [...prev, ...res.items]);
setCursor(res.nextCursor);
setHasMore(Boolean(res.nextCursor));
setLoadingMore(false);
}, [cursor, fetchPage]);
const sections = useMemo(() => {
const groups: Record<string, ListedItem[]> = {
Today: [],
'This week': [],
'This month': [],
Older: [],
};
for (const item of items) {
const d = new Date(item.nextRunAt);
if (isToday(d)) groups['Today'].push(item);
else if (isThisWeek(d)) groups['This week'].push(item);
else if (isThisMonth(d)) groups['This month'].push(item);
else groups['Older'].push(item);
}
return groups;
}, [items]);
const getStatusColor = (status: string, processedAt: string | null) => {
if (processedAt) return 'text-green-600 dark:text-green-400';
if (status === 'processing') return 'text-yellow-600 dark:text-yellow-400';
if (status === 'triggered') return 'text-blue-600 dark:text-blue-400';
return 'text-gray-600 dark:text-gray-400'; // pending
};
const getStatusText = (status: string, processedAt: string | null) => {
if (processedAt) return 'Completed';
if (status === 'processing') return 'Processing';
if (status === 'triggered') return 'Triggered';
return 'Pending';
};
const formatNextRunAt = (dateString: string) => {
const date = new Date(dateString);
return date.toLocaleString();
};
return (
<Panel
title={
<div className="flex items-center gap-3">
<div className="text-sm font-medium text-gray-900 dark:text-gray-100">
SCHEDULED JOB RULES
</div>
</div>
}
rightActions={
<div className="flex items-center gap-3">
<Link href={`/projects/${projectId}/job-rules/scheduled/new`}>
<Button size="sm" className="flex items-center gap-2">
<PlusIcon className="w-4 h-4" />
New Rule
</Button>
</Link>
</div>
}
>
<div className="h-full overflow-auto px-4 py-4">
<div className="max-w-[1024px] mx-auto">
{loading && (
<div className="flex items-center gap-2">
<Spinner size="sm" />
<div>Loading...</div>
</div>
)}
{!loading && (
<div className="flex flex-col gap-6">
{Object.entries(sections).map(([sectionName, sectionItems]) => {
if (sectionItems.length === 0) return null;
return (
<div key={sectionName} className="space-y-3">
<h3 className="text-lg font-semibold text-gray-900 dark:text-gray-100">
{sectionName}
</h3>
<div className="grid gap-3">
{sectionItems.map((item) => (
<Link
key={item.id}
href={`/projects/${projectId}/job-rules/scheduled/${item.id}`}
className="block p-4 bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 hover:border-gray-300 dark:hover:border-gray-600 transition-colors"
>
<div className="flex items-center justify-between">
<div className="flex-1">
<div className="flex items-center gap-3 mb-2">
<span className={`text-sm font-medium ${getStatusColor(item.status, item.processedAt || null)}`}>
{getStatusText(item.status, item.processedAt || null)}
</span>
<span className="text-sm text-gray-500 dark:text-gray-400">
Next run: {formatNextRunAt(item.nextRunAt)}
</span>
</div>
<div className="text-sm text-gray-600 dark:text-gray-400">
Created: {new Date(item.createdAt).toLocaleDateString()}
</div>
</div>
<div className="text-sm text-gray-500 dark:text-gray-400">
{new Date(item.createdAt).toLocaleDateString()}
</div>
</div>
</Link>
))}
</div>
</div>
);
})}
{items.length === 0 && !loading && (
<div className="text-center py-8 text-gray-500 dark:text-gray-400">
No scheduled job rules found. Create your first rule to get started.
</div>
)}
{hasMore && (
<div className="text-center">
<Button
onClick={loadMore}
disabled={loadingMore}
variant="secondary"
size="sm"
>
{loadingMore ? (
<>
<Spinner size="sm" />
Loading...
</>
) : (
'Load More'
)}
</Button>
</div>
)}
</div>
)}
</div>
</div>
</Panel>
);
}

View file

@ -0,0 +1,17 @@
import { Metadata } from "next";
import { requireActiveBillingSubscription } from '@/app/lib/billing';
import { CreateScheduledJobRuleForm } from "../components/create-scheduled-job-rule-form";
export const metadata: Metadata = {
title: "Create Scheduled Job Rule",
};
export default async function Page(
props: {
params: Promise<{ projectId: string }>
}
) {
const params = await props.params;
await requireActiveBillingSubscription();
return <CreateScheduledJobRuleForm projectId={params.projectId} />;
}

View file

@ -54,13 +54,35 @@ export function JobView({ projectId, jobId }: { projectId: string; jobId: string
'Trigger ID': reason.triggerId,
'Deployment ID': reason.triggerDeploymentId,
},
payload: reason.payload
payload: reason.payload,
link: null
};
}
if (reason.type === 'scheduled_job_rule') {
return {
type: 'Scheduled Job Rule',
details: {
'Rule ID': reason.ruleId,
},
payload: null,
link: `/projects/${projectId}/job-rules/scheduled/${reason.ruleId}`
};
}
if (reason.type === 'recurring_job_rule') {
return {
type: 'Recurring Job Rule',
details: {
'Rule ID': reason.ruleId,
},
payload: null,
link: `/projects/${projectId}/job-rules/recurring/${reason.ruleId}`
};
}
return {
type: 'Unknown',
details: {},
payload: null
payload: null,
link: null
};
};
@ -164,7 +186,7 @@ export function JobView({ projectId, jobId }: { projectId: string; jobId: string
))}
</div>
</div>
{reasonInfo.payload && Object.keys(reasonInfo.payload).length > 0 && (
<div>
<div className="text-xs font-semibold text-gray-600 dark:text-gray-400 mb-2 uppercase tracking-wide">
@ -175,6 +197,19 @@ export function JobView({ projectId, jobId }: { projectId: string; jobId: string
</pre>
</div>
)}
{reasonInfo.link && (
<div>
<div className="text-xs font-semibold text-gray-600 dark:text-gray-400 mb-2 uppercase tracking-wide">
Related Link
</div>
<Link
href={reasonInfo.link}
className="text-blue-600 dark:text-blue-400 hover:underline font-medium"
>
{reasonInfo.type === 'Scheduled Job Rule' ? 'View Scheduled Job Rule' : 'View Details'}
</Link>
</div>
)}
</div>
</div>
)}
@ -197,15 +232,7 @@ export function JobView({ projectId, jobId }: { projectId: string; jobId: string
</div>
</div>
{/* Workflow */}
<div>
<div className="text-xs font-semibold text-gray-600 dark:text-gray-400 mb-2 uppercase tracking-wide">
Workflow
</div>
<pre className="bg-gray-100 dark:bg-gray-900 p-3 rounded text-xs overflow-x-auto border border-gray-200 dark:border-gray-700 font-mono max-h-[400px]">
{JSON.stringify(job.input.workflow, null, 2)}
</pre>
</div>
</div>
</div>

View file

@ -81,9 +81,31 @@ export function JobsList({ projectId }: { projectId: string }) {
const getReasonDisplay = (reason: any) => {
if (reason.type === 'composio_trigger') {
return `Composio: ${reason.triggerTypeSlug}`;
return {
type: 'Composio Trigger',
display: `Composio: ${reason.triggerTypeSlug}`,
link: null
};
}
return 'Unknown';
if (reason.type === 'scheduled_job_rule') {
return {
type: 'Scheduled Job Rule',
display: `Scheduled Rule`,
link: `/projects/${projectId}/job-rules/scheduled/${reason.ruleId}`
};
}
if (reason.type === 'recurring_job_rule') {
return {
type: 'Recurring Job Rule',
display: `Recurring Rule`,
link: `/projects/${projectId}/job-rules/recurring/${reason.ruleId}`
};
}
return {
type: 'Unknown',
display: 'Unknown',
link: null
};
};
return (
@ -129,33 +151,46 @@ export function JobsList({ projectId }: { projectId: string }) {
</tr>
</thead>
<tbody className="bg-white dark:bg-gray-800 divide-y divide-gray-200 dark:divide-gray-700">
{group.map((job) => (
<tr key={job.id} className="hover:bg-gray-50 dark:hover:bg-gray-750 transition-colors">
<td className="px-6 py-4 text-left">
<Link
href={`/projects/${projectId}/jobs/${job.id}`}
size="lg"
isBlock
className="text-sm text-gray-900 dark:text-gray-100 hover:text-blue-600 dark:hover:text-blue-400 truncate block"
>
{job.id}
</Link>
</td>
<td className="px-6 py-4 text-left">
<span className={`text-sm font-medium ${getStatusColor(job.status)}`}>
{job.status}
</span>
</td>
<td className="px-6 py-4 text-left">
<span className="text-sm text-gray-600 dark:text-gray-300 font-mono">
{getReasonDisplay(job.reason)}
</span>
</td>
<td className="px-6 py-4 text-left text-sm text-gray-600 dark:text-gray-300">
{new Date(job.createdAt).toLocaleString()}
</td>
</tr>
))}
{group.map((job) => {
const reasonInfo = getReasonDisplay(job.reason);
return (
<tr key={job.id} className="hover:bg-gray-50 dark:hover:bg-gray-750 transition-colors">
<td className="px-6 py-4 text-left">
<Link
href={`/projects/${projectId}/jobs/${job.id}`}
size="lg"
isBlock
className="text-sm text-gray-900 dark:text-gray-100 hover:text-blue-600 dark:hover:text-blue-400 truncate block"
>
{job.id}
</Link>
</td>
<td className="px-6 py-4 text-left">
<span className={`text-sm font-medium ${getStatusColor(job.status)}`}>
{job.status}
</span>
</td>
<td className="px-6 py-4 text-left">
{reasonInfo.link ? (
<Link
href={reasonInfo.link}
size="sm"
className="text-sm text-blue-600 dark:text-blue-400 hover:underline font-mono"
>
{reasonInfo.display}
</Link>
) : (
<span className="text-sm text-gray-600 dark:text-gray-300 font-mono">
{reasonInfo.display}
</span>
)}
</td>
<td className="px-6 py-4 text-left text-sm text-gray-600 dark:text-gray-300">
{new Date(job.createdAt).toLocaleString()}
</td>
</tr>
);
})}
</tbody>
</table>
</div>

View file

@ -16,7 +16,8 @@ import {
Sun,
HelpCircle,
MessageSquareIcon,
LogsIcon
LogsIcon,
Clock
} from "lucide-react";
import { getProjectConfig } from "@/app/actions/project_actions";
import { createProjectWithOptions } from "../../lib/project-creation-utils";
@ -113,6 +114,12 @@ export default function Sidebar({ projectId, useAuth, collapsed = false, onToggl
icon: LogsIcon,
requiresProject: true
},
{
href: 'job-rules',
label: 'Job Rules',
icon: Clock,
requiresProject: true
},
{
href: 'config',
label: 'Settings',

View file

@ -0,0 +1,12 @@
import '../lib/loadenv';
import { container } from "@/di/container";
import { IJobRulesWorker } from "@/src/application/workers/job-rules.worker";
(async () => {
try {
const worker = container.resolve<IJobRulesWorker>('jobRulesWorker');
await worker.run();
} catch (error) {
console.error(`Unable to run scheduled job rules worker: ${error}`);
}
})();

View file

@ -30,6 +30,7 @@ import { DeleteComposioConnectedAccountController } from "@/src/interface-adapte
import { HandleComposioWebhookRequestController } from "@/src/interface-adapters/controllers/composio/webhook/handle-composio-webhook-request.controller";
import { RedisPubSubService } from "@/src/infrastructure/services/redis.pub-sub.service";
import { JobsWorker } from "@/src/application/workers/jobs.worker";
import { JobRulesWorker } from "@/src/application/workers/job-rules.worker";
import { ListJobsUseCase } from "@/src/application/use-cases/jobs/list-jobs.use-case";
import { ListJobsController } from "@/src/interface-adapters/controllers/jobs/list-jobs.controller";
import { ListConversationsUseCase } from "@/src/application/use-cases/conversations/list-conversations.use-case";
@ -39,6 +40,30 @@ import { FetchJobController } from "@/src/interface-adapters/controllers/jobs/fe
import { FetchConversationUseCase } from "@/src/application/use-cases/conversations/fetch-conversation.use-case";
import { FetchConversationController } from "@/src/interface-adapters/controllers/conversations/fetch-conversation.controller";
// Scheduled Job Rules
import { MongoDBScheduledJobRulesRepository } from "@/src/infrastructure/repositories/mongodb.scheduled-job-rules.repository";
import { CreateScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case";
import { FetchScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case";
import { ListScheduledJobRulesUseCase } from "@/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case";
import { DeleteScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case";
import { CreateScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/create-scheduled-job-rule.controller";
import { FetchScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/fetch-scheduled-job-rule.controller";
import { ListScheduledJobRulesController } from "@/src/interface-adapters/controllers/scheduled-job-rules/list-scheduled-job-rules.controller";
import { DeleteScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/delete-scheduled-job-rule.controller";
// Recurring Job Rules
import { MongoDBRecurringJobRulesRepository } from "@/src/infrastructure/repositories/mongodb.recurring-job-rules.repository";
import { CreateRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case";
import { FetchRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case";
import { ListRecurringJobRulesUseCase } from "@/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case";
import { ToggleRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case";
import { DeleteRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case";
import { CreateRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/create-recurring-job-rule.controller";
import { FetchRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/fetch-recurring-job-rule.controller";
import { ListRecurringJobRulesController } from "@/src/interface-adapters/controllers/recurring-job-rules/list-recurring-job-rules.controller";
import { ToggleRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/toggle-recurring-job-rule.controller";
import { DeleteRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/delete-recurring-job-rule.controller";
export const container = createContainer({
injectionMode: InjectionMode.PROXY,
strict: true,
@ -48,6 +73,7 @@ container.register({
// workers
// ---
jobsWorker: asClass(JobsWorker).singleton(),
jobRulesWorker: asClass(JobRulesWorker).singleton(),
// services
// ---
@ -79,6 +105,32 @@ container.register({
fetchJobUseCase: asClass(FetchJobUseCase).singleton(),
fetchJobController: asClass(FetchJobController).singleton(),
// scheduled job rules
// ---
scheduledJobRulesRepository: asClass(MongoDBScheduledJobRulesRepository).singleton(),
createScheduledJobRuleUseCase: asClass(CreateScheduledJobRuleUseCase).singleton(),
fetchScheduledJobRuleUseCase: asClass(FetchScheduledJobRuleUseCase).singleton(),
listScheduledJobRulesUseCase: asClass(ListScheduledJobRulesUseCase).singleton(),
deleteScheduledJobRuleUseCase: asClass(DeleteScheduledJobRuleUseCase).singleton(),
createScheduledJobRuleController: asClass(CreateScheduledJobRuleController).singleton(),
fetchScheduledJobRuleController: asClass(FetchScheduledJobRuleController).singleton(),
listScheduledJobRulesController: asClass(ListScheduledJobRulesController).singleton(),
deleteScheduledJobRuleController: asClass(DeleteScheduledJobRuleController).singleton(),
// recurring job rules
// ---
recurringJobRulesRepository: asClass(MongoDBRecurringJobRulesRepository).singleton(),
createRecurringJobRuleUseCase: asClass(CreateRecurringJobRuleUseCase).singleton(),
fetchRecurringJobRuleUseCase: asClass(FetchRecurringJobRuleUseCase).singleton(),
listRecurringJobRulesUseCase: asClass(ListRecurringJobRulesUseCase).singleton(),
toggleRecurringJobRuleUseCase: asClass(ToggleRecurringJobRuleUseCase).singleton(),
deleteRecurringJobRuleUseCase: asClass(DeleteRecurringJobRuleUseCase).singleton(),
createRecurringJobRuleController: asClass(CreateRecurringJobRuleController).singleton(),
fetchRecurringJobRuleController: asClass(FetchRecurringJobRuleController).singleton(),
listRecurringJobRulesController: asClass(ListRecurringJobRulesController).singleton(),
toggleRecurringJobRuleController: asClass(ToggleRecurringJobRuleController).singleton(),
deleteRecurringJobRuleController: asClass(DeleteRecurringJobRuleController).singleton(),
// composio
// ---
deleteComposioConnectedAccountUseCase: asClass(DeleteComposioConnectedAccountUseCase).singleton(),

View file

@ -22,6 +22,7 @@
"@heroui/react": "^2.8.0-beta.10",
"@heroui/system": "^2.4.18-beta.2",
"@heroui/theme": "^2.4.18-beta.2",
"@internationalized/date": "^3.8.2",
"@langchain/core": "^0.3.7",
"@langchain/textsplitters": "^0.1.0",
"@mendable/firecrawl-js": "^1.0.3",
@ -33,6 +34,7 @@
"ai": "^4.3.13",
"awilix": "^12.0.5",
"clsx": "^2.1.1",
"cron-parser": "^5.3.0",
"dotenv": "^16.4.5",
"immer": "^10.1.1",
"ioredis": "^5.6.1",
@ -9005,6 +9007,18 @@
"node": ">= 6"
}
},
"node_modules/cron-parser": {
"version": "5.3.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.3.0.tgz",
"integrity": "sha512-IS4mnFu6n3CFgEmXjr+B2zzGHsjJmHEdN+BViKvYSiEn3KWss9ICRDETDX/VZldiW82B94OyAZm4LIT4vcKK0g==",
"license": "MIT",
"dependencies": {
"luxon": "^3.6.1"
},
"engines": {
"node": ">=18"
}
},
"node_modules/cross-spawn": {
"version": "7.0.6",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz",
@ -12822,6 +12836,15 @@
"react": "^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0-rc"
}
},
"node_modules/luxon": {
"version": "3.7.1",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.7.1.tgz",
"integrity": "sha512-RkRWjA926cTvz5rAb1BqyWkKbbjzCGchDUIKMCUvNi17j6f6j8uHGDV82Aqcqtzd+icoYpELmG3ksgGiFNNcNg==",
"license": "MIT",
"engines": {
"node": ">=12"
}
},
"node_modules/magic-string": {
"version": "0.30.17",
"resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.17.tgz",

View file

@ -13,7 +13,8 @@
"ragUrlsWorker": "tsx app/scripts/rag_urls_worker.ts",
"ragFilesWorker": "tsx app/scripts/rag_files_worker.ts",
"ragTextWorker": "tsx app/scripts/rag_text_worker.ts",
"jobs-worker": "tsx app/scripts/jobs-worker.ts"
"jobs-worker": "tsx app/scripts/jobs-worker.ts",
"job-rules-worker": "tsx app/scripts/job-rules.worker.ts"
},
"dependencies": {
"@ai-sdk/openai": "^1.3.21",
@ -30,6 +31,7 @@
"@heroui/react": "^2.8.0-beta.10",
"@heroui/system": "^2.4.18-beta.2",
"@heroui/theme": "^2.4.18-beta.2",
"@internationalized/date": "^3.8.2",
"@langchain/core": "^0.3.7",
"@langchain/textsplitters": "^0.1.0",
"@mendable/firecrawl-js": "^1.0.3",
@ -41,6 +43,7 @@
"ai": "^4.3.13",
"awilix": "^12.0.5",
"clsx": "^2.1.1",
"cron-parser": "^5.3.0",
"dotenv": "^16.4.5",
"immer": "^10.1.1",
"ioredis": "^5.6.1",

View file

@ -8,7 +8,7 @@ import { PaginatedList } from "@/src/entities/common/paginated-list";
* Schema for creating a new job.
* Defines the required fields when creating a job in the system.
*/
const createJobSchema = Job.pick({
export const CreateJobSchema = Job.pick({
reason: true,
projectId: true,
input: true,
@ -27,7 +27,7 @@ export const ListedJobItem = Job.pick({
* Schema for updating an existing job.
* Defines the fields that can be updated for a job.
*/
const updateJobSchema = Job.pick({
export const UpdateJobSchema = Job.pick({
status: true,
output: true,
});
@ -46,7 +46,7 @@ export interface IJobsRepository {
* @param data - The job data containing trigger information, project ID, and input
* @returns Promise resolving to the created job with all fields populated
*/
create(data: z.infer<typeof createJobSchema>): Promise<z.infer<typeof Job>>;
create(data: z.infer<typeof CreateJobSchema>): Promise<z.infer<typeof Job>>;
/**
* Fetches a job by its unique identifier.
@ -88,7 +88,7 @@ export interface IJobsRepository {
* @returns Promise resolving to the updated job
* @throws {NotFoundError} if the job doesn't exist
*/
update(id: string, data: z.infer<typeof updateJobSchema>): Promise<z.infer<typeof Job>>;
update(id: string, data: z.infer<typeof UpdateJobSchema>): Promise<z.infer<typeof Job>>;
/**
* Releases a job lock, making it available for other workers.

View file

@ -0,0 +1,92 @@
import { NotFoundError } from "@/src/entities/errors/common";
import { z } from "zod";
import { PaginatedList } from "@/src/entities/common/paginated-list";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
/**
* Schema for creating a new recurring job rule.
*/
export const CreateRecurringRuleSchema = RecurringJobRule
.pick({
projectId: true,
input: true,
cron: true,
});
export const ListedRecurringRuleItem = RecurringJobRule.omit({
input: true,
});
/**
* Repository interface for managing recurring job rules in the system.
*
* This interface defines the contract for recurring job rule management operations including
* creation, fetching, polling, processing, and listing rules. Recurring job rules represent
* tasks that can be processed by workers based on cron expressions.
*/
export interface IRecurringJobRulesRepository {
/**
* Creates a new recurring job rule in the system.
*
* @param data - The rule data containing project ID, input messages, and cron expression
* @returns Promise resolving to the created recurring job rule with all fields populated
*/
create(data: z.infer<typeof CreateRecurringRuleSchema>): Promise<z.infer<typeof RecurringJobRule>>;
/**
* Fetches a recurring job rule by its unique identifier.
*
* @param id - The unique identifier of the recurring job rule to fetch
* @returns Promise resolving to the recurring job rule or null if not found
*/
fetch(id: string): Promise<z.infer<typeof RecurringJobRule> | null>;
/**
* Polls for the next available recurring job rule that can be processed by a worker.
*
* This method should return the next rule that is ready to be processed (not disabled,
* not currently locked, and nextRunAt is in the past).
*
* @param workerId - The unique identifier of the worker requesting a recurring job rule
* @returns Promise resolving to the next available recurring job rule or null if no rules are available
*/
poll(workerId: string): Promise<z.infer<typeof RecurringJobRule> | null>;
/**
* Releases a recurring job rule after it has been executed
*
* @param id - The unique identifier of the recurring job rule to release
* @returns Promise resolving to the updated recurring job rule
* @throws {NotFoundError} if the recurring job rule doesn't exist
*/
release(id: string): Promise<z.infer<typeof RecurringJobRule>>;
/**
* Lists recurring job rules for a specific project with pagination.
*
* @param projectId - The unique identifier of the project
* @param cursor - Optional cursor for pagination
* @param limit - Maximum number of recurring job rules to return (default: 50)
* @returns Promise resolving to a paginated list of recurring job rules
*/
list(projectId: string, cursor?: string, limit?: number): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>>;
/**
* Toggles a recurring job rule's disabled state
*
* This method should update the disabled field of the recurring job rule.
*
* @param id - The unique identifier of the recurring job rule to toggle
* @param disabled - The new disabled state
* @returns Promise resolving to the updated recurring job rule
*/
toggle(id: string, disabled: boolean): Promise<z.infer<typeof RecurringJobRule>>;
/**
* Deletes a recurring job rule by its unique identifier.
*
* @param id - The unique identifier of the recurring job rule to delete
* @returns Promise resolving to true if the rule was deleted, false if not found
*/
delete(id: string): Promise<boolean>;
}

View file

@ -0,0 +1,98 @@
import { NotFoundError } from "@/src/entities/errors/common";
import { z } from "zod";
import { PaginatedList } from "@/src/entities/common/paginated-list";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
/**
* Schema for creating a new scheduled job rule.
*/
export const CreateRuleSchema = ScheduledJobRule
.pick({
projectId: true,
input: true,
})
.extend({
scheduledTime: z.string().datetime(),
});
export const ListedRuleItem = ScheduledJobRule.omit({
input: true,
});
export const UpdateJobSchema = ScheduledJobRule.pick({
status: true,
output: true,
});
/**
* Repository interface for managing scheduled job rules in the system.
*
* This interface defines the contract for scheduled job rule management operations including
* creation, fetching, polling, processing, and listing rules. Scheduled job rules represent
* recurring or scheduled tasks that can be processed by workers at specified times.
*/
export interface IScheduledJobRulesRepository {
/**
* Creates a new scheduled job rule in the system.
*
* @param data - The rule data containing project ID, input messages, and scheduled run time
* @returns Promise resolving to the created scheduled job rule with all fields populated
*/
create(data: z.infer<typeof CreateRuleSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
/**
* Fetches a scheduled job rule by its unique identifier.
*
* @param id - The unique identifier of the scheduled job rule to fetch
* @returns Promise resolving to the scheduled job rule or null if not found
*/
fetch(id: string): Promise<z.infer<typeof ScheduledJobRule> | null>;
/**
* Polls for the next available scheduled job rule that can be processed by a worker.
*
* This method should return the next rule that is ready to be processed (not yet processed)
* and is not currently locked by another worker. The rules should be ordered by their scheduled
* run time (nextRunAt) in ascending order.
*
* @param workerId - The unique identifier of the worker requesting a scheduled job rule
* @returns Promise resolving to the next available scheduled job rule or null if no rules are available
*/
poll(workerId: string): Promise<z.infer<typeof ScheduledJobRule> | null>;
/**
* Updates a scheduled job rule with new status and output data.
*
* @param id - The unique identifier of the scheduled job rule to update
* @param data - The update data containing status and output fields
* @returns Promise resolving to the updated scheduled job rule
* @throws {NotFoundError} if the scheduled job rule doesn't exist
*/
update(id: string, data: z.infer<typeof UpdateJobSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
/**
* Releases a scheduled job rule after it has been executed.
*
* @param id - The unique identifier of the scheduled job rule to release
* @returns Promise resolving to the updated scheduled job rule
* @throws {NotFoundError} if the scheduled job rule doesn't exist
*/
release(id: string): Promise<z.infer<typeof ScheduledJobRule>>;
/**
* Lists scheduled job rules for a specific project with pagination.
*
* @param projectId - The unique identifier of the project
* @param cursor - Optional cursor for pagination
* @param limit - Maximum number of scheduled job rules to return (default: 50)
* @returns Promise resolving to a paginated list of scheduled job rules
*/
list(projectId: string, cursor?: string, limit?: number): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>>;
/**
* Deletes a scheduled job rule by its unique identifier.
*
* @param id - The unique identifier of the scheduled job rule to delete
* @returns Promise resolving to true if the rule was deleted, false if not found
*/
delete(id: string): Promise<boolean>;
}

View file

@ -129,7 +129,6 @@ export class HandleCompsioWebhookRequestUseCase implements IHandleCompsioWebhook
},
projectId: deployment.projectId,
input: {
workflow: project.liveWorkflow,
messages: [msg],
},
});

View file

@ -0,0 +1,96 @@
import { BadRequestError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface';
import { RecurringJobRule } from '@/src/entities/models/recurring-job-rule';
import { Message } from '@/app/lib/types/types';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
input: z.object({
messages: z.array(Message),
}),
cron: z.string(),
});
export interface ICreateRecurringJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class CreateRecurringJobRuleUseCase implements ICreateRecurringJobRuleUseCase {
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
recurringJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
recurringJobRulesRepository: IRecurringJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// Validate cron expression
if (!this.isValidCronExpression(request.cron)) {
throw new BadRequestError('Invalid cron expression. Expected format: minute hour day month dayOfWeek');
}
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId: request.projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
// create the recurring job rule
const rule = await this.recurringJobRulesRepository.create({
projectId: request.projectId,
input: request.input,
cron: request.cron,
});
return rule;
}
private isValidCronExpression(cron: string): boolean {
const parts = cron.split(' ');
if (parts.length !== 5) {
return false;
}
// Basic validation - in production you'd want more sophisticated validation
const [minute, hour, day, month, dayOfWeek] = parts;
// Check if parts are valid
const isValidPart = (part: string) => {
if (part === '*') return true;
if (part.includes('/')) {
const [range, step] = part.split('/');
if (range === '*' || (parseInt(step) > 0 && parseInt(step) <= 59)) return true;
return false;
}
if (part.includes('-')) {
const [start, end] = part.split('-');
return !isNaN(parseInt(start)) && !isNaN(parseInt(end)) && parseInt(start) <= parseInt(end);
}
return !isNaN(parseInt(part));
};
return isValidPart(minute) && isValidPart(hour) && isValidPart(day) && isValidPart(month) && isValidPart(dayOfWeek);
}
}

View file

@ -0,0 +1,59 @@
import { BadRequestError, NotFoundError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
ruleId: z.string(),
});
export interface IDeleteRecurringJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<boolean>;
}
export class DeleteRecurringJobRuleUseCase implements IDeleteRecurringJobRuleUseCase {
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
recurringJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
recurringJobRulesRepository: IRecurringJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<boolean> {
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId: request.projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
// ensure rule belongs to this project
const rule = await this.recurringJobRulesRepository.fetch(request.ruleId);
if (!rule || rule.projectId !== request.projectId) {
throw new NotFoundError('Recurring job rule not found');
}
// delete the rule
return await this.recurringJobRulesRepository.delete(request.ruleId);
}
}

View file

@ -0,0 +1,62 @@
import { NotFoundError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface';
import { RecurringJobRule } from '@/src/entities/models/recurring-job-rule';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
});
export interface IFetchRecurringJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class FetchRecurringJobRuleUseCase implements IFetchRecurringJobRuleUseCase {
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
recurringJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
recurringJobRulesRepository: IRecurringJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// fetch rule first to get projectId
const rule = await this.recurringJobRulesRepository.fetch(request.ruleId);
if (!rule) {
throw new NotFoundError(`Recurring job rule ${request.ruleId} not found`);
}
// extract projectid from rule
const { projectId } = rule;
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
// return the rule
return rule;
}
}

View file

@ -0,0 +1,57 @@
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IRecurringJobRulesRepository, ListedRecurringRuleItem } from '../../repositories/recurring-job-rules.repository.interface';
import { PaginatedList } from '@/src/entities/common/paginated-list';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
cursor: z.string().optional(),
limit: z.number().optional(),
});
export interface IListRecurringJobRulesUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>>;
}
export class ListRecurringJobRulesUseCase implements IListRecurringJobRulesUseCase {
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
recurringJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
recurringJobRulesRepository: IRecurringJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>> {
// extract projectid from request
const { projectId, limit } = request;
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
// fetch recurring job rules for project
return await this.recurringJobRulesRepository.list(projectId, request.cursor, limit);
}
}

View file

@ -0,0 +1,63 @@
import { NotFoundError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface';
import { RecurringJobRule } from '@/src/entities/models/recurring-job-rule';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
disabled: z.boolean(),
});
export interface IToggleRecurringJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class ToggleRecurringJobRuleUseCase implements IToggleRecurringJobRuleUseCase {
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
recurringJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
recurringJobRulesRepository: IRecurringJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// fetch rule first to get projectId
const rule = await this.recurringJobRulesRepository.fetch(request.ruleId);
if (!rule) {
throw new NotFoundError(`Recurring job rule ${request.ruleId} not found`);
}
// extract projectid from rule
const { projectId } = rule;
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
// update the rule
return await this.recurringJobRulesRepository.toggle(request.ruleId, request.disabled);
}
}

View file

@ -0,0 +1,64 @@
import { BadRequestError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IScheduledJobRulesRepository } from '../../repositories/scheduled-job-rules.repository.interface';
import { ScheduledJobRule } from '@/src/entities/models/scheduled-job-rule';
import { Message } from '@/app/lib/types/types';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
input: z.object({
messages: z.array(Message),
}),
scheduledTime: z.string().datetime(),
});
export interface ICreateScheduledJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
}
export class CreateScheduledJobRuleUseCase implements ICreateScheduledJobRuleUseCase {
private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
scheduledJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
scheduledJobRulesRepository: IScheduledJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.scheduledJobRulesRepository = scheduledJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId: request.projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
// create the scheduled job rule with UTC time
const rule = await this.scheduledJobRulesRepository.create({
projectId: request.projectId,
input: request.input,
scheduledTime: request.scheduledTime,
});
return rule;
}
}

View file

@ -0,0 +1,59 @@
import { BadRequestError, NotFoundError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IScheduledJobRulesRepository } from '../../repositories/scheduled-job-rules.repository.interface';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
ruleId: z.string(),
});
export interface IDeleteScheduledJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<boolean>;
}
export class DeleteScheduledJobRuleUseCase implements IDeleteScheduledJobRuleUseCase {
private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
scheduledJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
scheduledJobRulesRepository: IScheduledJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.scheduledJobRulesRepository = scheduledJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<boolean> {
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId: request.projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(request.projectId);
// ensure rule belongs to this project
const rule = await this.scheduledJobRulesRepository.fetch(request.ruleId);
if (!rule || rule.projectId !== request.projectId) {
throw new NotFoundError('Scheduled job rule not found');
}
// delete the rule
return await this.scheduledJobRulesRepository.delete(request.ruleId);
}
}

View file

@ -0,0 +1,62 @@
import { NotFoundError } from '@/src/entities/errors/common';
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IScheduledJobRulesRepository } from '../../repositories/scheduled-job-rules.repository.interface';
import { ScheduledJobRule } from '@/src/entities/models/scheduled-job-rule';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
});
export interface IFetchScheduledJobRuleUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
}
export class FetchScheduledJobRuleUseCase implements IFetchScheduledJobRuleUseCase {
private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
scheduledJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
scheduledJobRulesRepository: IScheduledJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.scheduledJobRulesRepository = scheduledJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
// fetch scheduled job rule first to get projectId
const rule = await this.scheduledJobRulesRepository.fetch(request.ruleId);
if (!rule) {
throw new NotFoundError(`Scheduled job rule ${request.ruleId} not found`);
}
// extract projectid from rule
const { projectId } = rule;
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
// return the scheduled job rule
return rule;
}
}

View file

@ -0,0 +1,57 @@
import { z } from "zod";
import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface';
import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy';
import { IScheduledJobRulesRepository, ListedRuleItem } from '../../repositories/scheduled-job-rules.repository.interface';
import { PaginatedList } from '@/src/entities/common/paginated-list';
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
cursor: z.string().optional(),
limit: z.number().optional(),
});
export interface IListScheduledJobRulesUseCase {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>>;
}
export class ListScheduledJobRulesUseCase implements IListScheduledJobRulesUseCase {
private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository;
private readonly usageQuotaPolicy: IUsageQuotaPolicy;
private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy;
constructor({
scheduledJobRulesRepository,
usageQuotaPolicy,
projectActionAuthorizationPolicy,
}: {
scheduledJobRulesRepository: IScheduledJobRulesRepository,
usageQuotaPolicy: IUsageQuotaPolicy,
projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy,
}) {
this.scheduledJobRulesRepository = scheduledJobRulesRepository;
this.usageQuotaPolicy = usageQuotaPolicy;
this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>> {
// extract projectid from request
const { projectId, limit } = request;
// authz check
await this.projectActionAuthorizationPolicy.authorize({
caller: request.caller,
userId: request.userId,
apiKey: request.apiKey,
projectId,
});
// assert and consume quota
await this.usageQuotaPolicy.assertAndConsume(projectId);
// fetch scheduled job rules for project
return await this.scheduledJobRulesRepository.list(projectId, request.cursor, limit);
}
}

View file

@ -0,0 +1,210 @@
import { IScheduledJobRulesRepository } from "@/src/application/repositories/scheduled-job-rules.repository.interface";
import { IRecurringJobRulesRepository } from "@/src/application/repositories/recurring-job-rules.repository.interface";
import { IJobsRepository } from "@/src/application/repositories/jobs.repository.interface";
import { IProjectsRepository } from "@/src/application/repositories/projects.repository.interface";
import { IPubSubService } from "@/src/application/services/pub-sub.service.interface";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
import { z } from "zod";
import { nanoid } from "nanoid";
import { PrefixLogger } from "@/app/lib/utils";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
export interface IJobRulesWorker {
run(): Promise<void>;
stop(): Promise<void>;
}
export class JobRulesWorker implements IJobRulesWorker {
private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository;
private readonly recurringJobRulesRepository: IRecurringJobRulesRepository;
private readonly jobsRepository: IJobsRepository;
private readonly projectsRepository: IProjectsRepository;
private readonly pubSubService: IPubSubService;
// Run polls aligned to minute marks at this offset (e.g., 2000 ms => :02 each minute)
private readonly minuteAlignmentOffsetMs: number = 2_000;
private workerId: string;
private logger: PrefixLogger;
private isRunning: boolean = false;
private pollTimeoutId: NodeJS.Timeout | null = null;
constructor({
scheduledJobRulesRepository,
recurringJobRulesRepository,
jobsRepository,
projectsRepository,
pubSubService,
}: {
scheduledJobRulesRepository: IScheduledJobRulesRepository;
recurringJobRulesRepository: IRecurringJobRulesRepository;
jobsRepository: IJobsRepository;
projectsRepository: IProjectsRepository;
pubSubService: IPubSubService;
}) {
this.scheduledJobRulesRepository = scheduledJobRulesRepository;
this.recurringJobRulesRepository = recurringJobRulesRepository;
this.jobsRepository = jobsRepository;
this.projectsRepository = projectsRepository;
this.pubSubService = pubSubService;
this.workerId = nanoid();
this.logger = new PrefixLogger(`scheduled-job-rules-worker-[${this.workerId}]`);
}
private async processScheduledRule(rule: z.infer<typeof ScheduledJobRule>): Promise<void> {
const logger = this.logger.child(`rule-${rule.id}`);
logger.log("Processing scheduled job rule");
try {
// create job
const job = await this.jobsRepository.create({
reason: {
type: "scheduled_job_rule",
ruleId: rule.id,
},
projectId: rule.projectId,
input: {
messages: rule.input.messages,
},
});
// notify job workers
await this.pubSubService.publish("new_jobs", job.id);
logger.log(`Created job ${job.id} from rule ${rule.id}`);
// update data
await this.scheduledJobRulesRepository.update(rule.id, {
output: {
jobId: job.id,
},
status: "triggered",
});
// release
await this.scheduledJobRulesRepository.release(rule.id);
logger.log(`Published job ${job.id} to new_jobs`);
} catch (error) {
logger.log(`Failed to process rule: ${error instanceof Error ? error.message : "Unknown error"}`);
// Always release the rule to avoid deadlocks but do not attach a jobId
try {
await this.scheduledJobRulesRepository.release(rule.id);
} catch (releaseError) {
logger.log(`Failed to release rule: ${releaseError instanceof Error ? releaseError.message : "Unknown error"}`);
}
}
}
private async processRecurringRule(rule: z.infer<typeof RecurringJobRule>): Promise<void> {
const logger = this.logger.child(`rule-${rule.id}`);
logger.log("Processing recurring job rule");
try {
// create job
const job = await this.jobsRepository.create({
reason: {
type: "recurring_job_rule",
ruleId: rule.id,
},
projectId: rule.projectId,
input: {
messages: rule.input.messages,
},
});
// notify job workers
await this.pubSubService.publish("new_jobs", job.id);
logger.log(`Created job ${job.id} from rule ${rule.id}`);
// release
await this.recurringJobRulesRepository.release(rule.id);
logger.log(`Published job ${job.id} to new_jobs`);
} catch (error) {
logger.log(`Failed to process rule: ${error instanceof Error ? error.message : "Unknown error"}`);
// Always release the rule to avoid deadlocks
try {
await this.recurringJobRulesRepository.release(rule.id);
} catch (releaseError) {
logger.log(`Failed to release rule: ${releaseError instanceof Error ? releaseError.message : "Unknown error"}`);
}
}
}
// Calculates delay so the next run happens at next minute + minuteAlignmentOffsetMs
private calculateDelayToNextAlignedMinute(): number {
const now = new Date();
const millisecondsUntilNextMinute = (60 - now.getSeconds()) * 1000 - now.getMilliseconds();
const delayMs = millisecondsUntilNextMinute + this.minuteAlignmentOffsetMs;
return delayMs > 0 ? delayMs : this.minuteAlignmentOffsetMs;
}
private async pollScheduled(): Promise<void> {
const logger = this.logger.child(`poll-scheduled`);
logger.log("Polling...");
let rule: z.infer<typeof ScheduledJobRule> | null = null;
try {
do {
rule = await this.scheduledJobRulesRepository.poll(this.workerId);
if (!rule) {
logger.log("No rules to process");
return;
}
await this.processScheduledRule(rule);
} while (rule);
} catch (error) {
logger.log(`Error while polling rules: ${error instanceof Error ? error.message : "Unknown error"}`);
}
}
private async pollRecurring(): Promise<void> {
const logger = this.logger.child(`poll-recurring`);
logger.log("Polling...");
let rule: z.infer<typeof RecurringJobRule> | null = null;
try {
do {
rule = await this.recurringJobRulesRepository.poll(this.workerId);
if (!rule) {
logger.log("No rules to process");
return;
}
await this.processRecurringRule(rule);
} while (rule);
} catch (error) {
logger.log(`Error while polling rules: ${error instanceof Error ? error.message : "Unknown error"}`);
}
}
private scheduleNextPoll(): void {
const delayMs = this.calculateDelayToNextAlignedMinute();
this.logger.log(`Scheduling next poll in ${delayMs} ms`);
this.pollTimeoutId = setTimeout(async () => {
if (!this.isRunning) return;
await Promise.all([
this.pollScheduled(),
this.pollRecurring(),
]);
this.scheduleNextPoll();
}, delayMs);
}
async run(): Promise<void> {
if (this.isRunning) {
this.logger.log("Worker already running");
return;
}
this.isRunning = true;
this.logger.log(`Starting worker ${this.workerId}`);
// No immediate polling; align to 2s past the next minute
this.scheduleNextPoll();
}
async stop(): Promise<void> {
this.logger.log(`Stopping worker ${this.workerId}`);
this.isRunning = false;
if (this.pollTimeoutId) {
clearTimeout(this.pollTimeoutId);
this.pollTimeoutId = null;
}
}
}

View file

@ -1,4 +1,5 @@
import { IJobsRepository } from "@/src/application/repositories/jobs.repository.interface";
import { IProjectsRepository } from "@/src/application/repositories/projects.repository.interface";
import { ICreateConversationUseCase } from "../use-cases/conversations/create-conversation.use-case";
import { IRunConversationTurnUseCase } from "../use-cases/conversations/run-conversation-turn.use-case";
import { Job } from "@/src/entities/models/job";
@ -15,6 +16,7 @@ export interface IJobsWorker {
export class JobsWorker implements IJobsWorker {
private readonly jobsRepository: IJobsRepository;
private readonly projectsRepository: IProjectsRepository;
private readonly createConversationUseCase: ICreateConversationUseCase;
private readonly runConversationTurnUseCase: IRunConversationTurnUseCase;
private readonly pubSubService: IPubSubService;
@ -27,16 +29,19 @@ export class JobsWorker implements IJobsWorker {
constructor({
jobsRepository,
projectsRepository,
createConversationUseCase,
runConversationTurnUseCase,
pubSubService,
}: {
jobsRepository: IJobsRepository;
projectsRepository: IProjectsRepository;
createConversationUseCase: ICreateConversationUseCase;
runConversationTurnUseCase: IRunConversationTurnUseCase;
pubSubService: IPubSubService;
}) {
this.jobsRepository = jobsRepository;
this.projectsRepository = projectsRepository;
this.createConversationUseCase = createConversationUseCase;
this.runConversationTurnUseCase = runConversationTurnUseCase;
this.pubSubService = pubSubService;
@ -52,6 +57,12 @@ export class JobsWorker implements IJobsWorker {
// extract project id from job
const { projectId } = job;
// fetch project
const project = await this.projectsRepository.fetch(projectId);
if (!project) {
throw new Error("Project not found");
}
// create conversation
logger.log('Creating conversation');
const conversation = await this.createConversationUseCase.execute({
@ -61,7 +72,6 @@ export class JobsWorker implements IJobsWorker {
type: "job",
jobId: job.id,
},
workflow: job.input.workflow,
isLiveWorkflow: true,
});
logger.log(`Created conversation ${conversation.id}`);

View file

@ -10,14 +10,27 @@ const composioTriggerReason = z.object({
payload: z.object({}).passthrough(),
});
const reason = composioTriggerReason;
const scheduledJobRuleReason = z.object({
type: z.literal("scheduled_job_rule"),
ruleId: z.string(),
});
const recurringJobRuleReason = z.object({
type: z.literal("recurring_job_rule"),
ruleId: z.string(),
});
const reason = z.discriminatedUnion("type", [
composioTriggerReason,
scheduledJobRuleReason,
recurringJobRuleReason,
]);
export const Job = z.object({
id: z.string(),
reason,
projectId: z.string(),
input: z.object({
workflow: Workflow,
messages: z.array(Message),
}),
output: z.object({

View file

@ -0,0 +1,19 @@
import { Message } from "@/app/lib/types/types";
import { z } from "zod";
export const RecurringJobRule = z.object({
id: z.string(),
projectId: z.string(),
input: z.object({
messages: z.array(Message),
}),
cron: z.string(), // a cron expression with at most minute-level resolution
nextRunAt: z.string().datetime(), // when is the next time this cron should run
workerId: z.string().nullable(), // set if currently locked by a worker
lastWorkerId: z.string().nullable(),
disabled: z.boolean(), // disabled rule - do not process
lastProcessedAt: z.string().datetime().optional(), // when was it last processed
lastError: z.string().optional(), // error msg if generated during last process
createdAt: z.string(),
updatedAt: z.string().optional(),
});

View file

@ -0,0 +1,21 @@
import { Message } from "@/app/lib/types/types";
import { z } from "zod";
export const ScheduledJobRule = z.object({
id: z.string(),
projectId: z.string(),
input: z.object({
messages: z.array(Message),
}),
nextRunAt: z.string().datetime(),
workerId: z.string().nullable(),
lastWorkerId: z.string().nullable(),
status: z.enum(["pending", "processing", "triggered"]),
output: z.object({
error: z.string().optional(),
jobId: z.string().optional(),
}).optional(),
processedAt: z.string().datetime().optional(),
createdAt: z.string(),
updatedAt: z.string().optional(),
});

View file

@ -1,7 +1,7 @@
import { z } from "zod";
import { ObjectId } from "mongodb";
import { db } from "@/app/lib/mongodb";
import { IJobsRepository, ListedJobItem } from "@/src/application/repositories/jobs.repository.interface";
import { CreateJobSchema, IJobsRepository, ListedJobItem, UpdateJobSchema } from "@/src/application/repositories/jobs.repository.interface";
import { Job } from "@/src/entities/models/job";
import { JobAcquisitionError } from "@/src/entities/errors/job-errors";
import { NotFoundError } from "@/src/entities/errors/common";
@ -15,23 +15,6 @@ const DocSchema = Job.omit({
id: true,
});
/**
* Schema for creating a new job.
*/
const createJobSchema = Job.pick({
reason: true,
projectId: true,
input: true,
});
/**
* Schema for updating an existing job.
*/
const updateJobSchema = Job.pick({
status: true,
output: true,
});
/**
* MongoDB implementation of the JobsRepository.
*
@ -44,7 +27,7 @@ export class MongoDBJobsRepository implements IJobsRepository {
/**
* Creates a new job in the system.
*/
async create(data: z.infer<typeof createJobSchema>): Promise<z.infer<typeof Job>> {
async create(data: z.infer<typeof CreateJobSchema>): Promise<z.infer<typeof Job>> {
const now = new Date().toISOString();
const _id = new ObjectId();
@ -163,7 +146,7 @@ export class MongoDBJobsRepository implements IJobsRepository {
/**
* Updates an existing job with new status and/or output data.
*/
async update(id: string, data: z.infer<typeof updateJobSchema>): Promise<z.infer<typeof Job>> {
async update(id: string, data: z.infer<typeof UpdateJobSchema>): Promise<z.infer<typeof Job>> {
const now = new Date().toISOString();
const result = await this.collection.findOneAndUpdate(

View file

@ -0,0 +1,241 @@
import { z } from "zod";
import { ObjectId } from "mongodb";
import { db } from "@/app/lib/mongodb";
import { CreateRecurringRuleSchema, IRecurringJobRulesRepository, ListedRecurringRuleItem } from "@/src/application/repositories/recurring-job-rules.repository.interface";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
import { NotFoundError } from "@/src/entities/errors/common";
import { PaginatedList } from "@/src/entities/common/paginated-list";
import { CronExpressionParser } from 'cron-parser';
/**
* MongoDB document schema for RecurringJobRule.
* Excludes the 'id' field as it's represented by MongoDB's '_id'.
*/
const DocSchema = RecurringJobRule
.omit({
id: true,
nextRunAt: true,
lastProcessedAt: true,
})
.extend({
_id: z.instanceof(ObjectId),
nextRunAt: z.number(),
lastProcessedAt: z.number().optional(),
});
/**
* Schema for creating documents (without _id field).
*/
const CreateDocSchema = DocSchema.omit({ _id: true });
/**
* MongoDB implementation of the RecurringJobRulesRepository.
*
* This repository manages recurring job rules in MongoDB, providing operations for
* creating, fetching, polling, processing, and listing rules for worker processing.
*/
export class MongoDBRecurringJobRulesRepository implements IRecurringJobRulesRepository {
private readonly collection = db.collection<z.infer<typeof DocSchema>>("recurring_job_rules");
/**
* Converts a MongoDB document to a domain model.
* Handles the conversion of timestamps from Unix timestamps to ISO strings.
*/
private convertDocToModel(doc: z.infer<typeof DocSchema>): z.infer<typeof RecurringJobRule> {
const { _id, nextRunAt, lastProcessedAt, ...rest } = doc;
return {
...rest,
id: _id.toString(),
nextRunAt: new Date(nextRunAt * 1000).toISOString(),
lastProcessedAt: lastProcessedAt ? new Date(lastProcessedAt * 1000).toISOString() : undefined,
};
}
/**
* Creates a new recurring job rule in the system.
*/
async create(data: z.infer<typeof CreateRecurringRuleSchema>): Promise<z.infer<typeof RecurringJobRule>> {
const now = new Date().toISOString();
const _id = new ObjectId();
const doc: z.infer<typeof CreateDocSchema> = {
...data,
nextRunAt: 0,
disabled: false,
workerId: null,
lastWorkerId: null,
createdAt: now,
};
await this.collection.insertOne({
...doc,
_id,
});
// update next run and return
return await this.updateNextRunAt(_id.toString(), data.cron);
}
/**
* Fetches a recurring job rule by its unique identifier.
*/
async fetch(id: string): Promise<z.infer<typeof RecurringJobRule> | null> {
const result = await this.collection.findOne({ _id: new ObjectId(id) });
if (!result) {
return null;
}
return this.convertDocToModel(result);
}
/**
* Polls for the next available recurring job rule that can be processed by a worker.
* Returns a single rule that is ready to run, atomically locked for the worker.
*/
async poll(workerId: string): Promise<z.infer<typeof RecurringJobRule> | null> {
const now = new Date();
const notBefore = new Date(now.getTime() - 1000 * 60 * 3); // not older than 3 minutes
// Use findOneAndUpdate to atomically find and lock the next available rule
const result = await this.collection.findOneAndUpdate(
{
nextRunAt: {
$lte: Math.floor(now.getTime() / 1000),
$gte: Math.floor(notBefore.getTime() / 1000),
},
$or: [
{
lastProcessedAt: {
$lt: Math.floor(now.getTime() / 1000),
},
},
{ lastProcessedAt: { $exists: false } },
],
disabled: false,
workerId: null,
},
{
$set: {
workerId,
lastWorkerId: workerId,
lastProcessedAt: Math.floor(now.getTime() / 1000),
lastError: undefined,
updatedAt: now.toISOString(),
},
},
{
sort: { nextRunAt: 1 }, // Process earliest rules first
returnDocument: "after",
}
);
if (!result) {
return null;
}
return this.convertDocToModel(result);
}
/**
* Releases a recurring job rule after it has been executed
*/
async release(id: string): Promise<z.infer<typeof RecurringJobRule>> {
const now = new Date();
const result = await this.collection.findOneAndUpdate(
{
_id: new ObjectId(id),
},
{
$set: {
workerId: null, // Release the lock
updatedAt: now.toISOString(),
},
},
{
returnDocument: "after",
}
);
if (!result) {
throw new NotFoundError(`Recurring job rule ${id} not found`);
}
// update next run at
return await this.updateNextRunAt(id, result.cron);
}
/**
* Lists recurring job rules for a specific project with pagination.
*/
async list(projectId: string, cursor?: string, limit: number = 50): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>> {
const query: any = { projectId };
if (cursor) {
query._id = { $lt: new ObjectId(cursor) };
}
const results = await this.collection
.find(query)
.sort({ _id: -1 })
.limit(limit + 1) // Fetch one extra to determine if there's a next page
.toArray();
const hasNextPage = results.length > limit;
const items = results.slice(0, limit).map(this.convertDocToModel);
return {
items,
nextCursor: hasNextPage ? results[limit - 1]._id.toString() : null,
};
}
/**
* Toggles a recurring job rule's disabled state
*/
async toggle(id: string, disabled: boolean): Promise<z.infer<typeof RecurringJobRule>> {
const result = await this.collection.findOneAndUpdate(
{ _id: new ObjectId(id) },
{ $set: { disabled, updatedAt: new Date().toISOString() } },
);
if (!result) {
throw new NotFoundError(`Recurring job rule ${id} not found`);
}
// update next run and return
return await this.updateNextRunAt(id, result.cron);
}
/**
* Deletes a recurring job rule by its unique identifier.
*/
async delete(id: string): Promise<boolean> {
const result = await this.collection.deleteOne({
_id: new ObjectId(id),
});
return result.deletedCount > 0;
}
async updateNextRunAt(id: string, cron: string): Promise<z.infer<typeof RecurringJobRule>> {
// parse cron to get next run time
const interval = CronExpressionParser.parse(cron, {
tz: "UTC",
});
const nextRunAt = Math.floor(interval.next().toDate().getTime() / 1000);
const result = await this.collection.findOneAndUpdate(
{ _id: new ObjectId(id) },
{ $set: { nextRunAt, updatedAt: new Date().toISOString() } },
{ returnDocument: "after" },
);
if (!result) {
throw new NotFoundError(`Recurring job rule ${id} not found`);
}
return this.convertDocToModel(result);
}
}

View file

@ -0,0 +1,221 @@
import { z } from "zod";
import { ObjectId } from "mongodb";
import { db } from "@/app/lib/mongodb";
import { CreateRuleSchema, IScheduledJobRulesRepository, ListedRuleItem, UpdateJobSchema } from "@/src/application/repositories/scheduled-job-rules.repository.interface";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
import { NotFoundError } from "@/src/entities/errors/common";
import { PaginatedList } from "@/src/entities/common/paginated-list";
/**
* MongoDB document schema for ScheduledJobRule.
* Excludes the 'id' field as it's represented by MongoDB's '_id'.
*/
const DocSchema = ScheduledJobRule
.omit({
id: true,
nextRunAt: true,
processedAt: true,
})
.extend({
_id: z.instanceof(ObjectId),
nextRunAt: z.number(),
});
/**
* Schema for creating documents (without _id field).
*/
const CreateDocSchema = DocSchema.omit({ _id: true });
/**
* MongoDB implementation of the ScheduledJobRulesRepository.
*
* This repository manages scheduled job rules in MongoDB, providing operations for
* creating, fetching, polling, processing, and listing rules for worker processing.
*/
export class MongoDBScheduledJobRulesRepository implements IScheduledJobRulesRepository {
private readonly collection = db.collection<z.infer<typeof DocSchema>>("scheduled_job_rules");
/**
* Converts a MongoDB document to a domain model.
* Handles the conversion of nextRunAt and processedAt from Unix timestamps to ISO strings.
*/
private convertDocToModel(doc: z.infer<typeof DocSchema>): z.infer<typeof ScheduledJobRule> {
const { _id, nextRunAt, ...rest } = doc;
return {
...rest,
id: _id.toString(),
nextRunAt: new Date(nextRunAt * 1000).toISOString(),
};
}
/**
* Creates a new scheduled job rule in the system.
*/
async create(data: z.infer<typeof CreateRuleSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
const now = new Date().toISOString();
const _id = new ObjectId();
const { scheduledTime, ...rest } = data;
// convert date string to seconds since epoch
// and round down to the last minute
const nextRunAtDate = new Date(scheduledTime);
const nextRunAtSeconds = Math.floor(nextRunAtDate.getTime() / 1000);
const nextRunAtMinutes = Math.floor(nextRunAtSeconds / 60) * 60;
const nextRunAt = nextRunAtMinutes;
const doc: z.infer<typeof CreateDocSchema> = {
...rest,
nextRunAt: nextRunAt,
status: "pending",
workerId: null,
lastWorkerId: null,
createdAt: now,
};
await this.collection.insertOne({
...doc,
_id,
});
return {
...doc,
nextRunAt: new Date(nextRunAt * 1000).toISOString(),
id: _id.toString(),
};
}
/**
* Fetches a scheduled job rule by its unique identifier.
*/
async fetch(id: string): Promise<z.infer<typeof ScheduledJobRule> | null> {
const result = await this.collection.findOne({ _id: new ObjectId(id) });
if (!result) {
return null;
}
return this.convertDocToModel(result);
}
/**
* Polls for the next available scheduled job rule that can be processed by a worker.
* Returns a single rule that is ready to run, atomically locked for the worker.
*/
async poll(workerId: string): Promise<z.infer<typeof ScheduledJobRule> | null> {
const now = new Date();
const notBefore = new Date(now.getTime() - 1000 * 60 * 3); // not older than 3 minutes
// Use findOneAndUpdate to atomically find and lock the next available rule
const result = await this.collection.findOneAndUpdate(
{
nextRunAt: {
$lte: Math.floor(now.getTime() / 1000),
$gte: Math.floor(notBefore.getTime() / 1000),
},
status: "pending",
workerId: null,
},
{
$set: {
workerId,
status: "processing",
lastWorkerId: workerId,
processedAt: now.toISOString(),
updatedAt: now.toISOString(),
},
},
{
sort: { nextRunAt: 1 }, // Process earliest rules first
returnDocument: "after",
}
);
if (!result) {
return null;
}
return this.convertDocToModel(result);
}
/**
* Updates a scheduled job rule with new status and output data.
*/
async update(id: string, data: z.infer<typeof UpdateJobSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
const now = new Date();
const result = await this.collection.findOneAndUpdate(
{ _id: new ObjectId(id) },
{ $set: { ...data, updatedAt: now.toISOString() } },
);
if (!result) {
throw new NotFoundError(`Scheduled job rule ${id} not found`);
}
return this.convertDocToModel(result);
}
/**
* Processes and releases a scheduled job rule after it has been executed.
*/
async release(id: string): Promise<z.infer<typeof ScheduledJobRule>> {
const now = new Date();
const result = await this.collection.findOneAndUpdate(
{
_id: new ObjectId(id),
},
{
$set: {
workerId: null, // Release the lock
updatedAt: now.toISOString(),
},
},
{
returnDocument: "after",
}
);
if (!result) {
throw new NotFoundError(`Scheduled job rule ${id} not found`);
}
return this.convertDocToModel(result);
}
/**
* Lists scheduled job rules for a specific project with pagination.
*/
async list(projectId: string, cursor?: string, limit: number = 50): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>> {
const query: any = { projectId };
if (cursor) {
query._id = { $lt: new ObjectId(cursor) };
}
const results = await this.collection
.find(query)
.sort({ _id: -1 })
.limit(limit + 1) // Fetch one extra to determine if there's a next page
.toArray();
const hasNextPage = results.length > limit;
const items = results.slice(0, limit).map(this.convertDocToModel);
return {
items,
nextCursor: hasNextPage ? results[limit - 1]._id.toString() : null,
};
}
/**
* Deletes a scheduled job rule by its unique identifier.
*/
async delete(id: string): Promise<boolean> {
const result = await this.collection.deleteOne({
_id: new ObjectId(id),
});
return result.deletedCount > 0;
}
}

View file

@ -0,0 +1,50 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { ICreateRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
input: z.object({
messages: z.array(z.any()),
}),
cron: z.string(),
});
export interface ICreateRecurringJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class CreateRecurringJobRuleController implements ICreateRecurringJobRuleController {
private readonly createRecurringJobRuleUseCase: ICreateRecurringJobRuleUseCase;
constructor({
createRecurringJobRuleUseCase,
}: {
createRecurringJobRuleUseCase: ICreateRecurringJobRuleUseCase,
}) {
this.createRecurringJobRuleUseCase = createRecurringJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, input, cron } = result.data;
// execute use case
return await this.createRecurringJobRuleUseCase.execute({
caller,
userId,
apiKey,
projectId,
input,
cron,
});
}
}

View file

@ -0,0 +1,45 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IDeleteRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
ruleId: z.string(),
});
export interface IDeleteRecurringJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<boolean>;
}
export class DeleteRecurringJobRuleController implements IDeleteRecurringJobRuleController {
private readonly deleteRecurringJobRuleUseCase: IDeleteRecurringJobRuleUseCase;
constructor({
deleteRecurringJobRuleUseCase,
}: {
deleteRecurringJobRuleUseCase: IDeleteRecurringJobRuleUseCase,
}) {
this.deleteRecurringJobRuleUseCase = deleteRecurringJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<boolean> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, ruleId } = result.data;
// execute use case
return await this.deleteRecurringJobRuleUseCase.execute({
caller,
userId,
apiKey,
projectId,
ruleId,
});
}
}

View file

@ -0,0 +1,44 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IFetchRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/fetch-recurring-job-rule.use-case";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
});
export interface IFetchRecurringJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class FetchRecurringJobRuleController implements IFetchRecurringJobRuleController {
private readonly fetchRecurringJobRuleUseCase: IFetchRecurringJobRuleUseCase;
constructor({
fetchRecurringJobRuleUseCase,
}: {
fetchRecurringJobRuleUseCase: IFetchRecurringJobRuleUseCase,
}) {
this.fetchRecurringJobRuleUseCase = fetchRecurringJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, ruleId } = result.data;
// execute use case
return await this.fetchRecurringJobRuleUseCase.execute({
caller,
userId,
apiKey,
ruleId,
});
}
}

View file

@ -0,0 +1,49 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IListRecurringJobRulesUseCase } from "@/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case";
import { PaginatedList } from "@/src/entities/common/paginated-list";
import { ListedRecurringRuleItem } from "@/src/application/repositories/recurring-job-rules.repository.interface";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
cursor: z.string().optional(),
limit: z.number().optional(),
});
export interface IListRecurringJobRulesController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>>;
}
export class ListRecurringJobRulesController implements IListRecurringJobRulesController {
private readonly listRecurringJobRulesUseCase: IListRecurringJobRulesUseCase;
constructor({
listRecurringJobRulesUseCase,
}: {
listRecurringJobRulesUseCase: IListRecurringJobRulesUseCase,
}) {
this.listRecurringJobRulesUseCase = listRecurringJobRulesUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRecurringRuleItem>>>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, cursor, limit } = result.data;
// execute use case
return await this.listRecurringJobRulesUseCase.execute({
caller,
userId,
apiKey,
projectId,
cursor,
limit,
});
}
}

View file

@ -0,0 +1,46 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IToggleRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case";
import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
disabled: z.boolean(),
});
export interface IToggleRecurringJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>>;
}
export class ToggleRecurringJobRuleController implements IToggleRecurringJobRuleController {
private readonly toggleRecurringJobRuleUseCase: IToggleRecurringJobRuleUseCase;
constructor({
toggleRecurringJobRuleUseCase,
}: {
toggleRecurringJobRuleUseCase: IToggleRecurringJobRuleUseCase,
}) {
this.toggleRecurringJobRuleUseCase = toggleRecurringJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof RecurringJobRule>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, ruleId, disabled } = result.data;
// execute use case
return await this.toggleRecurringJobRuleUseCase.execute({
caller,
userId,
apiKey,
ruleId,
disabled,
});
}
}

View file

@ -0,0 +1,51 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { ICreateScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/create-scheduled-job-rule.use-case";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
import { Message } from "@/app/lib/types/types";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
input: z.object({
messages: z.array(Message),
}),
scheduledTime: z.string().datetime(),
});
export interface ICreateScheduledJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
}
export class CreateScheduledJobRuleController implements ICreateScheduledJobRuleController {
private readonly createScheduledJobRuleUseCase: ICreateScheduledJobRuleUseCase;
constructor({
createScheduledJobRuleUseCase,
}: {
createScheduledJobRuleUseCase: ICreateScheduledJobRuleUseCase,
}) {
this.createScheduledJobRuleUseCase = createScheduledJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, input, scheduledTime } = result.data;
// execute use case
return await this.createScheduledJobRuleUseCase.execute({
caller,
userId,
apiKey,
projectId,
input,
scheduledTime,
});
}
}

View file

@ -0,0 +1,45 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IDeleteScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
ruleId: z.string(),
});
export interface IDeleteScheduledJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<boolean>;
}
export class DeleteScheduledJobRuleController implements IDeleteScheduledJobRuleController {
private readonly deleteScheduledJobRuleUseCase: IDeleteScheduledJobRuleUseCase;
constructor({
deleteScheduledJobRuleUseCase,
}: {
deleteScheduledJobRuleUseCase: IDeleteScheduledJobRuleUseCase,
}) {
this.deleteScheduledJobRuleUseCase = deleteScheduledJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<boolean> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, ruleId } = result.data;
// execute use case
return await this.deleteScheduledJobRuleUseCase.execute({
caller,
userId,
apiKey,
projectId,
ruleId,
});
}
}

View file

@ -0,0 +1,44 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IFetchScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
ruleId: z.string(),
});
export interface IFetchScheduledJobRuleController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>>;
}
export class FetchScheduledJobRuleController implements IFetchScheduledJobRuleController {
private readonly fetchScheduledJobRuleUseCase: IFetchScheduledJobRuleUseCase;
constructor({
fetchScheduledJobRuleUseCase,
}: {
fetchScheduledJobRuleUseCase: IFetchScheduledJobRuleUseCase,
}) {
this.fetchScheduledJobRuleUseCase = fetchScheduledJobRuleUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<typeof ScheduledJobRule>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, ruleId } = result.data;
// execute use case
return await this.fetchScheduledJobRuleUseCase.execute({
caller,
userId,
apiKey,
ruleId,
});
}
}

View file

@ -0,0 +1,50 @@
import { BadRequestError } from "@/src/entities/errors/common";
import z from "zod";
import { IListScheduledJobRulesUseCase } from "@/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case";
import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule";
import { PaginatedList } from "@/src/entities/common/paginated-list";
import { ListedRuleItem } from "@/src/application/repositories/scheduled-job-rules.repository.interface";
const inputSchema = z.object({
caller: z.enum(["user", "api"]),
userId: z.string().optional(),
apiKey: z.string().optional(),
projectId: z.string(),
cursor: z.string().optional(),
limit: z.number().optional(),
});
export interface IListScheduledJobRulesController {
execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>>;
}
export class ListScheduledJobRulesController implements IListScheduledJobRulesController {
private readonly listScheduledJobRulesUseCase: IListScheduledJobRulesUseCase;
constructor({
listScheduledJobRulesUseCase,
}: {
listScheduledJobRulesUseCase: IListScheduledJobRulesUseCase,
}) {
this.listScheduledJobRulesUseCase = listScheduledJobRulesUseCase;
}
async execute(request: z.infer<typeof inputSchema>): Promise<z.infer<ReturnType<typeof PaginatedList<typeof ListedRuleItem>>>> {
// parse input
const result = inputSchema.safeParse(request);
if (!result.success) {
throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`);
}
const { caller, userId, apiKey, projectId, cursor, limit } = result.data;
// execute use case
return await this.listScheduledJobRulesUseCase.execute({
caller,
userId,
apiKey,
projectId,
cursor,
limit,
});
}
}

View file

@ -241,6 +241,16 @@ services:
- COMPOSIO_API_KEY=${COMPOSIO_API_KEY}
restart: unless-stopped
job-rules-worker:
build:
context: ./apps/rowboat
dockerfile: scripts.Dockerfile
command: ["npm", "run", "job-rules-worker"]
environment:
- MONGODB_CONNECTION_STRING=mongodb://mongo:27017/rowboat
- REDIS_URL=redis://redis:6379
restart: unless-stopped
# chat_widget:
# build:
# context: ./apps/experimental/chat_widget