diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 2b8789e0c..934e56744 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -1,5 +1,9 @@ """ ClickUp connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import contextlib @@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.clickup_history import ClickUpHistoryConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -28,6 +32,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -141,10 +146,18 @@ async def index_clickup_tasks( documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Collect all tasks and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + tasks_to_process = [] # List of dicts with document and task data + new_documents_created = False + # Iterate workspaces and fetch tasks for workspace in workspaces: workspace_id = workspace.get("id") @@ -183,15 +196,6 @@ async def index_clickup_tasks( ) for task in tasks: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) - >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() - try: task_id = task.get("id") task_name = task.get("name", "Untitled Task") @@ -255,74 +259,35 @@ async def index_clickup_tasks( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() logger.info( f"Document for ClickUp task {task_name} unchanged. Skipping." ) documents_skipped += 1 continue else: - # Content has changed - update the existing document + # Queue existing document for update (will be set to processing in Phase 2) logger.info( - f"Content changed for ClickUp task {task_name}. Updating document." - ) - - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "task_id": task_id, - "task_name": task_name, - "task_status": task_status, - "task_priority": task_priority, - "task_list": task_list_name, - "task_space": task_space_name, - "assignees": len(task_assignees), - "document_type": "ClickUp Task", - "connector_type": "ClickUp", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - task_content, user_llm, document_metadata - ) - else: - summary_content = task_content - summary_embedding = ( - config.embedding_model_instance.embed(task_content) - ) - - # Process chunks - chunks = await create_document_chunks(task_content) - - # Update existing document - existing_document.title = task_name - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "task_id": task_id, - "task_name": task_name, - "task_status": task_status, - "task_priority": task_priority, - "task_assignees": task_assignees, - "task_due_date": task_due_date, - "task_created": task_created, - "task_updated": task_updated, - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - logger.info( - f"Successfully updated ClickUp task {task_name}" + f"Content changed for ClickUp task {task_name}. Queuing for update." ) + tasks_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'task_content': task_content, + 'content_hash': content_hash, + 'task_id': task_id, + 'task_name': task_name, + 'task_status': task_status, + 'task_priority': task_priority, + 'task_list_name': task_list_name, + 'task_space_name': task_space_name, + 'task_assignees': task_assignees, + 'task_due_date': task_due_date, + 'task_created': task_created, + 'task_updated': task_updated, + }) continue # Document doesn't exist by unique_identifier_hash @@ -341,39 +306,7 @@ async def index_clickup_tasks( documents_skipped += 1 continue - # Document doesn't exist - create new one - # Generate summary with metadata - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "task_id": task_id, - "task_name": task_name, - "task_status": task_status, - "task_priority": task_priority, - "task_list": task_list_name, - "task_space": task_space_name, - "assignees": len(task_assignees), - "document_type": "ClickUp Task", - "connector_type": "ClickUp", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - task_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_content = task_content - summary_embedding = config.embedding_model_instance.embed( - task_content - ) - - chunks = await create_document_chunks(task_content) - + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=task_name, @@ -387,44 +320,174 @@ async def index_clickup_tasks( "task_due_date": task_due_date, "task_created": task_created, "task_updated": task_updated, - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, }, - content=summary_content, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed new task {task_name}") + new_documents_created = True - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} ClickUp tasks processed so far" - ) - await session.commit() + tasks_to_process.append({ + 'document': document, + 'is_new': True, + 'task_content': task_content, + 'content_hash': content_hash, + 'task_id': task_id, + 'task_name': task_name, + 'task_status': task_status, + 'task_priority': task_priority, + 'task_list_name': task_list_name, + 'task_space_name': task_space_name, + 'task_assignees': task_assignees, + 'task_due_date': task_due_date, + 'task_created': task_created, + 'task_updated': task_updated, + }) except Exception as e: logger.error( - f"Error processing task {task.get('name', 'Unknown')}: {e!s}", + f"Error in Phase 1 for task {task.get('name', 'Unknown')}: {e!s}", exc_info=True, ) - documents_skipped += 1 + documents_failed += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(tasks_to_process)} documents") + + for item in tasks_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata_for_summary = { + "task_id": item['task_id'], + "task_name": item['task_name'], + "task_status": item['task_status'], + "task_priority": item['task_priority'], + "task_list": item['task_list_name'], + "task_space": item['task_space_name'], + "assignees": len(item['task_assignees']), + "document_type": "ClickUp Task", + "connector_type": "ClickUp", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + item['task_content'], user_llm, document_metadata_for_summary + ) + else: + summary_content = item['task_content'] + summary_embedding = config.embedding_model_instance.embed( + item['task_content'] + ) + + chunks = await create_document_chunks(item['task_content']) + + # Update document to READY with actual content + document.title = item['task_name'] + document.content = summary_content + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = { + "task_id": item['task_id'], + "task_name": item['task_name'], + "task_status": item['task_status'], + "task_priority": item['task_priority'], + "task_assignees": item['task_assignees'], + "task_due_date": item['task_due_date'], + "task_created": item['task_created'], + "task_updated": item['task_updated'], + "connector_id": connector_id, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} ClickUp tasks processed so far" + ) + await session.commit() + + except Exception as e: + logger.error( + f"Error processing task {item.get('task_name', 'Unknown')}: {e!s}", + exc_info=True, + ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info(f"Final commit: Total {documents_indexed} ClickUp tasks processed") - await session.commit() + try: + await session.commit() + logger.info( + "Successfully committed all ClickUp document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same task was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise await task_logger.log_task_success( log_entry, @@ -433,11 +496,12 @@ async def index_clickup_tasks( "pages_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, }, ) logger.info( - f"clickup indexing completed: {documents_indexed} new tasks, {documents_skipped} skipped" + f"clickup indexing completed: {documents_indexed} ready, {documents_skipped} skipped, {documents_failed} failed" ) # Close client connection diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 848db7623..b37989a84 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -3,6 +3,10 @@ GitHub connector indexer using gitingest. This indexer processes entire repository digests in one pass, dramatically reducing LLM API calls compared to the previous file-by-file approach. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time @@ -14,7 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.github_connector import GitHubConnector, RepositoryDigest -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -30,6 +34,8 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, + update_connector_last_indexed, ) # Type hint for heartbeat callback @@ -164,7 +170,7 @@ async def index_github_repos( ) return 0, f"Failed to initialize GitHub client: {e!s}" - # 4. Process each repository with gitingest + # 4. Process each repository with gitingest using 2-phase approach await task_logger.log_task_progress( log_entry, f"Starting gitingest processing for {len(repo_full_names_to_index)} repositories", @@ -181,24 +187,25 @@ async def index_github_repos( # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() documents_indexed = 0 + documents_skipped = 0 + documents_failed = 0 + + # ======================================================================= + # PHASE 1: Analyze all repos and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + repos_to_process = [] # List of dicts with document and digest data + new_documents_created = False for repo_full_name in repo_full_names_to_index: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() if not repo_full_name or not isinstance(repo_full_name, str): logger.warning(f"Skipping invalid repository entry: {repo_full_name}") continue - logger.info(f"Ingesting repository: {repo_full_name}") - try: + logger.info(f"Phase 1: Analyzing repository: {repo_full_name}") + # Run gitingest via subprocess (isolated from event loop) - # Using to_thread to not block the async database operations import asyncio digest = await asyncio.to_thread( @@ -212,30 +219,248 @@ async def index_github_repos( errors.append(f"No digest for {repo_full_name}") continue - # Process the digest and create documents - docs_created = await _process_repository_digest( - session=session, - digest=digest, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - connector_id=connector_id, + # Generate unique identifier based on repo name + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GITHUB_CONNECTOR, repo_full_name, search_space_id ) - documents_processed += docs_created - logger.info( - f"Created {docs_created} documents from repository: {repo_full_name}" + # Generate content hash from digest + full_content = digest.full_digest + content_hash = generate_content_hash(full_content, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() + logger.info(f"Repository {repo_full_name} unchanged. Skipping.") + documents_skipped += 1 + continue + + # Queue existing document for update (will be set to processing in Phase 2) + logger.info( + f"Content changed for repository {repo_full_name}. Queuing for update." + ) + repos_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'digest': digest, + 'content_hash': content_hash, + 'repo_full_name': repo_full_name, + 'unique_identifier_hash': unique_identifier_hash, + }) + continue + + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Repository {repo_full_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=repo_full_name, + document_type=DocumentType.GITHUB_CONNECTOR, + document_metadata={ + "repository_full_name": repo_full_name, + "url": f"https://github.com/{repo_full_name}", + "branch": digest.branch, + "ingestion_method": "gitingest", + "connector_id": connector_id, + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + repos_to_process.append({ + 'document': document, + 'is_new': True, + 'digest': digest, + 'content_hash': content_hash, + 'repo_full_name': repo_full_name, + 'unique_identifier_hash': unique_identifier_hash, + }) + except Exception as repo_err: logger.error( - f"Failed to process repository {repo_full_name}: {repo_err}" + f"Error in Phase 1 for repository {repo_full_name}: {repo_err}", + exc_info=True, ) + errors.append(f"Phase 1 error for {repo_full_name}: {repo_err}") + documents_failed += 1 + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(repos_to_process)} documents") + + for item in repos_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + digest = item['digest'] + repo_full_name = item['repo_full_name'] + + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (LLM, embeddings, chunks) + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + document_metadata_for_summary = { + "repository": repo_full_name, + "document_type": "GitHub Repository", + "connector_type": "GitHub", + "ingestion_method": "gitingest", + "file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree, + "estimated_tokens": digest.estimated_tokens, + } + + if user_llm: + # Prepare content for summarization + summary_content = digest.full_digest + if len(summary_content) > MAX_DIGEST_CHARS: + summary_content = ( + f"# Repository: {repo_full_name}\n\n" + f"## File Structure\n\n{digest.tree}\n\n" + f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..." + ) + + summary_text, summary_embedding = await generate_document_summary( + summary_content, user_llm, document_metadata_for_summary + ) + else: + # Fallback to simple summary if no LLM configured + summary_text = ( + f"# GitHub Repository: {repo_full_name}\n\n" + f"## Summary\n{digest.summary}\n\n" + f"## File Structure\n{digest.tree[:3000]}" + ) + summary_embedding = config.embedding_model_instance.embed(summary_text) + + # Chunk the full digest content for granular search + try: + chunks_data = await create_document_chunks(digest.content) + except Exception as chunk_err: + logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}") + chunks_data = await _simple_chunk_content(digest.content) + + # Update document to READY with actual content + doc_metadata = { + "repository_full_name": repo_full_name, + "url": f"https://github.com/{repo_full_name}", + "branch": digest.branch, + "ingestion_method": "gitingest", + "file_tree": digest.tree, + "gitingest_summary": digest.summary, + "estimated_tokens": digest.estimated_tokens, + "connector_id": connector_id, + "indexed_at": datetime.now(UTC).isoformat(), + } + + document.title = repo_full_name + document.content = summary_text + document.content_hash = item['content_hash'] + document.embedding = summary_embedding + document.document_metadata = doc_metadata + safe_set_chunks(document, chunks_data) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_processed += 1 + documents_indexed += 1 + + logger.info( + f"Created document for repository {repo_full_name} " + f"with {len(chunks_data)} chunks" + ) + + # Batch commit every 5 documents (repositories are large) + if documents_indexed % 5 == 0: + logger.info( + f"Committing batch: {documents_indexed} GitHub repos processed so far" + ) + await session.commit() + + except Exception as repo_err: + logger.error( + f"Error processing repository {repo_full_name}: {repo_err}", + exc_info=True, + ) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(repo_err)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") errors.append(f"Failed processing {repo_full_name}: {repo_err}") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit - await session.commit() + logger.info(f"Final commit: Total {documents_processed} GitHub repositories processed") + try: + await session.commit() + logger.info( + "Successfully committed all GitHub document changes to database" + ) + except Exception as e: + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise + logger.info( f"Finished GitHub indexing for connector {connector_id}. " f"Created {documents_processed} documents." @@ -247,6 +472,8 @@ async def index_github_repos( f"Successfully completed GitHub indexing for connector {connector_id}", { "documents_processed": documents_processed, + "documents_skipped": documents_skipped, + "documents_failed": documents_failed, "errors_count": len(errors), "repo_count": len(repo_full_names_to_index), "method": "gitingest", @@ -286,163 +513,6 @@ async def index_github_repos( return documents_processed, error_message -async def _process_repository_digest( - session: AsyncSession, - digest: RepositoryDigest, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry, - connector_id: int, -) -> int: - """ - Process a repository digest and create documents. - - For each repository, we create: - 1. One main document with the repository summary - 2. Chunks from the full digest content for granular search - - Args: - session: Database session - digest: The repository digest from gitingest - search_space_id: ID of the search space - user_id: ID of the user - task_logger: Task logging service - log_entry: Current log entry - - Returns: - Number of documents created - """ - repo_full_name = digest.repo_full_name - documents_created = 0 - - # Generate unique identifier based on repo name and content hash - # This allows updates when repo content changes - full_content = digest.full_digest - content_hash = generate_content_hash(full_content, search_space_id) - - # Use repo name as the unique identifier (one document per repo) - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GITHUB_CONNECTOR, repo_full_name, search_space_id - ) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logger.info(f"Repository {repo_full_name} unchanged. Skipping.") - return 0 - else: - logger.info( - f"Content changed for repository {repo_full_name}. Updating document." - ) - # Delete existing document to replace with new one - await session.delete(existing_document) - await session.flush() - else: - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"Repository {repo_full_name} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - return 0 - - # Generate summary using LLM (ONE call per repository!) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - - document_metadata = { - "repository": repo_full_name, - "document_type": "GitHub Repository", - "connector_type": "GitHub", - "ingestion_method": "gitingest", - "file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree, - "estimated_tokens": digest.estimated_tokens, - } - - if user_llm: - # Prepare content for summarization - # Include tree structure and truncated content if too large - summary_content = digest.full_digest - if len(summary_content) > MAX_DIGEST_CHARS: - # Truncate but keep the tree and beginning of content - summary_content = ( - f"# Repository: {repo_full_name}\n\n" - f"## File Structure\n\n{digest.tree}\n\n" - f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..." - ) - - summary_text, summary_embedding = await generate_document_summary( - summary_content, user_llm, document_metadata - ) - else: - # Fallback to simple summary if no LLM configured - summary_text = ( - f"# GitHub Repository: {repo_full_name}\n\n" - f"## Summary\n{digest.summary}\n\n" - f"## File Structure\n{digest.tree[:3000]}" - ) - summary_embedding = config.embedding_model_instance.embed(summary_text) - - # Chunk the full digest content for granular search - try: - # Use the content (not the summary) for chunking - # This preserves file-level granularity in search - chunks_data = await create_document_chunks(digest.content) - except Exception as chunk_err: - logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}") - # Fall back to a simpler chunking approach - chunks_data = await _simple_chunk_content(digest.content) - - # Create the document - doc_metadata = { - "repository_full_name": repo_full_name, - "url": f"https://github.com/{repo_full_name}", - "branch": digest.branch, - "ingestion_method": "gitingest", - "file_tree": digest.tree, - "gitingest_summary": digest.summary, - "estimated_tokens": digest.estimated_tokens, - "indexed_at": datetime.now(UTC).isoformat(), - } - - document = Document( - title=repo_full_name, - document_type=DocumentType.GITHUB_CONNECTOR, - document_metadata=doc_metadata, - content=summary_text, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - search_space_id=search_space_id, - chunks=chunks_data, - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - - session.add(document) - documents_created += 1 - - logger.info( - f"Created document for repository {repo_full_name} " - f"with {len(chunks_data)} chunks" - ) - - return documents_created - - async def _simple_chunk_content(content: str, chunk_size: int = 4000) -> list: """ Simple fallback chunking when the regular chunker fails.