mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
feat: implement two-phase document indexing for Obsidian and Circleback connectors with real-time status updates
This commit is contained in:
parent
0f61a249c0
commit
629f6f9cf5
3 changed files with 394 additions and 192 deletions
|
|
@ -3,6 +3,10 @@ Obsidian connector indexer.
|
|||
|
||||
Indexes markdown notes from a local Obsidian vault.
|
||||
This connector is only available in self-hosted mode.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
"""
|
||||
|
||||
import os
|
||||
|
|
@ -17,7 +21,7 @@ from sqlalchemy.exc import SQLAlchemyError
|
|||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
|
|
@ -34,6 +38,7 @@ from .base import (
|
|||
get_connector_by_id,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
safe_set_chunks,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
|
@ -307,25 +312,22 @@ async def index_obsidian_vault(
|
|||
|
||||
logger.info(f"Processing {len(files)} files after date filtering")
|
||||
|
||||
# Get LLM for summarization
|
||||
long_context_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
indexed_count = 0
|
||||
skipped_count = 0
|
||||
failed_count = 0
|
||||
duplicate_content_count = 0
|
||||
|
||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||
last_heartbeat_time = time.time()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 1: Analyze all files, create pending documents
|
||||
# This makes ALL documents visible in the UI immediately with pending status
|
||||
# =======================================================================
|
||||
files_to_process = [] # List of dicts with document and file data
|
||||
new_documents_created = False
|
||||
|
||||
for file_info in files:
|
||||
# Check if it's time for a heartbeat update
|
||||
if (
|
||||
on_heartbeat_callback
|
||||
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
|
||||
):
|
||||
await on_heartbeat_callback(indexed_count)
|
||||
last_heartbeat_time = time.time()
|
||||
try:
|
||||
file_path = file_info["path"]
|
||||
relative_path = file_info["relative_path"]
|
||||
|
|
@ -368,13 +370,143 @@ async def index_obsidian_vault(
|
|||
search_space_id,
|
||||
)
|
||||
|
||||
# Generate content hash
|
||||
content_hash = generate_content_hash(content, search_space_id)
|
||||
|
||||
# Check for existing document
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Generate content hash
|
||||
content_hash = generate_content_hash(content, search_space_id)
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
logger.debug(f"Note {title} unchanged, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Queue existing document for update (will be set to processing in Phase 2)
|
||||
files_to_process.append({
|
||||
'document': existing_document,
|
||||
'is_new': False,
|
||||
'file_info': file_info,
|
||||
'content': content,
|
||||
'body_content': body_content,
|
||||
'frontmatter': frontmatter,
|
||||
'wiki_links': wiki_links,
|
||||
'tags': tags,
|
||||
'title': title,
|
||||
'relative_path': relative_path,
|
||||
'content_hash': content_hash,
|
||||
'unique_identifier_hash': unique_identifier_hash,
|
||||
})
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
f"Obsidian note {title} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||
)
|
||||
duplicate_content_count += 1
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Create new document with PENDING status (visible in UI immediately)
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=title,
|
||||
document_type=DocumentType.OBSIDIAN_CONNECTOR,
|
||||
document_metadata={
|
||||
"vault_name": vault_name,
|
||||
"file_path": relative_path,
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content="Pending...", # Placeholder until processed
|
||||
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=None,
|
||||
chunks=[], # Empty at creation - safe for async
|
||||
status=DocumentStatus.pending(), # Pending until processing starts
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
session.add(document)
|
||||
new_documents_created = True
|
||||
|
||||
files_to_process.append({
|
||||
'document': document,
|
||||
'is_new': True,
|
||||
'file_info': file_info,
|
||||
'content': content,
|
||||
'body_content': body_content,
|
||||
'frontmatter': frontmatter,
|
||||
'wiki_links': wiki_links,
|
||||
'tags': tags,
|
||||
'title': title,
|
||||
'relative_path': relative_path,
|
||||
'content_hash': content_hash,
|
||||
'unique_identifier_hash': unique_identifier_hash,
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error in Phase 1 for file {file_info.get('path', 'unknown')}: {e}"
|
||||
)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
# Commit all pending documents - they all appear in UI now
|
||||
if new_documents_created:
|
||||
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Process each document one by one
|
||||
# Each document transitions: pending → processing → ready/failed
|
||||
# =======================================================================
|
||||
logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
|
||||
|
||||
# Get LLM for summarization
|
||||
long_context_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
for item in files_to_process:
|
||||
# Send heartbeat periodically
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
await on_heartbeat_callback(indexed_count)
|
||||
last_heartbeat_time = current_time
|
||||
|
||||
document = item['document']
|
||||
try:
|
||||
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
# Extract data from item
|
||||
title = item['title']
|
||||
relative_path = item['relative_path']
|
||||
content = item['content']
|
||||
body_content = item['body_content']
|
||||
frontmatter = item['frontmatter']
|
||||
wiki_links = item['wiki_links']
|
||||
tags = item['tags']
|
||||
content_hash = item['content_hash']
|
||||
file_info = item['file_info']
|
||||
|
||||
# Build metadata
|
||||
document_metadata = {
|
||||
|
|
@ -404,134 +536,114 @@ async def index_obsidian_vault(
|
|||
]
|
||||
document_string = build_document_metadata_string(metadata_sections)
|
||||
|
||||
if existing_document:
|
||||
# Check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
logger.debug(f"Note {title} unchanged, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Update existing document
|
||||
logger.info(f"Updating note: {title}")
|
||||
|
||||
# Generate new summary if content changed
|
||||
if long_context_llm:
|
||||
new_summary, _ = await generate_document_summary(
|
||||
document_string,
|
||||
long_context_llm,
|
||||
document_metadata,
|
||||
)
|
||||
# Store summary in metadata
|
||||
document_metadata["summary"] = new_summary
|
||||
|
||||
# Add URL and connector_id to metadata
|
||||
document_metadata["url"] = (
|
||||
f"obsidian://{vault_name}/{relative_path}"
|
||||
)
|
||||
document_metadata["connector_id"] = connector_id
|
||||
|
||||
existing_document.content = document_string
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.document_metadata = document_metadata
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
|
||||
# Update embedding
|
||||
embedding = config.embedding_model_instance.embed(document_string)
|
||||
existing_document.embedding = embedding
|
||||
|
||||
# Update chunks - delete old and create new
|
||||
existing_document.chunks.clear()
|
||||
new_chunks = await create_document_chunks(document_string)
|
||||
existing_document.chunks = new_chunks
|
||||
|
||||
indexed_count += 1
|
||||
|
||||
else:
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
f"Obsidian note {title} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||
)
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Create new document
|
||||
logger.info(f"Indexing new note: {title}")
|
||||
|
||||
# Generate summary
|
||||
summary_content = ""
|
||||
if long_context_llm:
|
||||
summary_content, _ = await generate_document_summary(
|
||||
document_string,
|
||||
long_context_llm,
|
||||
document_metadata,
|
||||
)
|
||||
|
||||
# Generate embedding
|
||||
embedding = config.embedding_model_instance.embed(document_string)
|
||||
|
||||
# Add URL and summary to metadata
|
||||
document_metadata["url"] = (
|
||||
f"obsidian://{vault_name}/{relative_path}"
|
||||
)
|
||||
document_metadata["summary"] = summary_content
|
||||
document_metadata["connector_id"] = connector_id
|
||||
|
||||
# Create chunks
|
||||
chunks = await create_document_chunks(document_string)
|
||||
|
||||
# Create document
|
||||
new_document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=title,
|
||||
document_type=DocumentType.OBSIDIAN_CONNECTOR,
|
||||
content=document_string,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
document_metadata=document_metadata,
|
||||
embedding=embedding,
|
||||
chunks=chunks,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
# Generate summary
|
||||
summary_content = ""
|
||||
if long_context_llm:
|
||||
summary_content, _ = await generate_document_summary(
|
||||
document_string,
|
||||
long_context_llm,
|
||||
document_metadata,
|
||||
)
|
||||
|
||||
session.add(new_document)
|
||||
# Generate embedding
|
||||
embedding = config.embedding_model_instance.embed(document_string)
|
||||
|
||||
indexed_count += 1
|
||||
# Add URL and summary to metadata
|
||||
document_metadata["url"] = f"obsidian://{vault_name}/{relative_path}"
|
||||
document_metadata["summary"] = summary_content
|
||||
document_metadata["connector_id"] = connector_id
|
||||
|
||||
# Create chunks
|
||||
chunks = await create_document_chunks(document_string)
|
||||
|
||||
# Update document to READY with actual content
|
||||
document.title = title
|
||||
document.content = document_string
|
||||
document.content_hash = content_hash
|
||||
document.embedding = embedding
|
||||
document.document_metadata = document_metadata
|
||||
safe_set_chunks(document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
|
||||
indexed_count += 1
|
||||
|
||||
# Batch commit every 10 documents (for ready status updates)
|
||||
if indexed_count % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {indexed_count} Obsidian notes processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error processing file {file_info.get('path', 'unknown')}: {e}"
|
||||
f"Error processing file {item.get('file_info', {}).get('path', 'unknown')}: {e}"
|
||||
)
|
||||
skipped_count += 1
|
||||
# Mark document as failed with reason (visible in UI)
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e))
|
||||
document.updated_at = get_current_timestamp()
|
||||
except Exception as status_error:
|
||||
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
# Update connector's last indexed timestamp
|
||||
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Commit all changes
|
||||
await session.commit()
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(
|
||||
f"Final commit: Total {indexed_count} Obsidian notes processed"
|
||||
)
|
||||
try:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Successfully committed all Obsidian document changes to database"
|
||||
)
|
||||
except Exception as e:
|
||||
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in str(e).lower()
|
||||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"This may occur if the same note was indexed by multiple connectors. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
)
|
||||
await session.rollback()
|
||||
# Don't fail the entire task - some documents may have been successfully indexed
|
||||
else:
|
||||
raise
|
||||
|
||||
# Build warning message if there were issues
|
||||
warning_parts = []
|
||||
if duplicate_content_count > 0:
|
||||
warning_parts.append(f"{duplicate_content_count} duplicate")
|
||||
if failed_count > 0:
|
||||
warning_parts.append(f"{failed_count} failed")
|
||||
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||
|
||||
total_processed = indexed_count
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully indexed {indexed_count} Obsidian notes (skipped {skipped_count})",
|
||||
f"Successfully completed Obsidian vault indexing for connector {connector_id}",
|
||||
{
|
||||
"indexed_count": indexed_count,
|
||||
"skipped_count": skipped_count,
|
||||
"total_files": len(files),
|
||||
"notes_processed": total_processed,
|
||||
"documents_indexed": indexed_count,
|
||||
"documents_skipped": skipped_count,
|
||||
"documents_failed": failed_count,
|
||||
"duplicate_content_count": duplicate_content_count,
|
||||
},
|
||||
)
|
||||
|
||||
return indexed_count, None
|
||||
logger.info(
|
||||
f"Obsidian vault indexing completed: {indexed_count} ready, "
|
||||
f"{skipped_count} skipped, {failed_count} failed "
|
||||
f"({duplicate_content_count} duplicate content)"
|
||||
)
|
||||
return total_processed, warning_message
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.exception(f"Database error during Obsidian indexing: {e}")
|
||||
|
|
|
|||
|
|
@ -14,6 +14,34 @@ from app.db import Document
|
|||
md = MarkdownifyTransformer()
|
||||
|
||||
|
||||
def safe_set_chunks(document: Document, chunks: list) -> None:
|
||||
"""
|
||||
Safely assign chunks to a document without triggering lazy loading.
|
||||
|
||||
ALWAYS use this instead of `document.chunks = chunks` to avoid
|
||||
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
|
||||
|
||||
Why this is needed:
|
||||
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
|
||||
load the OLD chunks first (for comparison/orphan detection)
|
||||
- This lazy loading fails in async context with asyncpg driver
|
||||
- set_committed_value bypasses this by setting the value directly
|
||||
|
||||
This function is safe regardless of how the document was loaded
|
||||
(with or without selectinload).
|
||||
|
||||
Args:
|
||||
document: The Document object to update
|
||||
chunks: List of Chunk objects to assign
|
||||
|
||||
Example:
|
||||
# Instead of: document.chunks = chunks (DANGEROUS!)
|
||||
safe_set_chunks(document, chunks) # Always safe
|
||||
"""
|
||||
from sqlalchemy.orm.attributes import set_committed_value
|
||||
set_committed_value(document, 'chunks', chunks)
|
||||
|
||||
|
||||
def get_current_timestamp() -> datetime:
|
||||
"""
|
||||
Get the current timestamp with timezone for updated_at field.
|
||||
|
|
|
|||
|
|
@ -3,6 +3,11 @@ Circleback meeting document processor.
|
|||
|
||||
This module processes meeting data received from Circleback webhooks
|
||||
and stores it as searchable documents in the database.
|
||||
|
||||
Implements real-time document status updates for UI feedback:
|
||||
- Create document with 'pending' status (visible in UI immediately)
|
||||
- Set to 'processing' while processing content
|
||||
- Set to 'ready' or 'failed' when complete
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
|
@ -14,6 +19,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentStatus,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
|
|
@ -30,6 +36,7 @@ from app.utils.document_converters import (
|
|||
from .base import (
|
||||
check_document_by_unique_identifier,
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -47,6 +54,11 @@ async def add_circleback_meeting_document(
|
|||
"""
|
||||
Process and store a Circleback meeting document.
|
||||
|
||||
Implements real-time document status updates:
|
||||
- Phase 1: Create document with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Set to 'processing' while processing content
|
||||
- Phase 3: Set to 'ready' or 'failed' when complete
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
meeting_id: Circleback meeting ID
|
||||
|
|
@ -59,6 +71,7 @@ async def add_circleback_meeting_document(
|
|||
Returns:
|
||||
Document object if successful, None if failed or duplicate
|
||||
"""
|
||||
document = None
|
||||
try:
|
||||
# Generate unique identifier hash using Circleback meeting ID
|
||||
unique_identifier = f"circleback_{meeting_id}"
|
||||
|
|
@ -77,6 +90,10 @@ async def add_circleback_meeting_document(
|
|||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
await session.commit()
|
||||
logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.")
|
||||
return existing_document
|
||||
else:
|
||||
|
|
@ -84,7 +101,79 @@ async def add_circleback_meeting_document(
|
|||
logger.info(
|
||||
f"Content changed for Circleback meeting {meeting_id}. Updating document."
|
||||
)
|
||||
document = existing_document
|
||||
# Set to PROCESSING status and commit - shows "processing" in UI
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
else:
|
||||
# =======================================================================
|
||||
# PHASE 1: Create document with PENDING status
|
||||
# This makes the document visible in the UI immediately
|
||||
# =======================================================================
|
||||
|
||||
# Fetch the user who set up the Circleback connector (preferred)
|
||||
# or fall back to search space owner if no connector found
|
||||
created_by_user_id = None
|
||||
|
||||
# Try to find the Circleback connector for this search space
|
||||
connector_result = await session.execute(
|
||||
select(SearchSourceConnector.user_id).where(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.CIRCLEBACK_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector_user = connector_result.scalar_one_or_none()
|
||||
|
||||
if connector_user:
|
||||
# Use the user who set up the Circleback connector
|
||||
created_by_user_id = connector_user
|
||||
else:
|
||||
# Fallback: use search space owner if no connector found
|
||||
search_space_result = await session.execute(
|
||||
select(SearchSpace.user_id).where(SearchSpace.id == search_space_id)
|
||||
)
|
||||
created_by_user_id = search_space_result.scalar_one_or_none()
|
||||
|
||||
# Create new document with PENDING status (visible in UI immediately)
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=meeting_name,
|
||||
document_type=DocumentType.CIRCLEBACK,
|
||||
document_metadata={
|
||||
"CIRCLEBACK_MEETING_ID": meeting_id,
|
||||
"MEETING_NAME": meeting_name,
|
||||
"SOURCE": "CIRCLEBACK_WEBHOOK",
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content="Pending...", # Placeholder until processed
|
||||
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=None,
|
||||
chunks=[], # Empty at creation - safe for async
|
||||
status=DocumentStatus.pending(), # Pending until processing starts
|
||||
content_needs_reindexing=False,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=created_by_user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
session.add(document)
|
||||
# Commit immediately so document appears in UI with pending status
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"Created pending Circleback meeting document {meeting_id} in search space {search_space_id}"
|
||||
)
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Set to PROCESSING status
|
||||
# =======================================================================
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 3: Process the document content
|
||||
# =======================================================================
|
||||
|
||||
# Get LLM for generating summary
|
||||
llm = await get_document_summary_llm(session, search_space_id)
|
||||
if not llm:
|
||||
|
|
@ -100,7 +189,7 @@ async def add_circleback_meeting_document(
|
|||
summary_embedding = None
|
||||
else:
|
||||
# Generate summary with metadata
|
||||
document_metadata = {
|
||||
summary_metadata = {
|
||||
"meeting_name": meeting_name,
|
||||
"meeting_id": meeting_id,
|
||||
"document_type": "Circleback Meeting",
|
||||
|
|
@ -111,7 +200,7 @@ async def add_circleback_meeting_document(
|
|||
},
|
||||
}
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
markdown_content, llm, document_metadata
|
||||
markdown_content, llm, summary_metadata
|
||||
)
|
||||
|
||||
# Process chunks
|
||||
|
|
@ -126,7 +215,7 @@ async def add_circleback_meeting_document(
|
|||
f"Failed to convert Circleback meeting {meeting_id} to BlockNote JSON, document will not be editable"
|
||||
)
|
||||
|
||||
# Prepare document metadata
|
||||
# Prepare final document metadata
|
||||
document_metadata = {
|
||||
"CIRCLEBACK_MEETING_ID": meeting_id,
|
||||
"MEETING_NAME": meeting_name,
|
||||
|
|
@ -134,77 +223,34 @@ async def add_circleback_meeting_document(
|
|||
**metadata,
|
||||
}
|
||||
|
||||
# Fetch the user who set up the Circleback connector (preferred)
|
||||
# or fall back to search space owner if no connector found
|
||||
created_by_user_id = None
|
||||
# =======================================================================
|
||||
# PHASE 4: Update document to READY status with actual content
|
||||
# =======================================================================
|
||||
document.title = meeting_name
|
||||
document.content = summary_content
|
||||
document.content_hash = content_hash
|
||||
if summary_embedding is not None:
|
||||
document.embedding = summary_embedding
|
||||
document.document_metadata = document_metadata
|
||||
safe_set_chunks(document, chunks)
|
||||
document.blocknote_document = blocknote_json
|
||||
document.content_needs_reindexing = False
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
# Ensure connector_id is set (backfill for documents created before this field)
|
||||
if connector_id is not None:
|
||||
document.connector_id = connector_id
|
||||
|
||||
# Try to find the Circleback connector for this search space
|
||||
connector_result = await session.execute(
|
||||
select(SearchSourceConnector.user_id).where(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.CIRCLEBACK_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector_user = connector_result.scalar_one_or_none()
|
||||
|
||||
if connector_user:
|
||||
# Use the user who set up the Circleback connector
|
||||
created_by_user_id = connector_user
|
||||
else:
|
||||
# Fallback: use search space owner if no connector found
|
||||
search_space_result = await session.execute(
|
||||
select(SearchSpace.user_id).where(SearchSpace.id == search_space_id)
|
||||
)
|
||||
created_by_user_id = search_space_result.scalar_one_or_none()
|
||||
|
||||
# Update or create document
|
||||
await session.commit()
|
||||
await session.refresh(document)
|
||||
|
||||
if existing_document:
|
||||
# Update existing document
|
||||
existing_document.title = meeting_name
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
if summary_embedding is not None:
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = document_metadata
|
||||
existing_document.chunks = chunks
|
||||
existing_document.blocknote_document = blocknote_json
|
||||
existing_document.content_needs_reindexing = False
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
# Ensure connector_id is set (backfill for documents created before this field)
|
||||
if connector_id is not None:
|
||||
existing_document.connector_id = connector_id
|
||||
|
||||
await session.commit()
|
||||
await session.refresh(existing_document)
|
||||
document = existing_document
|
||||
logger.info(
|
||||
f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}"
|
||||
)
|
||||
else:
|
||||
# Create new document
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=meeting_name,
|
||||
document_type=DocumentType.CIRCLEBACK,
|
||||
document_metadata=document_metadata,
|
||||
content=summary_content,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
blocknote_document=blocknote_json,
|
||||
content_needs_reindexing=False,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=created_by_user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
await session.commit()
|
||||
await session.refresh(document)
|
||||
logger.info(
|
||||
f"Created new Circleback meeting document {meeting_id} in search space {search_space_id}"
|
||||
f"Processed Circleback meeting document {meeting_id} in search space {search_space_id} - now ready"
|
||||
)
|
||||
|
||||
return document
|
||||
|
|
@ -214,8 +260,24 @@ async def add_circleback_meeting_document(
|
|||
logger.error(
|
||||
f"Database error processing Circleback meeting {meeting_id}: {db_error}"
|
||||
)
|
||||
# Mark document as failed if it was created
|
||||
if document is not None:
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(db_error))
|
||||
document.updated_at = get_current_timestamp()
|
||||
await session.commit()
|
||||
except Exception as status_error:
|
||||
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||
raise db_error
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Failed to process Circleback meeting {meeting_id}: {e!s}")
|
||||
# Mark document as failed if it was created
|
||||
if document is not None:
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e))
|
||||
document.updated_at = get_current_timestamp()
|
||||
await session.commit()
|
||||
except Exception as status_error:
|
||||
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||
raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue