Merge pull request #765 from AnishSarkar22/fix/documents

feat: Add document ownership & deletion of documents
This commit is contained in:
Rohan Verma 2026-02-02 14:50:18 -08:00 committed by GitHub
commit d0673cecf6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 832 additions and 16 deletions

View file

@ -0,0 +1,125 @@
"""Add created_by_id column to documents table for document ownership tracking
Revision ID: 86
Revises: 85
Create Date: 2026-02-02
Changes:
1. Add created_by_id column (UUID, nullable, foreign key to user.id)
2. Create index on created_by_id for performance
3. Backfill existing documents with search space owner's user_id
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "86"
down_revision: str | None = "85"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Add created_by_id column to documents and backfill with search space owner."""
# 1. Add created_by_id column (nullable for backward compatibility)
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'created_by_id'
) THEN
ALTER TABLE documents
ADD COLUMN created_by_id UUID;
END IF;
END$$;
"""
)
# 2. Create index on created_by_id for efficient queries
op.execute(
"""
CREATE INDEX IF NOT EXISTS ix_documents_created_by_id
ON documents (created_by_id);
"""
)
# 3. Add foreign key constraint with ON DELETE SET NULL
# First check if constraint already exists
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_created_by_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
ADD CONSTRAINT fk_documents_created_by_id
FOREIGN KEY (created_by_id) REFERENCES "user"(id)
ON DELETE SET NULL;
END IF;
END$$;
"""
)
# 4. Backfill existing documents with search space owner's user_id
# This ensures all existing documents are associated with the search space owner
op.execute(
"""
UPDATE documents
SET created_by_id = searchspaces.user_id
FROM searchspaces
WHERE documents.search_space_id = searchspaces.id
AND documents.created_by_id IS NULL;
"""
)
def downgrade() -> None:
"""Remove created_by_id column from documents."""
# Drop foreign key constraint
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_created_by_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
DROP CONSTRAINT fk_documents_created_by_id;
END IF;
END$$;
"""
)
# Drop index
op.execute(
"""
DROP INDEX IF EXISTS ix_documents_created_by_id;
"""
)
# Drop column
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'created_by_id'
) THEN
ALTER TABLE documents
DROP COLUMN created_by_id;
END IF;
END$$;
"""
)

View file

