mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-11 00:32:38 +02:00
merge: upstream/dev with migration renumbering
This commit is contained in:
commit
a7145b2c63
176 changed files with 8791 additions and 3608 deletions
|
|
@ -810,8 +810,8 @@ def index_composio_connector_task(
|
|||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
start_date: str | None,
|
||||
end_date: str | None,
|
||||
):
|
||||
"""Celery task to index Composio connector content (Google Drive, Gmail, Calendar via Composio)."""
|
||||
import asyncio
|
||||
|
|
@ -833,14 +833,16 @@ async def _index_composio_connector(
|
|||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
start_date: str | None,
|
||||
end_date: str | None,
|
||||
):
|
||||
"""Index Composio connector content with new session."""
|
||||
# Import from tasks folder (not connector_indexers) to avoid circular import
|
||||
from app.tasks.composio_indexer import index_composio_connector
|
||||
"""Index Composio connector content with new session and real-time notifications."""
|
||||
# Import from routes to use the notification-wrapped version
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_composio_indexing,
|
||||
)
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
await index_composio_connector(
|
||||
await run_composio_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ async def _check_and_trigger_schedules():
|
|||
from app.tasks.celery_tasks.connector_tasks import (
|
||||
index_airtable_records_task,
|
||||
index_clickup_tasks_task,
|
||||
index_composio_connector_task,
|
||||
index_confluence_pages_task,
|
||||
index_crawled_urls_task,
|
||||
index_discord_messages_task,
|
||||
|
|
@ -98,6 +99,10 @@ async def _check_and_trigger_schedules():
|
|||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
|
||||
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
|
||||
# Composio connector types
|
||||
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_composio_connector_task,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_composio_connector_task,
|
||||
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_composio_connector_task,
|
||||
}
|
||||
|
||||
# Trigger indexing for each due connector
|
||||
|
|
|
|||
|
|
@ -54,21 +54,68 @@ def format_attachments_as_context(attachments: list[ChatAttachment]) -> str:
|
|||
|
||||
|
||||
def format_mentioned_documents_as_context(documents: list[Document]) -> str:
|
||||
"""Format mentioned documents as context for the agent."""
|
||||
"""
|
||||
Format mentioned documents as context for the agent.
|
||||
|
||||
Uses the same XML structure as knowledge_base.format_documents_for_context
|
||||
to ensure citations work properly with chunk IDs.
|
||||
"""
|
||||
if not documents:
|
||||
return ""
|
||||
|
||||
context_parts = ["<mentioned_documents>"]
|
||||
context_parts.append(
|
||||
"The user has explicitly mentioned the following documents from their knowledge base. "
|
||||
"These documents are directly relevant to the query and should be prioritized as primary sources."
|
||||
"These documents are directly relevant to the query and should be prioritized as primary sources. "
|
||||
"Use [citation:CHUNK_ID] format for citations (e.g., [citation:123])."
|
||||
)
|
||||
for i, doc in enumerate(documents, 1):
|
||||
context_parts.append(
|
||||
f"<document index='{i}' id='{doc.id}' title='{doc.title}' type='{doc.document_type.value}'>"
|
||||
context_parts.append("")
|
||||
|
||||
for doc in documents:
|
||||
# Build metadata JSON
|
||||
metadata = doc.document_metadata or {}
|
||||
metadata_json = json.dumps(metadata, ensure_ascii=False)
|
||||
|
||||
# Get URL from metadata
|
||||
url = (
|
||||
metadata.get("url")
|
||||
or metadata.get("source")
|
||||
or metadata.get("page_url")
|
||||
or ""
|
||||
)
|
||||
context_parts.append(f"<![CDATA[{doc.content}]]>")
|
||||
|
||||
context_parts.append("<document>")
|
||||
context_parts.append("<document_metadata>")
|
||||
context_parts.append(f" <document_id>{doc.id}</document_id>")
|
||||
context_parts.append(
|
||||
f" <document_type>{doc.document_type.value}</document_type>"
|
||||
)
|
||||
context_parts.append(f" <title><![CDATA[{doc.title}]]></title>")
|
||||
context_parts.append(f" <url><![CDATA[{url}]]></url>")
|
||||
context_parts.append(
|
||||
f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>"
|
||||
)
|
||||
context_parts.append("</document_metadata>")
|
||||
context_parts.append("")
|
||||
context_parts.append("<document_content>")
|
||||
|
||||
# Use chunks if available (preferred for proper citations)
|
||||
if hasattr(doc, "chunks") and doc.chunks:
|
||||
for chunk in doc.chunks:
|
||||
context_parts.append(
|
||||
f" <chunk id='{chunk.id}'><![CDATA[{chunk.content}]]></chunk>"
|
||||
)
|
||||
else:
|
||||
# Fallback to document content if chunks not loaded
|
||||
# Use document ID as chunk ID prefix for consistency
|
||||
context_parts.append(
|
||||
f" <chunk id='{doc.id}'><![CDATA[{doc.content}]]></chunk>"
|
||||
)
|
||||
|
||||
context_parts.append("</document_content>")
|
||||
context_parts.append("</document>")
|
||||
context_parts.append("")
|
||||
|
||||
context_parts.append("</mentioned_documents>")
|
||||
|
||||
return "\n".join(context_parts)
|
||||
|
|
@ -81,8 +128,6 @@ def format_mentioned_surfsense_docs_as_context(
|
|||
if not documents:
|
||||
return ""
|
||||
|
||||
import json
|
||||
|
||||
context_parts = ["<mentioned_surfsense_docs>"]
|
||||
context_parts.append(
|
||||
"The user has explicitly mentioned the following SurfSense documentation pages. "
|
||||
|
|
@ -263,11 +308,15 @@ async def stream_new_chat(
|
|||
# Build input with message history from frontend
|
||||
langchain_messages = []
|
||||
|
||||
# Fetch mentioned documents if any
|
||||
# Fetch mentioned documents if any (with chunks for proper citations)
|
||||
mentioned_documents: list[Document] = []
|
||||
if mentioned_document_ids:
|
||||
from sqlalchemy.orm import selectinload as doc_selectinload
|
||||
|
||||
result = await session.execute(
|
||||
select(Document).filter(
|
||||
select(Document)
|
||||
.options(doc_selectinload(Document.chunks))
|
||||
.filter(
|
||||
Document.id.in_(mentioned_document_ids),
|
||||
Document.search_space_id == search_space_id,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,83 +2,76 @@
|
|||
Composio connector indexer.
|
||||
|
||||
Routes indexing requests to toolkit-specific handlers (Google Drive, Gmail, Calendar).
|
||||
Uses a registry pattern for clean, extensible connector routing.
|
||||
|
||||
Note: This module is intentionally placed in app/tasks/ (not in connector_indexers/)
|
||||
to avoid circular import issues with the connector_indexers package.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from importlib import import_module
|
||||
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.composio_connector import ComposioConnector
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
)
|
||||
from app.services.composio_service import INDEXABLE_TOOLKITS
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_INDEXER
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
|
||||
# Set up logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============ Utility functions (copied from connector_indexers.base to avoid circular imports) ============
|
||||
# Valid Composio connector types
|
||||
COMPOSIO_CONNECTOR_TYPES = {
|
||||
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
|
||||
}
|
||||
|
||||
|
||||
def get_current_timestamp() -> datetime:
|
||||
"""Get the current timestamp with timezone for updated_at field."""
|
||||
return datetime.now(UTC)
|
||||
|
||||
|
||||
async def check_document_by_unique_identifier(
|
||||
session: AsyncSession, unique_identifier_hash: str
|
||||
) -> Document | None:
|
||||
"""Check if a document with the given unique identifier hash already exists."""
|
||||
existing_doc_result = await session.execute(
|
||||
select(Document)
|
||||
.options(selectinload(Document.chunks))
|
||||
.where(Document.unique_identifier_hash == unique_identifier_hash)
|
||||
)
|
||||
return existing_doc_result.scalars().first()
|
||||
# ============ Utility functions ============
|
||||
|
||||
|
||||
async def get_connector_by_id(
|
||||
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
connector_type: SearchSourceConnectorType | None,
|
||||
) -> SearchSourceConnector | None:
|
||||
"""Get a connector by ID and type from the database."""
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.connector_type == connector_type,
|
||||
)
|
||||
"""Get a connector by ID and optionally by type from the database."""
|
||||
query = select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == connector_id
|
||||
)
|
||||
if connector_type is not None:
|
||||
query = query.filter(SearchSourceConnector.connector_type == connector_type)
|
||||
result = await session.execute(query)
|
||||
return result.scalars().first()
|
||||
|
||||
|
||||
async def update_connector_last_indexed(
|
||||
session: AsyncSession,
|
||||
connector: SearchSourceConnector,
|
||||
update_last_indexed: bool = True,
|
||||
) -> None:
|
||||
"""Update the last_indexed_at timestamp for a connector."""
|
||||
if update_last_indexed:
|
||||
connector.last_indexed_at = datetime.now()
|
||||
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
|
||||
def get_indexer_function(toolkit_id: str):
|
||||
"""
|
||||
Dynamically import and return the indexer function for a toolkit.
|
||||
|
||||
Args:
|
||||
toolkit_id: The toolkit ID (e.g., "googledrive", "gmail")
|
||||
|
||||
Returns:
|
||||
Tuple of (indexer_function, supports_date_filter)
|
||||
|
||||
Raises:
|
||||
ValueError: If toolkit not found in registry
|
||||
"""
|
||||
if toolkit_id not in TOOLKIT_TO_INDEXER:
|
||||
raise ValueError(f"No indexer registered for toolkit: {toolkit_id}")
|
||||
|
||||
module_path, function_name, supports_date_filter = TOOLKIT_TO_INDEXER[toolkit_id]
|
||||
module = import_module(module_path)
|
||||
indexer_func = getattr(module, function_name)
|
||||
return indexer_func, supports_date_filter
|
||||
|
||||
|
||||
# ============ Main indexer function ============
|
||||
|
|
@ -98,6 +91,7 @@ async def index_composio_connector(
|
|||
Index content from a Composio connector.
|
||||
|
||||
Routes to toolkit-specific indexing based on the connector's toolkit_id.
|
||||
Uses a registry pattern for clean, extensible connector routing.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
|
|
@ -129,10 +123,16 @@ async def index_composio_connector(
|
|||
)
|
||||
|
||||
try:
|
||||
# Get connector by id
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.COMPOSIO_CONNECTOR
|
||||
)
|
||||
# Get connector by id - accept any Composio connector type
|
||||
connector = await get_connector_by_id(session, connector_id, None)
|
||||
|
||||
# Validate it's a Composio connector
|
||||
if connector and connector.connector_type not in COMPOSIO_CONNECTOR_TYPES:
|
||||
error_msg = f"Connector {connector_id} is not a Composio connector"
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, {"error_type": "InvalidConnectorType"}
|
||||
)
|
||||
return 0, error_msg
|
||||
|
||||
if not connector:
|
||||
error_msg = f"Composio connector with ID {connector_id} not found"
|
||||
|
|
@ -160,53 +160,35 @@ async def index_composio_connector(
|
|||
)
|
||||
return 0, error_msg
|
||||
|
||||
# Route to toolkit-specific indexer
|
||||
if toolkit_id == "googledrive":
|
||||
return await _index_composio_google_drive(
|
||||
session=session,
|
||||
connector=connector,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
update_last_indexed=update_last_indexed,
|
||||
max_items=max_items,
|
||||
)
|
||||
elif toolkit_id == "gmail":
|
||||
return await _index_composio_gmail(
|
||||
session=session,
|
||||
connector=connector,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
update_last_indexed=update_last_indexed,
|
||||
max_items=max_items,
|
||||
)
|
||||
elif toolkit_id == "googlecalendar":
|
||||
return await _index_composio_google_calendar(
|
||||
session=session,
|
||||
connector=connector,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
update_last_indexed=update_last_indexed,
|
||||
max_items=max_items,
|
||||
)
|
||||
else:
|
||||
error_msg = f"No indexer implemented for toolkit: {toolkit_id}"
|
||||
# Get indexer function from registry
|
||||
try:
|
||||
indexer_func, supports_date_filter = get_indexer_function(toolkit_id)
|
||||
except ValueError as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, {"error_type": "NoIndexerImplemented"}
|
||||
log_entry, str(e), {"error_type": "NoIndexerImplemented"}
|
||||
)
|
||||
return 0, error_msg
|
||||
return 0, str(e)
|
||||
|
||||
# Build kwargs for the indexer function
|
||||
kwargs = {
|
||||
"session": session,
|
||||
"connector": connector,
|
||||
"connector_id": connector_id,
|
||||
"search_space_id": search_space_id,
|
||||
"user_id": user_id,
|
||||
"task_logger": task_logger,
|
||||
"log_entry": log_entry,
|
||||
"update_last_indexed": update_last_indexed,
|
||||
"max_items": max_items,
|
||||
}
|
||||
|
||||
# Add date params for toolkits that support them
|
||||
if supports_date_filter:
|
||||
kwargs["start_date"] = start_date
|
||||
kwargs["end_date"] = end_date
|
||||
|
||||
# Call the toolkit-specific indexer
|
||||
return await indexer_func(**kwargs)
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
|
|
@ -228,714 +210,3 @@ async def index_composio_connector(
|
|||
)
|
||||
logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index Composio connector: {e!s}"
|
||||
|
||||
|
||||
async def _index_composio_google_drive(
|
||||
session: AsyncSession,
|
||||
connector,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
task_logger: TaskLoggingService,
|
||||
log_entry,
|
||||
update_last_indexed: bool = True,
|
||||
max_items: int = 1000,
|
||||
) -> tuple[int, str]:
|
||||
"""Index Google Drive files via Composio."""
|
||||
try:
|
||||
composio_connector = ComposioConnector(session, connector_id)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching Google Drive files via Composio for connector {connector_id}",
|
||||
{"stage": "fetching_files"},
|
||||
)
|
||||
|
||||
# Fetch files
|
||||
all_files = []
|
||||
page_token = None
|
||||
|
||||
while len(all_files) < max_items:
|
||||
files, next_token, error = await composio_connector.list_drive_files(
|
||||
page_token=page_token,
|
||||
page_size=min(100, max_items - len(all_files)),
|
||||
)
|
||||
|
||||
if error:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to fetch Drive files: {error}", {}
|
||||
)
|
||||
return 0, f"Failed to fetch Drive files: {error}"
|
||||
|
||||
all_files.extend(files)
|
||||
|
||||
if not next_token:
|
||||
break
|
||||
page_token = next_token
|
||||
|
||||
if not all_files:
|
||||
success_msg = "No Google Drive files found"
|
||||
await task_logger.log_task_success(
|
||||
log_entry, success_msg, {"files_count": 0}
|
||||
)
|
||||
return 0, success_msg
|
||||
|
||||
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio")
|
||||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
|
||||
for file_info in all_files:
|
||||
try:
|
||||
# Handle both standard Google API and potential Composio variations
|
||||
file_id = file_info.get("id", "") or file_info.get("fileId", "")
|
||||
file_name = (
|
||||
file_info.get("name", "")
|
||||
or file_info.get("fileName", "")
|
||||
or "Untitled"
|
||||
)
|
||||
mime_type = file_info.get("mimeType", "") or file_info.get(
|
||||
"mime_type", ""
|
||||
)
|
||||
|
||||
if not file_id:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Skip folders
|
||||
if mime_type == "application/vnd.google-apps.folder":
|
||||
continue
|
||||
|
||||
# Generate unique identifier hash
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.COMPOSIO_CONNECTOR, f"drive_{file_id}", search_space_id
|
||||
)
|
||||
|
||||
# Check if document exists
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Get file content
|
||||
(
|
||||
content,
|
||||
content_error,
|
||||
) = await composio_connector.get_drive_file_content(file_id)
|
||||
|
||||
if content_error or not content:
|
||||
logger.warning(
|
||||
f"Could not get content for file {file_name}: {content_error}"
|
||||
)
|
||||
# Use metadata as content fallback
|
||||
markdown_content = f"# {file_name}\n\n"
|
||||
markdown_content += f"**File ID:** {file_id}\n"
|
||||
markdown_content += f"**Type:** {mime_type}\n"
|
||||
else:
|
||||
try:
|
||||
markdown_content = content.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
markdown_content = f"# {file_name}\n\n[Binary file content]\n"
|
||||
|
||||
content_hash = generate_content_hash(markdown_content, search_space_id)
|
||||
|
||||
if existing_document:
|
||||
if existing_document.content_hash == content_hash:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Update existing document
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"file_id": file_id,
|
||||
"file_name": file_name,
|
||||
"mime_type": mime_type,
|
||||
"document_type": "Google Drive File (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = (
|
||||
f"Google Drive File: {file_name}\n\nType: {mime_type}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
existing_document.title = f"Drive: {file_name}"
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = {
|
||||
"file_id": file_id,
|
||||
"file_name": file_name,
|
||||
"mime_type": mime_type,
|
||||
"connector_id": connector_id,
|
||||
"source": "composio",
|
||||
}
|
||||
existing_document.chunks = chunks
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
|
||||
documents_indexed += 1
|
||||
continue
|
||||
|
||||
# Create new document
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"file_id": file_id,
|
||||
"file_name": file_name,
|
||||
"mime_type": mime_type,
|
||||
"document_type": "Google Drive File (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = (
|
||||
f"Google Drive File: {file_name}\n\nType: {mime_type}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Drive: {file_name}",
|
||||
document_type=DocumentType.COMPOSIO_CONNECTOR,
|
||||
document_metadata={
|
||||
"file_id": file_id,
|
||||
"file_name": file_name,
|
||||
"mime_type": mime_type,
|
||||
"connector_id": connector_id,
|
||||
"toolkit_id": "googledrive",
|
||||
"source": "composio",
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
updated_at=get_current_timestamp(),
|
||||
)
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
|
||||
if documents_indexed % 10 == 0:
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Drive file: {e!s}", exc_info=True)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
if documents_indexed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Google Drive indexing via Composio for connector {connector_id}",
|
||||
{
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
},
|
||||
)
|
||||
|
||||
return documents_indexed, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index Google Drive via Composio: {e!s}"
|
||||
|
||||
|
||||
async def _index_composio_gmail(
|
||||
session: AsyncSession,
|
||||
connector,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None,
|
||||
end_date: str | None,
|
||||
task_logger: TaskLoggingService,
|
||||
log_entry,
|
||||
update_last_indexed: bool = True,
|
||||
max_items: int = 1000,
|
||||
) -> tuple[int, str]:
|
||||
"""Index Gmail messages via Composio."""
|
||||
try:
|
||||
composio_connector = ComposioConnector(session, connector_id)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching Gmail messages via Composio for connector {connector_id}",
|
||||
{"stage": "fetching_messages"},
|
||||
)
|
||||
|
||||
# Build query with date range
|
||||
query_parts = []
|
||||
if start_date:
|
||||
query_parts.append(f"after:{start_date.replace('-', '/')}")
|
||||
if end_date:
|
||||
query_parts.append(f"before:{end_date.replace('-', '/')}")
|
||||
query = " ".join(query_parts)
|
||||
|
||||
messages, error = await composio_connector.list_gmail_messages(
|
||||
query=query,
|
||||
max_results=max_items,
|
||||
)
|
||||
|
||||
if error:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to fetch Gmail messages: {error}", {}
|
||||
)
|
||||
return 0, f"Failed to fetch Gmail messages: {error}"
|
||||
|
||||
if not messages:
|
||||
success_msg = "No Gmail messages found in the specified date range"
|
||||
await task_logger.log_task_success(
|
||||
log_entry, success_msg, {"messages_count": 0}
|
||||
)
|
||||
return 0, success_msg
|
||||
|
||||
logger.info(f"Found {len(messages)} Gmail messages to index via Composio")
|
||||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
|
||||
for message in messages:
|
||||
try:
|
||||
# Composio uses 'messageId' (camelCase), not 'id'
|
||||
message_id = message.get("messageId", "") or message.get("id", "")
|
||||
if not message_id:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Composio's GMAIL_FETCH_EMAILS already returns full message content
|
||||
# No need for a separate detail API call
|
||||
|
||||
# Extract message info from Composio response
|
||||
# Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds
|
||||
payload = message.get("payload", {})
|
||||
headers = payload.get("headers", [])
|
||||
|
||||
subject = "No Subject"
|
||||
sender = "Unknown Sender"
|
||||
date_str = message.get("messageTimestamp", "Unknown Date")
|
||||
|
||||
for header in headers:
|
||||
name = header.get("name", "").lower()
|
||||
value = header.get("value", "")
|
||||
if name == "subject":
|
||||
subject = value
|
||||
elif name == "from":
|
||||
sender = value
|
||||
elif name == "date":
|
||||
date_str = value
|
||||
|
||||
# Format to markdown using the full message data
|
||||
markdown_content = composio_connector.format_gmail_message_to_markdown(
|
||||
message
|
||||
)
|
||||
|
||||
# Generate unique identifier
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.COMPOSIO_CONNECTOR,
|
||||
f"gmail_{message_id}",
|
||||
search_space_id,
|
||||
)
|
||||
|
||||
content_hash = generate_content_hash(markdown_content, search_space_id)
|
||||
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Get label IDs from Composio response
|
||||
label_ids = message.get("labelIds", [])
|
||||
|
||||
if existing_document:
|
||||
if existing_document.content_hash == content_hash:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Update existing
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"message_id": message_id,
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"document_type": "Gmail Message (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = (
|
||||
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
existing_document.title = f"Gmail: {subject}"
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = {
|
||||
"message_id": message_id,
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"date": date_str,
|
||||
"labels": label_ids,
|
||||
"connector_id": connector_id,
|
||||
"source": "composio",
|
||||
}
|
||||
existing_document.chunks = chunks
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
|
||||
documents_indexed += 1
|
||||
continue
|
||||
|
||||
# Create new document
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"message_id": message_id,
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"document_type": "Gmail Message (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = (
|
||||
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Gmail: {subject}",
|
||||
document_type=DocumentType.COMPOSIO_CONNECTOR,
|
||||
document_metadata={
|
||||
"message_id": message_id,
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"date": date_str,
|
||||
"labels": label_ids,
|
||||
"connector_id": connector_id,
|
||||
"toolkit_id": "gmail",
|
||||
"source": "composio",
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
updated_at=get_current_timestamp(),
|
||||
)
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
|
||||
if documents_indexed % 10 == 0:
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
if documents_indexed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Gmail indexing via Composio for connector {connector_id}",
|
||||
{
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
},
|
||||
)
|
||||
|
||||
return documents_indexed, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index Gmail via Composio: {e!s}"
|
||||
|
||||
|
||||
async def _index_composio_google_calendar(
|
||||
session: AsyncSession,
|
||||
connector,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None,
|
||||
end_date: str | None,
|
||||
task_logger: TaskLoggingService,
|
||||
log_entry,
|
||||
update_last_indexed: bool = True,
|
||||
max_items: int = 2500,
|
||||
) -> tuple[int, str]:
|
||||
"""Index Google Calendar events via Composio."""
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
try:
|
||||
composio_connector = ComposioConnector(session, connector_id)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching Google Calendar events via Composio for connector {connector_id}",
|
||||
{"stage": "fetching_events"},
|
||||
)
|
||||
|
||||
# Build time range
|
||||
if start_date:
|
||||
time_min = f"{start_date}T00:00:00Z"
|
||||
else:
|
||||
# Default to 365 days ago
|
||||
default_start = datetime.now() - timedelta(days=365)
|
||||
time_min = default_start.strftime("%Y-%m-%dT00:00:00Z")
|
||||
|
||||
if end_date:
|
||||
time_max = f"{end_date}T23:59:59Z"
|
||||
else:
|
||||
time_max = datetime.now().strftime("%Y-%m-%dT23:59:59Z")
|
||||
|
||||
events, error = await composio_connector.list_calendar_events(
|
||||
time_min=time_min,
|
||||
time_max=time_max,
|
||||
max_results=max_items,
|
||||
)
|
||||
|
||||
if error:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to fetch Calendar events: {error}", {}
|
||||
)
|
||||
return 0, f"Failed to fetch Calendar events: {error}"
|
||||
|
||||
if not events:
|
||||
success_msg = "No Google Calendar events found in the specified date range"
|
||||
await task_logger.log_task_success(
|
||||
log_entry, success_msg, {"events_count": 0}
|
||||
)
|
||||
return 0, success_msg
|
||||
|
||||
logger.info(f"Found {len(events)} Google Calendar events to index via Composio")
|
||||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
|
||||
for event in events:
|
||||
try:
|
||||
# Handle both standard Google API and potential Composio variations
|
||||
event_id = event.get("id", "") or event.get("eventId", "")
|
||||
summary = (
|
||||
event.get("summary", "") or event.get("title", "") or "No Title"
|
||||
)
|
||||
|
||||
if not event_id:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Format to markdown
|
||||
markdown_content = composio_connector.format_calendar_event_to_markdown(
|
||||
event
|
||||
)
|
||||
|
||||
# Generate unique identifier
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.COMPOSIO_CONNECTOR,
|
||||
f"calendar_{event_id}",
|
||||
search_space_id,
|
||||
)
|
||||
|
||||
content_hash = generate_content_hash(markdown_content, search_space_id)
|
||||
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Extract event times
|
||||
start = event.get("start", {})
|
||||
end = event.get("end", {})
|
||||
start_time = start.get("dateTime") or start.get("date", "")
|
||||
end_time = end.get("dateTime") or end.get("date", "")
|
||||
location = event.get("location", "")
|
||||
|
||||
if existing_document:
|
||||
if existing_document.content_hash == content_hash:
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Update existing
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"event_id": event_id,
|
||||
"summary": summary,
|
||||
"start_time": start_time,
|
||||
"document_type": "Google Calendar Event (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
|
||||
if location:
|
||||
summary_content += f"\nLocation: {location}"
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
existing_document.title = f"Calendar: {summary}"
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = {
|
||||
"event_id": event_id,
|
||||
"summary": summary,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"location": location,
|
||||
"connector_id": connector_id,
|
||||
"source": "composio",
|
||||
}
|
||||
existing_document.chunks = chunks
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
|
||||
documents_indexed += 1
|
||||
continue
|
||||
|
||||
# Create new document
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"event_id": event_id,
|
||||
"summary": summary,
|
||||
"start_time": start_time,
|
||||
"document_type": "Google Calendar Event (Composio)",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = (
|
||||
f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
|
||||
)
|
||||
if location:
|
||||
summary_content += f"\nLocation: {location}"
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Calendar: {summary}",
|
||||
document_type=DocumentType.COMPOSIO_CONNECTOR,
|
||||
document_metadata={
|
||||
"event_id": event_id,
|
||||
"summary": summary,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"location": location,
|
||||
"connector_id": connector_id,
|
||||
"toolkit_id": "googlecalendar",
|
||||
"source": "composio",
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
updated_at=get_current_timestamp(),
|
||||
)
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
|
||||
if documents_indexed % 10 == 0:
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Calendar event: {e!s}", exc_info=True)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
if documents_indexed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Google Calendar indexing via Composio for connector {connector_id}",
|
||||
{
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
},
|
||||
)
|
||||
|
||||
return documents_indexed, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True
|
||||
)
|
||||
return 0, f"Failed to index Google Calendar via Composio: {e!s}"
|
||||
|
|
|
|||
|
|
@ -112,6 +112,13 @@ def calculate_date_range(
|
|||
Returns:
|
||||
Tuple of (start_date_str, end_date_str)
|
||||
"""
|
||||
# Normalize "undefined" strings to None (from frontend)
|
||||
# This prevents parsing errors and ensures consistent behavior across all indexers
|
||||
if start_date == "undefined" or start_date == "":
|
||||
start_date = None
|
||||
if end_date == "undefined" or end_date == "":
|
||||
end_date = None
|
||||
|
||||
if start_date is not None and end_date is not None:
|
||||
return start_date, end_date
|
||||
|
||||
|
|
|
|||
|
|
@ -136,10 +136,9 @@ async def index_bookstack_pages(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get BookStack pages: {error}")
|
||||
|
||||
# Don't treat "No pages found" as an error that should stop indexing
|
||||
if "No pages found" in error:
|
||||
logger.info(f"No BookStack pages found: {error}")
|
||||
logger.info(
|
||||
"No pages found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -159,6 +158,7 @@ async def index_bookstack_pages(
|
|||
)
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get BookStack pages: {error}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get BookStack pages: {error}",
|
||||
|
|
|
|||
|
|
@ -120,10 +120,9 @@ async def index_confluence_pages(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Confluence pages: {error}")
|
||||
|
||||
# Don't treat "No pages found" as an error that should stop indexing
|
||||
if "No pages found" in error:
|
||||
logger.info(f"No Confluence pages found: {error}")
|
||||
logger.info(
|
||||
"No pages found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -147,6 +146,7 @@ async def index_confluence_pages(
|
|||
await confluence_client.close()
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get Confluence pages: {error}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get Confluence pages: {error}",
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ Google Calendar connector indexer.
|
|||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytz
|
||||
from dateutil.parser import isoparse
|
||||
from google.oauth2.credentials import Credentials
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
|
@ -21,6 +23,7 @@ from app.utils.document_converters import (
|
|||
|
||||
from .base import (
|
||||
check_document_by_unique_identifier,
|
||||
check_duplicate_document_by_hash,
|
||||
get_connector_by_id,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
|
|
@ -206,6 +209,23 @@ async def index_google_calendar_events(
|
|||
start_date_str = start_date
|
||||
end_date_str = end_date
|
||||
|
||||
# If start_date and end_date are the same, adjust end_date to be one day later
|
||||
# to ensure valid date range (start_date must be strictly before end_date)
|
||||
if start_date_str == end_date_str:
|
||||
# Parse the date and add one day to ensure valid range
|
||||
dt = isoparse(end_date_str)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
else:
|
||||
dt = dt.astimezone(pytz.UTC)
|
||||
# Add one day to end_date to make it strictly after start_date
|
||||
dt_end = dt + timedelta(days=1)
|
||||
end_date_str = dt_end.strftime("%Y-%m-%d")
|
||||
logger.info(
|
||||
f"Adjusted end_date from {end_date} to {end_date_str} "
|
||||
f"to ensure valid date range (start_date must be strictly before end_date)"
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching Google Calendar events from {start_date_str} to {end_date_str}",
|
||||
|
|
@ -223,10 +243,9 @@ async def index_google_calendar_events(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Google Calendar events: {error}")
|
||||
|
||||
# Don't treat "No events found" as an error that should stop indexing
|
||||
if "No events found" in error:
|
||||
logger.info(f"No Google Calendar events found: {error}")
|
||||
logger.info(
|
||||
"No events found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -246,13 +265,25 @@ async def index_google_calendar_events(
|
|||
)
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get Google Calendar events: {error}")
|
||||
# Check if this is an authentication error that requires re-authentication
|
||||
error_message = error
|
||||
error_type = "APIError"
|
||||
if (
|
||||
"re-authenticate" in error.lower()
|
||||
or "expired or been revoked" in error.lower()
|
||||
or "authentication failed" in error.lower()
|
||||
):
|
||||
error_message = "Google Calendar authentication failed. Please re-authenticate."
|
||||
error_type = "AuthenticationError"
|
||||
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get Google Calendar events: {error}",
|
||||
"API Error",
|
||||
{"error_type": "APIError"},
|
||||
error_message,
|
||||
error,
|
||||
{"error_type": error_type},
|
||||
)
|
||||
return 0, f"Failed to get Google Calendar events: {error}"
|
||||
return 0, error_message
|
||||
|
||||
logger.info(f"Retrieved {len(events)} events from Google Calendar API")
|
||||
|
||||
|
|
@ -263,6 +294,9 @@ async def index_google_calendar_events(
|
|||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
skipped_events = []
|
||||
duplicate_content_count = (
|
||||
0 # Track events skipped due to duplicate content_hash
|
||||
)
|
||||
|
||||
for event in events:
|
||||
try:
|
||||
|
|
@ -383,6 +417,27 @@ async def index_google_calendar_events(
|
|||
)
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
# A document with the same content already exists (likely from Composio connector)
|
||||
logger.info(
|
||||
f"Event {event_summary} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content."
|
||||
)
|
||||
duplicate_content_count += 1
|
||||
documents_skipped += 1
|
||||
skipped_events.append(
|
||||
f"{event_summary} (already indexed by another connector)"
|
||||
)
|
||||
continue
|
||||
|
||||
# Document doesn't exist - create new one
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
|
|
@ -475,7 +530,28 @@ async def index_google_calendar_events(
|
|||
logger.info(
|
||||
f"Final commit: Total {documents_indexed} Google Calendar events processed"
|
||||
)
|
||||
await session.commit()
|
||||
try:
|
||||
await session.commit()
|
||||
except Exception as e:
|
||||
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in str(e).lower()
|
||||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"This may occur if the same event was indexed by multiple connectors. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
)
|
||||
await session.rollback()
|
||||
# Don't fail the entire task - some documents may have been successfully indexed
|
||||
else:
|
||||
raise
|
||||
|
||||
# Build warning message if duplicates were found
|
||||
warning_message = None
|
||||
if duplicate_content_count > 0:
|
||||
warning_message = f"{duplicate_content_count} skipped (duplicate)"
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
|
|
@ -484,14 +560,16 @@ async def index_google_calendar_events(
|
|||
"events_processed": total_processed,
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"duplicate_content_count": duplicate_content_count,
|
||||
"skipped_events_count": len(skipped_events),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped"
|
||||
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped "
|
||||
f"({duplicate_content_count} due to duplicate content from other connectors)"
|
||||
)
|
||||
return total_processed, None
|
||||
return total_processed, warning_message
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
|
|
|
|||
|
|
@ -578,7 +578,7 @@ async def _check_rename_only_update(
|
|||
- (True, message): Only filename changed, document was updated
|
||||
- (False, None): Content changed or new file, needs full processing
|
||||
"""
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import String, cast, select
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.db import Document
|
||||
|
|
@ -603,7 +603,8 @@ async def _check_rename_only_update(
|
|||
select(Document).where(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
|
||||
Document.document_metadata["google_drive_file_id"].astext == file_id,
|
||||
cast(Document.document_metadata["google_drive_file_id"], String)
|
||||
== file_id,
|
||||
)
|
||||
)
|
||||
existing_document = result.scalar_one_or_none()
|
||||
|
|
@ -755,7 +756,7 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
|
|||
|
||||
Handles both new (file_id-based) and legacy (filename-based) hash schemes.
|
||||
"""
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import String, cast, select
|
||||
|
||||
from app.db import Document
|
||||
|
||||
|
|
@ -774,7 +775,8 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
|
|||
select(Document).where(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
|
||||
Document.document_metadata["google_drive_file_id"].astext == file_id,
|
||||
cast(Document.document_metadata["google_drive_file_id"], String)
|
||||
== file_id,
|
||||
)
|
||||
)
|
||||
existing_document = result.scalar_one_or_none()
|
||||
|
|
|
|||
|
|
@ -170,10 +170,21 @@ async def index_google_gmail_messages(
|
|||
)
|
||||
|
||||
if error:
|
||||
# Check if this is an authentication error that requires re-authentication
|
||||
error_message = error
|
||||
error_type = "APIError"
|
||||
if (
|
||||
"re-authenticate" in error.lower()
|
||||
or "expired or been revoked" in error.lower()
|
||||
or "authentication failed" in error.lower()
|
||||
):
|
||||
error_message = "Gmail authentication failed. Please re-authenticate."
|
||||
error_type = "AuthenticationError"
|
||||
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to fetch messages: {error}", {}
|
||||
log_entry, error_message, error, {"error_type": error_type}
|
||||
)
|
||||
return 0, f"Failed to fetch Gmail messages: {error}"
|
||||
return 0, error_message
|
||||
|
||||
if not messages:
|
||||
success_msg = "No Google gmail messages found in the specified date range"
|
||||
|
|
|
|||
|
|
@ -126,10 +126,9 @@ async def index_jira_issues(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Jira issues: {error}")
|
||||
|
||||
# Don't treat "No issues found" as an error that should stop indexing
|
||||
if "No issues found" in error:
|
||||
logger.info(f"No Jira issues found: {error}")
|
||||
logger.info(
|
||||
"No issues found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -149,6 +148,7 @@ async def index_jira_issues(
|
|||
)
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get Jira issues: {error}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get Jira issues: {error}",
|
||||
|
|
|
|||
|
|
@ -145,10 +145,9 @@ async def index_linear_issues(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Linear issues: {error}")
|
||||
|
||||
# Don't treat "No issues found" as an error that should stop indexing
|
||||
if "No issues found" in error:
|
||||
logger.info(f"No Linear issues found: {error}")
|
||||
logger.info(
|
||||
"No issues found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -162,6 +161,7 @@ async def index_linear_issues(
|
|||
)
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get Linear issues: {error}")
|
||||
return 0, f"Failed to get Linear issues: {error}"
|
||||
|
||||
logger.info(f"Retrieved {len(issues)} issues from Linear API")
|
||||
|
|
|
|||
|
|
@ -116,6 +116,13 @@ async def index_luma_events(
|
|||
|
||||
luma_client = LumaConnector(api_key=api_key)
|
||||
|
||||
# Handle 'undefined' string from frontend (treat as None)
|
||||
# This prevents "time data 'undefined' does not match format" errors
|
||||
if start_date == "undefined" or start_date == "":
|
||||
start_date = None
|
||||
if end_date == "undefined" or end_date == "":
|
||||
end_date = None
|
||||
|
||||
# Calculate date range
|
||||
# For calendar connectors, allow future dates to index upcoming events
|
||||
if start_date is None or end_date is None:
|
||||
|
|
@ -172,10 +179,9 @@ async def index_luma_events(
|
|||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Luma events: {error}")
|
||||
|
||||
# Don't treat "No events found" as an error that should stop indexing
|
||||
if "No events found" in error or "no events" in error.lower():
|
||||
logger.info(f"No Luma events found: {error}")
|
||||
logger.info(
|
||||
"No events found is not a critical error, continuing with update"
|
||||
)
|
||||
|
|
@ -195,6 +201,7 @@ async def index_luma_events(
|
|||
)
|
||||
return 0, None
|
||||
else:
|
||||
logger.error(f"Failed to get Luma events: {error}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get Luma events: {error}",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue