diff --git a/apps/rowboat/app/actions/recurring-job-rules.actions.ts b/apps/rowboat/app/actions/recurring-job-rules.actions.ts index d64264d8..b807aa3b 100644 --- a/apps/rowboat/app/actions/recurring-job-rules.actions.ts +++ b/apps/rowboat/app/actions/recurring-job-rules.actions.ts @@ -6,6 +6,7 @@ import { IListRecurringJobRulesController } from "@/src/interface-adapters/contr import { IFetchRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/fetch-recurring-job-rule.controller"; import { IToggleRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/toggle-recurring-job-rule.controller"; import { IDeleteRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/delete-recurring-job-rule.controller"; +import { IUpdateRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/update-recurring-job-rule.controller"; import { authCheck } from "./auth.actions"; import { z } from "zod"; import { Message } from "@/app/lib/types/types"; @@ -15,6 +16,7 @@ const listRecurringJobRulesController = container.resolve('fetchRecurringJobRuleController'); const toggleRecurringJobRuleController = container.resolve('toggleRecurringJobRuleController'); const deleteRecurringJobRuleController = container.resolve('deleteRecurringJobRuleController'); +const updateRecurringJobRuleController = container.resolve('updateRecurringJobRuleController'); export async function createRecurringJobRule(request: { projectId: string, @@ -89,3 +91,23 @@ export async function deleteRecurringJobRule(request: { ruleId: request.ruleId, }); } + +export async function updateRecurringJobRule(request: { + projectId: string, + ruleId: string, + input: { + messages: z.infer[], + }, + cron: string, +}) { + const user = await authCheck(); + + return await updateRecurringJobRuleController.execute({ + caller: 'user', + userId: user.id, + projectId: request.projectId, + ruleId: request.ruleId, + input: request.input, + cron: request.cron, + }); +} diff --git a/apps/rowboat/app/actions/scheduled-job-rules.actions.ts b/apps/rowboat/app/actions/scheduled-job-rules.actions.ts index 640c4e08..144475a0 100644 --- a/apps/rowboat/app/actions/scheduled-job-rules.actions.ts +++ b/apps/rowboat/app/actions/scheduled-job-rules.actions.ts @@ -5,6 +5,7 @@ import { ICreateScheduledJobRuleController } from "@/src/interface-adapters/cont import { IListScheduledJobRulesController } from "@/src/interface-adapters/controllers/scheduled-job-rules/list-scheduled-job-rules.controller"; import { IFetchScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/fetch-scheduled-job-rule.controller"; import { IDeleteScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/delete-scheduled-job-rule.controller"; +import { IUpdateScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/update-scheduled-job-rule.controller"; import { authCheck } from "./auth.actions"; import { z } from "zod"; import { Message } from "@/app/lib/types/types"; @@ -13,6 +14,7 @@ const createScheduledJobRuleController = container.resolve('listScheduledJobRulesController'); const fetchScheduledJobRuleController = container.resolve('fetchScheduledJobRuleController'); const deleteScheduledJobRuleController = container.resolve('deleteScheduledJobRuleController'); +const updateScheduledJobRuleController = container.resolve('updateScheduledJobRuleController'); export async function createScheduledJobRule(request: { projectId: string, @@ -72,4 +74,24 @@ export async function deleteScheduledJobRule(request: { projectId: request.projectId, ruleId: request.ruleId, }); -} \ No newline at end of file +} + +export async function updateScheduledJobRule(request: { + projectId: string, + ruleId: string, + input: { + messages: z.infer[], + }, + scheduledTime: string, +}) { + const user = await authCheck(); + + return await updateScheduledJobRuleController.execute({ + caller: 'user', + userId: user.id, + projectId: request.projectId, + ruleId: request.ruleId, + input: request.input, + scheduledTime: request.scheduledTime, + }); +} diff --git a/apps/rowboat/app/projects/[projectId]/copilot/components/messages.tsx b/apps/rowboat/app/projects/[projectId]/copilot/components/messages.tsx index 7a5822c3..2ba399db 100644 --- a/apps/rowboat/app/projects/[projectId]/copilot/components/messages.tsx +++ b/apps/rowboat/app/projects/[projectId]/copilot/components/messages.tsx @@ -222,10 +222,12 @@ function AssistantMessage({ // Remove autoApplyEnabled and useEffect for auto-apply const triggersRef = useRef(triggers); + const pendingTriggerEditsRef = useRef>(new Map()); const triggerUpdateCallbackRef = useRef(onTriggersUpdated); useEffect(() => { triggersRef.current = triggers; + pendingTriggerEditsRef.current.clear(); }, [triggers]); useEffect(() => { @@ -378,14 +380,46 @@ function AssistantMessage({ return false; }, [dispatch, workflow.agents, workflow.tools]); - const handleTriggerAction = useCallback(async (action: any): Promise => { + const handleTriggerAction = useCallback(async (action: any, actionIndex?: number): Promise => { const configType = action.config_type; const actionType = action.action; const triggerList = triggersRef.current ?? []; + const key = `${configType}:${action.name}`; + + const hasUpcomingReplacement = () => parsed.some((part, idx) => + idx > (actionIndex ?? -1) && + part.type === 'action' && + part.action.config_type === configType && + part.action.name === action.name && + part.action.action === 'create_new' + ); try { if (configType === 'one_time_trigger') { if (actionType === 'create_new') { + const pending = pendingTriggerEditsRef.current.get(key); + + if (pending && pending.type === 'one_time') { + const scheduledTime = action.config_changes?.scheduledTime ?? pending.nextRunAt; + const input = action.config_changes?.input ?? pending.input; + + if (!scheduledTime || !input) { + console.error('Missing data for one-time trigger update via replacement', action); + return false; + } + + const { updateScheduledJobRule } = await loadScheduledJobActions(); + await updateScheduledJobRule({ + projectId, + ruleId: pending.id, + scheduledTime, + input, + }); + + pendingTriggerEditsRef.current.delete(key); + return true; + } + const { scheduledTime, input } = action.config_changes || {}; if (!scheduledTime || !input) { console.error('Missing scheduledTime or input for one-time trigger', action); @@ -410,9 +444,20 @@ function AssistantMessage({ return false; } - const { fetchScheduledJobRule, deleteScheduledJobRule, createScheduledJobRule } = await loadScheduledJobActions(); + const { + fetchScheduledJobRule, + deleteScheduledJobRule, + updateScheduledJobRule, + } = await loadScheduledJobActions(); if (actionType === 'delete') { + if (hasUpcomingReplacement()) { + pendingTriggerEditsRef.current.set(key, target); + return true; + } + + pendingTriggerEditsRef.current.delete(key); + await deleteScheduledJobRule({ projectId, ruleId: target.id }); return true; } @@ -432,27 +477,63 @@ function AssistantMessage({ return false; } - const created = await createScheduledJobRule({ + await updateScheduledJobRule({ projectId, + ruleId: target.id, scheduledTime, input, }); - // Remove the previous rule only after successfully creating the updated one - await deleteScheduledJobRule({ projectId, ruleId: target.id }); - - return Boolean(created?.id); + return true; } } if (configType === 'recurring_trigger') { if (actionType === 'create_new') { + const pending = pendingTriggerEditsRef.current.get(key); + + const { + createRecurringJobRule, + updateRecurringJobRule, + toggleRecurringJobRule, + } = await loadRecurringJobActions(); + + if (pending && pending.type === 'recurring') { + const cron = action.config_changes?.cron ?? pending.cron; + const input = action.config_changes?.input ?? pending.input; + + if (!cron || !input) { + console.error('Missing data for recurring trigger update via replacement', action); + return false; + } + + const updatedRule = await updateRecurringJobRule({ + projectId, + ruleId: pending.id, + cron, + input, + }); + + const hasDisabledToggle = Object.prototype.hasOwnProperty.call(action.config_changes ?? {}, 'disabled'); + if (hasDisabledToggle) { + const desiredDisabled = typeof action.config_changes?.disabled === 'boolean' + ? action.config_changes.disabled + : pending.disabled; + if (typeof desiredDisabled === 'boolean' && desiredDisabled !== pending.disabled) { + await toggleRecurringJobRule({ ruleId: pending.id, disabled: desiredDisabled }); + } + } + + pendingTriggerEditsRef.current.delete(key); + return Boolean(updatedRule?.id); + } + const { cron, input } = action.config_changes || {}; if (!cron || !input) { console.error('Missing cron or input for recurring trigger', action); return false; } - const { createRecurringJobRule } = await loadRecurringJobActions(); + await createRecurringJobRule({ projectId, cron, @@ -474,11 +555,18 @@ function AssistantMessage({ const { fetchRecurringJobRule, deleteRecurringJobRule, - createRecurringJobRule, toggleRecurringJobRule, + updateRecurringJobRule, } = await loadRecurringJobActions(); if (actionType === 'delete') { + if (hasUpcomingReplacement()) { + pendingTriggerEditsRef.current.set(key, target); + return true; + } + + pendingTriggerEditsRef.current.delete(key); + await deleteRecurringJobRule({ projectId, ruleId: target.id }); return true; } @@ -513,16 +601,15 @@ function AssistantMessage({ return false; } - const created = await createRecurringJobRule({ + const updatedRule = await updateRecurringJobRule({ projectId, + ruleId: target.id, cron, input, }); - await deleteRecurringJobRule({ projectId, ruleId: target.id }); - - if (desiredDisabled !== created.disabled) { - await toggleRecurringJobRule({ ruleId: created.id, disabled: desiredDisabled }); + if (hasDisabledToggle && desiredDisabled !== updatedRule.disabled) { + await toggleRecurringJobRule({ ruleId: target.id, disabled: desiredDisabled }); } return true; @@ -559,7 +646,7 @@ function AssistantMessage({ console.warn('Unhandled trigger action from Copilot applyAction', action); return false; - }, [projectId]); + }, [projectId, parsed]); const refreshTriggers = useCallback(async () => { const callback = triggerUpdateCallbackRef.current; @@ -589,7 +676,7 @@ function AssistantMessage({ try { const isTrigger = action.config_type === 'one_time_trigger' || action.config_type === 'recurring_trigger' || action.config_type === 'external_trigger'; const success = isTrigger - ? await handleTriggerAction(action) + ? await handleTriggerAction(action, actionIndex) : applyAction(action); if (success) { @@ -625,7 +712,7 @@ function AssistantMessage({ try { const isTrigger = action.config_type === 'one_time_trigger' || action.config_type === 'recurring_trigger' || action.config_type === 'external_trigger'; const success = isTrigger - ? await handleTriggerAction(action) + ? await handleTriggerAction(action, actionIndex) : applyAction(action); if (success) { diff --git a/apps/rowboat/app/projects/[projectId]/manage-triggers/components/create-recurring-job-rule-form.tsx b/apps/rowboat/app/projects/[projectId]/manage-triggers/components/create-recurring-job-rule-form.tsx index 5c0698c2..b059b820 100644 --- a/apps/rowboat/app/projects/[projectId]/manage-triggers/components/create-recurring-job-rule-form.tsx +++ b/apps/rowboat/app/projects/[projectId]/manage-triggers/components/create-recurring-job-rule-form.tsx @@ -1,12 +1,15 @@ 'use client'; -import { useState } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useRouter } from "next/navigation"; import { Button } from "@/components/ui/button"; import { Panel } from "@/components/common/panel-common"; -import { createRecurringJobRule } from "@/app/actions/recurring-job-rules.actions"; +import { createRecurringJobRule, updateRecurringJobRule } from "@/app/actions/recurring-job-rules.actions"; import { ArrowLeftIcon, PlusIcon, TrashIcon, InfoIcon } from "lucide-react"; import Link from "next/link"; +import { z } from "zod"; +import { Message } from "@/app/lib/types/types"; +import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule"; // Define a simpler message type for the form that only includes the fields we need type FormMessage = { @@ -14,6 +17,29 @@ type FormMessage = { content: string; }; +type BackButtonConfig = + | { label: string; onClick: () => void } + | { label: string; href: string }; + +type FormSubmitPayload = { + messages: FormMessage[]; + cron: string; +}; + +type RecurringJobRuleFormBaseProps = { + title: string; + description?: string; + submitLabel: string; + submittingLabel: string; + errorMessage: string; + backButton?: BackButtonConfig; + initialCron?: string; + initialMessages?: FormMessage[]; + onSubmit: (payload: FormSubmitPayload) => Promise; + onSuccess?: (result: unknown) => void; + successHref?: string; +}; + const commonCronExamples = [ { label: "Every minute", value: "* * * * *" }, { label: "Every 5 minutes", value: "*/5 * * * *" }, @@ -25,86 +51,112 @@ const commonCronExamples = [ { label: "Monthly on the 1st at midnight", value: "0 0 1 * *" }, ]; -export function CreateRecurringJobRuleForm({ - projectId, - onBack, - hasExistingTriggers = true -}: { - projectId: string; - onBack?: () => void; - hasExistingTriggers?: boolean; -}) { +const createEmptyMessage = (): FormMessage => ({ role: "user", content: "" }); + +const normaliseMessages = (messages?: FormMessage[]): FormMessage[] => { + if (!messages || messages.length === 0) { + return [createEmptyMessage()]; + } + + return messages.map((message) => ({ ...message })); +}; + +const convertFormMessagesToMessages = (messages: FormMessage[]): z.infer[] => { + return messages.map((msg) => { + if (msg.role === "assistant") { + return { + role: msg.role, + content: msg.content, + agentName: null, + responseType: "internal" as const, + timestamp: undefined, + }; + } + + return { + role: msg.role, + content: msg.content, + timestamp: undefined, + }; + }); +}; + +function RecurringJobRuleFormBase({ + title, + description, + submitLabel, + submittingLabel, + errorMessage, + backButton, + initialCron, + initialMessages, + onSubmit, + onSuccess, + successHref, +}: RecurringJobRuleFormBaseProps) { const router = useRouter(); + const [messages, setMessages] = useState(normaliseMessages(initialMessages)); + const [cronExpression, setCronExpression] = useState(initialCron ?? "* * * * *"); const [loading, setLoading] = useState(false); - const [messages, setMessages] = useState([ - { role: "user", content: "" } - ]); - const [cronExpression, setCronExpression] = useState("* * * * *"); const [showCronHelp, setShowCronHelp] = useState(false); + useEffect(() => { + setMessages(normaliseMessages(initialMessages)); + }, [initialMessages]); + + useEffect(() => { + setCronExpression(initialCron ?? "* * * * *"); + }, [initialCron]); + const addMessage = () => { - setMessages([...messages, { role: "user", content: "" }]); + setMessages((prev) => [...prev, createEmptyMessage()]); }; const removeMessage = (index: number) => { - if (messages.length > 1) { - setMessages(messages.filter((_, i) => i !== index)); - } + setMessages((prev) => { + if (prev.length <= 1) { + return prev; + } + return prev.filter((_, i) => i !== index); + }); }; const updateMessage = (index: number, field: keyof FormMessage, value: string) => { - const newMessages = [...messages]; - newMessages[index] = { ...newMessages[index], [field]: value }; - setMessages(newMessages); + setMessages((prev) => { + const next = [...prev]; + next[index] = { ...next[index], [field]: value }; + return next; + }); }; const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); - - // Validate required fields + if (!cronExpression.trim()) { alert("Please enter a cron expression"); return; } - if (messages.some(msg => !msg.content?.trim())) { + if (messages.some((msg) => !msg.content?.trim())) { alert("Please fill in all message content"); return; } setLoading(true); try { - // Convert FormMessage to the expected Message type - const convertedMessages = messages.map(msg => { - if (msg.role === "assistant") { - return { - role: msg.role, - content: msg.content, - agentName: null, - responseType: "internal" as const, - timestamp: undefined - }; - } - return { - role: msg.role, - content: msg.content, - timestamp: undefined - }; - }); - - await createRecurringJobRule({ - projectId, - input: { messages: convertedMessages }, + const result = await onSubmit({ cron: cronExpression, + messages, }); - if (onBack) { - onBack(); - } else { - router.push(`/projects/${projectId}/manage-triggers?tab=recurring`); + + if (onSuccess) { + onSuccess(result); + } else if (successHref) { + router.push(successHref); } } catch (error) { - console.error("Failed to create recurring job rule:", error); - alert("Failed to create recurring job rule"); + console.error(errorMessage, error); + alert(errorMessage); } finally { setLoading(false); } @@ -114,30 +166,39 @@ export function CreateRecurringJobRuleForm({ - {hasExistingTriggers && onBack ? ( - - ) : hasExistingTriggers ? ( - - - + ) : ( + + + + ) ) : null}
- CREATE RECURRING JOB RULE + {title}
-

- Note: Triggers run only on the published version of your workflow. Publish any changes to make them active. -

+ {description ? ( +

+ {description} +

+ ) : null}
} @@ -262,7 +323,7 @@ export function CreateRecurringJobRuleForm({ isLoading={loading} className="px-6 py-2 whitespace-nowrap" > - {loading ? "Creating..." : "Create Rule"} + {loading ? submittingLabel : submitLabel} @@ -271,3 +332,99 @@ export function CreateRecurringJobRuleForm({
); } + +export function CreateRecurringJobRuleForm({ + projectId, + onBack, + hasExistingTriggers = true, +}: { + projectId: string; + onBack?: () => void; + hasExistingTriggers?: boolean; +}) { + const handleSubmit = async ({ cron, messages }: FormSubmitPayload) => { + const convertedMessages = convertFormMessagesToMessages(messages); + await createRecurringJobRule({ + projectId, + input: { messages: convertedMessages }, + cron, + }); + }; + + const handleSuccess = onBack ? () => onBack() : undefined; + const backButton: BackButtonConfig | undefined = hasExistingTriggers + ? onBack + ? { label: "Back", onClick: onBack } + : { label: "Back", href: `/projects/${projectId}/manage-triggers?tab=recurring` } + : undefined; + + return ( + + ); +} + +export function EditRecurringJobRuleForm({ + projectId, + rule, + onCancel, + onUpdated, +}: { + projectId: string; + rule: z.infer; + onCancel: () => void; + onUpdated?: (rule: z.infer) => void; +}) { + const initialMessages = useMemo(() => { + return rule.input.messages + .filter((message): message is Extract, { role: "system" | "user" | "assistant" }> => { + return message.role === "system" || message.role === "user" || message.role === "assistant"; + }) + .map((message) => ({ + role: message.role, + content: message.content ?? "", + })); + }, [rule.input.messages]); + + const handleSubmit = async ({ cron, messages }: FormSubmitPayload) => { + const convertedMessages = convertFormMessagesToMessages(messages); + const updatedRule = await updateRecurringJobRule({ + projectId, + ruleId: rule.id, + input: { messages: convertedMessages }, + cron, + }); + return updatedRule; + }; + + const handleSuccess = (result: unknown) => { + if (result && typeof result === 'object' && onUpdated) { + onUpdated(result as z.infer); + } + onCancel(); + }; + + return ( + + ); +} diff --git a/apps/rowboat/app/projects/[projectId]/manage-triggers/components/recurring-job-rule-view.tsx b/apps/rowboat/app/projects/[projectId]/manage-triggers/components/recurring-job-rule-view.tsx index 3b55cc31..a8413479 100644 --- a/apps/rowboat/app/projects/[projectId]/manage-triggers/components/recurring-job-rule-view.tsx +++ b/apps/rowboat/app/projects/[projectId]/manage-triggers/components/recurring-job-rule-view.tsx @@ -5,12 +5,13 @@ import { useRouter } from "next/navigation"; import { Button } from "@/components/ui/button"; import { Panel } from "@/components/common/panel-common"; import { fetchRecurringJobRule, toggleRecurringJobRule, deleteRecurringJobRule } from "@/app/actions/recurring-job-rules.actions"; -import { ArrowLeftIcon, PlayIcon, PauseIcon, ClockIcon, AlertCircleIcon, Trash2Icon } from "lucide-react"; +import { ArrowLeftIcon, PlayIcon, PauseIcon, ClockIcon, AlertCircleIcon, Trash2Icon, PencilIcon } from "lucide-react"; import Link from "next/link"; import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule"; import { Spinner } from "@heroui/react"; import { z } from "zod"; import { JobsList } from "@/app/projects/[projectId]/jobs/components/jobs-list"; +import { EditRecurringJobRuleForm } from "./create-recurring-job-rule-form"; export function RecurringJobRuleView({ projectId, ruleId }: { projectId: string; ruleId: string }) { const router = useRouter(); @@ -19,6 +20,7 @@ export function RecurringJobRuleView({ projectId, ruleId }: { projectId: string; const [updating, setUpdating] = useState(false); const [deleting, setDeleting] = useState(false); const [showDeleteConfirm, setShowDeleteConfirm] = useState(false); + const [editing, setEditing] = useState(false); const jobsFilters = useMemo(() => ({ recurringJobRuleId: ruleId }), [ruleId]); @@ -145,128 +147,161 @@ export function RecurringJobRuleView({ projectId, ruleId }: { projectId: string; } rightActions={
- - + {editing ? ( + + ) : ( + <> + + + + + )}
} >
- {/* Status */} -
-
-
- - Status: {rule.disabled ? 'Disabled' : 'Active'} - -
- {rule.lastError && ( -
- -
- Last Error: {rule.lastError} -
-
- )} -
- - {/* Schedule Information */} -
-

- Schedule Information -

-
-
- - Cron Expression: - - {rule.cron} - -
-
- Human Readable: {formatCronExpression(rule.cron)} -
-
- Next Run: {formatDate(rule.nextRunAt)} -
- {rule.lastProcessedAt && ( -
- Last Processed: {formatDate(rule.lastProcessedAt)} -
- )} -
-
- - {/* Messages */} -
-

- Messages -

-
- {rule.input.messages.map((message, index) => ( -
-
- - {message.role.charAt(0).toUpperCase() + message.role.slice(1)} - -
-
- {message.content} -
-
- ))} -
-
- - {/* Metadata */} -
-

- Metadata -

-
-
Created: {formatDate(rule.createdAt)}
- {rule.updatedAt && ( -
Last Updated: {formatDate(rule.updatedAt)}
- )} -
Rule ID: {rule.id}
-
-
- - {/* Jobs Created by This Rule */} -
-

- Jobs Created by This Rule -

- setEditing(false)} + onUpdated={(updatedRule) => setRule(updatedRule)} /> -
+ ) : ( + <> + {/* Status */} +
+
+
+ + Status: {rule.disabled ? 'Disabled' : 'Active'} + +
+ {rule.lastError && ( +
+ +
+ Last Error: {rule.lastError} +
+
+ )} +
+ + {/* Schedule Information */} +
+

+ Schedule Information +

+
+
+ + Cron Expression: + + {rule.cron} + +
+
+ Human Readable: {formatCronExpression(rule.cron)} +
+
+ Next Run: {formatDate(rule.nextRunAt)} +
+ {rule.lastProcessedAt && ( +
+ Last Processed: {formatDate(rule.lastProcessedAt)} +
+ )} +
+
+ + {/* Messages */} +
+

+ Messages +

+
+ {rule.input.messages.map((message, index) => ( +
+
+ + {message.role.charAt(0).toUpperCase() + message.role.slice(1)} + +
+
+ {message.content} +
+
+ ))} +
+
+ + {/* Metadata */} +
+

+ Metadata +

+
+
Created: {formatDate(rule.createdAt)}
+ {rule.updatedAt && ( +
Last Updated: {formatDate(rule.updatedAt)}
+ )} +
Rule ID: {rule.id}
+
+
+ + {/* Jobs Created by This Rule */} +
+

+ Jobs Created by This Rule +

+ +
+ + )}
diff --git a/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/create-scheduled-job-rule-form.tsx b/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/create-scheduled-job-rule-form.tsx index 193da0f6..ceb24d5b 100644 --- a/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/create-scheduled-job-rule-form.tsx +++ b/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/create-scheduled-job-rule-form.tsx @@ -1,132 +1,197 @@ 'use client'; -import { useState } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useRouter } from "next/navigation"; import { Button } from "@/components/ui/button"; import { Panel } from "@/components/common/panel-common"; -import { createScheduledJobRule } from "@/app/actions/scheduled-job-rules.actions"; +import { createScheduledJobRule, updateScheduledJobRule } from "@/app/actions/scheduled-job-rules.actions"; import { ArrowLeftIcon, PlusIcon, TrashIcon } from "lucide-react"; import Link from "next/link"; import { DatePicker } from "@heroui/react"; -import { ZonedDateTime, now, getLocalTimeZone } from "@internationalized/date"; +import { ZonedDateTime, now, getLocalTimeZone, parseAbsoluteToLocal } from "@internationalized/date"; +import { z } from "zod"; +import { Message } from "@/app/lib/types/types"; +import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule"; -// Define a simpler message type for the form that only includes the fields we need type FormMessage = { role: "system" | "user" | "assistant"; content: string; }; -export function CreateScheduledJobRuleForm({ projectId, onBack, hasExistingTriggers = true }: { projectId: string; onBack?: () => void; hasExistingTriggers?: boolean }) { - const router = useRouter(); - const [loading, setLoading] = useState(false); - const [messages, setMessages] = useState([ - { role: "user", content: "" } - ]); - // Set default to 30 minutes from now with timezone info - const getDefaultDateTime = () => { - const localTimeZone = getLocalTimeZone(); - const currentTime = now(localTimeZone); - const thirtyMinutesFromNow = currentTime.add({ minutes: 30 }); - return thirtyMinutesFromNow; - }; +type BackButtonConfig = + | { label: string; onClick: () => void } + | { label: string; href: string }; - const [scheduledDateTime, setScheduledDateTime] = useState(getDefaultDateTime()); +type FormSubmitPayload = { + messages: FormMessage[]; + scheduledDateTime: ZonedDateTime; +}; + +type ScheduledJobRuleFormBaseProps = { + title: string; + description?: string; + submitLabel: string; + submittingLabel: string; + errorMessage: string; + backButton?: BackButtonConfig; + initialMessages?: FormMessage[]; + initialDateTime?: ZonedDateTime | null; + placeholderDateTime: ZonedDateTime; + minDateTime: ZonedDateTime; + onSubmit: (payload: FormSubmitPayload) => Promise; + onSuccess?: (result: unknown) => void; + successHref?: string; +}; + +const createEmptyMessage = (): FormMessage => ({ role: "user", content: "" }); + +const normaliseMessages = (messages?: FormMessage[]): FormMessage[] => { + if (!messages || messages.length === 0) { + return [createEmptyMessage()]; + } + + return messages.map((message) => ({ ...message })); +}; + +const convertFormMessagesToMessages = (messages: FormMessage[]): z.infer[] => { + return messages.map((msg) => { + if (msg.role === "assistant") { + return { + role: msg.role, + content: msg.content, + agentName: null, + responseType: "internal" as const, + timestamp: undefined, + }; + } + + return { + role: msg.role, + content: msg.content, + timestamp: undefined, + }; + }); +}; + +function ScheduledJobRuleFormBase({ + title, + description, + submitLabel, + submittingLabel, + errorMessage, + backButton, + initialMessages, + initialDateTime, + placeholderDateTime, + minDateTime, + onSubmit, + onSuccess, + successHref, +}: ScheduledJobRuleFormBaseProps) { + const router = useRouter(); + const [messages, setMessages] = useState(normaliseMessages(initialMessages)); + const [scheduledDateTime, setScheduledDateTime] = useState(initialDateTime ?? placeholderDateTime); + const [loading, setLoading] = useState(false); + + useEffect(() => { + setMessages(normaliseMessages(initialMessages)); + }, [initialMessages]); + + useEffect(() => { + setScheduledDateTime(initialDateTime ?? placeholderDateTime); + }, [initialDateTime, placeholderDateTime]); const addMessage = () => { - setMessages([...messages, { role: "user", content: "" }]); + setMessages((prev) => [...prev, createEmptyMessage()]); }; const removeMessage = (index: number) => { - if (messages.length > 1) { - setMessages(messages.filter((_, i) => i !== index)); - } + setMessages((prev) => { + if (prev.length <= 1) { + return prev; + } + return prev.filter((_, i) => i !== index); + }); }; const updateMessage = (index: number, field: keyof FormMessage, value: string) => { - const newMessages = [...messages]; - newMessages[index] = { ...newMessages[index], [field]: value }; - setMessages(newMessages); + setMessages((prev) => { + const next = [...prev]; + next[index] = { ...next[index], [field]: value }; + return next; + }); }; const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); - - // Validate required fields + if (!scheduledDateTime) { alert("Please select date and time"); return; } - if (messages.some(msg => !msg.content?.trim())) { + if (messages.some((msg) => !msg.content?.trim())) { alert("Please fill in all message content"); return; } setLoading(true); try { - // Convert FormMessage to the expected Message type - const convertedMessages = messages.map(msg => { - if (msg.role === "assistant") { - return { - role: msg.role, - content: msg.content, - agentName: null, - responseType: "internal" as const, - timestamp: undefined - }; - } - return { - role: msg.role, - content: msg.content, - timestamp: undefined - }; + const result = await onSubmit({ + messages, + scheduledDateTime, }); - - // Convert ZonedDateTime to ISO string (already in UTC) - const scheduledTimeString = scheduledDateTime.toDate().toISOString(); - - await createScheduledJobRule({ - projectId, - input: { messages: convertedMessages }, - scheduledTime: scheduledTimeString, - }); - if (onBack) { - onBack(); - } else { - router.push(`/projects/${projectId}/manage-triggers?tab=scheduled`); + + if (onSuccess) { + onSuccess(result); + } else if (successHref) { + router.push(successHref); } } catch (error) { - console.error("Failed to create scheduled job rule:", error); - alert("Failed to create scheduled job rule"); + console.error(errorMessage, error); + alert(errorMessage); } finally { setLoading(false); } }; - - return ( - {hasExistingTriggers && onBack ? ( - - ) : hasExistingTriggers ? ( - - - + ) : ( + + + + ) ) : null}
- CREATE SCHEDULED JOB RULE + {title}
-