@ -0,0 +1,170 @@
"""Add connector_id column to documents table for linking documents to their source connector
Revision ID: 87
Revises: 86
Create Date: 2026-02-02
Changes:
1. Add connector_id column (Integer, nullable, foreign key to search_source_connectors.id)
2. Create index on connector_id for efficient bulk deletion queries
3. SET NULL on delete - allows controlled cleanup in application code
4. Backfill existing documents based on document_type and search_space_id matching
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "87"
down_revision: str | None = "86"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Add connector_id column to documents and backfill from existing connectors."""
# 1. Add connector_id column (nullable - for manually uploaded docs without connector)
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'connector_id'
) THEN
ALTER TABLE documents
ADD COLUMN connector_id INTEGER;
END IF;
END$$;
"""
)
# 2. Create index on connector_id for efficient cleanup queries
op.execute(
"""
CREATE INDEX IF NOT EXISTS ix_documents_connector_id
ON documents (connector_id);
"""
)
# 3. Add foreign key constraint with ON DELETE SET NULL
# SET NULL allows us to delete documents in controlled batches before deleting connector
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_connector_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
ADD CONSTRAINT fk_documents_connector_id
FOREIGN KEY (connector_id) REFERENCES search_source_connectors(id)
ON DELETE SET NULL;
END IF;
END$$;
"""
)
# 4. Backfill existing documents with connector_id based on document_type matching
# This maps document types to their corresponding connector types
# Only backfills for documents in search spaces that have exactly one connector of that type
# Map of document_type -> connector_type for backfilling
document_connector_mappings = [
("NOTION_CONNECTOR", "NOTION_CONNECTOR"),
("SLACK_CONNECTOR", "SLACK_CONNECTOR"),
("TEAMS_CONNECTOR", "TEAMS_CONNECTOR"),
("GITHUB_CONNECTOR", "GITHUB_CONNECTOR"),
("LINEAR_CONNECTOR", "LINEAR_CONNECTOR"),
("DISCORD_CONNECTOR", "DISCORD_CONNECTOR"),
("JIRA_CONNECTOR", "JIRA_CONNECTOR"),
("CONFLUENCE_CONNECTOR", "CONFLUENCE_CONNECTOR"),
("CLICKUP_CONNECTOR", "CLICKUP_CONNECTOR"),
("GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR"),
("GOOGLE_GMAIL_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR"),
("GOOGLE_DRIVE_FILE", "GOOGLE_DRIVE_CONNECTOR"),
("AIRTABLE_CONNECTOR", "AIRTABLE_CONNECTOR"),
("LUMA_CONNECTOR", "LUMA_CONNECTOR"),
("ELASTICSEARCH_CONNECTOR", "ELASTICSEARCH_CONNECTOR"),
("BOOKSTACK_CONNECTOR", "BOOKSTACK_CONNECTOR"),
("CIRCLEBACK", "CIRCLEBACK_CONNECTOR"),
("OBSIDIAN_CONNECTOR", "OBSIDIAN_CONNECTOR"),
("COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"),
("COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR"),
("COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"),
("CRAWLED_URL", "WEBCRAWLER_CONNECTOR"),
]
for doc_type, connector_type in document_connector_mappings:
# Backfill connector_id for documents where:
# 1. Document has this document_type
# 2. Document doesn't already have a connector_id
# 3. There's exactly one connector of this type in the same search space
# This safely handles most cases while avoiding ambiguity
op.execute(
f"""
UPDATE documents d
SET connector_id = (
SELECT ssc.id
FROM search_source_connectors ssc
WHERE ssc.search_space_id = d.search_space_id
AND ssc.connector_type = '{connector_type}'
LIMIT 1
)
WHERE d.document_type = '{doc_type}'
AND d.connector_id IS NULL
AND EXISTS (
SELECT 1 FROM search_source_connectors ssc
WHERE ssc.search_space_id = d.search_space_id
AND ssc.connector_type = '{connector_type}'
);
"""
)
def downgrade() -> None:
"""Remove connector_id column from documents."""
# Drop foreign key constraint
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_connector_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
DROP CONSTRAINT fk_documents_connector_id;
END IF;
END$$;
"""
)
# Drop index
op.execute(
"""
DROP INDEX IF EXISTS ix_documents_connector_id;
"""
)
# Drop column
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'connector_id'
) THEN
ALTER TABLE documents
DROP COLUMN connector_id;
END IF;
END$$;
"""
)

View file

@ -81,6 +81,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.blocknote_migration_tasks",
"app.tasks.celery_tasks.document_reindex_tasks",
"app.tasks.celery_tasks.stale_notification_cleanup_task",
"app.tasks.celery_tasks.connector_deletion_task",
],
)

View file

