diff --git a/backend/src/lib/llm/claude.ts b/backend/src/lib/llm/claude.ts index 0ecef37..9f86b16 100644 --- a/backend/src/lib/llm/claude.ts +++ b/backend/src/lib/llm/claude.ts @@ -20,9 +20,19 @@ type NativeMessage = { const MAX_TOKENS = 16384; +function apiKey(override?: string | null): string { + const key = override?.trim() || process.env.ANTHROPIC_API_KEY?.trim() || ""; + if (!key) { + throw new Error( + "Anthropic API key is not configured. Set ANTHROPIC_API_KEY or add a user Anthropic key.", + ); + } + return key; +} + function client(override?: string | null): Anthropic { - const apiKey = override?.trim() || process.env.ANTHROPIC_API_KEY || ""; - return new Anthropic({ apiKey }); + const apiKeyValue = apiKey(override); + return new Anthropic({ apiKey: apiKeyValue }); } function toNativeMessages( diff --git a/backend/src/lib/llm/gemini.ts b/backend/src/lib/llm/gemini.ts index dd7c4d7..e40fc60 100644 --- a/backend/src/lib/llm/gemini.ts +++ b/backend/src/lib/llm/gemini.ts @@ -28,9 +28,6 @@ type GeminiContent = { parts: GeminiPart[]; }; -const RETRYABLE_STATUSES = new Set([429, 500, 502, 503, 504]); -const MAX_GEMINI_ATTEMPTS = 3; - function apiKey(override?: string | null): string { const key = override?.trim() || process.env.GEMINI_API_KEY?.trim() || ""; if (!key) { @@ -45,49 +42,6 @@ function client(override?: string | null): GoogleGenAI { return new GoogleGenAI({ apiKey: apiKey(override) }); } -function geminiStatus(err: unknown): number | null { - const status = (err as { status?: unknown })?.status; - return typeof status === "number" ? status : null; -} - -function isRetryableGeminiError(err: unknown): boolean { - const status = geminiStatus(err); - if (status != null && RETRYABLE_STATUSES.has(status)) return true; - - const message = - err instanceof Error ? err.message : typeof err === "string" ? err : ""; - return /UNAVAILABLE|Service Unavailable|high demand|try again later/i.test( - message, - ); -} - -function retryDelayMs(attempt: number): number { - return 400 * 2 ** attempt; -} - -async function sleep(ms: number): Promise { - await new Promise((resolve) => setTimeout(resolve, ms)); -} - -async function withGeminiRetries(operation: () => Promise): Promise { - let lastError: unknown; - for (let attempt = 0; attempt < MAX_GEMINI_ATTEMPTS; attempt++) { - try { - return await operation(); - } catch (err) { - lastError = err; - const isLastAttempt = attempt === MAX_GEMINI_ATTEMPTS - 1; - if (isLastAttempt || !isRetryableGeminiError(err)) throw err; - console.warn("[gemini] transient error; retrying", { - attempt: attempt + 1, - status: geminiStatus(err), - }); - await sleep(retryDelayMs(attempt)); - } - } - throw lastError; -} - function toNativeContents(messages: StreamChatParams["messages"]): GeminiContent[] { return messages.map((m) => ({ role: m.role === "assistant" ? "model" : "user", @@ -107,25 +61,23 @@ export async function streamGemini( let fullText = ""; for (let iter = 0; iter < maxIter; iter++) { - const stream = await withGeminiRetries(() => - ai.models.generateContentStream({ - model, - contents: contents as never, - config: { - systemInstruction: systemPrompt, - tools: functionDeclarations.length - ? [{ functionDeclarations } as never] - : undefined, - // When enabled, ask Gemini to surface thought summaries. - // When disabled, explicitly zero the thinking budget so the - // model skips thinking entirely (saves tokens and latency - // for bulk extraction jobs). - thinkingConfig: enableThinking - ? { includeThoughts: true } - : { thinkingBudget: 0 }, - }, - }), - ); + const stream = await ai.models.generateContentStream({ + model, + contents: contents as never, + config: { + systemInstruction: systemPrompt, + tools: functionDeclarations.length + ? [{ functionDeclarations } as never] + : undefined, + // When enabled, ask Gemini to surface thought summaries. + // When disabled, explicitly zero the thinking budget so the + // model skips thinking entirely (saves tokens and latency + // for bulk extraction jobs). + thinkingConfig: enableThinking + ? { includeThoughts: true } + : { thinkingBudget: 0 }, + }, + }); // Per-iteration accumulators. const textParts: string[] = []; @@ -207,14 +159,12 @@ export async function completeGeminiText(params: { apiKeys?: { gemini?: string | null }; }): Promise { const ai = client(params.apiKeys?.gemini); - const resp = await withGeminiRetries(() => - ai.models.generateContent({ - model: params.model, - contents: [{ role: "user", parts: [{ text: params.user }] }], - config: params.systemPrompt - ? { systemInstruction: params.systemPrompt } - : undefined, - }), - ); + const resp = await ai.models.generateContent({ + model: params.model, + contents: [{ role: "user", parts: [{ text: params.user }] }], + config: params.systemPrompt + ? { systemInstruction: params.systemPrompt } + : undefined, + }); return resp.text ?? ""; } diff --git a/backend/src/lib/llm/openai.ts b/backend/src/lib/llm/openai.ts index dbb7ef6..de07b5c 100644 --- a/backend/src/lib/llm/openai.ts +++ b/backend/src/lib/llm/openai.ts @@ -36,7 +36,13 @@ type ResponseStreamEvent = { }; function apiKey(override?: string | null): string { - return override?.trim() || process.env.OPENAI_API_KEY?.trim() || ""; + const key = override?.trim() || process.env.OPENAI_API_KEY?.trim() || ""; + if (!key) { + throw new Error( + "OpenAI API key is not configured. Set OPENAI_API_KEY or add a user OpenAI key.", + ); + } + return key; } function toResponseTools(tools: OpenAIToolSchema[]): ResponseFunctionTool[] { @@ -131,9 +137,11 @@ async function createResponse(params: { if (!response.ok) { const text = await response.text().catch(() => ""); - throw new Error( + const err = new Error( `OpenAI request failed (${response.status}): ${text || response.statusText}`, ); + (err as { status?: number }).status = response.status; + throw err; } return response; diff --git a/backend/src/routes/workflows.ts b/backend/src/routes/workflows.ts index 5f365b3..83ae451 100644 --- a/backend/src/routes/workflows.ts +++ b/backend/src/routes/workflows.ts @@ -1,4 +1,4 @@ -import { Router } from "express"; +import { Router, type NextFunction, type Request, type Response } from "express"; import { requireAuth } from "../middleware/auth"; import { createServerSupabase } from "../lib/supabase"; @@ -21,6 +21,14 @@ type WorkflowAccess = } | null; +type AsyncRoute = (req: Request, res: Response) => Promise; + +function asyncRoute(handler: AsyncRoute) { + return (req: Request, res: Response, next: NextFunction) => { + void handler(req, res).catch(next); + }; +} + function withWorkflowAccess>( workflow: T, access: { allowEdit: boolean; isOwner: boolean; sharedByName?: string | null }, @@ -33,6 +41,53 @@ function withWorkflowAccess>( }; } +async function loadSharerNames( + db: Db, + sharerIds: string[], +): Promise> { + const uniqueIds = [...new Set(sharerIds.filter(Boolean))]; + const names = new Map(); + if (uniqueIds.length === 0) return names; + + try { + const { data: profiles, error } = await db + .from("user_profiles") + .select("user_id, display_name") + .in("user_id", uniqueIds); + + if (error) { + console.warn("[workflows] failed to load sharer profiles", error); + } else { + for (const profile of profiles ?? []) { + if (profile.user_id && profile.display_name) { + names.set(profile.user_id, profile.display_name); + } + } + } + } catch (err) { + console.warn("[workflows] sharer profile lookup threw", err); + } + + const missingIds = uniqueIds.filter((id) => !names.has(id)); + const results = await Promise.allSettled( + missingIds.map(async (id) => { + const { data, error } = await db.auth.admin.getUserById(id); + if (error) throw error; + return { id, email: data.user?.email ?? null }; + }), + ); + + for (const result of results) { + if (result.status === "fulfilled" && result.value.email) { + names.set(result.value.id, result.value.email); + } else if (result.status === "rejected") { + console.warn("[workflows] failed to load sharer email", result.reason); + } + } + + return names; +} + async function resolveWorkflowAccess( workflowId: string, userId: string, @@ -65,7 +120,7 @@ async function resolveWorkflowAccess( } // GET /workflows -workflowsRouter.get("/", requireAuth, async (req, res) => { +workflowsRouter.get("/", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const userEmail = res.locals.userEmail as string; const { type } = req.query as { type?: string }; @@ -97,23 +152,13 @@ workflowsRouter.get("/", requireAuth, async (req, res) => { const { data: wfs } = await sharedQuery; if (wfs && wfs.length > 0) { - // Fetch sharer profiles const sharerIds = [...new Set(shares.map((s) => s.shared_by_user_id).filter(Boolean))]; - const { data: profiles } = sharerIds.length > 0 - ? await db.from("user_profiles").select("user_id, display_name").in("user_id", sharerIds) - : { data: [] }; - - // Fetch sharer emails via admin client - const admin = createServerSupabase(); - const { data: authData } = await admin.auth.admin.listUsers({ perPage: 1000 }); - const authUsers = authData?.users ?? []; + const sharerNames = await loadSharerNames(db, sharerIds); sharedWorkflows = wfs.map((wf) => { const share = shares.find((s) => s.workflow_id === wf.id); const sharerId = share?.shared_by_user_id; - const profile = profiles?.find((p) => p.user_id === sharerId); - const authUser = authUsers.find((u) => u.id === sharerId); - const shared_by_name = profile?.display_name || authUser?.email || null; + const shared_by_name = sharerId ? sharerNames.get(sharerId) ?? null : null; return withWorkflowAccess(wf, { allowEdit: !!share?.allow_edit, isOwner: false, @@ -127,10 +172,10 @@ workflowsRouter.get("/", requireAuth, async (req, res) => { withWorkflowAccess(wf, { allowEdit: true, isOwner: true }), ); res.json([...ownWithFlag, ...sharedWorkflows]); -}); +})); // POST /workflows -workflowsRouter.post("/", requireAuth, async (req, res) => { +workflowsRouter.post("/", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { title, type, prompt_md, columns_config, practice } = req.body as { title: string; @@ -162,9 +207,9 @@ workflowsRouter.post("/", requireAuth, async (req, res) => { .single(); if (error) return void res.status(500).json({ detail: error.message }); res.status(201).json(data); -}); +})); -async function handleWorkflowUpdate(req: import("express").Request, res: import("express").Response) { +async function handleWorkflowUpdate(req: Request, res: Response) { const userId = res.locals.userId as string; const userEmail = res.locals.userEmail as string | undefined; const { workflowId } = req.params; @@ -202,13 +247,13 @@ async function handleWorkflowUpdate(req: import("express").Request, res: import( } // PUT /workflows/:workflowId -workflowsRouter.put("/:workflowId", requireAuth, handleWorkflowUpdate); +workflowsRouter.put("/:workflowId", requireAuth, asyncRoute(handleWorkflowUpdate)); // PATCH /workflows/:workflowId -workflowsRouter.patch("/:workflowId", requireAuth, handleWorkflowUpdate); +workflowsRouter.patch("/:workflowId", requireAuth, asyncRoute(handleWorkflowUpdate)); // DELETE /workflows/:workflowId -workflowsRouter.delete("/:workflowId", requireAuth, async (req, res) => { +workflowsRouter.delete("/:workflowId", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflowId } = req.params; const db = createServerSupabase(); @@ -220,10 +265,10 @@ workflowsRouter.delete("/:workflowId", requireAuth, async (req, res) => { .eq("is_system", false); if (error) return void res.status(500).json({ detail: error.message }); res.status(204).send(); -}); +})); // GET /workflows/hidden -workflowsRouter.get("/hidden", requireAuth, async (req, res) => { +workflowsRouter.get("/hidden", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const db = createServerSupabase(); const { data, error } = await db @@ -232,10 +277,10 @@ workflowsRouter.get("/hidden", requireAuth, async (req, res) => { .eq("user_id", userId); if (error) return void res.status(500).json({ detail: error.message }); res.json((data ?? []).map((r) => r.workflow_id)); -}); +})); // POST /workflows/hidden -workflowsRouter.post("/hidden", requireAuth, async (req, res) => { +workflowsRouter.post("/hidden", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflow_id } = req.body as { workflow_id: string }; if (!workflow_id?.trim()) @@ -246,10 +291,10 @@ workflowsRouter.post("/hidden", requireAuth, async (req, res) => { .upsert({ user_id: userId, workflow_id }, { onConflict: "user_id,workflow_id" }); if (error) return void res.status(500).json({ detail: error.message }); res.status(204).send(); -}); +})); // DELETE /workflows/hidden/:workflowId -workflowsRouter.delete("/hidden/:workflowId", requireAuth, async (req, res) => { +workflowsRouter.delete("/hidden/:workflowId", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflowId } = req.params; const db = createServerSupabase(); @@ -260,10 +305,10 @@ workflowsRouter.delete("/hidden/:workflowId", requireAuth, async (req, res) => { .eq("workflow_id", workflowId); if (error) return void res.status(500).json({ detail: error.message }); res.status(204).send(); -}); +})); // GET /workflows/:workflowId -workflowsRouter.get("/:workflowId", requireAuth, async (req, res) => { +workflowsRouter.get("/:workflowId", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const userEmail = res.locals.userEmail as string | undefined; const { workflowId } = req.params; @@ -277,10 +322,10 @@ workflowsRouter.get("/:workflowId", requireAuth, async (req, res) => { isOwner: access.isOwner, }), ); -}); +})); // GET /workflows/:workflowId/shares -workflowsRouter.get("/:workflowId/shares", requireAuth, async (req, res) => { +workflowsRouter.get("/:workflowId/shares", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflowId } = req.params; const db = createServerSupabase(); @@ -302,10 +347,10 @@ workflowsRouter.get("/:workflowId/shares", requireAuth, async (req, res) => { if (error) return void res.status(500).json({ detail: error.message }); res.json(shares ?? []); -}); +})); // DELETE /workflows/:workflowId/shares/:shareId -workflowsRouter.delete("/:workflowId/shares/:shareId", requireAuth, async (req, res) => { +workflowsRouter.delete("/:workflowId/shares/:shareId", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflowId, shareId } = req.params; const db = createServerSupabase(); @@ -320,10 +365,10 @@ workflowsRouter.delete("/:workflowId/shares/:shareId", requireAuth, async (req, await db.from("workflow_shares").delete().eq("id", shareId).eq("workflow_id", workflowId); res.status(204).send(); -}); +})); // POST /workflows/:workflowId/share -workflowsRouter.post("/:workflowId/share", requireAuth, async (req, res) => { +workflowsRouter.post("/:workflowId/share", requireAuth, asyncRoute(async (req, res) => { const userId = res.locals.userId as string; const { workflowId } = req.params; const { emails, allow_edit } = req.body as { emails: string[]; allow_edit: boolean }; @@ -355,4 +400,12 @@ workflowsRouter.post("/:workflowId/share", requireAuth, async (req, res) => { if (error) return void res.status(500).json({ detail: error.message }); res.status(204).send(); -}); +})); + +workflowsRouter.use( + (err: unknown, _req: Request, res: Response, next: NextFunction) => { + if (res.headersSent) return next(err); + console.error("[workflows] unhandled route error", err); + res.status(500).json({ detail: "Failed to process workflow request" }); + }, +);