- Note: Triggers run only on the published version of your workflow. Publish any changes to make them active. -

+ {description ? ( +

+ {description} +

+ ) : null}
} @@ -142,8 +207,8 @@ export function CreateScheduledJobRuleForm({ projectId, onBack, hasExistingTrigg - {loading ? "Creating..." : "Create Rule"} + {loading ? submittingLabel : submitLabel}
@@ -223,3 +288,111 @@ export function CreateScheduledJobRuleForm({ projectId, onBack, hasExistingTrigg ); } + +export function CreateScheduledJobRuleForm({ projectId, onBack, hasExistingTriggers = true }: { projectId: string; onBack?: () => void; hasExistingTriggers?: boolean }) { + const timeZone = useMemo(() => getLocalTimeZone(), []); + const minDateTime = useMemo(() => now(timeZone), [timeZone]); + const defaultDateTime = useMemo(() => now(timeZone).add({ minutes: 30 }), [timeZone]); + + const handleSubmit = async ({ messages, scheduledDateTime }: FormSubmitPayload) => { + const convertedMessages = convertFormMessagesToMessages(messages); + const scheduledTimeString = scheduledDateTime.toDate().toISOString(); + + await createScheduledJobRule({ + projectId, + input: { messages: convertedMessages }, + scheduledTime: scheduledTimeString, + }); + }; + + const handleSuccess = onBack ? () => onBack() : undefined; + const backButton: BackButtonConfig | undefined = hasExistingTriggers + ? onBack + ? { label: "Back", onClick: onBack } + : { label: "Back", href: `/projects/${projectId}/manage-triggers?tab=scheduled` } + : undefined; + + return ( + + ); +} + +export function EditScheduledJobRuleForm({ + projectId, + rule, + onCancel, + onUpdated, +}: { + projectId: string; + rule: z.infer; + onCancel: () => void; + onUpdated?: (rule: z.infer) => void; +}) { + const timeZone = useMemo(() => getLocalTimeZone(), []); + const initialDateTime = useMemo(() => parseAbsoluteToLocal(rule.nextRunAt), [rule.nextRunAt]); + const nowDateTime = useMemo(() => now(timeZone), [timeZone]); + const minDateTime = useMemo(() => { + return initialDateTime.compare(nowDateTime) < 0 ? initialDateTime : nowDateTime; + }, [initialDateTime, nowDateTime]); + + const initialMessages = useMemo(() => { + return rule.input.messages + .filter((message): message is Extract, { role: "system" | "user" | "assistant" }> => { + return message.role === "system" || message.role === "user" || message.role === "assistant"; + }) + .map((message) => ({ + role: message.role, + content: message.content ?? "", + })); + }, [rule.input.messages]); + + const handleSubmit = async ({ messages, scheduledDateTime }: FormSubmitPayload) => { + const convertedMessages = convertFormMessagesToMessages(messages); + const scheduledTimeString = scheduledDateTime.toDate().toISOString(); + + const updatedRule = await updateScheduledJobRule({ + projectId, + ruleId: rule.id, + input: { messages: convertedMessages }, + scheduledTime: scheduledTimeString, + }); + return updatedRule; + }; + + const handleSuccess = (result: unknown) => { + if (result && typeof result === 'object' && onUpdated) { + onUpdated(result as z.infer); + } + onCancel(); + }; + + return ( + + ); +} diff --git a/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/scheduled-job-rule-view.tsx b/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/scheduled-job-rule-view.tsx index 7b44fb46..2bdc8c2f 100644 --- a/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/scheduled-job-rule-view.tsx +++ b/apps/rowboat/app/projects/[projectId]/manage-triggers/scheduled/components/scheduled-job-rule-view.tsx @@ -9,8 +9,9 @@ import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule"; import { z } from "zod"; import Link from "next/link"; import { Button } from "@/components/ui/button"; -import { ArrowLeftIcon, Trash2Icon } from "lucide-react"; +import { ArrowLeftIcon, Trash2Icon, PencilIcon } from "lucide-react"; import { MessageDisplay } from "@/app/lib/components/message-display"; +import { EditScheduledJobRuleForm } from "./create-scheduled-job-rule-form"; export function ScheduledJobRuleView({ projectId, ruleId }: { projectId: string; ruleId: string; }) { const router = useRouter(); @@ -18,6 +19,7 @@ export function ScheduledJobRuleView({ projectId, ruleId }: { projectId: string; const [loading, setLoading] = useState(true); const [deleting, setDeleting] = useState(false); const [showDeleteConfirm, setShowDeleteConfirm] = useState(false); + const [editing, setEditing] = useState(false); useEffect(() => { let ignore = false; @@ -92,15 +94,37 @@ export function ScheduledJobRuleView({ projectId, ruleId }: { projectId: string; } rightActions={
- + {editing ? ( + + ) : ( + <> + + + + )}
} > @@ -114,74 +138,85 @@ export function ScheduledJobRuleView({ projectId, ruleId }: { projectId: string; )} {!loading && rule && (
- {/* Rule Metadata */} -
-
-
- Rule ID: - {rule.id} -
-
- Status: - - {getStatusText(rule.status, rule.processedAt || null)} - -
-
- Next Run: - - {formatDateTime(rule.nextRunAt)} - -
-
- Created: - - {formatDateTime(rule.createdAt)} - -
- {rule.processedAt && ( -
- Processed: - - {formatDateTime(rule.processedAt)} - + {editing ? ( + setEditing(false)} + onUpdated={(updatedRule) => setRule(updatedRule)} + /> + ) : ( + <> + {/* Rule Metadata */} +
+
+
+ Rule ID: + {rule.id}
- )} - {rule.output?.jobId && ( -
- Job ID: - - - {rule.output.jobId} - - +
+ Status: + + {getStatusText(rule.status, rule.processedAt || null)} + +
+
+ Next Run: + + {formatDateTime(rule.nextRunAt)} + +
+
+ Created: + + {formatDateTime(rule.createdAt)} + +
+ {rule.processedAt && ( +
+ Processed: + + {formatDateTime(rule.processedAt)} + +
+ )} + {rule.output?.jobId && ( +
+ Job ID: + + + {rule.output.jobId} + + +
+ )} + {rule.workerId && ( +
+ Worker ID: + {rule.workerId} +
+ )}
- )} - {rule.workerId && ( -
- Worker ID: - {rule.workerId} -
- )} -
-
+
- {/* Messages */} -
-

- Messages -

-
- {rule.input.messages.map((message, index) => ( -
- + {/* Messages */} +
+

+ Messages +

+
+ {rule.input.messages.map((message, index) => ( +
+ +
+ ))}
- ))} -
-
+
+ + )}
)}
diff --git a/apps/rowboat/di/container.ts b/apps/rowboat/di/container.ts index 456250c8..402a952e 100644 --- a/apps/rowboat/di/container.ts +++ b/apps/rowboat/di/container.ts @@ -73,10 +73,12 @@ import { CreateScheduledJobRuleUseCase } from "@/src/application/use-cases/sched import { FetchScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/fetch-scheduled-job-rule.use-case"; import { ListScheduledJobRulesUseCase } from "@/src/application/use-cases/scheduled-job-rules/list-scheduled-job-rules.use-case"; import { DeleteScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/delete-scheduled-job-rule.use-case"; +import { UpdateScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/update-scheduled-job-rule.use-case"; import { CreateScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/create-scheduled-job-rule.controller"; import { FetchScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/fetch-scheduled-job-rule.controller"; import { ListScheduledJobRulesController } from "@/src/interface-adapters/controllers/scheduled-job-rules/list-scheduled-job-rules.controller"; import { DeleteScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/delete-scheduled-job-rule.controller"; +import { UpdateScheduledJobRuleController } from "@/src/interface-adapters/controllers/scheduled-job-rules/update-scheduled-job-rule.controller"; // Recurring Job Rules import { MongoDBRecurringJobRulesRepository } from "@/src/infrastructure/repositories/mongodb.recurring-job-rules.repository"; @@ -85,11 +87,13 @@ import { FetchRecurringJobRuleUseCase } from "@/src/application/use-cases/recurr import { ListRecurringJobRulesUseCase } from "@/src/application/use-cases/recurring-job-rules/list-recurring-job-rules.use-case"; import { ToggleRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/toggle-recurring-job-rule.use-case"; import { DeleteRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/delete-recurring-job-rule.use-case"; +import { UpdateRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/update-recurring-job-rule.use-case"; import { CreateRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/create-recurring-job-rule.controller"; import { FetchRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/fetch-recurring-job-rule.controller"; import { ListRecurringJobRulesController } from "@/src/interface-adapters/controllers/recurring-job-rules/list-recurring-job-rules.controller"; import { ToggleRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/toggle-recurring-job-rule.controller"; import { DeleteRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/delete-recurring-job-rule.controller"; +import { UpdateRecurringJobRuleController } from "@/src/interface-adapters/controllers/recurring-job-rules/update-recurring-job-rule.controller"; // API Keys import { CreateApiKeyUseCase } from "@/src/application/use-cases/api-keys/create-api-key.use-case"; @@ -238,10 +242,12 @@ container.register({ createScheduledJobRuleUseCase: asClass(CreateScheduledJobRuleUseCase).singleton(), fetchScheduledJobRuleUseCase: asClass(FetchScheduledJobRuleUseCase).singleton(), listScheduledJobRulesUseCase: asClass(ListScheduledJobRulesUseCase).singleton(), + updateScheduledJobRuleUseCase: asClass(UpdateScheduledJobRuleUseCase).singleton(), deleteScheduledJobRuleUseCase: asClass(DeleteScheduledJobRuleUseCase).singleton(), createScheduledJobRuleController: asClass(CreateScheduledJobRuleController).singleton(), fetchScheduledJobRuleController: asClass(FetchScheduledJobRuleController).singleton(), listScheduledJobRulesController: asClass(ListScheduledJobRulesController).singleton(), + updateScheduledJobRuleController: asClass(UpdateScheduledJobRuleController).singleton(), deleteScheduledJobRuleController: asClass(DeleteScheduledJobRuleController).singleton(), // recurring job rules @@ -251,11 +257,13 @@ container.register({ fetchRecurringJobRuleUseCase: asClass(FetchRecurringJobRuleUseCase).singleton(), listRecurringJobRulesUseCase: asClass(ListRecurringJobRulesUseCase).singleton(), toggleRecurringJobRuleUseCase: asClass(ToggleRecurringJobRuleUseCase).singleton(), + updateRecurringJobRuleUseCase: asClass(UpdateRecurringJobRuleUseCase).singleton(), deleteRecurringJobRuleUseCase: asClass(DeleteRecurringJobRuleUseCase).singleton(), createRecurringJobRuleController: asClass(CreateRecurringJobRuleController).singleton(), fetchRecurringJobRuleController: asClass(FetchRecurringJobRuleController).singleton(), listRecurringJobRulesController: asClass(ListRecurringJobRulesController).singleton(), toggleRecurringJobRuleController: asClass(ToggleRecurringJobRuleController).singleton(), + updateRecurringJobRuleController: asClass(UpdateRecurringJobRuleController).singleton(), deleteRecurringJobRuleController: asClass(DeleteRecurringJobRuleController).singleton(), // projects @@ -344,4 +352,4 @@ container.register({ // users // --- usersRepository: asClass(MongoDBUsersRepository).singleton(), -}); \ No newline at end of file +}); diff --git a/apps/rowboat/src/application/lib/utils/is-valid-cron-expression.ts b/apps/rowboat/src/application/lib/utils/is-valid-cron-expression.ts new file mode 100644 index 00000000..affe62be --- /dev/null +++ b/apps/rowboat/src/application/lib/utils/is-valid-cron-expression.ts @@ -0,0 +1,70 @@ +const RANGE_SEPARATOR = "-"; +const STEP_SEPARATOR = "/"; + +export function isValidCronExpression(cron: string): boolean { + const parts = cron.trim().split(/\s+/); + if (parts.length !== 5) { + return false; + } + + const [minute, hour, day, month, dayOfWeek] = parts; + + const validatePart = (part: string, max: number): boolean => { + if (part === "*") { + return true; + } + + if (part.includes(STEP_SEPARATOR)) { + const [range, step] = part.split(STEP_SEPARATOR); + if (!step) { + return false; + } + + const stepValue = Number(step); + if (!Number.isInteger(stepValue) || stepValue <= 0) { + return false; + } + + if (range === "*") { + return stepValue <= max; + } + + return validatePart(range, max); + } + + if (part.includes(RANGE_SEPARATOR)) { + const [start, end] = part.split(RANGE_SEPARATOR); + if (start === undefined || end === undefined) { + return false; + } + + const startValue = Number(start); + const endValue = Number(end); + + if (!Number.isInteger(startValue) || !Number.isInteger(endValue)) { + return false; + } + + if (startValue > endValue) { + return false; + } + + return startValue >= 0 && endValue <= max; + } + + const value = Number(part); + if (!Number.isInteger(value)) { + return false; + } + + return value >= 0 && value <= max; + }; + + return ( + validatePart(minute, 59) && + validatePart(hour, 23) && + validatePart(day, 31) && + validatePart(month, 12) && + validatePart(dayOfWeek, 7) + ); +} diff --git a/apps/rowboat/src/application/repositories/recurring-job-rules.repository.interface.ts b/apps/rowboat/src/application/repositories/recurring-job-rules.repository.interface.ts index 9b491d11..a24dce68 100644 --- a/apps/rowboat/src/application/repositories/recurring-job-rules.repository.interface.ts +++ b/apps/rowboat/src/application/repositories/recurring-job-rules.repository.interface.ts @@ -17,6 +17,15 @@ export const ListedRecurringRuleItem = RecurringJobRule.omit({ input: true, }); +/** + * Schema for updating a recurring job rule. + */ +export const UpdateRecurringRuleSchema = RecurringJobRule + .pick({ + input: true, + cron: true, + }); + /** * Repository interface for managing recurring job rules in the system. * @@ -82,6 +91,16 @@ export interface IRecurringJobRulesRepository { */ toggle(id: string, disabled: boolean): Promise>; + /** + * Updates a recurring job rule with new input and cron expression. + * + * @param id - The unique identifier of the recurring job rule to update + * @param data - The update data containing input messages and cron expression + * @returns Promise resolving to the updated recurring job rule + * @throws {NotFoundError} if the recurring job rule doesn't exist + */ + update(id: string, data: z.infer): Promise>; + /** * Deletes a recurring job rule by its unique identifier. * diff --git a/apps/rowboat/src/application/repositories/scheduled-job-rules.repository.interface.ts b/apps/rowboat/src/application/repositories/scheduled-job-rules.repository.interface.ts index 81391dc3..de3c8692 100644 --- a/apps/rowboat/src/application/repositories/scheduled-job-rules.repository.interface.ts +++ b/apps/rowboat/src/application/repositories/scheduled-job-rules.repository.interface.ts @@ -24,6 +24,17 @@ export const UpdateJobSchema = ScheduledJobRule.pick({ output: true, }); +/** + * Schema for updating a scheduled job rule's next run configuration. + */ +export const UpdateScheduledRuleSchema = ScheduledJobRule + .pick({ + input: true, + }) + .extend({ + scheduledTime: z.string().datetime(), + }); + /** * Repository interface for managing scheduled job rules in the system. * @@ -69,6 +80,16 @@ export interface IScheduledJobRulesRepository { */ update(id: string, data: z.infer): Promise>; + /** + * Updates a scheduled job rule with new input and scheduled time. + * + * @param id - The unique identifier of the scheduled job rule to update + * @param data - The update data containing input messages and scheduled time + * @returns Promise resolving to the updated scheduled job rule + * @throws {NotFoundError} if the scheduled job rule doesn't exist + */ + updateRule(id: string, data: z.infer): Promise>; + /** * Releases a scheduled job rule after it has been executed. * @@ -103,4 +124,4 @@ export interface IScheduledJobRulesRepository { * @returns Promise resolving to void */ deleteByProjectId(projectId: string): Promise; -} \ No newline at end of file +} diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts index 0c2c2c0c..51ad3208 100644 --- a/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/create-recurring-job-rule.use-case.ts @@ -5,6 +5,7 @@ import { IProjectActionAuthorizationPolicy } from '../../policies/project-action import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface'; import { RecurringJobRule } from '@/src/entities/models/recurring-job-rule'; import { Message } from '@/app/lib/types/types'; +import { isValidCronExpression } from '@/src/application/lib/utils/is-valid-cron-expression'; const inputSchema = z.object({ caller: z.enum(["user", "api"]), @@ -42,7 +43,7 @@ export class CreateRecurringJobRuleUseCase implements ICreateRecurringJobRuleUse async execute(request: z.infer): Promise> { // Validate cron expression - if (!this.isValidCronExpression(request.cron)) { + if (!isValidCronExpression(request.cron)) { throw new BadRequestError('Invalid cron expression. Expected format: minute hour day month dayOfWeek'); } @@ -66,31 +67,4 @@ export class CreateRecurringJobRuleUseCase implements ICreateRecurringJobRuleUse return rule; } - - private isValidCronExpression(cron: string): boolean { - const parts = cron.split(' '); - if (parts.length !== 5) { - return false; - } - - // Basic validation - in production you'd want more sophisticated validation - const [minute, hour, day, month, dayOfWeek] = parts; - - // Check if parts are valid - const isValidPart = (part: string) => { - if (part === '*') return true; - if (part.includes('/')) { - const [range, step] = part.split('/'); - if (range === '*' || (parseInt(step) > 0 && parseInt(step) <= 59)) return true; - return false; - } - if (part.includes('-')) { - const [start, end] = part.split('-'); - return !isNaN(parseInt(start)) && !isNaN(parseInt(end)) && parseInt(start) <= parseInt(end); - } - return !isNaN(parseInt(part)); - }; - - return isValidPart(minute) && isValidPart(hour) && isValidPart(day) && isValidPart(month) && isValidPart(dayOfWeek); - } } diff --git a/apps/rowboat/src/application/use-cases/recurring-job-rules/update-recurring-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/recurring-job-rules/update-recurring-job-rule.use-case.ts new file mode 100644 index 00000000..549eff63 --- /dev/null +++ b/apps/rowboat/src/application/use-cases/recurring-job-rules/update-recurring-job-rule.use-case.ts @@ -0,0 +1,69 @@ +import { BadRequestError, NotFoundError } from '@/src/entities/errors/common'; +import { z } from "zod"; +import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; +import { IRecurringJobRulesRepository } from '../../repositories/recurring-job-rules.repository.interface'; +import { RecurringJobRule } from '@/src/entities/models/recurring-job-rule'; +import { Message } from '@/app/lib/types/types'; +import { isValidCronExpression } from '@/src/application/lib/utils/is-valid-cron-expression'; + +const inputSchema = z.object({ + caller: z.enum(["user", "api"]), + userId: z.string().optional(), + apiKey: z.string().optional(), + projectId: z.string(), + ruleId: z.string(), + input: z.object({ + messages: z.array(Message), + }), + cron: z.string(), +}); + +export interface IUpdateRecurringJobRuleUseCase { + execute(request: z.infer): Promise>; +} + +export class UpdateRecurringJobRuleUseCase implements IUpdateRecurringJobRuleUseCase { + private readonly recurringJobRulesRepository: IRecurringJobRulesRepository; + private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; + + constructor({ + recurringJobRulesRepository, + usageQuotaPolicy, + projectActionAuthorizationPolicy, + }: { + recurringJobRulesRepository: IRecurringJobRulesRepository, + usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, + }) { + this.recurringJobRulesRepository = recurringJobRulesRepository; + this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; + } + + async execute(request: z.infer): Promise> { + if (!isValidCronExpression(request.cron)) { + throw new BadRequestError('Invalid cron expression. Expected format: minute hour day month dayOfWeek'); + } + + await this.projectActionAuthorizationPolicy.authorize({ + caller: request.caller, + userId: request.userId, + apiKey: request.apiKey, + projectId: request.projectId, + }); + + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); + + const rule = await this.recurringJobRulesRepository.fetch(request.ruleId); + if (!rule || rule.projectId !== request.projectId) { + throw new NotFoundError('Recurring job rule not found'); + } + + return await this.recurringJobRulesRepository.update(request.ruleId, { + input: request.input, + cron: request.cron, + }); + } +} diff --git a/apps/rowboat/src/application/use-cases/scheduled-job-rules/update-scheduled-job-rule.use-case.ts b/apps/rowboat/src/application/use-cases/scheduled-job-rules/update-scheduled-job-rule.use-case.ts new file mode 100644 index 00000000..6b8a54ad --- /dev/null +++ b/apps/rowboat/src/application/use-cases/scheduled-job-rules/update-scheduled-job-rule.use-case.ts @@ -0,0 +1,64 @@ +import { NotFoundError } from '@/src/entities/errors/common'; +import { z } from "zod"; +import { IUsageQuotaPolicy } from '../../policies/usage-quota.policy.interface'; +import { IProjectActionAuthorizationPolicy } from '../../policies/project-action-authorization.policy'; +import { IScheduledJobRulesRepository } from '../../repositories/scheduled-job-rules.repository.interface'; +import { ScheduledJobRule } from '@/src/entities/models/scheduled-job-rule'; +import { Message } from '@/app/lib/types/types'; + +const inputSchema = z.object({ + caller: z.enum(["user", "api"]), + userId: z.string().optional(), + apiKey: z.string().optional(), + projectId: z.string(), + ruleId: z.string(), + input: z.object({ + messages: z.array(Message), + }), + scheduledTime: z.string().datetime(), +}); + +export interface IUpdateScheduledJobRuleUseCase { + execute(request: z.infer): Promise>; +} + +export class UpdateScheduledJobRuleUseCase implements IUpdateScheduledJobRuleUseCase { + private readonly scheduledJobRulesRepository: IScheduledJobRulesRepository; + private readonly usageQuotaPolicy: IUsageQuotaPolicy; + private readonly projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy; + + constructor({ + scheduledJobRulesRepository, + usageQuotaPolicy, + projectActionAuthorizationPolicy, + }: { + scheduledJobRulesRepository: IScheduledJobRulesRepository, + usageQuotaPolicy: IUsageQuotaPolicy, + projectActionAuthorizationPolicy: IProjectActionAuthorizationPolicy, + }) { + this.scheduledJobRulesRepository = scheduledJobRulesRepository; + this.usageQuotaPolicy = usageQuotaPolicy; + this.projectActionAuthorizationPolicy = projectActionAuthorizationPolicy; + } + + async execute(request: z.infer): Promise> { + await this.projectActionAuthorizationPolicy.authorize({ + caller: request.caller, + userId: request.userId, + apiKey: request.apiKey, + projectId: request.projectId, + }); + + await this.usageQuotaPolicy.assertAndConsumeProjectAction(request.projectId); + + const rule = await this.scheduledJobRulesRepository.fetch(request.ruleId); + if (!rule || rule.projectId !== request.projectId) { + throw new NotFoundError('Scheduled job rule not found'); + } + + return await this.scheduledJobRulesRepository.updateRule(request.ruleId, { + input: request.input, + scheduledTime: request.scheduledTime, + }); + } +} diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.repository.ts index a0a43d7c..fe9937f2 100644 --- a/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.repository.ts +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.recurring-job-rules.repository.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import { Filter, ObjectId } from "mongodb"; import { db } from "@/app/lib/mongodb"; -import { CreateRecurringRuleSchema, IRecurringJobRulesRepository, ListedRecurringRuleItem } from "@/src/application/repositories/recurring-job-rules.repository.interface"; +import { CreateRecurringRuleSchema, IRecurringJobRulesRepository, ListedRecurringRuleItem, UpdateRecurringRuleSchema } from "@/src/application/repositories/recurring-job-rules.repository.interface"; import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule"; import { NotFoundError } from "@/src/entities/errors/common"; import { PaginatedList } from "@/src/entities/common/paginated-list"; @@ -208,6 +208,31 @@ export class MongoDBRecurringJobRulesRepository implements IRecurringJobRulesRep return await this.updateNextRunAt(id, result.cron); } + /** + * Updates a recurring job rule with new input and schedule. + */ + async update(id: string, data: z.infer): Promise> { + const now = new Date().toISOString(); + + const result = await this.collection.findOneAndUpdate( + { _id: new ObjectId(id) }, + { + $set: { + input: data.input, + cron: data.cron, + updatedAt: now, + }, + }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new NotFoundError(`Recurring job rule ${id} not found`); + } + + return await this.updateNextRunAt(id, data.cron); + } + /** * Deletes a recurring job rule by its unique identifier. */ diff --git a/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.repository.ts b/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.repository.ts index 0bbb206a..ec2b9380 100644 --- a/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.repository.ts +++ b/apps/rowboat/src/infrastructure/repositories/mongodb.scheduled-job-rules.repository.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import { Filter, ObjectId } from "mongodb"; import { db } from "@/app/lib/mongodb"; -import { CreateRuleSchema, IScheduledJobRulesRepository, ListedRuleItem, UpdateJobSchema } from "@/src/application/repositories/scheduled-job-rules.repository.interface"; +import { CreateRuleSchema, IScheduledJobRulesRepository, ListedRuleItem, UpdateJobSchema, UpdateScheduledRuleSchema } from "@/src/application/repositories/scheduled-job-rules.repository.interface"; import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule"; import { NotFoundError } from "@/src/entities/errors/common"; import { PaginatedList } from "@/src/entities/common/paginated-list"; @@ -138,6 +138,41 @@ export class MongoDBScheduledJobRulesRepository implements IScheduledJobRulesRep return this.convertDocToModel(result); } + /** + * Reconfigures a scheduled job rule's input and next run time. + */ + async updateRule(id: string, data: z.infer): Promise> { + const scheduledDate = new Date(data.scheduledTime); + const nextRunAtSeconds = Math.floor(scheduledDate.getTime() / 1000); + const nextRunAt = Math.floor(nextRunAtSeconds / 60) * 60; + const now = new Date().toISOString(); + + const result = await this.collection.findOneAndUpdate( + { _id: new ObjectId(id) }, + { + $set: { + input: data.input, + nextRunAt, + status: "pending", + workerId: null, + lastWorkerId: null, + updatedAt: now, + }, + $unset: { + output: "", + processedAt: "", + }, + }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new NotFoundError(`Scheduled job rule ${id} not found`); + } + + return this.convertDocToModel(result); + } + /** * Updates a scheduled job rule with new status and output data. */ diff --git a/apps/rowboat/src/interface-adapters/controllers/recurring-job-rules/update-recurring-job-rule.controller.ts b/apps/rowboat/src/interface-adapters/controllers/recurring-job-rules/update-recurring-job-rule.controller.ts new file mode 100644 index 00000000..dd175872 --- /dev/null +++ b/apps/rowboat/src/interface-adapters/controllers/recurring-job-rules/update-recurring-job-rule.controller.ts @@ -0,0 +1,50 @@ +import { BadRequestError } from "@/src/entities/errors/common"; +import z from "zod"; +import { IUpdateRecurringJobRuleUseCase } from "@/src/application/use-cases/recurring-job-rules/update-recurring-job-rule.use-case"; +import { RecurringJobRule } from "@/src/entities/models/recurring-job-rule"; + +const inputSchema = z.object({ + caller: z.enum(["user", "api"]), + userId: z.string().optional(), + apiKey: z.string().optional(), + projectId: z.string(), + ruleId: z.string(), + input: z.object({ + messages: z.array(z.any()), + }), + cron: z.string(), +}); + +export interface IUpdateRecurringJobRuleController { + execute(request: z.infer): Promise>; +} + +export class UpdateRecurringJobRuleController implements IUpdateRecurringJobRuleController { + private readonly updateRecurringJobRuleUseCase: IUpdateRecurringJobRuleUseCase; + + constructor({ + updateRecurringJobRuleUseCase, + }: { + updateRecurringJobRuleUseCase: IUpdateRecurringJobRuleUseCase, + }) { + this.updateRecurringJobRuleUseCase = updateRecurringJobRuleUseCase; + } + + async execute(request: z.infer): Promise> { + const result = inputSchema.safeParse(request); + if (!result.success) { + throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`); + } + const { caller, userId, apiKey, projectId, ruleId, input, cron } = result.data; + + return await this.updateRecurringJobRuleUseCase.execute({ + caller, + userId, + apiKey, + projectId, + ruleId, + input, + cron, + }); + } +} diff --git a/apps/rowboat/src/interface-adapters/controllers/scheduled-job-rules/update-scheduled-job-rule.controller.ts b/apps/rowboat/src/interface-adapters/controllers/scheduled-job-rules/update-scheduled-job-rule.controller.ts new file mode 100644 index 00000000..1976b35f --- /dev/null +++ b/apps/rowboat/src/interface-adapters/controllers/scheduled-job-rules/update-scheduled-job-rule.controller.ts @@ -0,0 +1,51 @@ +import { BadRequestError } from "@/src/entities/errors/common"; +import z from "zod"; +import { IUpdateScheduledJobRuleUseCase } from "@/src/application/use-cases/scheduled-job-rules/update-scheduled-job-rule.use-case"; +import { ScheduledJobRule } from "@/src/entities/models/scheduled-job-rule"; +import { Message } from "@/app/lib/types/types"; + +const inputSchema = z.object({ + caller: z.enum(["user", "api"]), + userId: z.string().optional(), + apiKey: z.string().optional(), + projectId: z.string(), + ruleId: z.string(), + input: z.object({ + messages: z.array(Message), + }), + scheduledTime: z.string().datetime(), +}); + +export interface IUpdateScheduledJobRuleController { + execute(request: z.infer): Promise>; +} + +export class UpdateScheduledJobRuleController implements IUpdateScheduledJobRuleController { + private readonly updateScheduledJobRuleUseCase: IUpdateScheduledJobRuleUseCase; + + constructor({ + updateScheduledJobRuleUseCase, + }: { + updateScheduledJobRuleUseCase: IUpdateScheduledJobRuleUseCase, + }) { + this.updateScheduledJobRuleUseCase = updateScheduledJobRuleUseCase; + } + + async execute(request: z.infer): Promise> { + const result = inputSchema.safeParse(request); + if (!result.success) { + throw new BadRequestError(`Invalid request: ${JSON.stringify(result.error)}`); + } + const { caller, userId, apiKey, projectId, ruleId, input, scheduledTime } = result.data; + + return await this.updateScheduledJobRuleUseCase.execute({ + caller, + userId, + apiKey, + projectId, + ruleId, + input, + scheduledTime, + }); + } +}