diff --git a/surfsense_backend/app/agents/new_chat/podcast.py b/surfsense_backend/app/agents/new_chat/podcast.py index 2205227b1..d57d0fb21 100644 --- a/surfsense_backend/app/agents/new_chat/podcast.py +++ b/surfsense_backend/app/agents/new_chat/podcast.py @@ -4,13 +4,67 @@ Podcast generation tool for the new chat agent. This module provides a factory function for creating the generate_podcast tool that submits a Celery task for background podcast generation. The frontend polls for completion and auto-updates when the podcast is ready. + +Duplicate request prevention: +- Only one podcast can be generated at a time per search space +- Uses Redis to track active podcast tasks +- Returns a friendly message if a podcast is already being generated """ +import os from typing import Any +import redis from langchain_core.tools import tool from sqlalchemy.ext.asyncio import AsyncSession +# Redis connection for tracking active podcast tasks +# Uses the same Redis instance as Celery +REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") +_redis_client: redis.Redis | None = None + + +def get_redis_client() -> redis.Redis: + """Get or create Redis client for podcast task tracking.""" + global _redis_client + if _redis_client is None: + _redis_client = redis.from_url(REDIS_URL, decode_responses=True) + return _redis_client + + +def get_active_podcast_key(search_space_id: int) -> str: + """Generate Redis key for tracking active podcast task.""" + return f"podcast:active:{search_space_id}" + + +def get_active_podcast_task(search_space_id: int) -> str | None: + """Check if there's an active podcast task for this search space.""" + try: + client = get_redis_client() + return client.get(get_active_podcast_key(search_space_id)) + except Exception: + # If Redis is unavailable, allow the request (fail open) + return None + + +def set_active_podcast_task(search_space_id: int, task_id: str) -> None: + """Mark a podcast task as active for this search space.""" + try: + client = get_redis_client() + # Set with 30-minute expiry as safety net (podcast should complete before this) + client.setex(get_active_podcast_key(search_space_id), 1800, task_id) + except Exception as e: + print(f"[generate_podcast] Warning: Could not set active task in Redis: {e}") + + +def clear_active_podcast_task(search_space_id: int) -> None: + """Clear the active podcast task for this search space.""" + try: + client = get_redis_client() + client.delete(get_active_podcast_key(search_space_id)) + except Exception as e: + print(f"[generate_podcast] Warning: Could not clear active task in Redis: {e}") + def create_generate_podcast_tool( search_space_id: int, @@ -49,6 +103,10 @@ def create_generate_podcast_tool( The tool will start generating a podcast in the background. The podcast will be available once generation completes. + IMPORTANT: Only one podcast can be generated at a time. If a podcast + is already being generated, this tool will return a message asking + the user to wait. + Args: source_content: The text content to convert into a podcast. This can be a summary, research findings, or any text @@ -59,11 +117,23 @@ def create_generate_podcast_tool( Returns: A dictionary containing: - - status: "processing" (task submitted) or "error" - - task_id: The Celery task ID for polling status + - status: "processing" (task submitted), "already_generating", or "error" + - task_id: The Celery task ID for polling status (if processing) - title: The podcast title + - message: Status message for the user """ try: + # Check if a podcast is already being generated for this search space + active_task_id = get_active_podcast_task(search_space_id) + if active_task_id: + print(f"[generate_podcast] Blocked duplicate request. Active task: {active_task_id}") + return { + "status": "already_generating", + "task_id": active_task_id, + "title": podcast_title, + "message": "A podcast is already being generated. Please wait for it to complete before requesting another one.", + } + # Import Celery task here to avoid circular imports from app.tasks.celery_tasks.podcast_tasks import ( generate_content_podcast_task, @@ -78,6 +148,9 @@ def create_generate_podcast_tool( user_prompt=user_prompt, ) + # Mark this task as active + set_active_podcast_task(search_space_id, task.id) + print(f"[generate_podcast] Submitted Celery task: {task.id}") # Return immediately with task_id for polling diff --git a/surfsense_backend/app/agents/new_chat/system_prompt.py b/surfsense_backend/app/agents/new_chat/system_prompt.py index fbd18cb0b..f725be684 100644 --- a/surfsense_backend/app/agents/new_chat/system_prompt.py +++ b/surfsense_backend/app/agents/new_chat/system_prompt.py @@ -135,10 +135,16 @@ You have access to the following tools: - Use this when the user asks to create, generate, or make a podcast. - Trigger phrases: "give me a podcast about", "create a podcast", "generate a podcast", "make a podcast", "turn this into a podcast" - Args: - - source_content: The text content to convert into a podcast (e.g., a summary, research findings, or conversation) + - source_content: The text content to convert into a podcast. This MUST be comprehensive and include: + * If discussing the current conversation: Include a detailed summary of the FULL chat history (all user questions and your responses) + * If based on knowledge base search: Include the key findings and insights from the search results + * You can combine both: conversation context + search results for richer podcasts + * The more detailed the source_content, the better the podcast quality - podcast_title: Optional title for the podcast (default: "SurfSense Podcast") - user_prompt: Optional instructions for podcast style/format (e.g., "Make it casual and fun") - - Returns: A podcast with audio that the user can listen to and download + - Returns: A task_id for tracking. The podcast will be generated in the background. + - IMPORTANT: Only one podcast can be generated at a time. If a podcast is already being generated, the tool will return status "already_generating". + - After calling this tool, inform the user that podcast generation has started and they will see the player when it's ready (takes 3-5 minutes). - User: "Fetch all my notes and what's in them?" @@ -148,10 +154,14 @@ You have access to the following tools: - Call: `search_knowledge_base(query="React migration", connectors_to_search=["SLACK_CONNECTOR"], start_date="YYYY-MM-DD", end_date="YYYY-MM-DD")` - User: "Give me a podcast about AI trends based on what we discussed" - - First search for relevant content, then call: `generate_podcast(source_content="[summarized content from search]", podcast_title="AI Trends Podcast")` + - First search for relevant content, then call: `generate_podcast(source_content="Based on our conversation and search results: [detailed summary of chat + search findings]", podcast_title="AI Trends Podcast")` - User: "Create a podcast summary of this conversation" - - Call: `generate_podcast(source_content="[summary of the conversation so far]", podcast_title="Conversation Summary")` + - Call: `generate_podcast(source_content="Complete conversation summary:\n\nUser asked about [topic 1]:\n[Your detailed response]\n\nUser then asked about [topic 2]:\n[Your detailed response]\n\n[Continue for all exchanges in the conversation]", podcast_title="Conversation Summary")` + +- User: "Make a podcast about quantum computing" + - First search: `search_knowledge_base(query="quantum computing")` + - Then: `generate_podcast(source_content="Key insights about quantum computing from the knowledge base:\n\n[Comprehensive summary of all relevant search results with key facts, concepts, and findings]", podcast_title="Quantum Computing Explained")` {citation_section} """ diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 994f67be7..1abfba193 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -98,6 +98,22 @@ async def _generate_chat_podcast( # ============================================================================= +def _clear_active_podcast_redis_key(search_space_id: int) -> None: + """Clear the active podcast task key from Redis when task completes.""" + import os + + import redis + + try: + redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") + client = redis.from_url(redis_url, decode_responses=True) + key = f"podcast:active:{search_space_id}" + client.delete(key) + logger.info(f"Cleared active podcast key for search_space_id={search_space_id}") + except Exception as e: + logger.warning(f"Could not clear active podcast key: {e}") + + @celery_app.task(name="generate_content_podcast", bind=True) def generate_content_podcast_task( self, @@ -142,6 +158,8 @@ def generate_content_podcast_task( logger.error(f"Error generating content podcast: {e!s}") return {"status": "error", "error": str(e)} finally: + # Always clear the active podcast key when task completes (success or failure) + _clear_active_podcast_redis_key(search_space_id) asyncio.set_event_loop(None) loop.close() diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx index fc40d85b9..3b4c75a35 100644 --- a/surfsense_web/components/tool-ui/generate-podcast.tsx +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -6,6 +6,10 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { Audio } from "@/components/tool-ui/audio"; import { baseApiService } from "@/lib/apis/base-api.service"; import { podcastsApiService } from "@/lib/apis/podcasts-api.service"; +import { + clearActivePodcastTaskId, + setActivePodcastTaskId, +} from "@/lib/chat/podcast-state"; /** * Type definitions for the generate_podcast tool @@ -17,7 +21,7 @@ interface GeneratePodcastArgs { } interface GeneratePodcastResult { - status: "processing" | "success" | "error"; + status: "processing" | "already_generating" | "success" | "error"; task_id?: string; podcast_id?: number; title?: string; @@ -218,6 +222,17 @@ function PodcastTaskPoller({ const [pollCount, setPollCount] = useState(0); const pollingRef = useRef(null); + // Set active podcast state when this component mounts + useEffect(() => { + setActivePodcastTaskId(taskId); + + // Clear when component unmounts + return () => { + // Only clear if this task is still the active one + clearActivePodcastTaskId(); + }; + }, [taskId]); + // Poll for task status useEffect(() => { const pollStatus = async () => { @@ -233,6 +248,8 @@ function PodcastTaskPoller({ clearInterval(pollingRef.current); pollingRef.current = null; } + // Clear the active podcast state when task completes + clearActivePodcastTaskId(); } } catch (err) { console.error("Error polling task status:", err); @@ -336,6 +353,28 @@ export const GeneratePodcastToolUI = makeAssistantToolUI< return ; } + // Already generating - show simple warning, don't create another poller + // The FIRST tool call will display the podcast when ready + if (result.status === "already_generating") { + return ( +
+
+
+ +
+
+

+ Podcast already in progress +

+

+ Please wait for the current podcast to complete. +

+
+
+
+ ); + } + // Processing - poll for completion if (result.status === "processing" && result.task_id) { return ; diff --git a/surfsense_web/lib/chat/new-chat-transport.ts b/surfsense_web/lib/chat/new-chat-transport.ts index 74897bd8e..4f607c984 100644 --- a/surfsense_web/lib/chat/new-chat-transport.ts +++ b/surfsense_web/lib/chat/new-chat-transport.ts @@ -4,7 +4,13 @@ */ import type { ChatModelAdapter, ChatModelRunOptions } from "@assistant-ui/react"; +import { toast } from "sonner"; import { getBearerToken } from "@/lib/auth-utils"; +import { + isPodcastGenerating, + looksLikePodcastRequest, + setActivePodcastTaskId, +} from "@/lib/chat/podcast-state"; interface NewChatAdapterConfig { searchSpaceId: number; @@ -59,6 +65,21 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda throw new Error("User query cannot be empty"); } + // Check if user is requesting a podcast while one is already generating + if (isPodcastGenerating() && looksLikePodcastRequest(userQuery)) { + toast.warning("A podcast is already being generated. Please wait for it to complete."); + // Return a message telling the user to wait + yield { + content: [ + { + type: "text", + text: "A podcast is already being generated. Please wait for it to complete before requesting another one.", + }, + ], + }; + return; + } + const token = getBearerToken(); if (!token) { throw new Error("Not authenticated. Please log in again."); @@ -204,6 +225,20 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda const existing = toolCalls.get(toolCallId); if (existing) { existing.result = output; + + // If this is a podcast tool with status="processing", set the state immediately + // This ensures subsequent podcast requests are intercepted + if ( + existing.toolName === "generate_podcast" && + output && + typeof output === "object" && + "status" in output && + output.status === "processing" && + "task_id" in output && + typeof output.task_id === "string" + ) { + setActivePodcastTaskId(output.task_id); + } } yield { content: buildContent() }; break; @@ -245,6 +280,19 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda const existing = toolCalls.get(toolCallId); if (existing) { existing.result = output; + + // Set podcast state if processing + if ( + existing.toolName === "generate_podcast" && + output && + typeof output === "object" && + "status" in output && + output.status === "processing" && + "task_id" in output && + typeof output.task_id === "string" + ) { + setActivePodcastTaskId(output.task_id); + } } yield { content: buildContent() }; } diff --git a/surfsense_web/lib/chat/podcast-state.ts b/surfsense_web/lib/chat/podcast-state.ts new file mode 100644 index 000000000..782a31bf6 --- /dev/null +++ b/surfsense_web/lib/chat/podcast-state.ts @@ -0,0 +1,74 @@ +/** + * Module-level state for tracking active podcast generation. + * Used by the new-chat adapter to prevent duplicate podcast requests. + */ + +type PodcastStateListener = (isGenerating: boolean) => void; + +let _activePodcastTaskId: string | null = null; +const _listeners: Set = new Set(); + +/** + * Check if a podcast is currently being generated + */ +export function isPodcastGenerating(): boolean { + return _activePodcastTaskId !== null; +} + +/** + * Get the active podcast task ID + */ +export function getActivePodcastTaskId(): string | null { + return _activePodcastTaskId; +} + +/** + * Set the active podcast task ID (when podcast generation starts) + */ +export function setActivePodcastTaskId(taskId: string): void { + _activePodcastTaskId = taskId; + notifyListeners(); +} + +/** + * Clear the active podcast task ID (when podcast generation completes or errors) + */ +export function clearActivePodcastTaskId(): void { + _activePodcastTaskId = null; + notifyListeners(); +} + +/** + * Subscribe to podcast state changes + */ +export function subscribeToPodcastState(listener: PodcastStateListener): () => void { + _listeners.add(listener); + return () => { + _listeners.delete(listener); + }; +} + +function notifyListeners(): void { + const isGenerating = _activePodcastTaskId !== null; + for (const listener of _listeners) { + listener(isGenerating); + } +} + +/** + * Check if a message looks like a podcast request + */ +export function looksLikePodcastRequest(message: string): boolean { + const podcastPatterns = [ + /\bpodcast\b/i, + /\bcreate.*podcast\b/i, + /\bgenerate.*podcast\b/i, + /\bmake.*podcast\b/i, + /\bturn.*into.*podcast\b/i, + /\bpodcast.*about\b/i, + /\bgive.*podcast\b/i, + ]; + + return podcastPatterns.some((pattern) => pattern.test(message)); +} +