mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
feat: implement two-phase document indexing for ClickUp and GitHub connectors with real-time status updates
This commit is contained in:
parent
108e8c960f
commit
bfa3be655e
2 changed files with 440 additions and 306 deletions
|
|
@ -1,5 +1,9 @@
|
|||
"""
|
||||
ClickUp connector indexer.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
|
|
@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
||||
from app.config import config
|
||||
from app.connectors.clickup_history import ClickUpHistoryConnector
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
|
|
@ -28,6 +32,7 @@ from .base import (
|
|||
get_connector_by_id,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
safe_set_chunks,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
|
@ -141,10 +146,18 @@ async def index_clickup_tasks(
|
|||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
documents_failed = 0
|
||||
|
||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||
last_heartbeat_time = time.time()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 1: Collect all tasks and create pending documents
|
||||
# This makes ALL documents visible in the UI immediately with pending status
|
||||
# =======================================================================
|
||||
tasks_to_process = [] # List of dicts with document and task data
|
||||
new_documents_created = False
|
||||
|
||||
# Iterate workspaces and fetch tasks
|
||||
for workspace in workspaces:
|
||||
workspace_id = workspace.get("id")
|
||||
|
|
@ -183,15 +196,6 @@ async def index_clickup_tasks(
|
|||
)
|
||||
|
||||
for task in tasks:
|
||||
# Check if it's time for a heartbeat update
|
||||
if (
|
||||
on_heartbeat_callback
|
||||
and (time.time() - last_heartbeat_time)
|
||||
>= HEARTBEAT_INTERVAL_SECONDS
|
||||
):
|
||||
await on_heartbeat_callback(documents_indexed)
|
||||
last_heartbeat_time = time.time()
|
||||
|
||||
try:
|
||||
task_id = task.get("id")
|
||||
task_name = task.get("name", "Untitled Task")
|
||||
|
|
@ -255,74 +259,35 @@ async def index_clickup_tasks(
|
|||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
logger.info(
|
||||
f"Document for ClickUp task {task_name} unchanged. Skipping."
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
else:
|
||||
# Content has changed - update the existing document
|
||||
# Queue existing document for update (will be set to processing in Phase 2)
|
||||
logger.info(
|
||||
f"Content changed for ClickUp task {task_name}. Updating document."
|
||||
)
|
||||
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"task_id": task_id,
|
||||
"task_name": task_name,
|
||||
"task_status": task_status,
|
||||
"task_priority": task_priority,
|
||||
"task_list": task_list_name,
|
||||
"task_space": task_space_name,
|
||||
"assignees": len(task_assignees),
|
||||
"document_type": "ClickUp Task",
|
||||
"connector_type": "ClickUp",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
task_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = task_content
|
||||
summary_embedding = (
|
||||
config.embedding_model_instance.embed(task_content)
|
||||
)
|
||||
|
||||
# Process chunks
|
||||
chunks = await create_document_chunks(task_content)
|
||||
|
||||
# Update existing document
|
||||
existing_document.title = task_name
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = {
|
||||
"task_id": task_id,
|
||||
"task_name": task_name,
|
||||
"task_status": task_status,
|
||||
"task_priority": task_priority,
|
||||
"task_assignees": task_assignees,
|
||||
"task_due_date": task_due_date,
|
||||
"task_created": task_created,
|
||||
"task_updated": task_updated,
|
||||
"indexed_at": datetime.now().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
}
|
||||
existing_document.chunks = chunks
|
||||
existing_document.updated_at = get_current_timestamp()
|
||||
|
||||
documents_indexed += 1
|
||||
logger.info(
|
||||
f"Successfully updated ClickUp task {task_name}"
|
||||
f"Content changed for ClickUp task {task_name}. Queuing for update."
|
||||
)
|
||||
tasks_to_process.append({
|
||||
'document': existing_document,
|
||||
'is_new': False,
|
||||
'task_content': task_content,
|
||||
'content_hash': content_hash,
|
||||
'task_id': task_id,
|
||||
'task_name': task_name,
|
||||
'task_status': task_status,
|
||||
'task_priority': task_priority,
|
||||
'task_list_name': task_list_name,
|
||||
'task_space_name': task_space_name,
|
||||
'task_assignees': task_assignees,
|
||||
'task_due_date': task_due_date,
|
||||
'task_created': task_created,
|
||||
'task_updated': task_updated,
|
||||
})
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
|
|
@ -341,39 +306,7 @@ async def index_clickup_tasks(
|
|||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Document doesn't exist - create new one
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"task_id": task_id,
|
||||
"task_name": task_name,
|
||||
"task_status": task_status,
|
||||
"task_priority": task_priority,
|
||||
"task_list": task_list_name,
|
||||
"task_space": task_space_name,
|
||||
"assignees": len(task_assignees),
|
||||
"document_type": "ClickUp Task",
|
||||
"connector_type": "ClickUp",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
task_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_content = task_content
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
task_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(task_content)
|
||||
|
||||
# Create new document with PENDING status (visible in UI immediately)
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=task_name,
|
||||
|
|
@ -387,44 +320,174 @@ async def index_clickup_tasks(
|
|||
"task_due_date": task_due_date,
|
||||
"task_created": task_created,
|
||||
"task_updated": task_updated,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
content="Pending...", # Placeholder until processed
|
||||
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
embedding=None,
|
||||
chunks=[], # Empty at creation - safe for async
|
||||
status=DocumentStatus.pending(), # Pending until processing starts
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
logger.info(f"Successfully indexed new task {task_name}")
|
||||
new_documents_created = True
|
||||
|
||||
# Batch commit every 10 documents
|
||||
if documents_indexed % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {documents_indexed} ClickUp tasks processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
tasks_to_process.append({
|
||||
'document': document,
|
||||
'is_new': True,
|
||||
'task_content': task_content,
|
||||
'content_hash': content_hash,
|
||||
'task_id': task_id,
|
||||
'task_name': task_name,
|
||||
'task_status': task_status,
|
||||
'task_priority': task_priority,
|
||||
'task_list_name': task_list_name,
|
||||
'task_space_name': task_space_name,
|
||||
'task_assignees': task_assignees,
|
||||
'task_due_date': task_due_date,
|
||||
'task_created': task_created,
|
||||
'task_updated': task_updated,
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing task {task.get('name', 'Unknown')}: {e!s}",
|
||||
f"Error in Phase 1 for task {task.get('name', 'Unknown')}: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
documents_skipped += 1
|
||||
documents_failed += 1
|
||||
continue
|
||||
|
||||
# Commit all pending documents - they all appear in UI now
|
||||
if new_documents_created:
|
||||
logger.info(f"Phase 1: Committing {len([t for t in tasks_to_process if t['is_new']])} pending documents")
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Process each document one by one
|
||||
# Each document transitions: pending → processing → ready/failed
|
||||
# =======================================================================
|
||||
logger.info(f"Phase 2: Processing {len(tasks_to_process)} documents")
|
||||
|
||||
for item in tasks_to_process:
|
||||
# Send heartbeat periodically
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
await on_heartbeat_callback(documents_indexed)
|
||||
last_heartbeat_time = current_time
|
||||
|
||||
document = item['document']
|
||||
try:
|
||||
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
# Heavy processing (LLM, embeddings, chunks)
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata_for_summary = {
|
||||
"task_id": item['task_id'],
|
||||
"task_name": item['task_name'],
|
||||
"task_status": item['task_status'],
|
||||
"task_priority": item['task_priority'],
|
||||
"task_list": item['task_list_name'],
|
||||
"task_space": item['task_space_name'],
|
||||
"assignees": len(item['task_assignees']),
|
||||
"document_type": "ClickUp Task",
|
||||
"connector_type": "ClickUp",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
item['task_content'], user_llm, document_metadata_for_summary
|
||||
)
|
||||
else:
|
||||
summary_content = item['task_content']
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
item['task_content']
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(item['task_content'])
|
||||
|
||||
# Update document to READY with actual content
|
||||
document.title = item['task_name']
|
||||
document.content = summary_content
|
||||
document.content_hash = item['content_hash']
|
||||
document.embedding = summary_embedding
|
||||
document.document_metadata = {
|
||||
"task_id": item['task_id'],
|
||||
"task_name": item['task_name'],
|
||||
"task_status": item['task_status'],
|
||||
"task_priority": item['task_priority'],
|
||||
"task_assignees": item['task_assignees'],
|
||||
"task_due_date": item['task_due_date'],
|
||||
"task_created": item['task_created'],
|
||||
"task_updated": item['task_updated'],
|
||||
"connector_id": connector_id,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
safe_set_chunks(document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
|
||||
documents_indexed += 1
|
||||
|
||||
# Batch commit every 10 documents (for ready status updates)
|
||||
if documents_indexed % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {documents_indexed} ClickUp tasks processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing task {item.get('task_name', 'Unknown')}: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Mark document as failed with reason (visible in UI)
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e))
|
||||
document.updated_at = get_current_timestamp()
|
||||
except Exception as status_error:
|
||||
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||
documents_failed += 1
|
||||
continue
|
||||
|
||||
total_processed = documents_indexed
|
||||
|
||||
if total_processed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||
# This ensures the UI shows "Last indexed" instead of "Never indexed"
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(f"Final commit: Total {documents_indexed} ClickUp tasks processed")
|
||||
await session.commit()
|
||||
try:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Successfully committed all ClickUp document changes to database"
|
||||
)
|
||||
except Exception as e:
|
||||
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in str(e).lower()
|
||||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"This may occur if the same task was indexed by multiple connectors. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
)
|
||||
await session.rollback()
|
||||
# Don't fail the entire task - some documents may have been successfully indexed
|
||||
else:
|
||||
raise
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
|
|
@ -433,11 +496,12 @@ async def index_clickup_tasks(
|
|||
"pages_processed": total_processed,
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"documents_failed": documents_failed,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"clickup indexing completed: {documents_indexed} new tasks, {documents_skipped} skipped"
|
||||
f"clickup indexing completed: {documents_indexed} ready, {documents_skipped} skipped, {documents_failed} failed"
|
||||
)
|
||||
|
||||
# Close client connection
|
||||
|
|
|
|||
|
|
@ -3,6 +3,10 @@ GitHub connector indexer using gitingest.
|
|||
|
||||
This indexer processes entire repository digests in one pass, dramatically
|
||||
reducing LLM API calls compared to the previous file-by-file approach.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
"""
|
||||
|
||||
import time
|
||||
|
|
@ -14,7 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
||||
from app.config import config
|
||||
from app.connectors.github_connector import GitHubConnector, RepositoryDigest
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
|
|
@ -30,6 +34,8 @@ from .base import (
|
|||
get_connector_by_id,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
safe_set_chunks,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
# Type hint for heartbeat callback
|
||||
|
|
@ -164,7 +170,7 @@ async def index_github_repos(
|
|||
)
|
||||
return 0, f"Failed to initialize GitHub client: {e!s}"
|
||||
|
||||
# 4. Process each repository with gitingest
|
||||
# 4. Process each repository with gitingest using 2-phase approach
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Starting gitingest processing for {len(repo_full_names_to_index)} repositories",
|
||||
|
|
@ -181,24 +187,25 @@ async def index_github_repos(
|
|||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||
last_heartbeat_time = time.time()
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
documents_failed = 0
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 1: Analyze all repos and create pending documents
|
||||
# This makes ALL documents visible in the UI immediately with pending status
|
||||
# =======================================================================
|
||||
repos_to_process = [] # List of dicts with document and digest data
|
||||
new_documents_created = False
|
||||
|
||||
for repo_full_name in repo_full_names_to_index:
|
||||
# Check if it's time for a heartbeat update
|
||||
if (
|
||||
on_heartbeat_callback
|
||||
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
|
||||
):
|
||||
await on_heartbeat_callback(documents_indexed)
|
||||
last_heartbeat_time = time.time()
|
||||
if not repo_full_name or not isinstance(repo_full_name, str):
|
||||
logger.warning(f"Skipping invalid repository entry: {repo_full_name}")
|
||||
continue
|
||||
|
||||
logger.info(f"Ingesting repository: {repo_full_name}")
|
||||
|
||||
try:
|
||||
logger.info(f"Phase 1: Analyzing repository: {repo_full_name}")
|
||||
|
||||
# Run gitingest via subprocess (isolated from event loop)
|
||||
# Using to_thread to not block the async database operations
|
||||
import asyncio
|
||||
|
||||
digest = await asyncio.to_thread(
|
||||
|
|
@ -212,30 +219,248 @@ async def index_github_repos(
|
|||
errors.append(f"No digest for {repo_full_name}")
|
||||
continue
|
||||
|
||||
# Process the digest and create documents
|
||||
docs_created = await _process_repository_digest(
|
||||
session=session,
|
||||
digest=digest,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
connector_id=connector_id,
|
||||
# Generate unique identifier based on repo name
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.GITHUB_CONNECTOR, repo_full_name, search_space_id
|
||||
)
|
||||
|
||||
documents_processed += docs_created
|
||||
logger.info(
|
||||
f"Created {docs_created} documents from repository: {repo_full_name}"
|
||||
# Generate content hash from digest
|
||||
full_content = digest.full_digest
|
||||
content_hash = generate_content_hash(full_content, search_space_id)
|
||||
|
||||
# Check if document with this unique identifier already exists
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
logger.info(f"Repository {repo_full_name} unchanged. Skipping.")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Queue existing document for update (will be set to processing in Phase 2)
|
||||
logger.info(
|
||||
f"Content changed for repository {repo_full_name}. Queuing for update."
|
||||
)
|
||||
repos_to_process.append({
|
||||
'document': existing_document,
|
||||
'is_new': False,
|
||||
'digest': digest,
|
||||
'content_hash': content_hash,
|
||||
'repo_full_name': repo_full_name,
|
||||
'unique_identifier_hash': unique_identifier_hash,
|
||||
})
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
f"Repository {repo_full_name} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Create new document with PENDING status (visible in UI immediately)
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=repo_full_name,
|
||||
document_type=DocumentType.GITHUB_CONNECTOR,
|
||||
document_metadata={
|
||||
"repository_full_name": repo_full_name,
|
||||
"url": f"https://github.com/{repo_full_name}",
|
||||
"branch": digest.branch,
|
||||
"ingestion_method": "gitingest",
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content="Pending...", # Placeholder until processed
|
||||
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=None,
|
||||
chunks=[], # Empty at creation - safe for async
|
||||
status=DocumentStatus.pending(), # Pending until processing starts
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
session.add(document)
|
||||
new_documents_created = True
|
||||
|
||||
repos_to_process.append({
|
||||
'document': document,
|
||||
'is_new': True,
|
||||
'digest': digest,
|
||||
'content_hash': content_hash,
|
||||
'repo_full_name': repo_full_name,
|
||||
'unique_identifier_hash': unique_identifier_hash,
|
||||
})
|
||||
|
||||
except Exception as repo_err:
|
||||
logger.error(
|
||||
f"Failed to process repository {repo_full_name}: {repo_err}"
|
||||
f"Error in Phase 1 for repository {repo_full_name}: {repo_err}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(f"Phase 1 error for {repo_full_name}: {repo_err}")
|
||||
documents_failed += 1
|
||||
|
||||
# Commit all pending documents - they all appear in UI now
|
||||
if new_documents_created:
|
||||
logger.info(f"Phase 1: Committing {len([r for r in repos_to_process if r['is_new']])} pending documents")
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Process each document one by one
|
||||
# Each document transitions: pending → processing → ready/failed
|
||||
# =======================================================================
|
||||
logger.info(f"Phase 2: Processing {len(repos_to_process)} documents")
|
||||
|
||||
for item in repos_to_process:
|
||||
# Send heartbeat periodically
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
await on_heartbeat_callback(documents_indexed)
|
||||
last_heartbeat_time = current_time
|
||||
|
||||
document = item['document']
|
||||
digest = item['digest']
|
||||
repo_full_name = item['repo_full_name']
|
||||
|
||||
try:
|
||||
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
# Heavy processing (LLM, embeddings, chunks)
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
document_metadata_for_summary = {
|
||||
"repository": repo_full_name,
|
||||
"document_type": "GitHub Repository",
|
||||
"connector_type": "GitHub",
|
||||
"ingestion_method": "gitingest",
|
||||
"file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree,
|
||||
"estimated_tokens": digest.estimated_tokens,
|
||||
}
|
||||
|
||||
if user_llm:
|
||||
# Prepare content for summarization
|
||||
summary_content = digest.full_digest
|
||||
if len(summary_content) > MAX_DIGEST_CHARS:
|
||||
summary_content = (
|
||||
f"# Repository: {repo_full_name}\n\n"
|
||||
f"## File Structure\n\n{digest.tree}\n\n"
|
||||
f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
|
||||
)
|
||||
|
||||
summary_text, summary_embedding = await generate_document_summary(
|
||||
summary_content, user_llm, document_metadata_for_summary
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_text = (
|
||||
f"# GitHub Repository: {repo_full_name}\n\n"
|
||||
f"## Summary\n{digest.summary}\n\n"
|
||||
f"## File Structure\n{digest.tree[:3000]}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(summary_text)
|
||||
|
||||
# Chunk the full digest content for granular search
|
||||
try:
|
||||
chunks_data = await create_document_chunks(digest.content)
|
||||
except Exception as chunk_err:
|
||||
logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}")
|
||||
chunks_data = await _simple_chunk_content(digest.content)
|
||||
|
||||
# Update document to READY with actual content
|
||||
doc_metadata = {
|
||||
"repository_full_name": repo_full_name,
|
||||
"url": f"https://github.com/{repo_full_name}",
|
||||
"branch": digest.branch,
|
||||
"ingestion_method": "gitingest",
|
||||
"file_tree": digest.tree,
|
||||
"gitingest_summary": digest.summary,
|
||||
"estimated_tokens": digest.estimated_tokens,
|
||||
"connector_id": connector_id,
|
||||
"indexed_at": datetime.now(UTC).isoformat(),
|
||||
}
|
||||
|
||||
document.title = repo_full_name
|
||||
document.content = summary_text
|
||||
document.content_hash = item['content_hash']
|
||||
document.embedding = summary_embedding
|
||||
document.document_metadata = doc_metadata
|
||||
safe_set_chunks(document, chunks_data)
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
|
||||
documents_processed += 1
|
||||
documents_indexed += 1
|
||||
|
||||
logger.info(
|
||||
f"Created document for repository {repo_full_name} "
|
||||
f"with {len(chunks_data)} chunks"
|
||||
)
|
||||
|
||||
# Batch commit every 5 documents (repositories are large)
|
||||
if documents_indexed % 5 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {documents_indexed} GitHub repos processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as repo_err:
|
||||
logger.error(
|
||||
f"Error processing repository {repo_full_name}: {repo_err}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Mark document as failed with reason (visible in UI)
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(repo_err))
|
||||
document.updated_at = get_current_timestamp()
|
||||
except Exception as status_error:
|
||||
logger.error(f"Failed to update document status to failed: {status_error}")
|
||||
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
|
||||
documents_failed += 1
|
||||
continue
|
||||
|
||||
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Final commit
|
||||
await session.commit()
|
||||
logger.info(f"Final commit: Total {documents_processed} GitHub repositories processed")
|
||||
try:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Successfully committed all GitHub document changes to database"
|
||||
)
|
||||
except Exception as e:
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in str(e).lower()
|
||||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
)
|
||||
await session.rollback()
|
||||
else:
|
||||
raise
|
||||
|
||||
logger.info(
|
||||
f"Finished GitHub indexing for connector {connector_id}. "
|
||||
f"Created {documents_processed} documents."
|
||||
|
|
@ -247,6 +472,8 @@ async def index_github_repos(
|
|||
f"Successfully completed GitHub indexing for connector {connector_id}",
|
||||
{
|
||||
"documents_processed": documents_processed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"documents_failed": documents_failed,
|
||||
"errors_count": len(errors),
|
||||
"repo_count": len(repo_full_names_to_index),
|
||||
"method": "gitingest",
|
||||
|
|
@ -286,163 +513,6 @@ async def index_github_repos(
|
|||
return documents_processed, error_message
|
||||
|
||||
|
||||
async def _process_repository_digest(
|
||||
session: AsyncSession,
|
||||
digest: RepositoryDigest,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
task_logger: TaskLoggingService,
|
||||
log_entry,
|
||||
connector_id: int,
|
||||
) -> int:
|
||||
"""
|
||||
Process a repository digest and create documents.
|
||||
|
||||
For each repository, we create:
|
||||
1. One main document with the repository summary
|
||||
2. Chunks from the full digest content for granular search
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
digest: The repository digest from gitingest
|
||||
search_space_id: ID of the search space
|
||||
user_id: ID of the user
|
||||
task_logger: Task logging service
|
||||
log_entry: Current log entry
|
||||
|
||||
Returns:
|
||||
Number of documents created
|
||||
"""
|
||||
repo_full_name = digest.repo_full_name
|
||||
documents_created = 0
|
||||
|
||||
# Generate unique identifier based on repo name and content hash
|
||||
# This allows updates when repo content changes
|
||||
full_content = digest.full_digest
|
||||
content_hash = generate_content_hash(full_content, search_space_id)
|
||||
|
||||
# Use repo name as the unique identifier (one document per repo)
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.GITHUB_CONNECTOR, repo_full_name, search_space_id
|
||||
)
|
||||
|
||||
# Check if document with this unique identifier already exists
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
logger.info(f"Repository {repo_full_name} unchanged. Skipping.")
|
||||
return 0
|
||||
else:
|
||||
logger.info(
|
||||
f"Content changed for repository {repo_full_name}. Updating document."
|
||||
)
|
||||
# Delete existing document to replace with new one
|
||||
await session.delete(existing_document)
|
||||
await session.flush()
|
||||
else:
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
f"Repository {repo_full_name} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||
)
|
||||
return 0
|
||||
|
||||
# Generate summary using LLM (ONE call per repository!)
|
||||
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
document_metadata = {
|
||||
"repository": repo_full_name,
|
||||
"document_type": "GitHub Repository",
|
||||
"connector_type": "GitHub",
|
||||
"ingestion_method": "gitingest",
|
||||
"file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree,
|
||||
"estimated_tokens": digest.estimated_tokens,
|
||||
}
|
||||
|
||||
if user_llm:
|
||||
# Prepare content for summarization
|
||||
# Include tree structure and truncated content if too large
|
||||
summary_content = digest.full_digest
|
||||
if len(summary_content) > MAX_DIGEST_CHARS:
|
||||
# Truncate but keep the tree and beginning of content
|
||||
summary_content = (
|
||||
f"# Repository: {repo_full_name}\n\n"
|
||||
f"## File Structure\n\n{digest.tree}\n\n"
|
||||
f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
|
||||
)
|
||||
|
||||
summary_text, summary_embedding = await generate_document_summary(
|
||||
summary_content, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_text = (
|
||||
f"# GitHub Repository: {repo_full_name}\n\n"
|
||||
f"## Summary\n{digest.summary}\n\n"
|
||||
f"## File Structure\n{digest.tree[:3000]}"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(summary_text)
|
||||
|
||||
# Chunk the full digest content for granular search
|
||||
try:
|
||||
# Use the content (not the summary) for chunking
|
||||
# This preserves file-level granularity in search
|
||||
chunks_data = await create_document_chunks(digest.content)
|
||||
except Exception as chunk_err:
|
||||
logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}")
|
||||
# Fall back to a simpler chunking approach
|
||||
chunks_data = await _simple_chunk_content(digest.content)
|
||||
|
||||
# Create the document
|
||||
doc_metadata = {
|
||||
"repository_full_name": repo_full_name,
|
||||
"url": f"https://github.com/{repo_full_name}",
|
||||
"branch": digest.branch,
|
||||
"ingestion_method": "gitingest",
|
||||
"file_tree": digest.tree,
|
||||
"gitingest_summary": digest.summary,
|
||||
"estimated_tokens": digest.estimated_tokens,
|
||||
"indexed_at": datetime.now(UTC).isoformat(),
|
||||
}
|
||||
|
||||
document = Document(
|
||||
title=repo_full_name,
|
||||
document_type=DocumentType.GITHUB_CONNECTOR,
|
||||
document_metadata=doc_metadata,
|
||||
content=summary_text,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
search_space_id=search_space_id,
|
||||
chunks=chunks_data,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_created += 1
|
||||
|
||||
logger.info(
|
||||
f"Created document for repository {repo_full_name} "
|
||||
f"with {len(chunks_data)} chunks"
|
||||
)
|
||||
|
||||
return documents_created
|
||||
|
||||
|
||||
async def _simple_chunk_content(content: str, chunk_size: int = 4000) -> list:
|
||||
"""
|
||||
Simple fallback chunking when the regular chunker fails.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue