feat: added celery and removed background_tasks for MQ's

- removed pre commit hooks
- updated docker setup
- updated github docker actions
- updated docs
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-10-20 00:30:00 -07:00
parent 031dc055da
commit c80bbfa867
27 changed files with 1664 additions and 1038 deletions

View file

@ -0,0 +1 @@
"""Celery tasks package."""

View file

@ -0,0 +1,589 @@
"""Celery tasks for connector indexing."""
import logging
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
This is necessary because Celery tasks run in a new event loop,
and the default session maker is bound to the main app's event loop.
"""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool, # Don't use connection pooling for Celery tasks
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="index_slack_messages", bind=True)
def index_slack_messages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Slack messages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_slack_messages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_slack_messages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Slack messages with new session."""
from app.routes.search_source_connectors_routes import (
run_slack_indexing,
)
async with get_celery_session_maker()() as session:
await run_slack_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_notion_pages", bind=True)
def index_notion_pages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Notion pages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_notion_pages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_notion_pages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Notion pages with new session."""
from app.routes.search_source_connectors_routes import (
run_notion_indexing,
)
async with get_celery_session_maker()() as session:
await run_notion_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_github_repos", bind=True)
def index_github_repos_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index GitHub repositories."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_github_repos(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_github_repos(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index GitHub repositories with new session."""
from app.routes.search_source_connectors_routes import (
run_github_indexing,
)
async with get_celery_session_maker()() as session:
await run_github_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_linear_issues", bind=True)
def index_linear_issues_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Linear issues."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_linear_issues(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_linear_issues(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Linear issues with new session."""
from app.routes.search_source_connectors_routes import (
run_linear_indexing,
)
async with get_celery_session_maker()() as session:
await run_linear_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_jira_issues", bind=True)
def index_jira_issues_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Jira issues."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_jira_issues(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_jira_issues(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Jira issues with new session."""
from app.routes.search_source_connectors_routes import (
run_jira_indexing,
)
async with get_celery_session_maker()() as session:
await run_jira_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_confluence_pages", bind=True)
def index_confluence_pages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Confluence pages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_confluence_pages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_confluence_pages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Confluence pages with new session."""
from app.routes.search_source_connectors_routes import (
run_confluence_indexing,
)
async with get_celery_session_maker()() as session:
await run_confluence_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_clickup_tasks", bind=True)
def index_clickup_tasks_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index ClickUp tasks."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_clickup_tasks(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_clickup_tasks(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index ClickUp tasks with new session."""
from app.routes.search_source_connectors_routes import (
run_clickup_indexing,
)
async with get_celery_session_maker()() as session:
await run_clickup_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_google_calendar_events", bind=True)
def index_google_calendar_events_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Google Calendar events."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_google_calendar_events(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_google_calendar_events(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Google Calendar events with new session."""
from app.routes.search_source_connectors_routes import (
run_google_calendar_indexing,
)
async with get_celery_session_maker()() as session:
await run_google_calendar_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_airtable_records", bind=True)
def index_airtable_records_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Airtable records."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_airtable_records(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_airtable_records(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Airtable records with new session."""
from app.routes.search_source_connectors_routes import (
run_airtable_indexing,
)
async with get_celery_session_maker()() as session:
await run_airtable_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_google_gmail_messages", bind=True)
def index_google_gmail_messages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Google Gmail messages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_google_gmail_messages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_google_gmail_messages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Google Gmail messages with new session."""
from app.routes.search_source_connectors_routes import (
run_google_gmail_indexing,
)
# Parse dates to get max_messages and days_back
# For now, we'll use default values
max_messages = 100
days_back = 30
async with get_celery_session_maker()() as session:
await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back
)
@celery_app.task(name="index_discord_messages", bind=True)
def index_discord_messages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Discord messages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_discord_messages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_discord_messages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Discord messages with new session."""
from app.routes.search_source_connectors_routes import (
run_discord_indexing,
)
async with get_celery_session_maker()() as session:
await run_discord_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_luma_events", bind=True)
def index_luma_events_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Luma events."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_luma_events(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_luma_events(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Luma events with new session."""
from app.routes.search_source_connectors_routes import (
run_luma_indexing,
)
async with get_celery_session_maker()() as session:
await run_luma_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_elasticsearch_documents", bind=True)
def index_elasticsearch_documents_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Elasticsearch documents."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_elasticsearch_documents(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_elasticsearch_documents(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Elasticsearch documents with new session."""
from app.routes.search_source_connectors_routes import (
run_elasticsearch_indexing,
)
async with get_celery_session_maker()() as session:
await run_elasticsearch_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -0,0 +1,318 @@
"""Celery tasks for document processing."""
import logging
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_crawled_url_document,
add_extension_received_document,
add_youtube_video_document,
)
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
This is necessary because Celery tasks run in a new event loop,
and the default session maker is bound to the main app's event loop.
"""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool, # Don't use connection pooling for Celery tasks
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="process_extension_document", bind=True)
def process_extension_document_task(
self, individual_document_dict, search_space_id: int, user_id: str
):
"""
Celery task to process extension document.
Args:
individual_document_dict: Document data as dictionary
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
# Create a new event loop for this task
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_process_extension_document(
individual_document_dict, search_space_id, user_id
)
)
finally:
loop.close()
async def _process_extension_document(
individual_document_dict, search_space_id: int, user_id: str
):
"""Process extension document with new session."""
from pydantic import BaseModel
# Reconstruct the document object from dict
# You'll need to define the proper model for this
class DocumentMetadata(BaseModel):
VisitedWebPageTitle: str
VisitedWebPageURL: str
class IndividualDocument(BaseModel):
metadata: DocumentMetadata
content: str
individual_document = IndividualDocument(**individual_document_dict)
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
metadata={
"document_type": "EXTENSION",
"url": individual_document.metadata.VisitedWebPageURL,
"title": individual_document.metadata.VisitedWebPageTitle,
"user_id": user_id,
},
)
try:
result = await add_extension_received_document(
session, individual_document, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash},
)
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Error processing extension document: {e!s}")
raise
@celery_app.task(name="process_crawled_url", bind=True)
def process_crawled_url_task(self, url: str, search_space_id: int, user_id: str):
"""
Celery task to process crawled URL.
Args:
url: URL to crawl and process
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_process_crawled_url(url, search_space_id, user_id))
finally:
loop.close()
async def _process_crawled_url(url: str, search_space_id: int, user_id: str):
"""Process crawled URL with new session."""
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_crawled_url",
source="document_processor",
message=f"Starting URL crawling and processing for: {url}",
metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id},
)
try:
result = await add_crawled_url_document(
session, url, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{
"document_id": result.id,
"title": result.title,
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"URL document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Error processing crawled URL: {e!s}")
raise
@celery_app.task(name="process_youtube_video", bind=True)
def process_youtube_video_task(self, url: str, search_space_id: int, user_id: str):
"""
Celery task to process YouTube video.
Args:
url: YouTube video URL
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_process_youtube_video(url, search_space_id, user_id))
finally:
loop.close()
async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
"""Process YouTube video with new session."""
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
message=f"Starting YouTube video processing for: {url}",
metadata={"document_type": "YOUTUBE_VIDEO", "url": url, "user_id": user_id},
)
try:
result = await add_youtube_video_document(
session, url, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed YouTube video: {result.title}",
{
"document_id": result.id,
"video_id": result.document_metadata.get("video_id"),
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process YouTube video: {url}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Error processing YouTube video: {e!s}")
raise
@celery_app.task(name="process_file_upload", bind=True)
def process_file_upload_task(
self, file_path: str, filename: str, search_space_id: int, user_id: str
):
"""
Celery task to process uploaded file.
Args:
file_path: Path to the uploaded file
filename: Original filename
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_process_file_upload(file_path, filename, search_space_id, user_id)
)
finally:
loop.close()
async def _process_file_upload(
file_path: str, filename: str, search_space_id: int, user_id: str
):
"""Process file upload with new session."""
from app.routes.documents_routes import process_file_in_background
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
message=f"Starting file processing for: {filename}",
metadata={
"document_type": "FILE",
"filename": filename,
"file_path": file_path,
"user_id": user_id,
},
)
try:
await process_file_in_background(
file_path,
filename,
search_space_id,
user_id,
session,
task_logger,
log_entry,
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process file: {filename}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Error processing file: {e!s}")
raise

View file

@ -0,0 +1,66 @@
"""Celery tasks for podcast generation."""
import logging
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.tasks.podcast_tasks import generate_chat_podcast
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
This is necessary because Celery tasks run in a new event loop,
and the default session maker is bound to the main app's event loop.
"""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool, # Don't use connection pooling for Celery tasks
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="generate_chat_podcast", bind=True)
def generate_chat_podcast_task(
self, chat_id: int, search_space_id: int, podcast_title: str, user_id: int
):
"""
Celery task to generate podcast from chat.
Args:
chat_id: ID of the chat to generate podcast from
search_space_id: ID of the search space
podcast_title: Title for the podcast
user_id: ID of the user
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_generate_chat_podcast(chat_id, search_space_id, podcast_title, user_id)
)
finally:
loop.close()
async def _generate_chat_podcast(
chat_id: int, search_space_id: int, podcast_title: str, user_id: int
):
"""Generate chat podcast with new session."""
async with get_celery_session_maker()() as session:
try:
await generate_chat_podcast(
session, chat_id, search_space_id, podcast_title, user_id
)
except Exception as e:
logger.error(f"Error generating podcast from chat: {e!s}")
raise