diff --git a/surfsense_backend/app/routes/public_chat_routes.py b/surfsense_backend/app/routes/public_chat_routes.py index 916a53249..ca70e911a 100644 --- a/surfsense_backend/app/routes/public_chat_routes.py +++ b/surfsense_backend/app/routes/public_chat_routes.py @@ -28,13 +28,13 @@ async def read_public_chat( Get a public chat by share token. No authentication required. - Returns sanitized content (citations stripped, non-UI tools removed). + Returns sanitized content (citations stripped). """ return await get_public_chat(session, share_token) @router.post("/{share_token}/clone", response_model=CloneInitiatedResponse) -async def clone_public_chat( +async def clone_public_chat_endpoint( share_token: str, session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), @@ -45,19 +45,20 @@ async def clone_public_chat( Requires authentication. Initiates a background job to copy the chat. """ + from app.tasks.celery_tasks.clone_chat_tasks import clone_public_chat_task + thread = await get_thread_by_share_token(session, share_token) if not thread: raise HTTPException(status_code=404, detail="Not found") - # TODO: Implement Celery task for cloning - # For now, return a placeholder response - # The actual implementation will: - # 1. Get user's default search space - # 2. Queue Celery task to clone thread, messages, and podcasts - # 3. Create notification on completion - - raise HTTPException( - status_code=501, - detail="Clone functionality not yet implemented", + task_result = clone_public_chat_task.delay( + share_token=share_token, + user_id=str(user.id), + ) + + return CloneInitiatedResponse( + status="processing", + task_id=task_result.id, + message="Copying chat to your account...", ) diff --git a/surfsense_backend/app/tasks/celery_tasks/clone_chat_tasks.py b/surfsense_backend/app/tasks/celery_tasks/clone_chat_tasks.py new file mode 100644 index 000000000..b846ee555 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/clone_chat_tasks.py @@ -0,0 +1,66 @@ +"""Celery tasks for cloning public chats.""" + +import asyncio +import logging + +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config + +logger = logging.getLogger(__name__) + + +def get_celery_session_maker(): + """Create a new async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="clone_public_chat", bind=True) +def clone_public_chat_task( + self, + share_token: str, + user_id: str, +) -> dict: + """ + Celery task to clone a public chat to user's account. + + Args: + share_token: Public share token of the chat to clone + user_id: UUID string of the user cloning the chat + + Returns: + dict with status and thread_id on success, or error info on failure + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete(_run_clone(share_token, user_id)) + return result + except Exception as e: + logger.error(f"Error cloning public chat: {e!s}") + return {"status": "error", "error": str(e)} + finally: + asyncio.set_event_loop(None) + loop.close() + + +async def _run_clone(share_token: str, user_id: str) -> dict: + """Run the clone operation with a fresh database session.""" + from uuid import UUID + + from app.services.public_chat_service import clone_public_chat + + async with get_celery_session_maker()() as session: + return await clone_public_chat( + session=session, + share_token=share_token, + user_id=UUID(user_id), + )