From e0ade20e68e8615c9c3afbd3bc009b8c466a7a27 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 2 Feb 2026 12:32:24 +0530 Subject: [PATCH 1/6] feat: add created_by_id column to documents for ownership tracking and update related connectors --- .../versions/86_add_document_created_by.py | 126 ++++++++++++++++++ .../connectors/composio_gmail_connector.py | 1 + .../composio_google_calendar_connector.py | 1 + .../composio_google_drive_connector.py | 1 + surfsense_backend/app/db.py | 25 ++++ surfsense_backend/app/routes/notes_routes.py | 2 + surfsense_backend/app/schemas/documents.py | 2 + .../connector_indexers/airtable_indexer.py | 1 + .../connector_indexers/bookstack_indexer.py | 1 + .../connector_indexers/clickup_indexer.py | 1 + .../connector_indexers/confluence_indexer.py | 1 + .../connector_indexers/discord_indexer.py | 1 + .../elasticsearch_indexer.py | 1 + .../connector_indexers/github_indexer.py | 1 + .../google_calendar_indexer.py | 1 + .../google_gmail_indexer.py | 1 + .../tasks/connector_indexers/jira_indexer.py | 1 + .../connector_indexers/linear_indexer.py | 1 + .../tasks/connector_indexers/luma_indexer.py | 1 + .../connector_indexers/notion_indexer.py | 1 + .../connector_indexers/obsidian_indexer.py | 1 + .../tasks/connector_indexers/slack_indexer.py | 1 + .../tasks/connector_indexers/teams_indexer.py | 1 + .../connector_indexers/webcrawler_indexer.py | 1 + .../circleback_processor.py | 34 ++++- .../extension_processor.py | 1 + .../document_processors/file_processors.py | 3 + .../document_processors/markdown_processor.py | 1 + .../document_processors/youtube_processor.py | 1 + 29 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 surfsense_backend/alembic/versions/86_add_document_created_by.py diff --git a/surfsense_backend/alembic/versions/86_add_document_created_by.py b/surfsense_backend/alembic/versions/86_add_document_created_by.py new file mode 100644 index 000000000..e4ce2a40f --- /dev/null +++ b/surfsense_backend/alembic/versions/86_add_document_created_by.py @@ -0,0 +1,126 @@ +"""Add created_by_id column to documents table for document ownership tracking + +Revision ID: 86 +Revises: 85 +Create Date: 2026-02-02 + +Changes: +1. Add created_by_id column (UUID, nullable, foreign key to user.id) +2. Create index on created_by_id for performance +3. Backfill existing documents with search space owner's user_id +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "86" +down_revision: str | None = "85" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add created_by_id column to documents and backfill with search space owner.""" + + # 1. Add created_by_id column (nullable for backward compatibility) + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'created_by_id' + ) THEN + ALTER TABLE documents + ADD COLUMN created_by_id UUID; + END IF; + END$$; + """ + ) + + # 2. Create index on created_by_id for efficient queries + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_documents_created_by_id + ON documents (created_by_id); + """ + ) + + # 3. Add foreign key constraint with ON DELETE SET NULL + # First check if constraint already exists + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_created_by_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + ADD CONSTRAINT fk_documents_created_by_id + FOREIGN KEY (created_by_id) REFERENCES "user"(id) + ON DELETE SET NULL; + END IF; + END$$; + """ + ) + + # 4. Backfill existing documents with search space owner's user_id + # This ensures all existing documents are associated with the search space owner + op.execute( + """ + UPDATE documents + SET created_by_id = searchspaces.user_id + FROM searchspaces + WHERE documents.search_space_id = searchspaces.id + AND documents.created_by_id IS NULL; + """ + ) + + +def downgrade() -> None: + """Remove created_by_id column from documents.""" + + # Drop foreign key constraint + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_created_by_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + DROP CONSTRAINT fk_documents_created_by_id; + END IF; + END$$; + """ + ) + + # Drop index + op.execute( + """ + DROP INDEX IF EXISTS ix_documents_created_by_id; + """ + ) + + # Drop column + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'created_by_id' + ) THEN + ALTER TABLE documents + DROP COLUMN created_by_id; + END IF; + END$$; + """ + ) + diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index d3a0d344b..2b470ae38 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -394,6 +394,7 @@ async def _process_gmail_message_batch( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index 4302e479b..960757901 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -442,6 +442,7 @@ async def index_composio_google_calendar( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 364712215..b4b3e7ee6 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -1258,6 +1258,7 @@ async def _process_single_drive_file( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 97d15d90f..b3a6266a0 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -749,7 +749,18 @@ class Document(BaseModel, TimestampMixin): search_space_id = Column( Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False ) + + # Track who created/uploaded this document + created_by_id = Column( + UUID(as_uuid=True), + ForeignKey("user.id", ondelete="SET NULL"), + nullable=True, # Nullable for backward compatibility with existing records + index=True, + ) + + # Relationships search_space = relationship("SearchSpace", back_populates="documents") + created_by = relationship("User", back_populates="documents") chunks = relationship( "Chunk", back_populates="document", cascade="all, delete-orphan" ) @@ -1284,6 +1295,13 @@ if config.AUTH_TYPE == "GOOGLE": passive_deletes=True, ) + # Documents created/uploaded by this user + documents = relationship( + "Document", + back_populates="created_by", + passive_deletes=True, + ) + # User memories for personalized AI responses memories = relationship( "UserMemory", @@ -1342,6 +1360,13 @@ else: passive_deletes=True, ) + # Documents created/uploaded by this user + documents = relationship( + "Document", + back_populates="created_by", + passive_deletes=True, + ) + # User memories for personalized AI responses memories = relationship( "UserMemory", diff --git a/surfsense_backend/app/routes/notes_routes.py b/surfsense_backend/app/routes/notes_routes.py index 5bb0a88a9..928cd462a 100644 --- a/surfsense_backend/app/routes/notes_routes.py +++ b/surfsense_backend/app/routes/notes_routes.py @@ -76,6 +76,7 @@ async def create_note( document_metadata={"NOTE": True}, embedding=None, # Will be generated on first reindex updated_at=datetime.now(UTC), + created_by_id=user.id, # Track who created this note ) session.add(document) @@ -93,6 +94,7 @@ async def create_note( search_space_id=document.search_space_id, created_at=document.created_at, updated_at=document.updated_at, + created_by_id=document.created_by_id, ) diff --git a/surfsense_backend/app/schemas/documents.py b/surfsense_backend/app/schemas/documents.py index 2b4bda0ca..1f82ae9ce 100644 --- a/surfsense_backend/app/schemas/documents.py +++ b/surfsense_backend/app/schemas/documents.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import TypeVar +from uuid import UUID from pydantic import BaseModel, ConfigDict @@ -51,6 +52,7 @@ class DocumentRead(BaseModel): created_at: datetime updated_at: datetime | None search_space_id: int + created_by_id: UUID | None = None # User who created/uploaded this document model_config = ConfigDict(from_attributes=True) diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 3bcf95d6a..7d0837ac1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -417,6 +417,7 @@ async def index_airtable_records( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index d726e5d95..fd89792e9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -396,6 +396,7 @@ async def index_bookstack_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e7e8b23e5..bcdb9c72a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -395,6 +395,7 @@ async def index_clickup_tasks( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 2f20472d2..3f8f43669 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -402,6 +402,7 @@ async def index_confluence_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index a70bc42d4..3d226ed06 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -527,6 +527,7 @@ async def index_discord_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 8fbba6463..6f2dd797f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -292,6 +292,7 @@ async def index_elasticsearch_documents( document_metadata=metadata, search_space_id=search_space_id, updated_at=get_current_timestamp(), + created_by_id=user_id, ) # Create chunks and attach to document (persist via relationship) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index b01d235cf..947035048 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -426,6 +426,7 @@ async def _process_repository_digest( search_space_id=search_space_id, chunks=chunks_data, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index f64a7a5c3..28037ba7e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -499,6 +499,7 @@ async def index_google_calendar_events( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 45ce91c6f..7c6b9ffec 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -421,6 +421,7 @@ async def index_google_gmail_messages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index acee74192..6262e8535 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -380,6 +380,7 @@ async def index_jira_issues( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index fc4ae5f18..dd0483eda 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -413,6 +413,7 @@ async def index_linear_issues( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index a18abf8ae..74e809384 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -476,6 +476,7 @@ async def index_luma_events( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 52622471a..169dbd775 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -470,6 +470,7 @@ async def index_notion_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index a8cd78cc9..a2ccd64d9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -500,6 +500,7 @@ async def index_obsidian_vault( embedding=embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(new_document) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 5923c8089..d922178ce 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -389,6 +389,7 @@ async def index_slack_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 162509a1e..7b401f6cf 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -430,6 +430,7 @@ async def index_teams_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index ac16ecde6..63105d7a5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -371,6 +371,7 @@ async def index_crawled_urls( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index 0a1d91784..ce596d579 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -8,10 +8,17 @@ and stores it as searchable documents in the database. import logging from typing import Any +from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Document, DocumentType +from app.db import ( + Document, + DocumentType, + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, +) from app.services.llm_service import get_document_summary_llm from app.utils.document_converters import ( create_document_chunks, @@ -125,6 +132,30 @@ async def add_circleback_meeting_document( **metadata, } + # Fetch the user who set up the Circleback connector (preferred) + # or fall back to search space owner if no connector found + created_by_user_id = None + + # Try to find the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.user_id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_user = connector_result.scalar_one_or_none() + + if connector_user: + # Use the user who set up the Circleback connector + created_by_user_id = connector_user + else: + # Fallback: use search space owner if no connector found + search_space_result = await session.execute( + select(SearchSpace.user_id).where(SearchSpace.id == search_space_id) + ) + created_by_user_id = search_space_result.scalar_one_or_none() + # Update or create document if existing_document: # Update existing document @@ -160,6 +191,7 @@ async def add_circleback_meeting_document( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=created_by_user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index 7d8462872..9ddab4ec6 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -185,6 +185,7 @@ async def add_extension_received_document( unique_identifier_hash=unique_identifier_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 6c4be0cb8..2f2e5a2e8 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -526,6 +526,7 @@ async def add_received_file_document_using_unstructured( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) @@ -665,6 +666,7 @@ async def add_received_file_document_using_llamacloud( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) @@ -829,6 +831,7 @@ async def add_received_file_document_using_docling( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index 3a9867fd6..a2399206a 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -295,6 +295,7 @@ async def add_received_markdown_file_document( unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index da1a8f538..7251fb22f 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -357,6 +357,7 @@ async def add_youtube_video_document( unique_identifier_hash=unique_identifier_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) From b4ed15585ee4b9332d5d34bae266f62c3b4c17e1 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:07:48 +0530 Subject: [PATCH 2/6] chore: linting --- surfsense_backend/alembic/versions/86_add_document_created_by.py | 1 - 1 file changed, 1 deletion(-) diff --git a/surfsense_backend/alembic/versions/86_add_document_created_by.py b/surfsense_backend/alembic/versions/86_add_document_created_by.py index e4ce2a40f..ea2e709c8 100644 --- a/surfsense_backend/alembic/versions/86_add_document_created_by.py +++ b/surfsense_backend/alembic/versions/86_add_document_created_by.py @@ -123,4 +123,3 @@ def downgrade() -> None: END$$; """ ) - From bf08982029c68eca3562e4a78ad406da992e3548 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:23:26 +0530 Subject: [PATCH 3/6] feat: add connector_id to documents for source tracking and implement connector deletion task --- .../versions/87_add_document_connector_id.py | 171 +++++++++++ surfsense_backend/app/celery_app.py | 1 + .../connectors/composio_gmail_connector.py | 1 + .../composio_google_calendar_connector.py | 1 + .../composio_google_drive_connector.py | 2 +- .../google_drive/content_extractor.py | 5 + surfsense_backend/app/db.py | 12 + .../app/routes/circleback_webhook_route.py | 29 +- .../routes/search_source_connectors_routes.py | 44 ++- .../celery_tasks/connector_deletion_task.py | 272 ++++++++++++++++++ .../app/tasks/celery_tasks/document_tasks.py | 5 + .../connector_indexers/airtable_indexer.py | 1 + .../connector_indexers/bookstack_indexer.py | 1 + .../connector_indexers/clickup_indexer.py | 1 + .../connector_indexers/confluence_indexer.py | 1 + .../connector_indexers/discord_indexer.py | 1 + .../elasticsearch_indexer.py | 1 + .../connector_indexers/github_indexer.py | 3 + .../google_calendar_indexer.py | 1 + .../google_drive_indexer.py | 1 + .../google_gmail_indexer.py | 2 +- .../tasks/connector_indexers/jira_indexer.py | 1 + .../connector_indexers/linear_indexer.py | 1 + .../tasks/connector_indexers/luma_indexer.py | 1 + .../connector_indexers/notion_indexer.py | 2 + .../connector_indexers/obsidian_indexer.py | 1 + .../tasks/connector_indexers/slack_indexer.py | 1 + .../tasks/connector_indexers/teams_indexer.py | 1 + .../connector_indexers/webcrawler_indexer.py | 1 + .../circleback_processor.py | 6 + .../document_processors/file_processors.py | 8 +- .../document_processors/markdown_processor.py | 1 + .../layout/ui/sidebar/InboxSidebar.tsx | 5 +- 33 files changed, 572 insertions(+), 13 deletions(-) create mode 100644 surfsense_backend/alembic/versions/87_add_document_connector_id.py create mode 100644 surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py diff --git a/surfsense_backend/alembic/versions/87_add_document_connector_id.py b/surfsense_backend/alembic/versions/87_add_document_connector_id.py new file mode 100644 index 000000000..b075014ae --- /dev/null +++ b/surfsense_backend/alembic/versions/87_add_document_connector_id.py @@ -0,0 +1,171 @@ +"""Add connector_id column to documents table for linking documents to their source connector + +Revision ID: 87 +Revises: 86 +Create Date: 2026-02-02 + +Changes: +1. Add connector_id column (Integer, nullable, foreign key to search_source_connectors.id) +2. Create index on connector_id for efficient bulk deletion queries +3. SET NULL on delete - allows controlled cleanup in application code +4. Backfill existing documents based on document_type and search_space_id matching +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "87" +down_revision: str | None = "86" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add connector_id column to documents and backfill from existing connectors.""" + + # 1. Add connector_id column (nullable - for manually uploaded docs without connector) + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + ADD COLUMN connector_id INTEGER; + END IF; + END$$; + """ + ) + + # 2. Create index on connector_id for efficient cleanup queries + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_documents_connector_id + ON documents (connector_id); + """ + ) + + # 3. Add foreign key constraint with ON DELETE SET NULL + # SET NULL allows us to delete documents in controlled batches before deleting connector + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + ADD CONSTRAINT fk_documents_connector_id + FOREIGN KEY (connector_id) REFERENCES search_source_connectors(id) + ON DELETE SET NULL; + END IF; + END$$; + """ + ) + + # 4. Backfill existing documents with connector_id based on document_type matching + # This maps document types to their corresponding connector types + # Only backfills for documents in search spaces that have exactly one connector of that type + + # Map of document_type -> connector_type for backfilling + document_connector_mappings = [ + ("NOTION_CONNECTOR", "NOTION_CONNECTOR"), + ("SLACK_CONNECTOR", "SLACK_CONNECTOR"), + ("TEAMS_CONNECTOR", "TEAMS_CONNECTOR"), + ("GITHUB_CONNECTOR", "GITHUB_CONNECTOR"), + ("LINEAR_CONNECTOR", "LINEAR_CONNECTOR"), + ("DISCORD_CONNECTOR", "DISCORD_CONNECTOR"), + ("JIRA_CONNECTOR", "JIRA_CONNECTOR"), + ("CONFLUENCE_CONNECTOR", "CONFLUENCE_CONNECTOR"), + ("CLICKUP_CONNECTOR", "CLICKUP_CONNECTOR"), + ("GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR"), + ("GOOGLE_GMAIL_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR"), + ("GOOGLE_DRIVE_FILE", "GOOGLE_DRIVE_CONNECTOR"), + ("AIRTABLE_CONNECTOR", "AIRTABLE_CONNECTOR"), + ("LUMA_CONNECTOR", "LUMA_CONNECTOR"), + ("ELASTICSEARCH_CONNECTOR", "ELASTICSEARCH_CONNECTOR"), + ("BOOKSTACK_CONNECTOR", "BOOKSTACK_CONNECTOR"), + ("CIRCLEBACK", "CIRCLEBACK_CONNECTOR"), + ("OBSIDIAN_CONNECTOR", "OBSIDIAN_CONNECTOR"), + ("COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"), + ("COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR"), + ("COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"), + ("CRAWLED_URL", "WEBCRAWLER_CONNECTOR"), + ] + + for doc_type, connector_type in document_connector_mappings: + # Backfill connector_id for documents where: + # 1. Document has this document_type + # 2. Document doesn't already have a connector_id + # 3. There's exactly one connector of this type in the same search space + # This safely handles most cases while avoiding ambiguity + op.execute( + f""" + UPDATE documents d + SET connector_id = ( + SELECT ssc.id + FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + LIMIT 1 + ) + WHERE d.document_type = '{doc_type}' + AND d.connector_id IS NULL + AND EXISTS ( + SELECT 1 FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + ); + """ + ) + + +def downgrade() -> None: + """Remove connector_id column from documents.""" + + # Drop foreign key constraint + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + DROP CONSTRAINT fk_documents_connector_id; + END IF; + END$$; + """ + ) + + # Drop index + op.execute( + """ + DROP INDEX IF EXISTS ix_documents_connector_id; + """ + ) + + # Drop column + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + DROP COLUMN connector_id; + END IF; + END$$; + """ + ) + diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index b77f5698e..fc13fd3eb 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -80,6 +80,7 @@ celery_app = Celery( "app.tasks.celery_tasks.blocknote_migration_tasks", "app.tasks.celery_tasks.document_reindex_tasks", "app.tasks.celery_tasks.stale_notification_cleanup_task", + "app.tasks.celery_tasks.connector_deletion_task", ], ) diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 2b470ae38..1964a4d45 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -395,6 +395,7 @@ async def _process_gmail_message_batch( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index 960757901..78ff360ca 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -443,6 +443,7 @@ async def index_composio_google_calendar( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index b4b3e7ee6..369f5d8b3 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -1248,7 +1248,6 @@ async def _process_single_drive_file( "file_name": file_name, "FILE_NAME": file_name, # For compatibility "mime_type": mime_type, - "connector_id": connector_id, "toolkit_id": "googledrive", "source": "composio", }, @@ -1259,6 +1258,7 @@ async def _process_single_drive_file( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 61b427970..39a92f95f 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -25,6 +25,7 @@ async def download_and_process_file( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector_id: int | None = None, ) -> tuple[Any, str | None, dict[str, Any] | None]: """ Download Google Drive file and process using Surfsense file processors. @@ -37,6 +38,7 @@ async def download_and_process_file( session: Database session task_logger: Task logging service log_entry: Log entry for tracking + connector_id: ID of the connector (for de-indexing support) Returns: Tuple of (Document object if successful, error message if failed, file metadata dict) @@ -92,6 +94,9 @@ async def download_and_process_file( "source_connector": "google_drive", }, } + # Include connector_id for de-indexing support + if connector_id is not None: + connector_info["connector_id"] = connector_id # Add additional Drive metadata if available if "modifiedTime" in file: diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 615a7ddb9..4148b8e38 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -760,9 +760,18 @@ class Document(BaseModel, TimestampMixin): index=True, ) + # Track which connector created this document (for cleanup on connector deletion) + connector_id = Column( + Integer, + ForeignKey("search_source_connectors.id", ondelete="SET NULL"), + nullable=True, # Nullable for manually uploaded docs without connector + index=True, + ) + # Relationships search_space = relationship("SearchSpace", back_populates="documents") created_by = relationship("User", back_populates="documents") + connector = relationship("SearchSourceConnector", back_populates="documents") chunks = relationship( "Chunk", back_populates="document", cascade="all, delete-orphan" ) @@ -991,6 +1000,9 @@ class SearchSourceConnector(BaseModel, TimestampMixin): UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False ) + # Documents created by this connector (for cleanup on connector deletion) + documents = relationship("Document", back_populates="connector") + class NewLLMConfig(BaseModel, TimestampMixin): """ diff --git a/surfsense_backend/app/routes/circleback_webhook_route.py b/surfsense_backend/app/routes/circleback_webhook_route.py index 1285aadeb..4a5823645 100644 --- a/surfsense_backend/app/routes/circleback_webhook_route.py +++ b/surfsense_backend/app/routes/circleback_webhook_route.py @@ -9,8 +9,12 @@ import logging from datetime import datetime from typing import Any -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import SearchSourceConnector, SearchSourceConnectorType, get_async_session logger = logging.getLogger(__name__) @@ -212,6 +216,7 @@ def format_circleback_meeting_to_markdown(payload: CirclebackWebhookPayload) -> async def receive_circleback_webhook( search_space_id: int, payload: CirclebackWebhookPayload, + session: AsyncSession = Depends(get_async_session), ): """ Receive and process a Circleback webhook. @@ -223,6 +228,7 @@ async def receive_circleback_webhook( Args: search_space_id: The ID of the search space to save the document to payload: The Circleback webhook payload containing meeting data + session: Database session for looking up the connector Returns: Success message with document details @@ -236,6 +242,26 @@ async def receive_circleback_webhook( f"Received Circleback webhook for meeting {payload.id} in search space {search_space_id}" ) + # Look up the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_id = connector_result.scalar_one_or_none() + + if connector_id: + logger.info( + f"Found Circleback connector {connector_id} for search space {search_space_id}" + ) + else: + logger.warning( + f"No Circleback connector found for search space {search_space_id}. " + "Document will be created without connector_id." + ) + # Convert to markdown markdown_content = format_circleback_meeting_to_markdown(payload) @@ -264,6 +290,7 @@ async def receive_circleback_webhook( markdown_content=markdown_content, metadata=meeting_metadata, search_space_id=search_space_id, + connector_id=connector_id, ) logger.info( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 3a937653d..e8ee0282d 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -524,9 +524,17 @@ async def delete_search_source_connector( user: User = Depends(current_active_user), ): """ - Delete a search source connector. + Delete a search source connector and all its associated documents. + + The deletion runs in background via Celery task. User is notified + via the notification system when complete (no polling required). + Requires CONNECTORS_DELETE permission. """ + from app.tasks.celery_tasks.connector_deletion_task import ( + delete_connector_with_documents_task, + ) + try: # Get the connector first result = await session.execute( @@ -548,7 +556,12 @@ async def delete_search_source_connector( "You don't have permission to delete this connector", ) - # Delete any periodic schedule associated with this connector + # Store connector info before we queue the deletion task + connector_name = db_connector.name + connector_type = db_connector.connector_type.value + search_space_id = db_connector.search_space_id + + # Delete any periodic schedule associated with this connector (lightweight, sync) if db_connector.periodic_indexing_enabled: success = delete_periodic_schedule(connector_id) if not success: @@ -556,7 +569,7 @@ async def delete_search_source_connector( f"Failed to delete periodic schedule for connector {connector_id}" ) - # For Composio connectors, also delete the connected account in Composio + # For Composio connectors, delete the connected account in Composio (lightweight API call, sync) composio_connector_types = [ SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, @@ -588,16 +601,33 @@ async def delete_search_source_connector( f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}" ) - await session.delete(db_connector) - await session.commit() - return {"message": "Search source connector deleted successfully"} + # Queue background task to delete documents and connector + # This handles potentially large document counts without blocking the API + delete_connector_with_documents_task.delay( + connector_id=connector_id, + user_id=str(user.id), + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + + logger.info( + f"Queued deletion task for connector {connector_id} ({connector_name})" + ) + + return { + "message": "Connector deletion started. You will be notified when complete.", + "status": "queued", + "connector_id": connector_id, + "connector_name": connector_name, + } except HTTPException: raise except Exception as e: await session.rollback() raise HTTPException( status_code=500, - detail=f"Failed to delete search source connector: {e!s}", + detail=f"Failed to start connector deletion: {e!s}", ) from e diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py new file mode 100644 index 000000000..b794c58d0 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -0,0 +1,272 @@ +"""Celery task for background connector deletion. + +This task handles the deletion of all documents associated with a connector +in the background, then deletes the connector itself. User is notified via +the notification system when complete (no polling required). + +Features: +- Batch deletion to handle large document counts +- Automatic retry on failure +- Progress tracking via notifications +- Handles both success and failure notifications +""" + +import asyncio +import logging +from uuid import UUID + +from sqlalchemy import delete, func, select +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import Document, Notification, SearchSourceConnector + +logger = logging.getLogger(__name__) + +# Batch size for document deletion +DELETION_BATCH_SIZE = 500 + + +def _get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False), engine + + +@celery_app.task( + bind=True, + name="delete_connector_with_documents", + max_retries=3, + default_retry_delay=60, + autoretry_for=(Exception,), + retry_backoff=True, +) +def delete_connector_with_documents_task( + self, + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +): + """ + Background task to delete a connector and all its associated documents. + + Creates a notification when complete (success or failure). + No polling required - user sees notification in UI. + + Args: + connector_id: ID of the connector to delete + user_id: ID of the user who initiated the deletion + search_space_id: ID of the search space + connector_name: Name of the connector (for notification message) + connector_type: Type of the connector (for logging) + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete( + _delete_connector_async( + connector_id=connector_id, + user_id=user_id, + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + ) + finally: + loop.close() + + +async def _delete_connector_async( + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +) -> dict: + """ + Async implementation of connector deletion. + + Steps: + 1. Count total documents to delete + 2. Delete documents in batches (chunks cascade automatically) + 3. Delete the connector record + 4. Create success notification + + On failure, creates failure notification and re-raises exception. + """ + session_maker, engine = _get_celery_session_maker() + total_deleted = 0 + + try: + async with session_maker() as session: + # Step 1: Count total documents for this connector + count_result = await session.execute( + select(func.count(Document.id)).where( + Document.connector_id == connector_id + ) + ) + total_docs = count_result.scalar() or 0 + + logger.info( + f"Starting deletion of connector {connector_id} ({connector_name}). " + f"Documents to delete: {total_docs}" + ) + + # Step 2: Delete documents in batches + while True: + # Get batch of document IDs + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(DELETION_BATCH_SIZE) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + # Delete this batch (chunks are deleted via CASCADE) + await session.execute( + delete(Document).where(Document.id.in_(doc_ids)) + ) + await session.commit() + + total_deleted += len(doc_ids) + logger.info( + f"Deleted batch of {len(doc_ids)} documents. " + f"Progress: {total_deleted}/{total_docs}" + ) + + # Step 3: Delete the connector record + result = await session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) + ) + connector = result.scalar_one_or_none() + + if connector: + await session.delete(connector) + logger.info(f"Deleted connector record: {connector_id}") + else: + logger.warning( + f"Connector {connector_id} not found - may have been already deleted" + ) + + # Step 4: Create success notification + doc_text = "document" if total_deleted == 1 else "documents" + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"{connector_name} Removed", + message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "completed", + }, + ) + session.add(notification) + await session.commit() + + logger.info( + f"Connector {connector_id} ({connector_name}) deleted successfully. " + f"Total documents deleted: {total_deleted}" + ) + + return { + "status": "success", + "connector_id": connector_id, + "connector_name": connector_name, + "documents_deleted": total_deleted, + } + + except Exception as e: + logger.error( + f"Failed to delete connector {connector_id} ({connector_name}): {e!s}", + exc_info=True, + ) + + # Create failure notification + try: + async with session_maker() as session: + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"Failed to Remove {connector_name}", + message="Something went wrong while removing this connector. Please try again.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "failed", + "error": str(e), + }, + ) + session.add(notification) + await session.commit() + except Exception as notify_error: + logger.error( + f"Failed to create failure notification: {notify_error!s}", + exc_info=True, + ) + + # Re-raise to trigger Celery retry + raise + + finally: + await engine.dispose() + + +async def delete_documents_by_connector_id( + session, + connector_id: int, + batch_size: int = DELETION_BATCH_SIZE, +) -> int: + """ + Delete all documents associated with a connector in batches. + + This is a utility function that can be used independently of the Celery task + for synchronous deletion scenarios (e.g., small document counts). + + Args: + session: AsyncSession instance + connector_id: ID of the connector + batch_size: Number of documents to delete per batch + + Returns: + Total number of documents deleted + """ + total_deleted = 0 + + while True: + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(batch_size) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + await session.execute(delete(Document).where(Document.id.in_(doc_ids))) + await session.commit() + total_deleted += len(doc_ids) + + return total_deleted + diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index f21ff5a30..7a3933091 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -505,6 +505,7 @@ def process_circleback_meeting_task( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """ Celery task to process Circleback meeting webhook data. @@ -515,6 +516,7 @@ def process_circleback_meeting_task( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) """ import asyncio @@ -529,6 +531,7 @@ def process_circleback_meeting_task( markdown_content, metadata, search_space_id, + connector_id, ) ) finally: @@ -541,6 +544,7 @@ async def _process_circleback_meeting( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """Process Circleback meeting with new session.""" from app.tasks.document_processors.circleback_processor import ( @@ -597,6 +601,7 @@ async def _process_circleback_meeting( markdown_content=markdown_content, metadata=metadata, search_space_id=search_space_id, + connector_id=connector_id, ) if result: diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 7d0837ac1..029c4a87c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -418,6 +418,7 @@ async def index_airtable_records( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index fd89792e9..fe608a8c9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -397,6 +397,7 @@ async def index_bookstack_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index bcdb9c72a..a8991647c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -396,6 +396,7 @@ async def index_clickup_tasks( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 3f8f43669..24859e685 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -403,6 +403,7 @@ async def index_confluence_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 3d226ed06..4999ba6d4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -528,6 +528,7 @@ async def index_discord_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 6f2dd797f..fb6487474 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -293,6 +293,7 @@ async def index_elasticsearch_documents( search_space_id=search_space_id, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) # Create chunks and attach to document (persist via relationship) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 947035048..d82f18944 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -220,6 +220,7 @@ async def index_github_repos( user_id=user_id, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) documents_processed += docs_created @@ -292,6 +293,7 @@ async def _process_repository_digest( user_id: str, task_logger: TaskLoggingService, log_entry, + connector_id: int, ) -> int: """ Process a repository digest and create documents. @@ -427,6 +429,7 @@ async def _process_repository_digest( chunks=chunks_data, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 28037ba7e..386c9de43 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -500,6 +500,7 @@ async def index_google_calendar_events( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 3cd59674e..151c1abbc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -767,6 +767,7 @@ async def _process_single_file( session=session, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) if error: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 7c6b9ffec..34d06d796 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -413,7 +413,6 @@ async def index_google_gmail_messages( "subject": subject, "sender": sender, "date": date_str, - "connector_id": connector_id, }, content=summary_content, content_hash=content_hash, @@ -422,6 +421,7 @@ async def index_google_gmail_messages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 6262e8535..6971703c1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -381,6 +381,7 @@ async def index_jira_issues( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index dd0483eda..a94420bc2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -414,6 +414,7 @@ async def index_linear_issues( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 74e809384..c0eb58d1d 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -477,6 +477,7 @@ async def index_luma_events( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 169dbd775..b1adeb035 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -398,6 +398,7 @@ async def index_notion_pages( } existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() + existing_document.connector_id = connector_id documents_indexed += 1 logger.info(f"Successfully updated Notion page: {page_title}") @@ -471,6 +472,7 @@ async def index_notion_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index a2ccd64d9..cfc321df1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -501,6 +501,7 @@ async def index_obsidian_vault( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(new_document) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index d922178ce..3cb4e3c85 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -390,6 +390,7 @@ async def index_slack_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 7b401f6cf..1e26fbc42 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -431,6 +431,7 @@ async def index_teams_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index 63105d7a5..cb11a6ec2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -372,6 +372,7 @@ async def index_crawled_urls( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index ce596d579..f412b51dd 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -42,6 +42,7 @@ async def add_circleback_meeting_document( markdown_content: str, metadata: dict[str, Any], search_space_id: int, + connector_id: int | None = None, ) -> Document | None: """ Process and store a Circleback meeting document. @@ -53,6 +54,7 @@ async def add_circleback_meeting_document( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) Returns: Document object if successful, None if failed or duplicate @@ -169,6 +171,9 @@ async def add_circleback_meeting_document( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + # Ensure connector_id is set (backfill for documents created before this field) + if connector_id is not None: + existing_document.connector_id = connector_id await session.commit() await session.refresh(existing_document) @@ -192,6 +197,7 @@ async def add_circleback_meeting_document( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=created_by_user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 2f2e5a2e8..674773463 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -527,6 +527,7 @@ async def add_received_file_document_using_unstructured( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -667,6 +668,7 @@ async def add_received_file_document_using_llamacloud( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -832,6 +834,7 @@ async def add_received_file_document_using_docling( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -852,7 +855,7 @@ async def add_received_file_document_using_docling( async def _update_document_from_connector( document: Document | None, connector: dict | None, session: AsyncSession ) -> None: - """Helper to update document type and metadata from connector info.""" + """Helper to update document type, metadata, and connector_id from connector info.""" if document and connector: if "type" in connector: document.document_type = connector["type"] @@ -864,6 +867,9 @@ async def _update_document_from_connector( # Expand existing metadata with connector metadata merged = {**document.document_metadata, **connector["metadata"]} document.document_metadata = merged + # Set connector_id if provided for de-indexing support + if "connector_id" in connector: + document.connector_id = connector["connector_id"] await session.commit() diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index a2399206a..ff85d962e 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -296,6 +296,7 @@ async def add_received_markdown_file_document( blocknote_document=blocknote_json, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) diff --git a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx index 9f73d1a82..9ef49c0d8 100644 --- a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx +++ b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx @@ -237,7 +237,7 @@ export function InboxSidebar({ const currentDataSource = activeTab === "mentions" ? mentions : status; const { loading, loadingMore = false, hasMore = false, loadMore } = currentDataSource; - // Status tab includes: connector indexing, document processing, page limit exceeded + // Status tab includes: connector indexing, document processing, page limit exceeded, connector deletion // Filter to only show status notification types const statusItems = useMemo( () => @@ -245,7 +245,8 @@ export function InboxSidebar({ (item) => item.type === "connector_indexing" || item.type === "document_processing" || - item.type === "page_limit_exceeded" + item.type === "page_limit_exceeded" || + item.type === "connector_deletion" ), [status.items] ); From 9e29265a6178ef5f38ccd8fd7bf4a00824f2dc2b Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:30:03 +0530 Subject: [PATCH 4/6] feat: add connector deletion type and metadata schema to inbox types --- surfsense_web/contracts/types/inbox.types.ts | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/surfsense_web/contracts/types/inbox.types.ts b/surfsense_web/contracts/types/inbox.types.ts index 3570f2850..8e4b9ae86 100644 --- a/surfsense_web/contracts/types/inbox.types.ts +++ b/surfsense_web/contracts/types/inbox.types.ts @@ -7,6 +7,7 @@ import { documentTypeEnum } from "./document.types"; */ export const inboxItemTypeEnum = z.enum([ "connector_indexing", + "connector_deletion", "document_processing", "new_mention", "page_limit_exceeded", @@ -60,6 +61,17 @@ export const connectorIndexingMetadata = baseInboxItemMetadata.extend({ file_names: z.array(z.string()).optional(), }); +/** + * Connector deletion metadata schema + */ +export const connectorDeletionMetadata = baseInboxItemMetadata.extend({ + connector_id: z.number(), + connector_name: z.string(), + connector_type: z.string(), + documents_deleted: z.number(), + error: z.string().optional(), +}); + /** * Document processing metadata schema */ @@ -110,6 +122,7 @@ export const pageLimitExceededMetadata = baseInboxItemMetadata.extend({ */ export const inboxItemMetadata = z.union([ connectorIndexingMetadata, + connectorDeletionMetadata, documentProcessingMetadata, newMentionMetadata, pageLimitExceededMetadata, @@ -140,6 +153,11 @@ export const connectorIndexingInboxItem = inboxItem.extend({ metadata: connectorIndexingMetadata, }); +export const connectorDeletionInboxItem = inboxItem.extend({ + type: z.literal("connector_deletion"), + metadata: connectorDeletionMetadata, +}); + export const documentProcessingInboxItem = inboxItem.extend({ type: z.literal("document_processing"), metadata: documentProcessingMetadata, @@ -235,6 +253,15 @@ export function isConnectorIndexingMetadata( return connectorIndexingMetadata.safeParse(metadata).success; } +/** + * Type guard for ConnectorDeletionMetadata + */ +export function isConnectorDeletionMetadata( + metadata: unknown +): metadata is ConnectorDeletionMetadata { + return connectorDeletionMetadata.safeParse(metadata).success; +} + /** * Type guard for DocumentProcessingMetadata */ @@ -268,6 +295,7 @@ export function parseInboxItemMetadata( metadata: unknown ): | ConnectorIndexingMetadata + | ConnectorDeletionMetadata | DocumentProcessingMetadata | NewMentionMetadata | PageLimitExceededMetadata @@ -277,6 +305,10 @@ export function parseInboxItemMetadata( const result = connectorIndexingMetadata.safeParse(metadata); return result.success ? result.data : null; } + case "connector_deletion": { + const result = connectorDeletionMetadata.safeParse(metadata); + return result.success ? result.data : null; + } case "document_processing": { const result = documentProcessingMetadata.safeParse(metadata); return result.success ? result.data : null; @@ -303,12 +335,14 @@ export type InboxItemStatusEnum = z.infer; export type DocumentProcessingStageEnum = z.infer; export type BaseInboxItemMetadata = z.infer; export type ConnectorIndexingMetadata = z.infer; +export type ConnectorDeletionMetadata = z.infer; export type DocumentProcessingMetadata = z.infer; export type NewMentionMetadata = z.infer; export type PageLimitExceededMetadata = z.infer; export type InboxItemMetadata = z.infer; export type InboxItem = z.infer; export type ConnectorIndexingInboxItem = z.infer; +export type ConnectorDeletionInboxItem = z.infer; export type DocumentProcessingInboxItem = z.infer; export type NewMentionInboxItem = z.infer; export type PageLimitExceededInboxItem = z.infer; From 2125c768417579ef69e0a85df33c0fedc2619e97 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 2 Feb 2026 19:03:05 +0530 Subject: [PATCH 5/6] feat: merge new credentials with existing connector configurations to preserve user settings --- .../app/connectors/google_drive/credentials.py | 7 ++++++- surfsense_backend/app/routes/composio_routes.py | 10 +++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/surfsense_backend/app/connectors/google_drive/credentials.py b/surfsense_backend/app/connectors/google_drive/credentials.py index 5b1900ab2..951d8e3e3 100644 --- a/surfsense_backend/app/connectors/google_drive/credentials.py +++ b/surfsense_backend/app/connectors/google_drive/credentials.py @@ -127,7 +127,12 @@ async def get_valid_credentials( ) creds_dict["_token_encrypted"] = True - connector.config = creds_dict + # IMPORTANT: Merge new credentials with existing config to preserve + # user settings like selected_folders, selected_files, indexing_options, + # folder_tokens, etc. that would otherwise be wiped on token refresh. + existing_config = connector.config.copy() if connector.config else {} + existing_config.update(creds_dict) + connector.config = existing_config flag_modified(connector, "config") await session.commit() diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index a28361132..c0a23a665 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -20,6 +20,7 @@ from pydantic import ValidationError from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified from app.config import config from app.db import ( @@ -330,10 +331,17 @@ async def composio_callback( ) # Update existing connector with new connected_account_id + # IMPORTANT: Merge new credentials with existing config to preserve + # user settings like selected_folders, selected_files, indexing_options, + # drive_page_token, etc. that would otherwise be wiped on reconnection. logger.info( f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}" ) - existing_connector.config = connector_config + existing_config = existing_connector.config.copy() if existing_connector.config else {} + existing_config.update(connector_config) + existing_connector.config = existing_config + + flag_modified(existing_connector, "config") await session.commit() await session.refresh(existing_connector) From f730df7c9dea415867c0736611fa31b7567beae2 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 3 Feb 2026 01:43:38 +0530 Subject: [PATCH 6/6] chore: ran linting --- .../alembic/versions/87_add_document_connector_id.py | 3 +-- surfsense_backend/app/routes/composio_routes.py | 4 +++- .../app/tasks/celery_tasks/connector_deletion_task.py | 5 +---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/surfsense_backend/alembic/versions/87_add_document_connector_id.py b/surfsense_backend/alembic/versions/87_add_document_connector_id.py index b075014ae..75abe8ab9 100644 --- a/surfsense_backend/alembic/versions/87_add_document_connector_id.py +++ b/surfsense_backend/alembic/versions/87_add_document_connector_id.py @@ -72,7 +72,7 @@ def upgrade() -> None: # 4. Backfill existing documents with connector_id based on document_type matching # This maps document types to their corresponding connector types # Only backfills for documents in search spaces that have exactly one connector of that type - + # Map of document_type -> connector_type for backfilling document_connector_mappings = [ ("NOTION_CONNECTOR", "NOTION_CONNECTOR"), @@ -168,4 +168,3 @@ def downgrade() -> None: END$$; """ ) - diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index c0a23a665..602aa876c 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -337,7 +337,9 @@ async def composio_callback( logger.info( f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}" ) - existing_config = existing_connector.config.copy() if existing_connector.config else {} + existing_config = ( + existing_connector.config.copy() if existing_connector.config else {} + ) existing_config.update(connector_config) existing_connector.config = existing_config diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py index b794c58d0..0fd68637c 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -135,9 +135,7 @@ async def _delete_connector_async( break # Delete this batch (chunks are deleted via CASCADE) - await session.execute( - delete(Document).where(Document.id.in_(doc_ids)) - ) + await session.execute(delete(Document).where(Document.id.in_(doc_ids))) await session.commit() total_deleted += len(doc_ids) @@ -269,4 +267,3 @@ async def delete_documents_by_connector_id( total_deleted += len(doc_ids) return total_deleted -