feat: Integrate gitingest for GitHub repository ingestion

- Added gitingest as a dependency to streamline the ingestion of GitHub repositories.
- Refactored GitHubConnector to utilize gitingest for efficient repository digest generation, reducing API calls.
- Updated GitHub indexer to process entire repository digests, enhancing performance and simplifying the indexing process.
- Modified GitHub connect form to indicate that the Personal Access Token is optional for public repositories.
This commit is contained in:
Anish Sarkar 2026-01-20 21:52:32 +05:30
parent 6e331c3b85
commit 49b8a46d10
6 changed files with 545 additions and 539 deletions

View file

@ -1,5 +1,8 @@
"""
GitHub connector indexer.
GitHub connector indexer using gitingest.
This indexer processes entire repository digests in one pass, dramatically
reducing LLM API calls compared to the previous file-by-file approach.
"""
from datetime import UTC, datetime
@ -8,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.github_connector import GitHubConnector
from app.connectors.github_connector import GitHubConnector, RepositoryDigest
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -26,43 +29,55 @@ from .base import (
logger,
)
# Maximum tokens for a single digest before splitting
# Most LLMs can handle 128k+ tokens now, but we'll be conservative
MAX_DIGEST_CHARS = 500_000 # ~125k tokens
async def index_github_repos(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
start_date: str | None = None, # Ignored - GitHub indexes full repo snapshots
end_date: str | None = None, # Ignored - GitHub indexes full repo snapshots
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index code and documentation files from accessible GitHub repositories.
Index GitHub repositories using gitingest for efficient processing.
This function ingests entire repositories as digests, generates a single
summary per repository, and chunks the content for vector storage.
Note: The start_date and end_date parameters are accepted for API compatibility
but are IGNORED. GitHub repositories are indexed as complete snapshots since
gitingest captures the current state of the entire codebase.
Args:
session: Database session
connector_id: ID of the GitHub connector
search_space_id: ID of the search space to store documents in
user_id: ID of the user
start_date: Start date for filtering (YYYY-MM-DD format) - Note: GitHub indexing processes all files regardless of dates
end_date: End date for filtering (YYYY-MM-DD format) - Note: GitHub indexing processes all files regardless of dates
start_date: Ignored - kept for API compatibility
end_date: Ignored - kept for API compatibility
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
# Note: start_date and end_date are intentionally unused
_ = start_date, end_date
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="github_repos_indexing",
source="connector_indexing_task",
message=f"Starting GitHub repositories indexing for connector {connector_id}",
message=f"Starting GitHub repositories indexing for connector {connector_id} (using gitingest)",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
"method": "gitingest",
},
)
@ -93,19 +108,11 @@ async def index_github_repos(
f"Connector with ID {connector_id} not found or is not a GitHub connector",
)
# 2. Get the GitHub PAT and selected repositories from the connector config
github_pat = connector.config.get("GITHUB_PAT")
# 2. Get the GitHub PAT (optional) and selected repositories from the connector config
# PAT is only required for private repositories - public repos work without it
github_pat = connector.config.get("GITHUB_PAT") # Can be None or empty
repo_full_names_to_index = connector.config.get("repo_full_names")
if not github_pat:
await task_logger.log_task_failure(
log_entry,
f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
"Missing GitHub PAT",
{"error_type": "MissingToken"},
)
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
if not repo_full_names_to_index or not isinstance(
repo_full_names_to_index, list
):
@ -117,10 +124,16 @@ async def index_github_repos(
)
return 0, "'repo_full_names' not found or is not a list in connector config"
# 3. Initialize GitHub connector client
# Log whether we're using authentication
if github_pat:
logger.info("Using GitHub PAT for authentication (private repos supported)")
else:
logger.info("No GitHub PAT provided - only public repositories can be indexed")
# 3. Initialize GitHub connector with gitingest backend
await task_logger.log_task_progress(
log_entry,
f"Initializing GitHub client for connector {connector_id}",
f"Initializing gitingest-based GitHub client for connector {connector_id}",
{
"stage": "client_initialization",
"repo_count": len(repo_full_names_to_index),
@ -138,258 +151,52 @@ async def index_github_repos(
)
return 0, f"Failed to initialize GitHub client: {e!s}"
# 4. Validate selected repositories
# 4. Process each repository with gitingest
await task_logger.log_task_progress(
log_entry,
f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
f"Starting gitingest processing for {len(repo_full_names_to_index)} repositories",
{
"stage": "repo_processing",
"repo_count": len(repo_full_names_to_index),
"start_date": start_date,
"end_date": end_date,
},
)
logger.info(
f"Starting indexing for {len(repo_full_names_to_index)} selected repositories."
f"Starting gitingest indexing for {len(repo_full_names_to_index)} repositories."
)
if start_date and end_date:
logger.info(
f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)"
)
# 6. Iterate through selected repositories and index files
for repo_full_name in repo_full_names_to_index:
if not repo_full_name or not isinstance(repo_full_name, str):
logger.warning(f"Skipping invalid repository entry: {repo_full_name}")
continue
logger.info(f"Processing repository: {repo_full_name}")
logger.info(f"Ingesting repository: {repo_full_name}")
try:
files_to_index = github_client.get_repository_files(repo_full_name)
if not files_to_index:
logger.info(
f"No indexable files found in repository: {repo_full_name}"
# Ingest the entire repository
digest = await github_client.ingest_repository(repo_full_name)
if not digest:
logger.warning(
f"No digest returned for repository: {repo_full_name}"
)
errors.append(f"No digest for {repo_full_name}")
continue
logger.info(
f"Found {len(files_to_index)} files to process in {repo_full_name}"
# Process the digest and create documents
docs_created = await _process_repository_digest(
session=session,
digest=digest,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
for file_info in files_to_index:
file_path = file_info.get("path")
file_url = file_info.get("url")
file_sha = file_info.get("sha")
file_type = file_info.get("type") # 'code' or 'doc'
full_path_key = f"{repo_full_name}/{file_path}"
if not file_path or not file_url or not file_sha:
logger.warning(
f"Skipping file with missing info in {repo_full_name}: {file_info}"
)
continue
# Get file content
file_content = github_client.get_file_content(
repo_full_name, file_path
)
if file_content is None:
logger.warning(
f"Could not retrieve content for {full_path_key}. Skipping."
)
continue # Skip if content fetch failed
# Generate unique identifier hash for this GitHub file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GITHUB_CONNECTOR, file_sha, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_content, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for GitHub file {full_path_key} unchanged. Skipping."
)
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for GitHub file {full_path_key}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
file_extension = (
file_path.split(".")[-1]
if "." in file_path
else None
)
document_metadata = {
"file_path": full_path_key,
"repository": repo_full_name,
"file_type": file_extension or "unknown",
"document_type": "GitHub Repository File",
"connector_type": "GitHub",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
file_content, user_llm, document_metadata
)
else:
summary_content = f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..."
summary_embedding = (
config.embedding_model_instance.embed(
summary_content
)
)
# Chunk the content
try:
if hasattr(config, "code_chunker_instance"):
chunks_data = [
await create_document_chunks(file_content)
][0]
else:
chunks_data = await create_document_chunks(
file_content
)
except Exception as chunk_err:
logger.error(
f"Failed to chunk file {full_path_key}: {chunk_err}"
)
continue
# Update existing document
existing_document.title = f"GitHub - {full_path_key}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"file_path": file_path,
"file_sha": file_sha,
"file_url": file_url,
"repository": repo_full_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks_data
existing_document.updated_at = get_current_timestamp()
logger.info(
f"Successfully updated GitHub file {full_path_key}"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
# Extract file extension from file path
file_extension = (
file_path.split(".")[-1] if "." in file_path else None
)
document_metadata = {
"file_path": full_path_key,
"repository": repo_full_name,
"file_type": file_extension or "unknown",
"document_type": "GitHub Repository File",
"connector_type": "GitHub",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
file_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = (
f"GitHub file: {full_path_key}\n\n{file_content[:1000]}..."
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Chunk the content
try:
chunks_data = [await create_document_chunks(file_content)][0]
# Use code chunker if available, otherwise regular chunker
if hasattr(config, "code_chunker_instance"):
chunks_data = [
{
"content": chunk.text,
"embedding": config.embedding_model_instance.embed(
chunk.text
),
}
for chunk in config.code_chunker_instance.chunk(
file_content
)
]
else:
chunks_data = await create_document_chunks(file_content)
except Exception as chunk_err:
logger.error(
f"Failed to chunk file {full_path_key}: {chunk_err}"
)
errors.append(
f"Chunking failed for {full_path_key}: {chunk_err}"
)
continue # Skip this file if chunking fails
doc_metadata = {
"repository_full_name": repo_full_name,
"file_path": file_path,
"full_path": full_path_key, # For easier lookup
"url": file_url,
"sha": file_sha,
"type": file_type,
"indexed_at": datetime.now(UTC).isoformat(),
}
# Create new document
logger.info(f"Creating new document for file: {full_path_key}")
document = Document(
title=f"GitHub - {file_path}",
document_type=DocumentType.GITHUB_CONNECTOR,
document_metadata=doc_metadata,
content=summary_content, # Store summary
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
chunks=chunks_data, # Associate chunks directly
updated_at=get_current_timestamp(),
)
session.add(document)
documents_processed += 1
# Batch commit every 10 documents
if documents_processed % 10 == 0:
logger.info(
f"Committing batch: {documents_processed} GitHub files processed so far"
)
await session.commit()
documents_processed += docs_created
logger.info(
f"Created {docs_created} documents from repository: {repo_full_name}"
)
except Exception as repo_err:
logger.error(
@ -397,11 +204,11 @@ async def index_github_repos(
)
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_processed} GitHub files processed")
# Final commit
await session.commit()
logger.info(
f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files."
f"Finished GitHub indexing for connector {connector_id}. "
f"Created {documents_processed} documents."
)
# Log success
@ -412,6 +219,7 @@ async def index_github_repos(
"documents_processed": documents_processed,
"errors_count": len(errors),
"repo_count": len(repo_full_names_to_index),
"method": "gitingest",
},
)
@ -428,6 +236,7 @@ async def index_github_repos(
)
errors.append(f"Database error: {db_err}")
return documents_processed, "; ".join(errors) if errors else str(db_err)
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -445,3 +254,173 @@ async def index_github_repos(
error_message = "; ".join(errors) if errors else None
return documents_processed, error_message
async def _process_repository_digest(
session: AsyncSession,
digest: RepositoryDigest,
search_space_id: int,
user_id: str,
task_logger: TaskLoggingService,
log_entry,
) -> int:
"""
Process a repository digest and create documents.
For each repository, we create:
1. One main document with the repository summary
2. Chunks from the full digest content for granular search
Args:
session: Database session
digest: The repository digest from gitingest
search_space_id: ID of the search space
user_id: ID of the user
task_logger: Task logging service
log_entry: Current log entry
Returns:
Number of documents created
"""
repo_full_name = digest.repo_full_name
documents_created = 0
# Generate unique identifier based on repo name and content hash
# This allows updates when repo content changes
full_content = digest.full_digest
content_hash = generate_content_hash(full_content, search_space_id)
# Use repo name as the unique identifier (one document per repo)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GITHUB_CONNECTOR, repo_full_name, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Repository {repo_full_name} unchanged. Skipping."
)
return 0
else:
logger.info(
f"Content changed for repository {repo_full_name}. Updating document."
)
# Delete existing document to replace with new one
await session.delete(existing_document)
await session.flush()
# Generate summary using LLM (ONE call per repository!)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
document_metadata = {
"repository": repo_full_name,
"document_type": "GitHub Repository",
"connector_type": "GitHub",
"ingestion_method": "gitingest",
"file_tree": digest.tree[:2000] if len(digest.tree) > 2000 else digest.tree,
"estimated_tokens": digest.estimated_tokens,
}
if user_llm:
# Prepare content for summarization
# Include tree structure and truncated content if too large
summary_content = digest.full_digest
if len(summary_content) > MAX_DIGEST_CHARS:
# Truncate but keep the tree and beginning of content
summary_content = (
f"# Repository: {repo_full_name}\n\n"
f"## File Structure\n\n{digest.tree}\n\n"
f"## File Contents (truncated)\n\n{digest.content[:MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
)
summary_text, summary_embedding = await generate_document_summary(
summary_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_text = (
f"# GitHub Repository: {repo_full_name}\n\n"
f"## Summary\n{digest.summary}\n\n"
f"## File Structure\n{digest.tree[:3000]}"
)
summary_embedding = config.embedding_model_instance.embed(summary_text)
# Chunk the full digest content for granular search
try:
# Use the content (not the summary) for chunking
# This preserves file-level granularity in search
chunks_data = await create_document_chunks(digest.content)
except Exception as chunk_err:
logger.error(
f"Failed to chunk repository {repo_full_name}: {chunk_err}"
)
# Fall back to a simpler chunking approach
chunks_data = await _simple_chunk_content(digest.content)
# Create the document
doc_metadata = {
"repository_full_name": repo_full_name,
"url": f"https://github.com/{repo_full_name}",
"branch": digest.branch,
"ingestion_method": "gitingest",
"file_tree": digest.tree,
"gitingest_summary": digest.summary,
"estimated_tokens": digest.estimated_tokens,
"indexed_at": datetime.now(UTC).isoformat(),
}
document = Document(
title=f"GitHub Repository: {repo_full_name}",
document_type=DocumentType.GITHUB_CONNECTOR,
document_metadata=doc_metadata,
content=summary_text,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
chunks=chunks_data,
updated_at=get_current_timestamp(),
)
session.add(document)
documents_created += 1
logger.info(
f"Created document for repository {repo_full_name} "
f"with {len(chunks_data)} chunks"
)
return documents_created
async def _simple_chunk_content(content: str, chunk_size: int = 4000) -> list:
"""
Simple fallback chunking when the regular chunker fails.
Args:
content: The content to chunk
chunk_size: Size of each chunk in characters
Returns:
List of chunk dictionaries with content and embedding
"""
from app.db import Chunk
chunks = []
for i in range(0, len(content), chunk_size):
chunk_text = content[i : i + chunk_size]
if chunk_text.strip():
chunks.append(
Chunk(
content=chunk_text,
embedding=config.embedding_model_instance.embed(chunk_text),
)
)
return chunks