feat: added circleback connector

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-12-30 09:00:59 -08:00
parent 23870042f3
commit c19d300c9d
27 changed files with 1153 additions and 97 deletions

View file

@ -268,3 +268,105 @@ async def _process_file_upload(
)
logger.error(error_message)
raise
@celery_app.task(name="process_circleback_meeting", bind=True)
def process_circleback_meeting_task(
self,
meeting_id: int,
meeting_name: str,
markdown_content: str,
metadata: dict,
search_space_id: int,
):
"""
Celery task to process Circleback meeting webhook data.
Args:
meeting_id: Circleback meeting ID
meeting_name: Name of the meeting
markdown_content: Meeting content formatted as markdown
metadata: Meeting metadata dictionary
search_space_id: ID of the search space
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_process_circleback_meeting(
meeting_id,
meeting_name,
markdown_content,
metadata,
search_space_id,
)
)
finally:
loop.close()
async def _process_circleback_meeting(
meeting_id: int,
meeting_name: str,
markdown_content: str,
metadata: dict,
search_space_id: int,
):
"""Process Circleback meeting with new session."""
from app.tasks.document_processors.circleback_processor import (
add_circleback_meeting_document,
)
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_circleback_meeting",
source="circleback_webhook",
message=f"Starting Circleback meeting processing: {meeting_name}",
metadata={
"document_type": "CIRCLEBACK",
"meeting_id": meeting_id,
"meeting_name": meeting_name,
**metadata,
},
)
try:
result = await add_circleback_meeting_document(
session=session,
meeting_id=meeting_id,
meeting_name=meeting_name,
markdown_content=markdown_content,
metadata=metadata,
search_space_id=search_space_id,
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully processed Circleback meeting: {meeting_name}",
{
"document_id": result.id,
"meeting_id": meeting_id,
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Circleback meeting document already exists (duplicate): {meeting_name}",
{"duplicate_detected": True, "meeting_id": meeting_id},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to process Circleback meeting: {meeting_name}",
str(e),
{"error_type": type(e).__name__, "meeting_id": meeting_id},
)
logger.error(f"Error processing Circleback meeting: {e!s}")
raise

View file

@ -34,8 +34,8 @@ from .discord_indexer import index_discord_messages
from .elasticsearch_indexer import index_elasticsearch_documents
from .github_indexer import index_github_repos
from .google_calendar_indexer import index_google_calendar_events
from .google_gmail_indexer import index_google_gmail_messages
from .google_drive_indexer import index_google_drive_files
from .google_gmail_indexer import index_google_gmail_messages
from .jira_indexer import index_jira_issues
# Issue tracking and project management

View file

@ -1,7 +1,6 @@
"""Google Drive indexer using Surfsense file processors."""
import logging
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -99,11 +98,15 @@ async def index_google_drive_files(
target_folder_id = folder_id
target_folder_name = folder_name or "Selected Folder"
logger.info(f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})")
logger.info(
f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})"
)
folder_tokens = connector.config.get("folder_tokens", {})
start_page_token = folder_tokens.get(target_folder_id)
can_use_delta_sync = use_delta_sync and start_page_token and connector.last_indexed_at
can_use_delta_sync = (
use_delta_sync and start_page_token and connector.last_indexed_at
)
if can_use_delta_sync:
logger.info(f"Using delta sync for connector {connector_id}")
@ -151,9 +154,7 @@ async def index_google_drive_files(
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
logger.info(
f"Successfully committed Google Drive indexing changes to database"
)
logger.info("Successfully committed Google Drive indexing changes to database")
await task_logger.log_task_success(
log_entry,
@ -252,7 +253,9 @@ async def _index_full_scan(
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} files indexed so far")
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
page_token = next_token
if not page_token:
@ -391,9 +394,7 @@ async def _process_single_file(
return 0, 1
async def _remove_document(
session: AsyncSession, file_id: str, search_space_id: int
):
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):
"""Remove a document that was deleted in Drive."""
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
@ -406,5 +407,3 @@ async def _remove_document(
if existing_document:
await session.delete(existing_document)
logger.info(f"Removed deleted file document: {file_id}")

View file

@ -0,0 +1,183 @@
"""
Circleback meeting document processor.
This module processes meeting data received from Circleback webhooks
and stores it as searchable documents in the database.
"""
import logging
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.services.llm_service import get_document_summary_llm
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
get_current_timestamp,
)
logger = logging.getLogger(__name__)
async def add_circleback_meeting_document(
session: AsyncSession,
meeting_id: int,
meeting_name: str,
markdown_content: str,
metadata: dict[str, Any],
search_space_id: int,
) -> Document | None:
"""
Process and store a Circleback meeting document.
Args:
session: Database session
meeting_id: Circleback meeting ID
meeting_name: Name of the meeting
markdown_content: Meeting content formatted as markdown
metadata: Meeting metadata dictionary
search_space_id: ID of the search space
Returns:
Document object if successful, None if failed or duplicate
"""
try:
# Generate unique identifier hash using Circleback meeting ID
unique_identifier = f"circleback_{meeting_id}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CIRCLEBACK, unique_identifier, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(markdown_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Circleback meeting {meeting_id}. Updating document."
)
# Get LLM for generating summary
llm = await get_document_summary_llm(session, search_space_id)
if not llm:
logger.warning(
f"No LLM configured for search space {search_space_id}. Using content as summary."
)
# Use first 1000 chars as summary if no LLM available
summary_content = (
markdown_content[:1000] + "..."
if len(markdown_content) > 1000
else markdown_content
)
summary_embedding = None
else:
# Generate summary with metadata
document_metadata = {
"meeting_name": meeting_name,
"meeting_id": meeting_id,
"document_type": "Circleback Meeting",
**{
k: v
for k, v in metadata.items()
if isinstance(v, str | int | float | bool)
},
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, llm, document_metadata
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Convert to BlockNote JSON for editing capability
from app.utils.blocknote_converter import convert_markdown_to_blocknote
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
if not blocknote_json:
logger.warning(
f"Failed to convert Circleback meeting {meeting_id} to BlockNote JSON, document will not be editable"
)
# Prepare document metadata
document_metadata = {
"CIRCLEBACK_MEETING_ID": meeting_id,
"MEETING_NAME": meeting_name,
"SOURCE": "CIRCLEBACK_WEBHOOK",
**metadata,
}
# Update or create document
if existing_document:
# Update existing document
existing_document.title = meeting_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
if summary_embedding is not None:
existing_document.embedding = summary_embedding
existing_document.document_metadata = document_metadata
existing_document.chunks = chunks
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
await session.commit()
await session.refresh(existing_document)
document = existing_document
logger.info(
f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}"
)
else:
# Create new document
document = Document(
search_space_id=search_space_id,
title=meeting_name,
document_type=DocumentType.CIRCLEBACK,
document_metadata=document_metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
)
session.add(document)
await session.commit()
await session.refresh(document)
logger.info(
f"Created new Circleback meeting document {meeting_id} in search space {search_space_id}"
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
logger.error(
f"Database error processing Circleback meeting {meeting_id}: {db_error}"
)
raise db_error
except Exception as e:
await session.rollback()
logger.error(f"Failed to process Circleback meeting {meeting_id}: {e!s}")
raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e

View file

@ -473,7 +473,8 @@ async def process_file_in_background(
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
connector: dict | None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}}
connector: dict
| None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}}
):
try:
# Check if the file is a markdown or text file
@ -926,7 +927,9 @@ async def process_file_in_background(
)
if connector:
await _update_document_from_connector(last_created_doc, connector, session)
await _update_document_from_connector(
last_created_doc, connector, session
)
await task_logger.log_task_success(
log_entry,
@ -1053,7 +1056,9 @@ async def process_file_in_background(
)
if connector:
await _update_document_from_connector(doc_result, connector, session)
await _update_document_from_connector(
doc_result, connector, session
)
await task_logger.log_task_success(
log_entry,