@ -394,6 +394,8 @@ async def _process_gmail_message_batch(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1

View file

@ -442,6 +442,8 @@ async def index_composio_google_calendar(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1

View file

@ -1248,7 +1248,6 @@ async def _process_single_drive_file(
"file_name": file_name,
"FILE_NAME": file_name, # For compatibility
"mime_type": mime_type,
"connector_id": connector_id,
"toolkit_id": "googledrive",
"source": "composio",
},
@ -1258,6 +1257,8 @@ async def _process_single_drive_file(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -25,6 +25,7 @@ async def download_and_process_file(
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
connector_id: int | None = None,
) -> tuple[Any, str | None, dict[str, Any] | None]:
"""
Download Google Drive file and process using Surfsense file processors.
@ -37,6 +38,7 @@ async def download_and_process_file(
session: Database session
task_logger: Task logging service
log_entry: Log entry for tracking
connector_id: ID of the connector (for de-indexing support)
Returns:
Tuple of (Document object if successful, error message if failed, file metadata dict)
@ -92,6 +94,9 @@ async def download_and_process_file(
"source_connector": "google_drive",
},
}
# Include connector_id for de-indexing support
if connector_id is not None:
connector_info["connector_id"] = connector_id
# Add additional Drive metadata if available
if "modifiedTime" in file:

View file

@ -127,7 +127,12 @@ async def get_valid_credentials(
)
creds_dict["_token_encrypted"] = True
connector.config = creds_dict
# IMPORTANT: Merge new credentials with existing config to preserve
# user settings like selected_folders, selected_files, indexing_options,
# folder_tokens, etc. that would otherwise be wiped on token refresh.
existing_config = connector.config.copy() if connector.config else {}
existing_config.update(creds_dict)
connector.config = existing_config
flag_modified(connector, "config")
await session.commit()

View file

@ -751,7 +751,27 @@ class Document(BaseModel, TimestampMixin):
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)
# Track who created/uploaded this document
created_by_id = Column(
UUID(as_uuid=True),
ForeignKey("user.id", ondelete="SET NULL"),
nullable=True, # Nullable for backward compatibility with existing records
index=True,
)
# Track which connector created this document (for cleanup on connector deletion)
connector_id = Column(
Integer,
ForeignKey("search_source_connectors.id", ondelete="SET NULL"),
nullable=True, # Nullable for manually uploaded docs without connector
index=True,
)
# Relationships
search_space = relationship("SearchSpace", back_populates="documents")
created_by = relationship("User", back_populates="documents")
connector = relationship("SearchSourceConnector", back_populates="documents")
chunks = relationship(
"Chunk", back_populates="document", cascade="all, delete-orphan"
)
@ -980,6 +1000,9 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
# Documents created by this connector (for cleanup on connector deletion)
documents = relationship("Document", back_populates="connector")
class NewLLMConfig(BaseModel, TimestampMixin):
"""
@ -1286,6 +1309,13 @@ if config.AUTH_TYPE == "GOOGLE":
passive_deletes=True,
)
# Documents created/uploaded by this user
documents = relationship(
"Document",
back_populates="created_by",
passive_deletes=True,
)
# User memories for personalized AI responses
memories = relationship(
"UserMemory",
@ -1344,6 +1374,13 @@ else:
passive_deletes=True,
)
# Documents created/uploaded by this user
documents = relationship(
"Document",
back_populates="created_by",
passive_deletes=True,
)
# User memories for personalized AI responses
memories = relationship(
"UserMemory",

View file

@ -9,8 +9,12 @@ import logging
from datetime import datetime
from typing import Any
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSourceConnector, SearchSourceConnectorType, get_async_session
logger = logging.getLogger(__name__)
@ -212,6 +216,7 @@ def format_circleback_meeting_to_markdown(payload: CirclebackWebhookPayload) ->
async def receive_circleback_webhook(
search_space_id: int,
payload: CirclebackWebhookPayload,
session: AsyncSession = Depends(get_async_session),
):
"""
Receive and process a Circleback webhook.
@ -223,6 +228,7 @@ async def receive_circleback_webhook(
Args:
search_space_id: The ID of the search space to save the document to
payload: The Circleback webhook payload containing meeting data
session: Database session for looking up the connector
Returns:
Success message with document details
@ -236,6 +242,26 @@ async def receive_circleback_webhook(
f"Received Circleback webhook for meeting {payload.id} in search space {search_space_id}"
)
# Look up the Circleback connector for this search space
connector_result = await session.execute(
select(SearchSourceConnector.id).where(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.CIRCLEBACK_CONNECTOR,
)
)
connector_id = connector_result.scalar_one_or_none()
if connector_id:
logger.info(
f"Found Circleback connector {connector_id} for search space {search_space_id}"
)
else:
logger.warning(
f"No Circleback connector found for search space {search_space_id}. "
"Document will be created without connector_id."
)
# Convert to markdown
markdown_content = format_circleback_meeting_to_markdown(payload)
@ -264,6 +290,7 @@ async def receive_circleback_webhook(
markdown_content=markdown_content,
metadata=meeting_metadata,
search_space_id=search_space_id,
connector_id=connector_id,
)
logger.info(

View file

@ -20,6 +20,7 @@ from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.db import (
@ -330,10 +331,19 @@ async def composio_callback(
)
# Update existing connector with new connected_account_id
# IMPORTANT: Merge new credentials with existing config to preserve
# user settings like selected_folders, selected_files, indexing_options,
# drive_page_token, etc. that would otherwise be wiped on reconnection.
logger.info(
f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}"
)
existing_connector.config = connector_config
existing_config = (
existing_connector.config.copy() if existing_connector.config else {}
)
existing_config.update(connector_config)
existing_connector.config = existing_config
flag_modified(existing_connector, "config")
await session.commit()
await session.refresh(existing_connector)

View file

@ -76,6 +76,7 @@ async def create_note(
document_metadata={"NOTE": True},
embedding=None, # Will be generated on first reindex
updated_at=datetime.now(UTC),
created_by_id=user.id, # Track who created this note
)
session.add(document)
@ -93,6 +94,7 @@ async def create_note(
search_space_id=document.search_space_id,
created_at=document.created_at,
updated_at=document.updated_at,
created_by_id=document.created_by_id,
)

View file

@ -527,9 +527,17 @@ async def delete_search_source_connector(
user: User = Depends(current_active_user),
):
"""
Delete a search source connector.
Delete a search source connector and all its associated documents.
The deletion runs in background via Celery task. User is notified
via the notification system when complete (no polling required).
Requires CONNECTORS_DELETE permission.
"""
from app.tasks.celery_tasks.connector_deletion_task import (
delete_connector_with_documents_task,
)
try:
# Get the connector first
result = await session.execute(
@ -551,7 +559,12 @@ async def delete_search_source_connector(
"You don't have permission to delete this connector",
)
# Delete any periodic schedule associated with this connector
# Store connector info before we queue the deletion task
connector_name = db_connector.name
connector_type = db_connector.connector_type.value
search_space_id = db_connector.search_space_id
# Delete any periodic schedule associated with this connector (lightweight, sync)
if db_connector.periodic_indexing_enabled:
success = delete_periodic_schedule(connector_id)
if not success:
@ -559,7 +572,7 @@ async def delete_search_source_connector(
f"Failed to delete periodic schedule for connector {connector_id}"
)
# For Composio connectors, also delete the connected account in Composio
# For Composio connectors, delete the connected account in Composio (lightweight API call, sync)
composio_connector_types = [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
@ -591,16 +604,33 @@ async def delete_search_source_connector(
f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}"
)
await session.delete(db_connector)
await session.commit()
return {"message": "Search source connector deleted successfully"}
# Queue background task to delete documents and connector
# This handles potentially large document counts without blocking the API
delete_connector_with_documents_task.delay(
connector_id=connector_id,
user_id=str(user.id),
search_space_id=search_space_id,
connector_name=connector_name,
connector_type=connector_type,
)
logger.info(
f"Queued deletion task for connector {connector_id} ({connector_name})"
)
return {
"message": "Connector deletion started. You will be notified when complete.",
"status": "queued",
"connector_id": connector_id,
"connector_name": connector_name,
}
except HTTPException:
raise
except Exception as e:
await session.rollback()
raise HTTPException(
status_code=500,
detail=f"Failed to delete search source connector: {e!s}",
detail=f"Failed to start connector deletion: {e!s}",
) from e

View file

@ -1,5 +1,6 @@
from datetime import datetime
from typing import TypeVar
from uuid import UUID
from pydantic import BaseModel, ConfigDict
@ -51,6 +52,7 @@ class DocumentRead(BaseModel):
created_at: datetime
updated_at: datetime | None
search_space_id: int
created_by_id: UUID | None = None # User who created/uploaded this document
model_config = ConfigDict(from_attributes=True)

View file

@ -0,0 +1,269 @@
"""Celery task for background connector deletion.
This task handles the deletion of all documents associated with a connector
in the background, then deletes the connector itself. User is notified via
the notification system when complete (no polling required).
Features:
- Batch deletion to handle large document counts
- Automatic retry on failure
- Progress tracking via notifications
- Handles both success and failure notifications
"""
import asyncio
import logging
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import Document, Notification, SearchSourceConnector
logger = logging.getLogger(__name__)
# Batch size for document deletion
DELETION_BATCH_SIZE = 500
def _get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False), engine
@celery_app.task(
bind=True,
name="delete_connector_with_documents",
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
)
def delete_connector_with_documents_task(
self,
connector_id: int,
user_id: str,
search_space_id: int,
connector_name: str,
connector_type: str,
):
"""
Background task to delete a connector and all its associated documents.
Creates a notification when complete (success or failure).
No polling required - user sees notification in UI.
Args:
connector_id: ID of the connector to delete
user_id: ID of the user who initiated the deletion
search_space_id: ID of the search space
connector_name: Name of the connector (for notification message)
connector_type: Type of the connector (for logging)
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_delete_connector_async(
connector_id=connector_id,
user_id=user_id,
search_space_id=search_space_id,
connector_name=connector_name,
connector_type=connector_type,
)
)
finally:
loop.close()
async def _delete_connector_async(
connector_id: int,
user_id: str,
search_space_id: int,
connector_name: str,
connector_type: str,
) -> dict:
"""
Async implementation of connector deletion.
Steps:
1. Count total documents to delete
2. Delete documents in batches (chunks cascade automatically)
3. Delete the connector record
4. Create success notification
On failure, creates failure notification and re-raises exception.
"""
session_maker, engine = _get_celery_session_maker()
total_deleted = 0
try:
async with session_maker() as session:
# Step 1: Count total documents for this connector
count_result = await session.execute(
select(func.count(Document.id)).where(
Document.connector_id == connector_id
)
)
total_docs = count_result.scalar() or 0
logger.info(
f"Starting deletion of connector {connector_id} ({connector_name}). "
f"Documents to delete: {total_docs}"
)
# Step 2: Delete documents in batches
while True:
# Get batch of document IDs
result = await session.execute(
select(Document.id)
.where(Document.connector_id == connector_id)
.limit(DELETION_BATCH_SIZE)
)
doc_ids = [row[0] for row in result.fetchall()]
if not doc_ids:
break
# Delete this batch (chunks are deleted via CASCADE)
await session.execute(delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
total_deleted += len(doc_ids)
logger.info(
f"Deleted batch of {len(doc_ids)} documents. "
f"Progress: {total_deleted}/{total_docs}"
)
# Step 3: Delete the connector record
result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == connector_id
)
)
connector = result.scalar_one_or_none()
if connector:
await session.delete(connector)
logger.info(f"Deleted connector record: {connector_id}")
else:
logger.warning(
f"Connector {connector_id} not found - may have been already deleted"
)
# Step 4: Create success notification
doc_text = "document" if total_deleted == 1 else "documents"
notification = Notification(
user_id=UUID(user_id),
search_space_id=search_space_id,
type="connector_deletion",
title=f"{connector_name} Removed",
message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.",
notification_metadata={
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"documents_deleted": total_deleted,
"status": "completed",
},
)
session.add(notification)
await session.commit()
logger.info(
f"Connector {connector_id} ({connector_name}) deleted successfully. "
f"Total documents deleted: {total_deleted}"
)
return {
"status": "success",
"connector_id": connector_id,
"connector_name": connector_name,
"documents_deleted": total_deleted,
}
except Exception as e:
logger.error(
f"Failed to delete connector {connector_id} ({connector_name}): {e!s}",
exc_info=True,
)
# Create failure notification
try:
async with session_maker() as session:
notification = Notification(
user_id=UUID(user_id),
search_space_id=search_space_id,
type="connector_deletion",
title=f"Failed to Remove {connector_name}",
message="Something went wrong while removing this connector. Please try again.",
notification_metadata={
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"documents_deleted": total_deleted,
"status": "failed",
"error": str(e),
},
)
session.add(notification)
await session.commit()
except Exception as notify_error:
logger.error(
f"Failed to create failure notification: {notify_error!s}",
exc_info=True,
)
# Re-raise to trigger Celery retry
raise
finally:
await engine.dispose()
async def delete_documents_by_connector_id(
session,
connector_id: int,
batch_size: int = DELETION_BATCH_SIZE,
) -> int:
"""
Delete all documents associated with a connector in batches.
This is a utility function that can be used independently of the Celery task
for synchronous deletion scenarios (e.g., small document counts).
Args:
session: AsyncSession instance
connector_id: ID of the connector
batch_size: Number of documents to delete per batch
Returns:
Total number of documents deleted
"""
total_deleted = 0
while True:
result = await session.execute(
select(Document.id)
.where(Document.connector_id == connector_id)
.limit(batch_size)
)
doc_ids = [row[0] for row in result.fetchall()]
if not doc_ids:
break
await session.execute(delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
total_deleted += len(doc_ids)
return total_deleted

View file

@ -545,6 +545,7 @@ def process_circleback_meeting_task(
markdown_content: str,
metadata: dict,
search_space_id: int,
connector_id: int | None = None,
):
"""
Celery task to process Circleback meeting webhook data.
@ -555,6 +556,7 @@ def process_circleback_meeting_task(
markdown_content: Meeting content formatted as markdown
metadata: Meeting metadata dictionary
search_space_id: ID of the search space
connector_id: ID of the Circleback connector (for deletion support)
"""
import asyncio
@ -569,6 +571,7 @@ def process_circleback_meeting_task(
markdown_content,
metadata,
search_space_id,
connector_id,
)
)
finally:
@ -581,6 +584,7 @@ async def _process_circleback_meeting(
markdown_content: str,
metadata: dict,
search_space_id: int,
connector_id: int | None = None,
):
"""Process Circleback meeting with new session."""
from app.tasks.document_processors.circleback_processor import (
@ -637,6 +641,7 @@ async def _process_circleback_meeting(
markdown_content=markdown_content,
metadata=metadata,
search_space_id=search_space_id,
connector_id=connector_id,
)
if result:

View file

@ -417,6 +417,8 @@ async def index_airtable_records(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -396,6 +396,8 @@ async def index_bookstack_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -395,6 +395,8 @@ async def index_clickup_tasks(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -402,6 +402,8 @@ async def index_confluence_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -527,6 +527,8 @@ async def index_discord_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -292,6 +292,8 @@ async def index_elasticsearch_documents(
document_metadata=metadata,
search_space_id=search_space_id,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
# Create chunks and attach to document (persist via relationship)

View file

@ -220,6 +220,7 @@ async def index_github_repos(
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
connector_id=connector_id,
)
documents_processed += docs_created
@ -292,6 +293,7 @@ async def _process_repository_digest(
user_id: str,
task_logger: TaskLoggingService,
log_entry,
connector_id: int,
) -> int:
"""
Process a repository digest and create documents.
@ -426,6 +428,8 @@ async def _process_repository_digest(
search_space_id=search_space_id,
chunks=chunks_data,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -499,6 +499,8 @@ async def index_google_calendar_events(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -767,6 +767,7 @@ async def _process_single_file(
session=session,
task_logger=task_logger,
log_entry=log_entry,
connector_id=connector_id,
)
if error:

View file

@ -413,7 +413,6 @@ async def index_google_gmail_messages(
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
@ -421,6 +420,8 @@ async def index_google_gmail_messages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1

View file

@ -380,6 +380,8 @@ async def index_jira_issues(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -413,6 +413,8 @@ async def index_linear_issues(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -476,6 +476,8 @@ async def index_luma_events(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -398,6 +398,7 @@ async def index_notion_pages(
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
existing_document.connector_id = connector_id
documents_indexed += 1
logger.info(f"Successfully updated Notion page: {page_title}")
@ -470,6 +471,8 @@ async def index_notion_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -500,6 +500,8 @@ async def index_obsidian_vault(
embedding=embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(new_document)

View file

@ -389,6 +389,8 @@ async def index_slack_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -430,6 +430,8 @@ async def index_teams_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -371,6 +371,8 @@ async def index_crawled_urls(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -8,10 +8,17 @@ and stores it as searchable documents in the database.
import logging
from typing import Any
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.db import (
Document,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
)
from app.services.llm_service import get_document_summary_llm
from app.utils.document_converters import (
create_document_chunks,
@ -35,6 +42,7 @@ async def add_circleback_meeting_document(
markdown_content: str,
metadata: dict[str, Any],
search_space_id: int,
connector_id: int | None = None,
) -> Document | None:
"""
Process and store a Circleback meeting document.
@ -46,6 +54,7 @@ async def add_circleback_meeting_document(
markdown_content: Meeting content formatted as markdown
metadata: Meeting metadata dictionary
search_space_id: ID of the search space
connector_id: ID of the Circleback connector (for deletion support)
Returns:
Document object if successful, None if failed or duplicate
@ -125,6 +134,30 @@ async def add_circleback_meeting_document(
**metadata,
}
# Fetch the user who set up the Circleback connector (preferred)
# or fall back to search space owner if no connector found
created_by_user_id = None
# Try to find the Circleback connector for this search space
connector_result = await session.execute(
select(SearchSourceConnector.user_id).where(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.CIRCLEBACK_CONNECTOR,
)
)
connector_user = connector_result.scalar_one_or_none()
if connector_user:
# Use the user who set up the Circleback connector
created_by_user_id = connector_user
else:
# Fallback: use search space owner if no connector found
search_space_result = await session.execute(
select(SearchSpace.user_id).where(SearchSpace.id == search_space_id)
)
created_by_user_id = search_space_result.scalar_one_or_none()
# Update or create document
if existing_document:
# Update existing document
@ -138,6 +171,9 @@ async def add_circleback_meeting_document(
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
# Ensure connector_id is set (backfill for documents created before this field)
if connector_id is not None:
existing_document.connector_id = connector_id
await session.commit()
await session.refresh(existing_document)
@ -160,6 +196,8 @@ async def add_circleback_meeting_document(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=created_by_user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -185,6 +185,7 @@ async def add_extension_received_document(
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -526,6 +526,8 @@ async def add_received_file_document_using_unstructured(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -665,6 +667,8 @@ async def add_received_file_document_using_llamacloud(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -829,6 +833,8 @@ async def add_received_file_document_using_docling(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -849,7 +855,7 @@ async def add_received_file_document_using_docling(
async def _update_document_from_connector(
document: Document | None, connector: dict | None, session: AsyncSession
) -> None:
"""Helper to update document type and metadata from connector info."""
"""Helper to update document type, metadata, and connector_id from connector info."""
if document and connector:
if "type" in connector:
document.document_type = connector["type"]
@ -861,6 +867,9 @@ async def _update_document_from_connector(
# Expand existing metadata with connector metadata
merged = {**document.document_metadata, **connector["metadata"]}
document.document_metadata = merged
# Set connector_id if provided for de-indexing support
if "connector_id" in connector:
document.connector_id = connector["connector_id"]
await session.commit()

View file

@ -295,6 +295,8 @@ async def add_received_markdown_file_document(
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)

View file

@ -357,6 +357,7 @@ async def add_youtube_video_document(
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -237,7 +237,7 @@ export function InboxSidebar({
const currentDataSource = activeTab === "mentions" ? mentions : status;
const { loading, loadingMore = false, hasMore = false, loadMore } = currentDataSource;
// Status tab includes: connector indexing, document processing, page limit exceeded
// Status tab includes: connector indexing, document processing, page limit exceeded, connector deletion
// Filter to only show status notification types
const statusItems = useMemo(
() =>
@ -245,7 +245,8 @@ export function InboxSidebar({
(item) =>
item.type === "connector_indexing" ||
item.type === "document_processing" ||
item.type === "page_limit_exceeded"
item.type === "page_limit_exceeded" ||
item.type === "connector_deletion"
),
[status.items]
);

View file

@ -7,6 +7,7 @@ import { documentTypeEnum } from "./document.types";
*/
export const inboxItemTypeEnum = z.enum([
"connector_indexing",
"connector_deletion",
"document_processing",
"new_mention",
"page_limit_exceeded",
@ -60,6 +61,17 @@ export const connectorIndexingMetadata = baseInboxItemMetadata.extend({
file_names: z.array(z.string()).optional(),
});
/**
* Connector deletion metadata schema
*/
export const connectorDeletionMetadata = baseInboxItemMetadata.extend({
connector_id: z.number(),
connector_name: z.string(),
connector_type: z.string(),
documents_deleted: z.number(),
error: z.string().optional(),
});
/**
* Document processing metadata schema
*/
@ -110,6 +122,7 @@ export const pageLimitExceededMetadata = baseInboxItemMetadata.extend({
*/
export const inboxItemMetadata = z.union([
connectorIndexingMetadata,
connectorDeletionMetadata,
documentProcessingMetadata,
newMentionMetadata,
pageLimitExceededMetadata,
@ -140,6 +153,11 @@ export const connectorIndexingInboxItem = inboxItem.extend({
metadata: connectorIndexingMetadata,
});
export const connectorDeletionInboxItem = inboxItem.extend({
type: z.literal("connector_deletion"),
metadata: connectorDeletionMetadata,
});
export const documentProcessingInboxItem = inboxItem.extend({
type: z.literal("document_processing"),
metadata: documentProcessingMetadata,
@ -235,6 +253,15 @@ export function isConnectorIndexingMetadata(
return connectorIndexingMetadata.safeParse(metadata).success;
}
/**
* Type guard for ConnectorDeletionMetadata
*/
export function isConnectorDeletionMetadata(
metadata: unknown
): metadata is ConnectorDeletionMetadata {
return connectorDeletionMetadata.safeParse(metadata).success;
}
/**
* Type guard for DocumentProcessingMetadata
*/
@ -268,6 +295,7 @@ export function parseInboxItemMetadata(
metadata: unknown
):
| ConnectorIndexingMetadata
| ConnectorDeletionMetadata
| DocumentProcessingMetadata
| NewMentionMetadata
| PageLimitExceededMetadata
@ -277,6 +305,10 @@ export function parseInboxItemMetadata(
const result = connectorIndexingMetadata.safeParse(metadata);
return result.success ? result.data : null;
}
case "connector_deletion": {
const result = connectorDeletionMetadata.safeParse(metadata);
return result.success ? result.data : null;
}
case "document_processing": {
const result = documentProcessingMetadata.safeParse(metadata);
return result.success ? result.data : null;
@ -303,12 +335,14 @@ export type InboxItemStatusEnum = z.infer<typeof inboxItemStatusEnum>;
export type DocumentProcessingStageEnum = z.infer<typeof documentProcessingStageEnum>;
export type BaseInboxItemMetadata = z.infer<typeof baseInboxItemMetadata>;
export type ConnectorIndexingMetadata = z.infer<typeof connectorIndexingMetadata>;
export type ConnectorDeletionMetadata = z.infer<typeof connectorDeletionMetadata>;
export type DocumentProcessingMetadata = z.infer<typeof documentProcessingMetadata>;
export type NewMentionMetadata = z.infer<typeof newMentionMetadata>;
export type PageLimitExceededMetadata = z.infer<typeof pageLimitExceededMetadata>;
export type InboxItemMetadata = z.infer<typeof inboxItemMetadata>;
export type InboxItem = z.infer<typeof inboxItem>;
export type ConnectorIndexingInboxItem = z.infer<typeof connectorIndexingInboxItem>;
export type ConnectorDeletionInboxItem = z.infer<typeof connectorDeletionInboxItem>;
export type DocumentProcessingInboxItem = z.infer<typeof documentProcessingInboxItem>;
export type NewMentionInboxItem = z.infer<typeof newMentionInboxItem>;
export type PageLimitExceededInboxItem = z.infer<typeof pageLimitExceededInboxItem>;