feat: add connector_id to documents for source tracking and implement connector deletion task

This commit is contained in:
Anish Sarkar 2026-02-02 16:23:26 +05:30
parent b4ed15585e
commit bf08982029
33 changed files with 572 additions and 13 deletions

View file

@ -42,6 +42,7 @@ async def add_circleback_meeting_document(
markdown_content: str,
metadata: dict[str, Any],
search_space_id: int,
connector_id: int | None = None,
) -> Document | None:
"""
Process and store a Circleback meeting document.
@ -53,6 +54,7 @@ async def add_circleback_meeting_document(
markdown_content: Meeting content formatted as markdown
metadata: Meeting metadata dictionary
search_space_id: ID of the search space
connector_id: ID of the Circleback connector (for deletion support)
Returns:
Document object if successful, None if failed or duplicate
@ -169,6 +171,9 @@ async def add_circleback_meeting_document(
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
# Ensure connector_id is set (backfill for documents created before this field)
if connector_id is not None:
existing_document.connector_id = connector_id
await session.commit()
await session.refresh(existing_document)
@ -192,6 +197,7 @@ async def add_circleback_meeting_document(
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=created_by_user_id,
connector_id=connector_id,
)
session.add(document)

View file

@ -527,6 +527,7 @@ async def add_received_file_document_using_unstructured(
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -667,6 +668,7 @@ async def add_received_file_document_using_llamacloud(
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -832,6 +834,7 @@ async def add_received_file_document_using_docling(
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)
@ -852,7 +855,7 @@ async def add_received_file_document_using_docling(
async def _update_document_from_connector(
document: Document | None, connector: dict | None, session: AsyncSession
) -> None:
"""Helper to update document type and metadata from connector info."""
"""Helper to update document type, metadata, and connector_id from connector info."""
if document and connector:
if "type" in connector:
document.document_type = connector["type"]
@ -864,6 +867,9 @@ async def _update_document_from_connector(
# Expand existing metadata with connector metadata
merged = {**document.document_metadata, **connector["metadata"]}
document.document_metadata = merged
# Set connector_id if provided for de-indexing support
if "connector_id" in connector:
document.connector_id = connector["connector_id"]
await session.commit()

View file

@ -296,6 +296,7 @@ async def add_received_markdown_file_document(
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
)
session.add(document)