feat: add document status management with JSONB column for processing states in documents

This commit is contained in:
Anish Sarkar 2026-02-05 21:59:31 +05:30
parent 04884caeef
commit aef59d04eb
13 changed files with 526 additions and 135 deletions

View file

@ -0,0 +1,80 @@
"""Add status column to documents table for per-document processing status
Revision ID: 92
Revises: 91
Create Date: 2026-02-05
Changes:
1. Add status column (JSONB) to documents table
2. Default value is {"state": "ready"} for backward compatibility
3. Existing documents are set to ready status
4. Index created for efficient status filtering
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "92"
down_revision: str | None = "91"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Add status column to documents with default ready state."""
# 1. Add status column with default value for new rows
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'status'
) THEN
ALTER TABLE documents
ADD COLUMN status JSONB NOT NULL DEFAULT '{"state": "ready"}'::jsonb;
END IF;
END$$;
"""
)
# 2. Create index on status for efficient filtering by state
op.execute(
"""
CREATE INDEX IF NOT EXISTS ix_documents_status
ON documents ((status->>'state'));
"""
)
def downgrade() -> None:
"""Remove status column from documents."""
# Drop index
op.execute(
"""
DROP INDEX IF EXISTS ix_documents_status;
"""
)
# Drop column
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'status'
) THEN
ALTER TABLE documents
DROP COLUMN status;
END IF;
END$$;
"""
)

View file

