feat(connectors): add connector parameter to file processor for source tracking

- Add optional 'connector' parameter with 'type' and 'metadata' fields
- Create helper function _update_document_from_connector
- Use document_metadata column (not metadata) for JSON field
- Merge metadata with existing using dict spread operator
- Google Drive documents now marked as GOOGLE_DRIVE_CONNECTOR
- Backward compatible - no changes to existing logic
- Simple and clean implementation
This commit is contained in:
CREDO23 2025-12-28 18:01:39 +02:00
parent 8da58be9e0
commit a5935bc677
3 changed files with 60 additions and 71 deletions

View file

@ -92,9 +92,26 @@ async def download_and_process_file(
from app.tasks.document_processors.file_processors import (
process_file_in_background,
)
from app.db import DocumentType
# Prepare connector info
connector_info = {
"type": DocumentType.GOOGLE_DRIVE_CONNECTOR,
"metadata": {
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
"source_connector": "google_drive",
},
}
# If it was a Google Workspace file, note the export format
if is_google_workspace_file(mime_type):
connector_info["metadata"]["exported_as"] = "pdf"
connector_info["metadata"]["original_workspace_type"] = mime_type.split(".")[-1]
logger.info(f"Processing {file_name} with Surfsense's file processor")
document = await process_file_in_background(
await process_file_in_background(
file_path=temp_file_path,
filename=file_name,
search_space_id=search_space_id,
@ -102,25 +119,11 @@ async def download_and_process_file(
session=session,
task_logger=task_logger,
log_entry=log_entry,
connector=connector_info, # Pass connector info
)
# Note: Document type update happens in the indexer after this returns
# to ensure proper session management and commit timing
# Prepare file metadata for the indexer to use
file_metadata = {
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
}
# If it was a Google Workspace file, note the export format
if is_google_workspace_file(mime_type):
file_metadata["exported_as"] = "pdf"
file_metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet"
# process_file_in_background returns None on duplicate/error, Document on success
return document, None, file_metadata
# process_file_in_background doesn't return the document
return None, None, connector_info["metadata"]
except Exception as e:
logger.warning(f"Failed to process {file_name}: {e!s}")

View file

@ -388,7 +388,8 @@ async def _process_single_file(
# Download and process using Surfsense's existing infrastructure
# This handles: markdown, audio, PDFs, Office docs, images, etc.
# It also handles: deduplication, chunking, summarization, embedding
document, error, file_metadata = await download_and_process_file(
# Document type is set to GOOGLE_DRIVE_CONNECTOR during processing
_, error, _ = await download_and_process_file(
client=drive_client,
file=file,
search_space_id=search_space_id,
@ -407,58 +408,9 @@ async def _process_single_file(
)
return 0, 1
if document and file_metadata:
# Refresh document from database to ensure it's attached to session
from app.db import Document
from sqlalchemy import select
# Get fresh document from database
result = await session.execute(
select(Document).where(Document.id == document.id)
)
document = result.scalar_one_or_none()
if not document:
logger.error(f"Could not find document {document.id} in database")
return 0, 1
# Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata
original_type = document.document_type
document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR
# Add Google Drive specific metadata
if not document.metadata:
document.metadata = {}
document.metadata.update({
**file_metadata,
"original_document_type": original_type,
"source_connector": "google_drive",
})
# Commit the document type and metadata changes
await session.commit()
logger.info(
f"Updated document {document.id} to GOOGLE_DRIVE_CONNECTOR type with metadata"
)
# Successfully indexed
await task_logger.log_task_progress(
log_entry,
f"Successfully indexed: {file_name}",
{
"status": "indexed",
"document_id": document.id,
"file_name": file_name,
"document_type": DocumentType.GOOGLE_DRIVE_CONNECTOR,
},
)
return 1, 0
else:
# Likely a duplicate or unsupported type
logger.info(f"No document created for {file_name} (duplicate or unsupported)")
return 0, 1
# File was processed successfully (document type already set in processor)
logger.info(f"Successfully indexed Google Drive file: {file_name}")
return 1, 0
except Exception as e:
logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True)

View file

@ -447,6 +447,24 @@ async def add_received_file_document_using_docling(
) from e
async def _update_document_from_connector(
document: Document | None, connector: dict | None, session: AsyncSession
) -> None:
"""Helper to update document type and metadata from connector info."""
if document and connector:
if "type" in connector:
document.document_type = connector["type"]
if "metadata" in connector:
# Merge with existing document_metadata (the actual column name)
if not document.document_metadata:
document.document_metadata = connector["metadata"]
else:
# Expand existing metadata with connector metadata
merged = {**document.document_metadata, **connector["metadata"]}
document.document_metadata = merged
await session.commit()
async def process_file_in_background(
file_path: str,
filename: str,
@ -455,6 +473,7 @@ async def process_file_in_background(
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
connector: dict | None = None, # Optional: {"type": "GOOGLE_DRIVE_CONNECTOR", "metadata": {...}}
):
try:
# Check if the file is a markdown or text file
@ -492,6 +511,9 @@ async def process_file_in_background(
session, filename, markdown_content, search_space_id, user_id
)
# Update from connector if provided
await _update_document_from_connector(result, connector, session)
if result:
await task_logger.log_task_success(
log_entry,
@ -608,6 +630,9 @@ async def process_file_in_background(
session, filename, transcribed_text, search_space_id, user_id
)
# Update from connector if provided
await _update_document_from_connector(result, connector, session)
if result:
await task_logger.log_task_success(
log_entry,
@ -753,6 +778,9 @@ async def process_file_in_background(
session, filename, docs, search_space_id, user_id
)
# Update from connector if provided
await _update_document_from_connector(result, connector, session)
if result:
# Update page usage after successful processing
# allow_exceed=True because document was already created after passing initial check
@ -897,6 +925,9 @@ async def process_file_in_background(
user_id, final_page_count, allow_exceed=True
)
# Update from connector if provided
await _update_document_from_connector(last_created_doc, connector, session)
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with LlamaCloud: {filename}",
@ -1021,6 +1052,9 @@ async def process_file_in_background(
user_id, final_page_count, allow_exceed=True
)
# Update from connector if provided
await _update_document_from_connector(doc_result, connector, session)
await task_logger.log_task_success(
log_entry,
f"Successfully processed file with Docling: {filename}",