From 4c4e4b3c4c8268aa26efed0093f7f0ed62703561 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 21 Dec 2025 19:07:46 +0530 Subject: [PATCH 1/5] feat: add podcast generation capabilities to SurfSense deep agent and UI integration --- .../app/agents/new_chat/chat_deepagent.py | 20 +- .../app/agents/new_chat/podcast.py | 170 ++++++++++ .../app/agents/new_chat/system_prompt.py | 18 +- .../app/tasks/chat/stream_new_chat.py | 75 ++++- .../new-chat/[[...chat_id]]/page.tsx | 3 + surfsense_web/components/tool-ui/audio.tsx | 310 ++++++++++++++++++ .../components/tool-ui/generate-podcast.tsx | 288 ++++++++++++++++ surfsense_web/components/tool-ui/index.ts | 11 + surfsense_web/lib/chat/new-chat-transport.ts | 112 ++++++- 9 files changed, 985 insertions(+), 22 deletions(-) create mode 100644 surfsense_backend/app/agents/new_chat/podcast.py create mode 100644 surfsense_web/components/tool-ui/audio.tsx create mode 100644 surfsense_web/components/tool-ui/generate-podcast.tsx create mode 100644 surfsense_web/components/tool-ui/index.ts diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index eb2dac737..7ac6ecac2 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -2,7 +2,7 @@ SurfSense deep agent implementation. This module provides the factory function for creating SurfSense deep agents -with knowledge base search capability. +with knowledge base search and podcast generation capabilities. """ from collections.abc import Sequence @@ -14,6 +14,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.agents.new_chat.context import SurfSenseContextSchema from app.agents.new_chat.knowledge_base import create_search_knowledge_base_tool +from app.agents.new_chat.podcast import create_generate_podcast_tool from app.agents.new_chat.system_prompt import build_surfsense_system_prompt from app.services.connector_service import ConnectorService @@ -27,22 +28,27 @@ def create_surfsense_deep_agent( search_space_id: int, db_session: AsyncSession, connector_service: ConnectorService, + user_id: str | None = None, user_instructions: str | None = None, enable_citations: bool = True, + enable_podcast: bool = True, additional_tools: Sequence[BaseTool] | None = None, ): """ - Create a SurfSense deep agent with knowledge base search capability. + Create a SurfSense deep agent with knowledge base search and podcast generation capabilities. Args: llm: ChatLiteLLM instance search_space_id: The user's search space ID db_session: Database session connector_service: Initialized connector service + user_id: The user's ID (required for podcast generation) user_instructions: Optional user instructions to inject into the system prompt. These will be added to the system prompt to customize agent behavior. enable_citations: Whether to include citation instructions in the system prompt (default: True). When False, the agent will not be instructed to add citations to responses. + enable_podcast: Whether to include the podcast generation tool (default: True). + When True and user_id is provided, the agent can generate podcasts. additional_tools: Optional sequence of additional tools to inject into the agent. The search_knowledge_base tool will always be included. @@ -58,6 +64,16 @@ def create_surfsense_deep_agent( # Combine search tool with any additional tools tools = [search_tool] + + # Add podcast tool if enabled and user_id is provided + if enable_podcast and user_id: + podcast_tool = create_generate_podcast_tool( + search_space_id=search_space_id, + db_session=db_session, + user_id=str(user_id), + ) + tools.append(podcast_tool) + if additional_tools: tools.extend(additional_tools) diff --git a/surfsense_backend/app/agents/new_chat/podcast.py b/surfsense_backend/app/agents/new_chat/podcast.py new file mode 100644 index 000000000..ed4116bfb --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/podcast.py @@ -0,0 +1,170 @@ +""" +Podcast generation tool for the new chat agent. + +This module provides a factory function for creating the generate_podcast tool +that integrates with the existing podcaster agent. Podcasts are saved to the +database like the old system, providing authentication and persistence. +""" + +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.podcaster.graph import graph as podcaster_graph +from app.agents.podcaster.state import State as PodcasterState +from app.db import Podcast + + +def create_generate_podcast_tool( + search_space_id: int, + db_session: AsyncSession, + user_id: str, +): + """ + Factory function to create the generate_podcast tool with injected dependencies. + + Args: + search_space_id: The user's search space ID + db_session: Database session + user_id: The user's ID (as string) + + Returns: + A configured tool function for generating podcasts + """ + + @tool + async def generate_podcast( + source_content: str, + podcast_title: str = "SurfSense Podcast", + user_prompt: str | None = None, + ) -> dict[str, Any]: + """ + Generate a podcast from the provided content. + + Use this tool when the user asks to create, generate, or make a podcast. + Common triggers include phrases like: + - "Give me a podcast about this" + - "Create a podcast from this conversation" + - "Generate a podcast summary" + - "Make a podcast about..." + - "Turn this into a podcast" + + The tool will generate a complete audio podcast with two speakers + discussing the provided content in an engaging conversational format. + + Args: + source_content: The text content to convert into a podcast. + This can be a summary, research findings, or any text + the user wants transformed into an audio podcast. + podcast_title: Title for the podcast (default: "SurfSense Podcast") + user_prompt: Optional instructions for podcast style, tone, or format. + For example: "Make it casual and fun" or "Focus on the key insights" + + Returns: + A dictionary containing: + - status: "success" or "error" + - podcast_id: The database ID of the saved podcast (for API access) + - title: The podcast title + - transcript: Full podcast transcript with all dialogue entries + - duration_ms: Estimated podcast duration in milliseconds + - transcript_entries: Number of dialogue entries + """ + try: + # Configure the podcaster graph + config = { + "configurable": { + "podcast_title": podcast_title, + "user_id": str(user_id), + "search_space_id": search_space_id, + "user_prompt": user_prompt, + } + } + + # Initialize the podcaster state with the source content + initial_state = PodcasterState( + source_content=source_content, + db_session=db_session, + ) + + # Run the podcaster graph + result = await podcaster_graph.ainvoke(initial_state, config=config) + + # Extract results + podcast_transcript = result.get("podcast_transcript", []) + file_path = result.get("final_podcast_file_path", "") + + # Calculate estimated duration (rough estimate: ~150 words per minute) + total_words = sum( + len(entry.dialog.split()) if hasattr(entry, "dialog") else len(entry.get("dialog", "").split()) + for entry in podcast_transcript + ) + estimated_duration_ms = int((total_words / 150) * 60 * 1000) + + # Create full transcript for display (all entries, complete dialog) + full_transcript = [] + for entry in podcast_transcript: + if hasattr(entry, "speaker_id"): + speaker = f"Speaker {entry.speaker_id + 1}" + dialog = entry.dialog + else: + speaker = f"Speaker {entry.get('speaker_id', 0) + 1}" + dialog = entry.get("dialog", "") + full_transcript.append(f"{speaker}: {dialog}") + + # Convert podcast transcript entries to serializable format (like old system) + serializable_transcript = [] + for entry in podcast_transcript: + if hasattr(entry, "speaker_id"): + serializable_transcript.append({ + "speaker_id": entry.speaker_id, + "dialog": entry.dialog + }) + else: + serializable_transcript.append({ + "speaker_id": entry.get("speaker_id", 0), + "dialog": entry.get("dialog", "") + }) + + # Save podcast to database (like old system) + # This provides authentication and persistence + podcast = Podcast( + title=podcast_title, + podcast_transcript=serializable_transcript, + file_location=file_path, + search_space_id=search_space_id, + # chat_id is None since new-chat uses LangGraph threads, not DB chats + chat_id=None, + chat_state_version=None, + ) + db_session.add(podcast) + await db_session.commit() + await db_session.refresh(podcast) + + # Return podcast_id - frontend will use it to call the API endpoint + # GET /api/v1/podcasts/{podcast_id}/stream (like the old system) + return { + "status": "success", + "podcast_id": podcast.id, + "title": podcast_title, + "transcript": "\n\n".join(full_transcript), + "duration_ms": estimated_duration_ms, + "transcript_entries": len(podcast_transcript), + } + + except Exception as e: + error_message = str(e) + print(f"[generate_podcast] Error: {error_message}") + # Rollback on error + await db_session.rollback() + return { + "status": "error", + "error": error_message, + "title": podcast_title, + "podcast_id": None, + "duration_ms": 0, + "transcript_entries": 0, + } + + return generate_podcast + diff --git a/surfsense_backend/app/agents/new_chat/system_prompt.py b/surfsense_backend/app/agents/new_chat/system_prompt.py index 65a5b1203..fbd18cb0b 100644 --- a/surfsense_backend/app/agents/new_chat/system_prompt.py +++ b/surfsense_backend/app/agents/new_chat/system_prompt.py @@ -121,7 +121,8 @@ Today's date (UTC): {resolved_today} {user_section} You have access to the following tools: -- search_knowledge_base: Search the user's personal knowledge base for relevant information. + +1. search_knowledge_base: Search the user's personal knowledge base for relevant information. - Args: - query: The search query - be specific and include key terms - top_k: Number of results to retrieve (default: 10) @@ -129,6 +130,15 @@ You have access to the following tools: - end_date: Optional ISO date/datetime (e.g. "2025-12-19" or "2025-12-19T23:59:59+00:00") - connectors_to_search: Optional list of connector enums to search. If omitted, searches all. - Returns: Formatted string with relevant documents and their content + +2. generate_podcast: Generate an audio podcast from provided content. + - Use this when the user asks to create, generate, or make a podcast. + - Trigger phrases: "give me a podcast about", "create a podcast", "generate a podcast", "make a podcast", "turn this into a podcast" + - Args: + - source_content: The text content to convert into a podcast (e.g., a summary, research findings, or conversation) + - podcast_title: Optional title for the podcast (default: "SurfSense Podcast") + - user_prompt: Optional instructions for podcast style/format (e.g., "Make it casual and fun") + - Returns: A podcast with audio that the user can listen to and download - User: "Fetch all my notes and what's in them?" @@ -136,6 +146,12 @@ You have access to the following tools: - User: "What did I discuss on Slack last week about the React migration?" - Call: `search_knowledge_base(query="React migration", connectors_to_search=["SLACK_CONNECTOR"], start_date="YYYY-MM-DD", end_date="YYYY-MM-DD")` + +- User: "Give me a podcast about AI trends based on what we discussed" + - First search for relevant content, then call: `generate_podcast(source_content="[summarized content from search]", podcast_title="AI Trends Podcast")` + +- User: "Create a podcast summary of this conversation" + - Call: `generate_podcast(source_content="[summary of the conversation so far]", podcast_title="Conversation Summary")` {citation_section} """ diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 7f97643dc..40bffa7db 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -5,6 +5,7 @@ This module streams responses from the deep agent using the Vercel AI SDK Data Stream Protocol (SSE format). """ +import json from collections.abc import AsyncGenerator from uuid import UUID @@ -73,12 +74,14 @@ async def stream_new_chat( # Create connector service connector_service = ConnectorService(session, search_space_id=search_space_id) - # Create the deep agent + # Create the deep agent with podcast capability agent = create_surfsense_deep_agent( llm=llm, search_space_id=search_space_id, db_session=session, connector_service=connector_service, + user_id=str(user_id), + enable_podcast=True, ) # Build input with just the current user query @@ -162,22 +165,72 @@ async def stream_new_chat( f"Searching knowledge base: {query[:100]}{'...' if len(query) > 100 else ''}", "info", ) + elif tool_name == "generate_podcast": + title = ( + tool_input.get("podcast_title", "SurfSense Podcast") + if isinstance(tool_input, dict) + else "SurfSense Podcast" + ) + yield streaming_service.format_terminal_info( + f"Generating podcast: {title}", + "info", + ) elif event_type == "on_tool_end": run_id = event.get("run_id", "") - tool_output = event.get("data", {}).get("output", "") + tool_name = event.get("name", "unknown_tool") + raw_output = event.get("data", {}).get("output", "") + + # Extract content from ToolMessage if needed + # LangGraph may return a ToolMessage object instead of raw dict + if hasattr(raw_output, "content"): + # It's a ToolMessage object - extract the content + content = raw_output.content + # If content is a string that looks like JSON, try to parse it + if isinstance(content, str): + try: + tool_output = json.loads(content) + except (json.JSONDecodeError, TypeError): + tool_output = {"result": content} + elif isinstance(content, dict): + tool_output = content + else: + tool_output = {"result": str(content)} + elif isinstance(raw_output, dict): + tool_output = raw_output + else: + tool_output = {"result": str(raw_output) if raw_output else "completed"} tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" - # Don't stream the full output (can be very large), just acknowledge - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - - yield streaming_service.format_terminal_info( - "Knowledge base search completed", "success" - ) + # Handle different tool outputs + if tool_name == "generate_podcast": + # Stream the full podcast result so frontend can render the audio player + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output if isinstance(tool_output, dict) else {"result": tool_output}, + ) + # Send appropriate terminal message based on status + if isinstance(tool_output, dict) and tool_output.get("status") == "success": + yield streaming_service.format_terminal_info( + f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", + "success", + ) + else: + error_msg = tool_output.get("error", "Unknown error") if isinstance(tool_output, dict) else "Unknown error" + yield streaming_service.format_terminal_info( + f"Podcast generation failed: {error_msg}", + "error", + ) + else: + # Don't stream the full output for other tools (can be very large), just acknowledge + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + "Knowledge base search completed", "success" + ) # Handle chain/agent end to close any open text blocks elif event_type in ("on_chain_end", "on_agent_end"): diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 86ed4974f..cd28a26fa 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -4,6 +4,7 @@ import { AssistantRuntimeProvider, useLocalRuntime } from "@assistant-ui/react"; import { useParams } from "next/navigation"; import { useMemo } from "react"; import { Thread } from "@/components/assistant-ui/thread"; +import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast"; import { createNewChatAdapter } from "@/lib/chat/new-chat-transport"; export default function NewChatPage() { @@ -38,6 +39,8 @@ export default function NewChatPage() { return ( + {/* Register tool UI components */} +
diff --git a/surfsense_web/components/tool-ui/audio.tsx b/surfsense_web/components/tool-ui/audio.tsx new file mode 100644 index 000000000..8adf8c498 --- /dev/null +++ b/surfsense_web/components/tool-ui/audio.tsx @@ -0,0 +1,310 @@ +"use client"; + +import { DownloadIcon, PauseIcon, PlayIcon, Volume2Icon, VolumeXIcon } from "lucide-react"; +import Image from "next/image"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { Button } from "@/components/ui/button"; +import { Slider } from "@/components/ui/slider"; +import { cn } from "@/lib/utils"; + +interface AudioProps { + id: string; + assetId?: string; + src: string; + title: string; + description?: string; + artwork?: string; + durationMs?: number; + className?: string; +} + +function formatTime(seconds: number): string { + if (!Number.isFinite(seconds) || seconds < 0) return "0:00"; + const mins = Math.floor(seconds / 60); + const secs = Math.floor(seconds % 60); + return `${mins}:${secs.toString().padStart(2, "0")}`; +} + +export function Audio({ + id, + src, + title, + description, + artwork, + durationMs, + className, +}: AudioProps) { + const audioRef = useRef(null); + const [isPlaying, setIsPlaying] = useState(false); + const [currentTime, setCurrentTime] = useState(0); + const [duration, setDuration] = useState(durationMs ? durationMs / 1000 : 0); + const [volume, setVolume] = useState(1); + const [isMuted, setIsMuted] = useState(false); + const [isLoading, setIsLoading] = useState(true); + const [error, setError] = useState(null); + + // Handle play/pause + const togglePlayPause = useCallback(() => { + const audio = audioRef.current; + if (!audio) return; + + if (isPlaying) { + audio.pause(); + } else { + audio.play().catch((err) => { + console.error("Error playing audio:", err); + setError("Failed to play audio"); + }); + } + }, [isPlaying]); + + // Handle seek + const handleSeek = useCallback((value: number[]) => { + const audio = audioRef.current; + if (!audio || !Number.isFinite(value[0])) return; + audio.currentTime = value[0]; + setCurrentTime(value[0]); + }, []); + + // Handle volume change + const handleVolumeChange = useCallback((value: number[]) => { + const audio = audioRef.current; + if (!audio || !Number.isFinite(value[0])) return; + const newVolume = value[0]; + audio.volume = newVolume; + setVolume(newVolume); + setIsMuted(newVolume === 0); + }, []); + + // Toggle mute + const toggleMute = useCallback(() => { + const audio = audioRef.current; + if (!audio) return; + + if (isMuted) { + audio.volume = volume || 1; + setIsMuted(false); + } else { + audio.volume = 0; + setIsMuted(true); + } + }, [isMuted, volume]); + + // Handle download + const handleDownload = useCallback(async () => { + try { + const response = await fetch(src); + const blob = await response.blob(); + const url = window.URL.createObjectURL(blob); + const a = document.createElement("a"); + a.href = url; + a.download = `${title.replace(/[^a-zA-Z0-9]/g, "_")}.mp3`; + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + window.URL.revokeObjectURL(url); + } catch (err) { + console.error("Error downloading audio:", err); + } + }, [src, title]); + + // Set up audio event listeners + useEffect(() => { + const audio = audioRef.current; + if (!audio) return; + + const handleLoadedMetadata = () => { + setDuration(audio.duration); + setIsLoading(false); + }; + + const handleTimeUpdate = () => { + setCurrentTime(audio.currentTime); + }; + + const handlePlay = () => setIsPlaying(true); + const handlePause = () => setIsPlaying(false); + const handleEnded = () => { + setIsPlaying(false); + setCurrentTime(0); + }; + const handleError = () => { + setError("Failed to load audio"); + setIsLoading(false); + }; + const handleCanPlay = () => setIsLoading(false); + + audio.addEventListener("loadedmetadata", handleLoadedMetadata); + audio.addEventListener("timeupdate", handleTimeUpdate); + audio.addEventListener("play", handlePlay); + audio.addEventListener("pause", handlePause); + audio.addEventListener("ended", handleEnded); + audio.addEventListener("error", handleError); + audio.addEventListener("canplay", handleCanPlay); + + return () => { + audio.removeEventListener("loadedmetadata", handleLoadedMetadata); + audio.removeEventListener("timeupdate", handleTimeUpdate); + audio.removeEventListener("play", handlePlay); + audio.removeEventListener("pause", handlePause); + audio.removeEventListener("ended", handleEnded); + audio.removeEventListener("error", handleError); + audio.removeEventListener("canplay", handleCanPlay); + }; + }, []); + + if (error) { + return ( +
+
+ +
+
+

{title}

+

{error}

+
+
+ ); + } + + return ( +
+ {/* Hidden audio element */} + + +
+ {/* Artwork */} +
+
+ {artwork ? ( + {title} + ) : ( +
+ +
+ )} + {/* Play overlay on artwork */} + +
+
+ + {/* Content */} +
+ {/* Title and description */} +
+

{title}

+ {description && ( +

+ {description} +

+ )} +
+ + {/* Progress bar */} +
+ +
+ {formatTime(currentTime)} + {formatTime(duration)} +
+
+
+
+ + {/* Controls */} +
+
+ {/* Play/Pause button */} + + + {/* Volume control */} +
+ + +
+
+ + {/* Download button */} + +
+
+ ); +} + diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx new file mode 100644 index 000000000..0aa50aea2 --- /dev/null +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -0,0 +1,288 @@ +"use client"; + +import { makeAssistantToolUI } from "@assistant-ui/react"; +import { AlertCircleIcon, Loader2Icon, MicIcon } from "lucide-react"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { Audio } from "@/components/tool-ui/audio"; +import { podcastsApiService } from "@/lib/apis/podcasts-api.service"; + +/** + * Type definitions for the generate_podcast tool + */ +interface GeneratePodcastArgs { + source_content: string; + podcast_title?: string; + user_prompt?: string; +} + +interface GeneratePodcastResult { + status: "success" | "error"; + podcast_id?: number; + title?: string; + transcript?: string; + duration_ms?: number; + transcript_entries?: number; + error?: string; +} + +/** + * Loading state component shown while podcast is being generated + */ +function PodcastGeneratingState({ title }: { title: string }) { + return ( +
+
+
+
+ +
+ {/* Animated rings */} +
+
+
+

{title}

+
+ + Generating podcast... This may take a few minutes +
+
+
+
+
+
+
+
+
+ ); +} + +/** + * Error state component shown when podcast generation fails + */ +function PodcastErrorState({ title, error }: { title: string; error: string }) { + return ( +
+
+
+ +
+
+

{title}

+

Failed to generate podcast

+

{error}

+
+
+
+ ); +} + +/** + * Audio loading state component + */ +function AudioLoadingState({ title }: { title: string }) { + return ( +
+
+
+ +
+
+

{title}

+
+ + Loading audio... +
+
+
+
+ ); +} + +/** + * Podcast Player Component - Fetches audio with authentication + */ +function PodcastPlayer({ + podcastId, + title, + description, + durationMs, + transcript, + transcriptEntries, +}: { + podcastId: number; + title: string; + description: string; + durationMs?: number; + transcript?: string; + transcriptEntries?: number; +}) { + const [audioSrc, setAudioSrc] = useState(null); + const [isLoading, setIsLoading] = useState(true); + const [error, setError] = useState(null); + const objectUrlRef = useRef(null); + + // Cleanup object URL on unmount + useEffect(() => { + return () => { + if (objectUrlRef.current) { + URL.revokeObjectURL(objectUrlRef.current); + } + }; + }, []); + + // Fetch audio with authentication + const loadAudio = useCallback(async () => { + setIsLoading(true); + setError(null); + + try { + // Revoke previous object URL if exists + if (objectUrlRef.current) { + URL.revokeObjectURL(objectUrlRef.current); + objectUrlRef.current = null; + } + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 60000); // 60s timeout + + try { + // Fetch audio blob with authentication + const response = await podcastsApiService.loadPodcast({ + request: { id: podcastId }, + controller, + }); + + // Create object URL from blob + const objectUrl = URL.createObjectURL(response); + objectUrlRef.current = objectUrl; + setAudioSrc(objectUrl); + } finally { + clearTimeout(timeoutId); + } + } catch (err) { + console.error("Error loading podcast audio:", err); + if (err instanceof DOMException && err.name === "AbortError") { + setError("Request timed out. Please try again."); + } else { + setError(err instanceof Error ? err.message : "Failed to load audio"); + } + } finally { + setIsLoading(false); + } + }, [podcastId]); + + // Load audio when component mounts + useEffect(() => { + loadAudio(); + }, [loadAudio]); + + if (isLoading) { + return ; + } + + if (error || !audioSrc) { + return ; + } + + return ( +
+
+ ); +} + +/** + * Generate Podcast Tool UI Component + * + * This component is registered with assistant-ui to render custom UI + * when the generate_podcast tool is called by the agent. + * + * It fetches the podcast audio with authentication (like the old system) + * and displays it using the Audio component. + */ +export const GeneratePodcastToolUI = makeAssistantToolUI< + GeneratePodcastArgs, + GeneratePodcastResult +>({ + toolName: "generate_podcast", + render: function GeneratePodcastUI({ args, result, status }) { + const title = args.podcast_title || "SurfSense Podcast"; + + // Loading state - podcast is being generated + if (status.type === "running" || status.type === "requires-action") { + return ; + } + + // Incomplete/cancelled state + if (status.type === "incomplete") { + if (status.reason === "cancelled") { + return ( +
+

+ + Podcast generation cancelled +

+
+ ); + } + if (status.reason === "error") { + return ( + + ); + } + } + + // No result yet + if (!result) { + return ; + } + + // Error result + if (result.status === "error") { + return ; + } + + // Success - need podcast_id to fetch with auth + if (!result.podcast_id) { + return ; + } + + // Render the podcast player (handles auth fetch internally) + return ( + + ); + }, +}); + diff --git a/surfsense_web/components/tool-ui/index.ts b/surfsense_web/components/tool-ui/index.ts new file mode 100644 index 000000000..4007a39fe --- /dev/null +++ b/surfsense_web/components/tool-ui/index.ts @@ -0,0 +1,11 @@ +/** + * Tool UI Components + * + * This module exports custom UI components for assistant tools. + * These components are registered with assistant-ui to render + * rich UI when specific tools are called by the agent. + */ + +export { Audio } from "./audio"; +export { GeneratePodcastToolUI } from "./generate-podcast"; + diff --git a/surfsense_web/lib/chat/new-chat-transport.ts b/surfsense_web/lib/chat/new-chat-transport.ts index 5be9278d9..74897bd8e 100644 --- a/surfsense_web/lib/chat/new-chat-transport.ts +++ b/surfsense_web/lib/chat/new-chat-transport.ts @@ -11,6 +11,22 @@ interface NewChatAdapterConfig { chatId: number; } +/** + * Represents an in-progress or completed tool call + */ +interface ToolCallState { + toolCallId: string; + toolName: string; + args: Record; + result?: unknown; +} + +/** + * Tools that should render custom UI in the chat. + * Other tools (like search_knowledge_base) will be hidden from the UI. + */ +const TOOLS_WITH_UI = new Set(["generate_podcast"]); + /** * Creates a ChatModelAdapter that connects to the FastAPI new_chat endpoint. * @@ -77,6 +93,41 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda let buffer = ""; let accumulatedText = ""; + // Track tool calls by their ID + const toolCalls = new Map(); + + /** + * Build the content array with text and tool calls. + * Only includes tools that have custom UI (defined in TOOLS_WITH_UI). + */ + function buildContent() { + const content: Array< + | { type: "text"; text: string } + | { type: "tool-call"; toolCallId: string; toolName: string; args: Record; result?: unknown } + > = []; + + // Add text content if any + if (accumulatedText) { + content.push({ type: "text" as const, text: accumulatedText }); + } + + // Only add tool calls that have custom UI registered + // Other tools (like search_knowledge_base) are hidden from the UI + for (const toolCall of toolCalls.values()) { + if (TOOLS_WITH_UI.has(toolCall.toolName)) { + content.push({ + type: "tool-call" as const, + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args: toolCall.args, + result: toolCall.result, + }); + } + } + + return content; + } + try { while (true) { const { done, value } = await reader.read(); @@ -113,16 +164,56 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda switch (parsed.type) { case "text-delta": accumulatedText += parsed.delta; - yield { - content: [{ type: "text" as const, text: accumulatedText }], - }; + yield { content: buildContent() }; break; + case "tool-input-start": { + // Tool call is starting - create a new tool call entry + const { toolCallId, toolName } = parsed; + toolCalls.set(toolCallId, { + toolCallId, + toolName, + args: {}, + }); + // Yield to show tool is starting (running state) + yield { content: buildContent() }; + break; + } + + case "tool-input-available": { + // Tool input is complete - update the args + const { toolCallId, toolName, input } = parsed; + const existing = toolCalls.get(toolCallId); + if (existing) { + existing.args = input || {}; + } else { + // Create new entry if we missed tool-input-start + toolCalls.set(toolCallId, { + toolCallId, + toolName, + args: input || {}, + }); + } + yield { content: buildContent() }; + break; + } + + case "tool-output-available": { + // Tool execution is complete - add the result + const { toolCallId, output } = parsed; + const existing = toolCalls.get(toolCallId); + if (existing) { + existing.result = output; + } + yield { content: buildContent() }; + break; + } + case "error": throw new Error(parsed.errorText || "Unknown error from server"); - // Other types like text-start, text-end, tool-*, etc. - // are handled implicitly - we just accumulate text deltas + // Other types like text-start, text-end, start-step, finish-step, etc. + // are handled implicitly default: break; } @@ -148,9 +239,14 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda const parsed = JSON.parse(data); if (parsed.type === "text-delta") { accumulatedText += parsed.delta; - yield { - content: [{ type: "text" as const, text: accumulatedText }], - }; + yield { content: buildContent() }; + } else if (parsed.type === "tool-output-available") { + const { toolCallId, output } = parsed; + const existing = toolCalls.get(toolCallId); + if (existing) { + existing.result = output; + } + yield { content: buildContent() }; } } catch { // Ignore parse errors From e79e1187b2593b1225200b90cfb1dc9662700386 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 21 Dec 2025 19:35:00 +0530 Subject: [PATCH 2/5] feat: implement background podcast generation with Celery and task polling in UI --- .../app/agents/new_chat/podcast.py | 117 +++---------- .../app/routes/podcasts_routes.py | 63 +++++++ .../app/tasks/celery_tasks/podcast_tasks.py | 133 +++++++++++++++ .../components/tool-ui/generate-podcast.tsx | 156 +++++++++++++----- 4 files changed, 335 insertions(+), 134 deletions(-) diff --git a/surfsense_backend/app/agents/new_chat/podcast.py b/surfsense_backend/app/agents/new_chat/podcast.py index ed4116bfb..2205227b1 100644 --- a/surfsense_backend/app/agents/new_chat/podcast.py +++ b/surfsense_backend/app/agents/new_chat/podcast.py @@ -2,8 +2,8 @@ Podcast generation tool for the new chat agent. This module provides a factory function for creating the generate_podcast tool -that integrates with the existing podcaster agent. Podcasts are saved to the -database like the old system, providing authentication and persistence. +that submits a Celery task for background podcast generation. The frontend +polls for completion and auto-updates when the podcast is ready. """ from typing import Any @@ -11,10 +11,6 @@ from typing import Any from langchain_core.tools import tool from sqlalchemy.ext.asyncio import AsyncSession -from app.agents.podcaster.graph import graph as podcaster_graph -from app.agents.podcaster.state import State as PodcasterState -from app.db import Podcast - def create_generate_podcast_tool( search_space_id: int, @@ -26,7 +22,7 @@ def create_generate_podcast_tool( Args: search_space_id: The user's search space ID - db_session: Database session + db_session: Database session (not used - Celery creates its own) user_id: The user's ID (as string) Returns: @@ -50,8 +46,8 @@ def create_generate_podcast_tool( - "Make a podcast about..." - "Turn this into a podcast" - The tool will generate a complete audio podcast with two speakers - discussing the provided content in an engaging conversational format. + The tool will start generating a podcast in the background. + The podcast will be available once generation completes. Args: source_content: The text content to convert into a podcast. @@ -63,108 +59,43 @@ def create_generate_podcast_tool( Returns: A dictionary containing: - - status: "success" or "error" - - podcast_id: The database ID of the saved podcast (for API access) + - status: "processing" (task submitted) or "error" + - task_id: The Celery task ID for polling status - title: The podcast title - - transcript: Full podcast transcript with all dialogue entries - - duration_ms: Estimated podcast duration in milliseconds - - transcript_entries: Number of dialogue entries """ try: - # Configure the podcaster graph - config = { - "configurable": { - "podcast_title": podcast_title, - "user_id": str(user_id), - "search_space_id": search_space_id, - "user_prompt": user_prompt, - } - } + # Import Celery task here to avoid circular imports + from app.tasks.celery_tasks.podcast_tasks import ( + generate_content_podcast_task, + ) - # Initialize the podcaster state with the source content - initial_state = PodcasterState( + # Submit Celery task for background processing + task = generate_content_podcast_task.delay( source_content=source_content, - db_session=db_session, - ) - - # Run the podcaster graph - result = await podcaster_graph.ainvoke(initial_state, config=config) - - # Extract results - podcast_transcript = result.get("podcast_transcript", []) - file_path = result.get("final_podcast_file_path", "") - - # Calculate estimated duration (rough estimate: ~150 words per minute) - total_words = sum( - len(entry.dialog.split()) if hasattr(entry, "dialog") else len(entry.get("dialog", "").split()) - for entry in podcast_transcript - ) - estimated_duration_ms = int((total_words / 150) * 60 * 1000) - - # Create full transcript for display (all entries, complete dialog) - full_transcript = [] - for entry in podcast_transcript: - if hasattr(entry, "speaker_id"): - speaker = f"Speaker {entry.speaker_id + 1}" - dialog = entry.dialog - else: - speaker = f"Speaker {entry.get('speaker_id', 0) + 1}" - dialog = entry.get("dialog", "") - full_transcript.append(f"{speaker}: {dialog}") - - # Convert podcast transcript entries to serializable format (like old system) - serializable_transcript = [] - for entry in podcast_transcript: - if hasattr(entry, "speaker_id"): - serializable_transcript.append({ - "speaker_id": entry.speaker_id, - "dialog": entry.dialog - }) - else: - serializable_transcript.append({ - "speaker_id": entry.get("speaker_id", 0), - "dialog": entry.get("dialog", "") - }) - - # Save podcast to database (like old system) - # This provides authentication and persistence - podcast = Podcast( - title=podcast_title, - podcast_transcript=serializable_transcript, - file_location=file_path, search_space_id=search_space_id, - # chat_id is None since new-chat uses LangGraph threads, not DB chats - chat_id=None, - chat_state_version=None, + user_id=str(user_id), + podcast_title=podcast_title, + user_prompt=user_prompt, ) - db_session.add(podcast) - await db_session.commit() - await db_session.refresh(podcast) - # Return podcast_id - frontend will use it to call the API endpoint - # GET /api/v1/podcasts/{podcast_id}/stream (like the old system) + print(f"[generate_podcast] Submitted Celery task: {task.id}") + + # Return immediately with task_id for polling return { - "status": "success", - "podcast_id": podcast.id, + "status": "processing", + "task_id": task.id, "title": podcast_title, - "transcript": "\n\n".join(full_transcript), - "duration_ms": estimated_duration_ms, - "transcript_entries": len(podcast_transcript), + "message": "Podcast generation started. This may take a few minutes.", } except Exception as e: error_message = str(e) - print(f"[generate_podcast] Error: {error_message}") - # Rollback on error - await db_session.rollback() + print(f"[generate_podcast] Error submitting task: {error_message}") return { "status": "error", "error": error_message, "title": podcast_title, - "podcast_id": None, - "duration_ms": 0, - "transcript_entries": 0, + "task_id": None, } return generate_podcast - diff --git a/surfsense_backend/app/routes/podcasts_routes.py b/surfsense_backend/app/routes/podcasts_routes.py index deb9d9744..904de20a3 100644 --- a/surfsense_backend/app/routes/podcasts_routes.py +++ b/surfsense_backend/app/routes/podcasts_routes.py @@ -444,3 +444,66 @@ async def get_podcast_by_chat_id( raise HTTPException( status_code=500, detail=f"Error fetching podcast: {e!s}" ) from e + + +@router.get("/podcasts/task/{task_id}/status") +async def get_podcast_task_status( + task_id: str, + user: User = Depends(current_active_user), +): + """ + Get the status of a podcast generation task. + Used by new-chat frontend to poll for completion. + + Returns: + - status: "processing" | "success" | "error" + - podcast_id: (only if status == "success") + - title: (only if status == "success") + - error: (only if status == "error") + """ + try: + from celery.result import AsyncResult + + from app.celery_app import celery_app + + result = AsyncResult(task_id, app=celery_app) + + if result.ready(): + # Task completed + if result.successful(): + task_result = result.result + if isinstance(task_result, dict): + if task_result.get("status") == "success": + return { + "status": "success", + "podcast_id": task_result.get("podcast_id"), + "title": task_result.get("title"), + "transcript_entries": task_result.get("transcript_entries"), + } + else: + return { + "status": "error", + "error": task_result.get("error", "Unknown error"), + } + else: + return { + "status": "error", + "error": "Unexpected task result format", + } + else: + # Task failed + return { + "status": "error", + "error": str(result.result) if result.result else "Task failed", + } + else: + # Task still processing + return { + "status": "processing", + "state": result.state, + } + + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error checking task status: {e!s}" + ) from e diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 65cdb886b..994f67be7 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -11,6 +11,11 @@ from app.celery_app import celery_app from app.config import config from app.tasks.podcast_tasks import generate_chat_podcast +# Import for content-based podcast (new-chat) +from app.agents.podcaster.graph import graph as podcaster_graph +from app.agents.podcaster.state import State as PodcasterState +from app.db import Podcast + logger = logging.getLogger(__name__) if sys.platform.startswith("win"): @@ -86,3 +91,131 @@ async def _generate_chat_podcast( except Exception as e: logger.error(f"Error generating podcast from chat: {e!s}") raise + + +# ============================================================================= +# Content-based podcast generation (for new-chat) +# ============================================================================= + + +@celery_app.task(name="generate_content_podcast", bind=True) +def generate_content_podcast_task( + self, + source_content: str, + search_space_id: int, + user_id: str, + podcast_title: str = "SurfSense Podcast", + user_prompt: str | None = None, +) -> dict: + """ + Celery task to generate podcast from source content (for new-chat). + + Unlike generate_chat_podcast which requires a chat_id, this task + generates a podcast directly from provided content. + + Args: + source_content: The text content to convert into a podcast + search_space_id: ID of the search space + user_id: ID of the user (as string) + podcast_title: Title for the podcast + user_prompt: Optional instructions for podcast style/tone + + Returns: + dict with podcast_id on success, or error info on failure + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete( + _generate_content_podcast( + source_content, + search_space_id, + user_id, + podcast_title, + user_prompt, + ) + ) + loop.run_until_complete(loop.shutdown_asyncgens()) + return result + except Exception as e: + logger.error(f"Error generating content podcast: {e!s}") + return {"status": "error", "error": str(e)} + finally: + asyncio.set_event_loop(None) + loop.close() + + +async def _generate_content_podcast( + source_content: str, + search_space_id: int, + user_id: str, + podcast_title: str = "SurfSense Podcast", + user_prompt: str | None = None, +) -> dict: + """Generate content-based podcast with new session.""" + async with get_celery_session_maker()() as session: + try: + # Configure the podcaster graph + graph_config = { + "configurable": { + "podcast_title": podcast_title, + "user_id": str(user_id), + "search_space_id": search_space_id, + "user_prompt": user_prompt, + } + } + + # Initialize the podcaster state with the source content + initial_state = PodcasterState( + source_content=source_content, + db_session=session, + ) + + # Run the podcaster graph + result = await podcaster_graph.ainvoke(initial_state, config=graph_config) + + # Extract results + podcast_transcript = result.get("podcast_transcript", []) + file_path = result.get("final_podcast_file_path", "") + + # Convert transcript to serializable format + serializable_transcript = [] + for entry in podcast_transcript: + if hasattr(entry, "speaker_id"): + serializable_transcript.append({ + "speaker_id": entry.speaker_id, + "dialog": entry.dialog + }) + else: + serializable_transcript.append({ + "speaker_id": entry.get("speaker_id", 0), + "dialog": entry.get("dialog", "") + }) + + # Save podcast to database + podcast = Podcast( + title=podcast_title, + podcast_transcript=serializable_transcript, + file_location=file_path, + search_space_id=search_space_id, + chat_id=None, # No chat_id for new-chat podcasts + chat_state_version=None, + ) + session.add(podcast) + await session.commit() + await session.refresh(podcast) + + logger.info(f"Successfully generated content podcast: {podcast.id}") + + return { + "status": "success", + "podcast_id": podcast.id, + "title": podcast_title, + "transcript_entries": len(serializable_transcript), + } + + except Exception as e: + logger.error(f"Error in _generate_content_podcast: {e!s}") + await session.rollback() + raise diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx index 0aa50aea2..fc40d85b9 100644 --- a/surfsense_web/components/tool-ui/generate-podcast.tsx +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -4,6 +4,7 @@ import { makeAssistantToolUI } from "@assistant-ui/react"; import { AlertCircleIcon, Loader2Icon, MicIcon } from "lucide-react"; import { useCallback, useEffect, useRef, useState } from "react"; import { Audio } from "@/components/tool-ui/audio"; +import { baseApiService } from "@/lib/apis/base-api.service"; import { podcastsApiService } from "@/lib/apis/podcasts-api.service"; /** @@ -16,12 +17,21 @@ interface GeneratePodcastArgs { } interface GeneratePodcastResult { - status: "success" | "error"; + status: "processing" | "success" | "error"; + task_id?: string; podcast_id?: number; title?: string; - transcript?: string; - duration_ms?: number; transcript_entries?: number; + message?: string; + error?: string; +} + +interface TaskStatusResponse { + status: "processing" | "success" | "error"; + podcast_id?: number; + title?: string; + transcript_entries?: number; + state?: string; error?: string; } @@ -106,15 +116,11 @@ function PodcastPlayer({ title, description, durationMs, - transcript, - transcriptEntries, }: { podcastId: number; title: string; description: string; durationMs?: number; - transcript?: string; - transcriptEntries?: number; }) { const [audioSrc, setAudioSrc] = useState(null); const [isLoading, setIsLoading] = useState(true); @@ -194,29 +200,96 @@ function PodcastPlayer({ durationMs={durationMs} className="w-full" /> - {/* Full transcript */} - {transcript && ( -
- - View full transcript{transcriptEntries ? ` (${transcriptEntries} entries)` : ""} - -
-						{transcript}
-					
-
- )}
); } +/** + * Polling component that checks task status and shows player when complete + */ +function PodcastTaskPoller({ + taskId, + title, +}: { + taskId: string; + title: string; +}) { + const [taskStatus, setTaskStatus] = useState({ status: "processing" }); + const [pollCount, setPollCount] = useState(0); + const pollingRef = useRef(null); + + // Poll for task status + useEffect(() => { + const pollStatus = async () => { + try { + const response = await baseApiService.get( + `/api/v1/podcasts/task/${taskId}/status` + ); + setTaskStatus(response); + + // Stop polling if task is complete or errored + if (response.status !== "processing") { + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + } catch (err) { + console.error("Error polling task status:", err); + // Don't stop polling on network errors, just increment count + } + setPollCount((prev) => prev + 1); + }; + + // Initial poll + pollStatus(); + + // Poll every 5 seconds + pollingRef.current = setInterval(pollStatus, 5000); + + return () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + } + }; + }, [taskId]); + + // Show loading state while processing + if (taskStatus.status === "processing") { + return ; + } + + // Show error state + if (taskStatus.status === "error") { + return ; + } + + // Show player when complete + if (taskStatus.status === "success" && taskStatus.podcast_id) { + return ( + + ); + } + + // Fallback + return ; +} + /** * Generate Podcast Tool UI Component * * This component is registered with assistant-ui to render custom UI * when the generate_podcast tool is called by the agent. - * - * It fetches the podcast audio with authentication (like the old system) - * and displays it using the Audio component. + * + * It polls for task completion and auto-updates when the podcast is ready. */ export const GeneratePodcastToolUI = makeAssistantToolUI< GeneratePodcastArgs, @@ -226,7 +299,7 @@ export const GeneratePodcastToolUI = makeAssistantToolUI< render: function GeneratePodcastUI({ args, result, status }) { const title = args.podcast_title || "SurfSense Podcast"; - // Loading state - podcast is being generated + // Loading state - tool is still running (agent processing) if (status.type === "running" || status.type === "requires-action") { return ; } @@ -263,26 +336,27 @@ export const GeneratePodcastToolUI = makeAssistantToolUI< return ; } - // Success - need podcast_id to fetch with auth - if (!result.podcast_id) { - return ; + // Processing - poll for completion + if (result.status === "processing" && result.task_id) { + return ; } - // Render the podcast player (handles auth fetch internally) - return ( - - ); + // Success with podcast_id (direct result, not via polling) + if (result.status === "success" && result.podcast_id) { + return ( + + ); + } + + // Fallback - missing required data + return ; }, }); - From 783ee9c154e562edad567196d8a83d6a77aa9d8b Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 21 Dec 2025 20:07:04 +0530 Subject: [PATCH 3/5] feat: enhance podcast generation with duplicate request prevention and improved UI feedback --- .../app/agents/new_chat/podcast.py | 77 ++++++++++++++++++- .../app/agents/new_chat/system_prompt.py | 18 ++++- .../app/tasks/celery_tasks/podcast_tasks.py | 18 +++++ .../components/tool-ui/generate-podcast.tsx | 41 +++++++++- surfsense_web/lib/chat/new-chat-transport.ts | 48 ++++++++++++ surfsense_web/lib/chat/podcast-state.ts | 74 ++++++++++++++++++ 6 files changed, 269 insertions(+), 7 deletions(-) create mode 100644 surfsense_web/lib/chat/podcast-state.ts diff --git a/surfsense_backend/app/agents/new_chat/podcast.py b/surfsense_backend/app/agents/new_chat/podcast.py index 2205227b1..d57d0fb21 100644 --- a/surfsense_backend/app/agents/new_chat/podcast.py +++ b/surfsense_backend/app/agents/new_chat/podcast.py @@ -4,13 +4,67 @@ Podcast generation tool for the new chat agent. This module provides a factory function for creating the generate_podcast tool that submits a Celery task for background podcast generation. The frontend polls for completion and auto-updates when the podcast is ready. + +Duplicate request prevention: +- Only one podcast can be generated at a time per search space +- Uses Redis to track active podcast tasks +- Returns a friendly message if a podcast is already being generated """ +import os from typing import Any +import redis from langchain_core.tools import tool from sqlalchemy.ext.asyncio import AsyncSession +# Redis connection for tracking active podcast tasks +# Uses the same Redis instance as Celery +REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") +_redis_client: redis.Redis | None = None + + +def get_redis_client() -> redis.Redis: + """Get or create Redis client for podcast task tracking.""" + global _redis_client + if _redis_client is None: + _redis_client = redis.from_url(REDIS_URL, decode_responses=True) + return _redis_client + + +def get_active_podcast_key(search_space_id: int) -> str: + """Generate Redis key for tracking active podcast task.""" + return f"podcast:active:{search_space_id}" + + +def get_active_podcast_task(search_space_id: int) -> str | None: + """Check if there's an active podcast task for this search space.""" + try: + client = get_redis_client() + return client.get(get_active_podcast_key(search_space_id)) + except Exception: + # If Redis is unavailable, allow the request (fail open) + return None + + +def set_active_podcast_task(search_space_id: int, task_id: str) -> None: + """Mark a podcast task as active for this search space.""" + try: + client = get_redis_client() + # Set with 30-minute expiry as safety net (podcast should complete before this) + client.setex(get_active_podcast_key(search_space_id), 1800, task_id) + except Exception as e: + print(f"[generate_podcast] Warning: Could not set active task in Redis: {e}") + + +def clear_active_podcast_task(search_space_id: int) -> None: + """Clear the active podcast task for this search space.""" + try: + client = get_redis_client() + client.delete(get_active_podcast_key(search_space_id)) + except Exception as e: + print(f"[generate_podcast] Warning: Could not clear active task in Redis: {e}") + def create_generate_podcast_tool( search_space_id: int, @@ -49,6 +103,10 @@ def create_generate_podcast_tool( The tool will start generating a podcast in the background. The podcast will be available once generation completes. + IMPORTANT: Only one podcast can be generated at a time. If a podcast + is already being generated, this tool will return a message asking + the user to wait. + Args: source_content: The text content to convert into a podcast. This can be a summary, research findings, or any text @@ -59,11 +117,23 @@ def create_generate_podcast_tool( Returns: A dictionary containing: - - status: "processing" (task submitted) or "error" - - task_id: The Celery task ID for polling status + - status: "processing" (task submitted), "already_generating", or "error" + - task_id: The Celery task ID for polling status (if processing) - title: The podcast title + - message: Status message for the user """ try: + # Check if a podcast is already being generated for this search space + active_task_id = get_active_podcast_task(search_space_id) + if active_task_id: + print(f"[generate_podcast] Blocked duplicate request. Active task: {active_task_id}") + return { + "status": "already_generating", + "task_id": active_task_id, + "title": podcast_title, + "message": "A podcast is already being generated. Please wait for it to complete before requesting another one.", + } + # Import Celery task here to avoid circular imports from app.tasks.celery_tasks.podcast_tasks import ( generate_content_podcast_task, @@ -78,6 +148,9 @@ def create_generate_podcast_tool( user_prompt=user_prompt, ) + # Mark this task as active + set_active_podcast_task(search_space_id, task.id) + print(f"[generate_podcast] Submitted Celery task: {task.id}") # Return immediately with task_id for polling diff --git a/surfsense_backend/app/agents/new_chat/system_prompt.py b/surfsense_backend/app/agents/new_chat/system_prompt.py index fbd18cb0b..f725be684 100644 --- a/surfsense_backend/app/agents/new_chat/system_prompt.py +++ b/surfsense_backend/app/agents/new_chat/system_prompt.py @@ -135,10 +135,16 @@ You have access to the following tools: - Use this when the user asks to create, generate, or make a podcast. - Trigger phrases: "give me a podcast about", "create a podcast", "generate a podcast", "make a podcast", "turn this into a podcast" - Args: - - source_content: The text content to convert into a podcast (e.g., a summary, research findings, or conversation) + - source_content: The text content to convert into a podcast. This MUST be comprehensive and include: + * If discussing the current conversation: Include a detailed summary of the FULL chat history (all user questions and your responses) + * If based on knowledge base search: Include the key findings and insights from the search results + * You can combine both: conversation context + search results for richer podcasts + * The more detailed the source_content, the better the podcast quality - podcast_title: Optional title for the podcast (default: "SurfSense Podcast") - user_prompt: Optional instructions for podcast style/format (e.g., "Make it casual and fun") - - Returns: A podcast with audio that the user can listen to and download + - Returns: A task_id for tracking. The podcast will be generated in the background. + - IMPORTANT: Only one podcast can be generated at a time. If a podcast is already being generated, the tool will return status "already_generating". + - After calling this tool, inform the user that podcast generation has started and they will see the player when it's ready (takes 3-5 minutes). - User: "Fetch all my notes and what's in them?" @@ -148,10 +154,14 @@ You have access to the following tools: - Call: `search_knowledge_base(query="React migration", connectors_to_search=["SLACK_CONNECTOR"], start_date="YYYY-MM-DD", end_date="YYYY-MM-DD")` - User: "Give me a podcast about AI trends based on what we discussed" - - First search for relevant content, then call: `generate_podcast(source_content="[summarized content from search]", podcast_title="AI Trends Podcast")` + - First search for relevant content, then call: `generate_podcast(source_content="Based on our conversation and search results: [detailed summary of chat + search findings]", podcast_title="AI Trends Podcast")` - User: "Create a podcast summary of this conversation" - - Call: `generate_podcast(source_content="[summary of the conversation so far]", podcast_title="Conversation Summary")` + - Call: `generate_podcast(source_content="Complete conversation summary:\n\nUser asked about [topic 1]:\n[Your detailed response]\n\nUser then asked about [topic 2]:\n[Your detailed response]\n\n[Continue for all exchanges in the conversation]", podcast_title="Conversation Summary")` + +- User: "Make a podcast about quantum computing" + - First search: `search_knowledge_base(query="quantum computing")` + - Then: `generate_podcast(source_content="Key insights about quantum computing from the knowledge base:\n\n[Comprehensive summary of all relevant search results with key facts, concepts, and findings]", podcast_title="Quantum Computing Explained")` {citation_section} """ diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 994f67be7..1abfba193 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -98,6 +98,22 @@ async def _generate_chat_podcast( # ============================================================================= +def _clear_active_podcast_redis_key(search_space_id: int) -> None: + """Clear the active podcast task key from Redis when task completes.""" + import os + + import redis + + try: + redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") + client = redis.from_url(redis_url, decode_responses=True) + key = f"podcast:active:{search_space_id}" + client.delete(key) + logger.info(f"Cleared active podcast key for search_space_id={search_space_id}") + except Exception as e: + logger.warning(f"Could not clear active podcast key: {e}") + + @celery_app.task(name="generate_content_podcast", bind=True) def generate_content_podcast_task( self, @@ -142,6 +158,8 @@ def generate_content_podcast_task( logger.error(f"Error generating content podcast: {e!s}") return {"status": "error", "error": str(e)} finally: + # Always clear the active podcast key when task completes (success or failure) + _clear_active_podcast_redis_key(search_space_id) asyncio.set_event_loop(None) loop.close() diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx index fc40d85b9..3b4c75a35 100644 --- a/surfsense_web/components/tool-ui/generate-podcast.tsx +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -6,6 +6,10 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { Audio } from "@/components/tool-ui/audio"; import { baseApiService } from "@/lib/apis/base-api.service"; import { podcastsApiService } from "@/lib/apis/podcasts-api.service"; +import { + clearActivePodcastTaskId, + setActivePodcastTaskId, +} from "@/lib/chat/podcast-state"; /** * Type definitions for the generate_podcast tool @@ -17,7 +21,7 @@ interface GeneratePodcastArgs { } interface GeneratePodcastResult { - status: "processing" | "success" | "error"; + status: "processing" | "already_generating" | "success" | "error"; task_id?: string; podcast_id?: number; title?: string; @@ -218,6 +222,17 @@ function PodcastTaskPoller({ const [pollCount, setPollCount] = useState(0); const pollingRef = useRef(null); + // Set active podcast state when this component mounts + useEffect(() => { + setActivePodcastTaskId(taskId); + + // Clear when component unmounts + return () => { + // Only clear if this task is still the active one + clearActivePodcastTaskId(); + }; + }, [taskId]); + // Poll for task status useEffect(() => { const pollStatus = async () => { @@ -233,6 +248,8 @@ function PodcastTaskPoller({ clearInterval(pollingRef.current); pollingRef.current = null; } + // Clear the active podcast state when task completes + clearActivePodcastTaskId(); } } catch (err) { console.error("Error polling task status:", err); @@ -336,6 +353,28 @@ export const GeneratePodcastToolUI = makeAssistantToolUI< return ; } + // Already generating - show simple warning, don't create another poller + // The FIRST tool call will display the podcast when ready + if (result.status === "already_generating") { + return ( +
+
+
+ +
+
+

+ Podcast already in progress +

+

+ Please wait for the current podcast to complete. +

+
+
+
+ ); + } + // Processing - poll for completion if (result.status === "processing" && result.task_id) { return ; diff --git a/surfsense_web/lib/chat/new-chat-transport.ts b/surfsense_web/lib/chat/new-chat-transport.ts index 74897bd8e..4f607c984 100644 --- a/surfsense_web/lib/chat/new-chat-transport.ts +++ b/surfsense_web/lib/chat/new-chat-transport.ts @@ -4,7 +4,13 @@ */ import type { ChatModelAdapter, ChatModelRunOptions } from "@assistant-ui/react"; +import { toast } from "sonner"; import { getBearerToken } from "@/lib/auth-utils"; +import { + isPodcastGenerating, + looksLikePodcastRequest, + setActivePodcastTaskId, +} from "@/lib/chat/podcast-state"; interface NewChatAdapterConfig { searchSpaceId: number; @@ -59,6 +65,21 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda throw new Error("User query cannot be empty"); } + // Check if user is requesting a podcast while one is already generating + if (isPodcastGenerating() && looksLikePodcastRequest(userQuery)) { + toast.warning("A podcast is already being generated. Please wait for it to complete."); + // Return a message telling the user to wait + yield { + content: [ + { + type: "text", + text: "A podcast is already being generated. Please wait for it to complete before requesting another one.", + }, + ], + }; + return; + } + const token = getBearerToken(); if (!token) { throw new Error("Not authenticated. Please log in again."); @@ -204,6 +225,20 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda const existing = toolCalls.get(toolCallId); if (existing) { existing.result = output; + + // If this is a podcast tool with status="processing", set the state immediately + // This ensures subsequent podcast requests are intercepted + if ( + existing.toolName === "generate_podcast" && + output && + typeof output === "object" && + "status" in output && + output.status === "processing" && + "task_id" in output && + typeof output.task_id === "string" + ) { + setActivePodcastTaskId(output.task_id); + } } yield { content: buildContent() }; break; @@ -245,6 +280,19 @@ export function createNewChatAdapter(config: NewChatAdapterConfig): ChatModelAda const existing = toolCalls.get(toolCallId); if (existing) { existing.result = output; + + // Set podcast state if processing + if ( + existing.toolName === "generate_podcast" && + output && + typeof output === "object" && + "status" in output && + output.status === "processing" && + "task_id" in output && + typeof output.task_id === "string" + ) { + setActivePodcastTaskId(output.task_id); + } } yield { content: buildContent() }; } diff --git a/surfsense_web/lib/chat/podcast-state.ts b/surfsense_web/lib/chat/podcast-state.ts new file mode 100644 index 000000000..782a31bf6 --- /dev/null +++ b/surfsense_web/lib/chat/podcast-state.ts @@ -0,0 +1,74 @@ +/** + * Module-level state for tracking active podcast generation. + * Used by the new-chat adapter to prevent duplicate podcast requests. + */ + +type PodcastStateListener = (isGenerating: boolean) => void; + +let _activePodcastTaskId: string | null = null; +const _listeners: Set = new Set(); + +/** + * Check if a podcast is currently being generated + */ +export function isPodcastGenerating(): boolean { + return _activePodcastTaskId !== null; +} + +/** + * Get the active podcast task ID + */ +export function getActivePodcastTaskId(): string | null { + return _activePodcastTaskId; +} + +/** + * Set the active podcast task ID (when podcast generation starts) + */ +export function setActivePodcastTaskId(taskId: string): void { + _activePodcastTaskId = taskId; + notifyListeners(); +} + +/** + * Clear the active podcast task ID (when podcast generation completes or errors) + */ +export function clearActivePodcastTaskId(): void { + _activePodcastTaskId = null; + notifyListeners(); +} + +/** + * Subscribe to podcast state changes + */ +export function subscribeToPodcastState(listener: PodcastStateListener): () => void { + _listeners.add(listener); + return () => { + _listeners.delete(listener); + }; +} + +function notifyListeners(): void { + const isGenerating = _activePodcastTaskId !== null; + for (const listener of _listeners) { + listener(isGenerating); + } +} + +/** + * Check if a message looks like a podcast request + */ +export function looksLikePodcastRequest(message: string): boolean { + const podcastPatterns = [ + /\bpodcast\b/i, + /\bcreate.*podcast\b/i, + /\bgenerate.*podcast\b/i, + /\bmake.*podcast\b/i, + /\bturn.*into.*podcast\b/i, + /\bpodcast.*about\b/i, + /\bgive.*podcast\b/i, + ]; + + return podcastPatterns.some((pattern) => pattern.test(message)); +} + From 4f2c9caac244d617e8c7475931bdcd0460515631 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 21 Dec 2025 20:25:27 +0530 Subject: [PATCH 4/5] feat: add transcript fetching and display in podcast player component --- .../components/tool-ui/generate-podcast.tsx | 69 ++++++++++++++----- .../lib/apis/podcasts-api.service.ts | 7 ++ 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx index 3b4c75a35..e11273e41 100644 --- a/surfsense_web/components/tool-ui/generate-podcast.tsx +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -51,7 +51,7 @@ function PodcastGeneratingState({ title }: { title: string }) {
{/* Animated rings */} -
+

{title}

@@ -113,7 +113,15 @@ function AudioLoadingState({ title }: { title: string }) { } /** - * Podcast Player Component - Fetches audio with authentication + * Transcript entry type from the database + */ +interface TranscriptEntry { + speaker_id: number; + dialog: string; +} + +/** + * Podcast Player Component - Fetches audio and transcript with authentication */ function PodcastPlayer({ podcastId, @@ -127,6 +135,7 @@ function PodcastPlayer({ durationMs?: number; }) { const [audioSrc, setAudioSrc] = useState(null); + const [transcript, setTranscript] = useState(null); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); const objectUrlRef = useRef(null); @@ -140,8 +149,8 @@ function PodcastPlayer({ }; }, []); - // Fetch audio with authentication - const loadAudio = useCallback(async () => { + // Fetch audio and podcast details (including transcript) + const loadPodcast = useCallback(async () => { setIsLoading(true); setError(null); @@ -156,35 +165,43 @@ function PodcastPlayer({ const timeoutId = setTimeout(() => controller.abort(), 60000); // 60s timeout try { - // Fetch audio blob with authentication - const response = await podcastsApiService.loadPodcast({ - request: { id: podcastId }, - controller, - }); + // Fetch audio blob and podcast details in parallel + const [audioBlob, podcastDetails] = await Promise.all([ + podcastsApiService.loadPodcast({ + request: { id: podcastId }, + controller, + }), + podcastsApiService.getPodcastById(podcastId), + ]); // Create object URL from blob - const objectUrl = URL.createObjectURL(response); + const objectUrl = URL.createObjectURL(audioBlob); objectUrlRef.current = objectUrl; setAudioSrc(objectUrl); + + // Set transcript from podcast details + if (podcastDetails?.podcast_transcript) { + setTranscript(podcastDetails.podcast_transcript as TranscriptEntry[]); + } } finally { clearTimeout(timeoutId); } } catch (err) { - console.error("Error loading podcast audio:", err); + console.error("Error loading podcast:", err); if (err instanceof DOMException && err.name === "AbortError") { setError("Request timed out. Please try again."); } else { - setError(err instanceof Error ? err.message : "Failed to load audio"); + setError(err instanceof Error ? err.message : "Failed to load podcast"); } } finally { setIsLoading(false); } }, [podcastId]); - // Load audio when component mounts + // Load podcast when component mounts useEffect(() => { - loadAudio(); - }, [loadAudio]); + loadPodcast(); + }, [loadPodcast]); if (isLoading) { return ; @@ -204,6 +221,24 @@ function PodcastPlayer({ durationMs={durationMs} className="w-full" /> + {/* Transcript section */} + {transcript && transcript.length > 0 && ( +
+ + View transcript ({transcript.length} entries) + +
+ {transcript.map((entry, idx) => ( +
+ + Speaker {entry.speaker_id + 1}: + {" "} + {entry.dialog} +
+ ))} +
+
+ )}
); } @@ -219,7 +254,6 @@ function PodcastTaskPoller({ title: string; }) { const [taskStatus, setTaskStatus] = useState({ status: "processing" }); - const [pollCount, setPollCount] = useState(0); const pollingRef = useRef(null); // Set active podcast state when this component mounts @@ -253,9 +287,8 @@ function PodcastTaskPoller({ } } catch (err) { console.error("Error polling task status:", err); - // Don't stop polling on network errors, just increment count + // Don't stop polling on network errors, continue polling } - setPollCount((prev) => prev + 1); }; // Initial poll diff --git a/surfsense_web/lib/apis/podcasts-api.service.ts b/surfsense_web/lib/apis/podcasts-api.service.ts index 346c984af..0defff7d4 100644 --- a/surfsense_web/lib/apis/podcasts-api.service.ts +++ b/surfsense_web/lib/apis/podcasts-api.service.ts @@ -62,6 +62,13 @@ class PodcastsApiService { ); }; + /** + * Get a podcast by its ID (includes full transcript) + */ + getPodcastById = async (podcastId: number) => { + return baseApiService.get(`/api/v1/podcasts/${podcastId}`, podcast); + }; + generatePodcast = async (request: GeneratePodcastRequest) => { // Validate the request const parsedRequest = generatePodcastRequest.safeParse(request); From bc189a53cf5627954b7d59e1d84e3075c37aebcc Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 21 Dec 2025 20:37:30 +0530 Subject: [PATCH 5/5] refactor: replace TranscriptEntry interface with PodcastTranscriptEntry type for improved type safety in podcast player --- .../components/tool-ui/generate-podcast.tsx | 15 ++++----------- surfsense_web/contracts/types/podcast.types.ts | 8 +++++++- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/surfsense_web/components/tool-ui/generate-podcast.tsx b/surfsense_web/components/tool-ui/generate-podcast.tsx index e11273e41..459307cb6 100644 --- a/surfsense_web/components/tool-ui/generate-podcast.tsx +++ b/surfsense_web/components/tool-ui/generate-podcast.tsx @@ -10,6 +10,7 @@ import { clearActivePodcastTaskId, setActivePodcastTaskId, } from "@/lib/chat/podcast-state"; +import type { PodcastTranscriptEntry } from "@/contracts/types/podcast.types"; /** * Type definitions for the generate_podcast tool @@ -51,7 +52,7 @@ function PodcastGeneratingState({ title }: { title: string }) {
{/* Animated rings */} -
+

{title}

@@ -112,14 +113,6 @@ function AudioLoadingState({ title }: { title: string }) { ); } -/** - * Transcript entry type from the database - */ -interface TranscriptEntry { - speaker_id: number; - dialog: string; -} - /** * Podcast Player Component - Fetches audio and transcript with authentication */ @@ -135,7 +128,7 @@ function PodcastPlayer({ durationMs?: number; }) { const [audioSrc, setAudioSrc] = useState(null); - const [transcript, setTranscript] = useState(null); + const [transcript, setTranscript] = useState(null); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); const objectUrlRef = useRef(null); @@ -181,7 +174,7 @@ function PodcastPlayer({ // Set transcript from podcast details if (podcastDetails?.podcast_transcript) { - setTranscript(podcastDetails.podcast_transcript as TranscriptEntry[]); + setTranscript(podcastDetails.podcast_transcript); } } finally { clearTimeout(timeoutId); diff --git a/surfsense_web/contracts/types/podcast.types.ts b/surfsense_web/contracts/types/podcast.types.ts index 7bc5faece..dc7c35ba4 100644 --- a/surfsense_web/contracts/types/podcast.types.ts +++ b/surfsense_web/contracts/types/podcast.types.ts @@ -1,12 +1,17 @@ import { z } from "zod"; import { paginationQueryParams } from "."; +export const podcastTranscriptEntry = z.object({ + speaker_id: z.number(), + dialog: z.string(), +}); + export const podcast = z.object({ id: z.number(), title: z.string(), created_at: z.string(), file_location: z.string(), - podcast_transcript: z.array(z.any()), + podcast_transcript: z.array(podcastTranscriptEntry), search_space_id: z.number(), chat_state_version: z.number().nullable(), }); @@ -41,6 +46,7 @@ export const getPodcastsRequest = z.object({ queryParams: paginationQueryParams.nullish(), }); +export type PodcastTranscriptEntry = z.infer; export type GeneratePodcastRequest = z.infer; export type GetPodcastByChatIdRequest = z.infer; export type GetPodcastByChatIdResponse = z.infer;