@ -16,13 +16,14 @@ from sqlalchemy.orm import selectinload
from app.config import config
from app.connectors.composio_connector import ComposioConnector
from app.db import Document, DocumentType
from app.db import Document, DocumentStatus, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
calculate_date_range,
check_duplicate_document_by_hash,
safe_set_chunks,
)
from app.utils.document_converters import (
create_document_chunks,
@ -266,18 +267,18 @@ async def index_composio_google_calendar(
documents_indexed = 0
documents_skipped = 0
duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
documents_failed = 0 # Track events that failed processing
duplicate_content_count = 0 # Track events skipped due to duplicate content_hash
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all events, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
events_to_process = [] # List of dicts with document and event data
new_documents_created = False
for event in events:
# Send heartbeat periodically to indicate task is still alive
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
try:
# Handle both standard Google API and potential Composio variations
event_id = event.get("id", "") or event.get("eventId", "")
@ -315,61 +316,24 @@ async def index_composio_google_calendar(
if existing_document:
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
# Update existing
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"document_type": "Google Calendar Event (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
if location:
summary_content += f"\nLocation: {location}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
existing_document.title = summary
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"end_time": end_time,
"location": location,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
await session.commit()
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'event_id': event_id,
'summary': summary,
'start_time': start_time,
'end_time': end_time,
'location': location,
})
continue
# Document doesn't exist by unique_identifier_hash
@ -380,46 +344,16 @@ async def index_composio_google_calendar(
)
if duplicate_by_content:
# A document with the same content already exists (likely from standard connector)
logger.info(
f"Event {summary} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content."
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"summary": summary,
"start_time": start_time,
"document_type": "Google Calendar Event (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
)
if location:
summary_content += f"\nLocation: {location}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=summary,
@ -436,19 +370,107 @@ async def index_composio_google_calendar(
"toolkit_id": "googlecalendar",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
new_documents_created = True
events_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'event_id': event_id,
'summary': summary,
'start_time': start_time,
'end_time': end_time,
'location': location,
})
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(events_to_process)} documents")
for item in events_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"event_id": item['event_id'],
"summary": item['summary'],
"start_time": item['start_time'],
"document_type": "Google Calendar Event (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Calendar: {item['summary']}\n\nStart: {item['start_time']}\nEnd: {item['end_time']}"
if item['location']:
summary_content += f"\nLocation: {item['location']}"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
# Update document to READY with actual content
document.title = item['summary']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item['event_id'],
"summary": item['summary'],
"start_time": item['start_time'],
"end_time": item['end_time'],
"location": item['location'],
"connector_id": connector_id,
"source": "composio",
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
@ -457,7 +479,13 @@ async def index_composio_google_calendar(
except Exception as e:
logger.error(f"Error processing Calendar event: {e!s}", exc_info=True)
documents_skipped += 1
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
@ -490,10 +518,13 @@ async def index_composio_google_calendar(
else:
raise
# Build warning message if duplicates were found
warning_message = None
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_message = f"{duplicate_content_count} skipped (duplicate)"
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
@ -501,13 +532,15 @@ async def index_composio_google_calendar(
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Composio Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped "
f"({duplicate_content_count} due to duplicate content from other connectors)"
f"Composio Google Calendar indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message

View file

@ -100,6 +100,80 @@ class PodcastStatus(str, Enum):
FAILED = "failed"
class DocumentStatus:
"""
Helper class for document processing status (stored as JSONB).
Status values:
- {"state": "ready"} - Document is fully processed and searchable
- {"state": "pending"} - Document is queued, waiting to be processed
- {"state": "processing"} - Document is currently being processed (only 1 at a time)
- {"state": "failed", "reason": "..."} - Processing failed with reason
Usage:
document.status = DocumentStatus.pending()
document.status = DocumentStatus.processing()
document.status = DocumentStatus.ready()
document.status = DocumentStatus.failed("LLM rate limit exceeded")
"""
# State constants
READY = "ready"
PENDING = "pending"
PROCESSING = "processing"
FAILED = "failed"
@staticmethod
def ready() -> dict:
"""Return status dict for a ready/searchable document."""
return {"state": DocumentStatus.READY}
@staticmethod
def pending() -> dict:
"""Return status dict for a document waiting to be processed."""
return {"state": DocumentStatus.PENDING}
@staticmethod
def processing() -> dict:
"""Return status dict for a document being processed."""
return {"state": DocumentStatus.PROCESSING}
@staticmethod
def failed(reason: str, **extra_details) -> dict:
"""
Return status dict for a failed document.
Args:
reason: Human-readable failure reason
**extra_details: Optional additional details (duplicate_of, error_code, etc.)
"""
status = {"state": DocumentStatus.FAILED, "reason": reason[:500]} # Truncate long reasons
if extra_details:
status.update(extra_details)
return status
@staticmethod
def get_state(status: dict | None) -> str | None:
"""Extract state from status dict, returns None if invalid."""
if status is None:
return None
return status.get("state") if isinstance(status, dict) else None
@staticmethod
def is_state(status: dict | None, state: str) -> bool:
"""Check if status matches a given state."""
return DocumentStatus.get_state(status) == state
@staticmethod
def get_failure_reason(status: dict | None) -> str | None:
"""Extract failure reason from status dict."""
if status is None or not isinstance(status, dict):
return None
if status.get("state") == DocumentStatus.FAILED:
return status.get("reason")
return None
class LiteLLMProvider(str, Enum):
"""
Enum for LLM providers supported by LiteLLM.
@ -785,6 +859,17 @@ class Document(BaseModel, TimestampMixin):
index=True,
)
# Processing status for real-time visibility (JSONB)
# Format: {"state": "ready"} or {"state": "processing"} or {"state": "failed", "reason": "..."}
# Default to {"state": "ready"} for backward compatibility with existing documents
status = Column(
JSONB,
nullable=False,
default=DocumentStatus.ready,
server_default=text("'{\"state\": \"ready\"}'::jsonb"),
index=True,
)
# Relationships
search_space = relationship("SearchSpace", back_populates="documents")
created_by = relationship("User", back_populates="documents")

View file

@ -19,6 +19,7 @@ from app.db import (
from app.schemas import (
DocumentRead,
DocumentsCreate,
DocumentStatusSchema,
DocumentTitleRead,
DocumentTitleSearchResponse,
DocumentUpdate,
@ -271,6 +272,14 @@ async def read_documents(
if doc.created_by:
created_by_name = doc.created_by.display_name or doc.created_by.email
# Parse status from JSONB
status_data = None
if hasattr(doc, 'status') and doc.status:
status_data = DocumentStatusSchema(
state=doc.status.get("state", "ready"),
reason=doc.status.get("reason"),
)
api_documents.append(
DocumentRead(
id=doc.id,
@ -285,6 +294,7 @@ async def read_documents(
search_space_id=doc.search_space_id,
created_by_id=doc.created_by_id,
created_by_name=created_by_name,
status=status_data,
)
)
@ -417,6 +427,14 @@ async def search_documents(
if doc.created_by:
created_by_name = doc.created_by.display_name or doc.created_by.email
# Parse status from JSONB
status_data = None
if hasattr(doc, 'status') and doc.status:
status_data = DocumentStatusSchema(
state=doc.status.get("state", "ready"),
reason=doc.status.get("reason"),
)
api_documents.append(
DocumentRead(
id=doc.id,
@ -431,6 +449,7 @@ async def search_documents(
search_space_id=doc.search_space_id,
created_by_id=doc.created_by_id,
created_by_name=created_by_name,
status=status_data,
)
)
@ -806,6 +825,7 @@ async def delete_document(
"""
Delete a document.
Requires DOCUMENTS_DELETE permission for the search space.
Documents in "processing" state cannot be deleted.
"""
try:
result = await session.execute(
@ -818,6 +838,14 @@ async def delete_document(
status_code=404, detail=f"Document with id {document_id} not found"
)
# Check if document is pending or currently being processed
doc_state = document.status.get("state") if document.status else None
if doc_state in ("pending", "processing"):
raise HTTPException(
status_code=409, # Conflict
detail="Cannot delete document while it is pending or being processed. Please wait for processing to complete.",
)
# Check permission for the search space
await check_permission(
session,

View file

@ -4,6 +4,7 @@ from .documents import (
DocumentBase,
DocumentRead,
DocumentsCreate,
DocumentStatusSchema,
DocumentTitleRead,
DocumentTitleSearchResponse,
DocumentUpdate,
@ -87,6 +88,7 @@ __all__ = [
# Document schemas
"DocumentBase",
"DocumentRead",
"DocumentStatusSchema",
"DocumentTitleRead",
"DocumentTitleSearchResponse",
"DocumentUpdate",

View file

@ -41,6 +41,12 @@ class DocumentUpdate(DocumentBase):
pass
class DocumentStatusSchema(BaseModel):
"""Document processing status."""
state: str # "ready", "processing", "failed"
reason: str | None = None
class DocumentRead(BaseModel):
id: int
title: str
@ -54,6 +60,7 @@ class DocumentRead(BaseModel):
search_space_id: int
created_by_id: UUID | None = None # User who created/uploaded this document
created_by_name: str | None = None # Display name or email of the user who created this document
status: DocumentStatusSchema | None = None # Processing status (ready, processing, failed)
model_config = ConfigDict(from_attributes=True)

View file

@ -28,6 +28,34 @@ def get_current_timestamp() -> datetime:
return datetime.now(UTC)
def safe_set_chunks(document: Document, chunks: list) -> None:
"""
Safely assign chunks to a document without triggering lazy loading.
ALWAYS use this instead of `document.chunks = chunks` to avoid
SQLAlchemy async errors (MissingGreenlet / greenlet_spawn).
Why this is needed:
- Direct assignment `document.chunks = chunks` triggers SQLAlchemy to
load the OLD chunks first (for comparison/orphan detection)
- This lazy loading fails in async context with asyncpg driver
- set_committed_value bypasses this by setting the value directly
This function is safe regardless of how the document was loaded
(with or without selectinload).
Args:
document: The Document object to update
chunks: List of Chunk objects to assign
Example:
# Instead of: document.chunks = chunks (DANGEROUS!)
safe_set_chunks(document, chunks) # Always safe
"""
from sqlalchemy.orm.attributes import set_committed_value
set_committed_value(document, 'chunks', chunks)
async def check_duplicate_document_by_hash(
session: AsyncSession, content_hash: str
) -> Document | None: