mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
feat(backend): Use deterministic content in connector ingestion
This commit is contained in:
parent
81fa219b30
commit
f3866b9e7e
24 changed files with 80 additions and 625 deletions
|
|
@ -141,7 +141,6 @@ async def download_and_process_file(
|
||||||
task_logger: TaskLoggingService,
|
task_logger: TaskLoggingService,
|
||||||
log_entry: Log,
|
log_entry: Log,
|
||||||
connector_id: int | None = None,
|
connector_id: int | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
) -> tuple[Any, str | None, dict[str, Any] | None]:
|
) -> tuple[Any, str | None, dict[str, Any] | None]:
|
||||||
"""
|
"""
|
||||||
Download Google Drive file and process using Surfsense file processors.
|
Download Google Drive file and process using Surfsense file processors.
|
||||||
|
|
@ -215,8 +214,6 @@ async def download_and_process_file(
|
||||||
"source_connector": "google_drive",
|
"source_connector": "google_drive",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
# Include connector_id for de-indexing support
|
|
||||||
connector_info["enable_summary"] = enable_summary
|
|
||||||
if connector_id is not None:
|
if connector_id is not None:
|
||||||
connector_info["connector_id"] = connector_id
|
connector_info["connector_id"] = connector_id
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -65,29 +64,11 @@ class ConfluenceKBSyncService:
|
||||||
if dup:
|
if dup:
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"page_title": page_title,
|
|
||||||
"space_id": space_id,
|
|
||||||
"document_type": "Confluence Page",
|
|
||||||
"connector_type": "Confluence",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
summary_embedding = embed_text(summary_content)
|
||||||
page_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(page_content)
|
chunks = await create_document_chunks(page_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -185,25 +166,10 @@ class ConfluenceKBSyncService:
|
||||||
|
|
||||||
space_id = (document.document_metadata or {}).get("space_id", "")
|
space_id = (document.document_metadata or {}).get("space_id", "")
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session, user_id, search_space_id, disable_streaming=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
||||||
doc_meta = {
|
summary_embedding = embed_text(summary_content)
|
||||||
"page_title": page_title,
|
|
||||||
"space_id": space_id,
|
|
||||||
"document_type": "Confluence Page",
|
|
||||||
"connector_type": "Confluence",
|
|
||||||
}
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
page_content, user_llm, doc_meta
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(page_content)
|
chunks = await create_document_chunks(page_content)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -72,29 +71,11 @@ class DropboxKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"file_name": file_name,
|
|
||||||
"document_type": "Dropbox File",
|
|
||||||
"connector_type": "Dropbox",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Dropbox File: {file_name}\n\n{indexable_content}"
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
summary_embedding = embed_text(summary_content)
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
|
||||||
summary_content = f"Dropbox File: {file_name}\n\n{indexable_content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -78,30 +77,11 @@ class GmailKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"subject": subject,
|
|
||||||
"sender": sender,
|
|
||||||
"document_type": "Gmail Message",
|
|
||||||
"connector_type": "Gmail",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Gmail Message: {subject}\n\n{indexable_content}"
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured -- using fallback summary")
|
|
||||||
summary_content = f"Gmail Message: {subject}\n\n{indexable_content}"
|
|
||||||
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -90,33 +89,13 @@ class GoogleCalendarKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
summary_content = (
|
||||||
search_space_id,
|
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
||||||
disable_streaming=True,
|
|
||||||
)
|
)
|
||||||
|
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"event_summary": event_summary,
|
|
||||||
"start_time": start_time,
|
|
||||||
"end_time": end_time,
|
|
||||||
"document_type": "Google Calendar Event",
|
|
||||||
"connector_type": "Google Calendar",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured -- using fallback summary")
|
|
||||||
summary_content = (
|
|
||||||
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
|
||||||
)
|
|
||||||
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -273,29 +252,13 @@ class GoogleCalendarKBSyncService:
|
||||||
if not indexable_content:
|
if not indexable_content:
|
||||||
return {"status": "error", "message": "Event produced empty content"}
|
return {"status": "error", "message": "Event produced empty content"}
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session, user_id, search_space_id, disable_streaming=True
|
|
||||||
|
summary_content = (
|
||||||
|
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
||||||
)
|
)
|
||||||
|
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"event_summary": event_summary,
|
|
||||||
"start_time": start_time,
|
|
||||||
"end_time": end_time,
|
|
||||||
"document_type": "Google Calendar Event",
|
|
||||||
"connector_type": "Google Calendar",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = (
|
|
||||||
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
|
||||||
)
|
|
||||||
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -74,32 +73,13 @@ class GoogleDriveKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
summary_content = (
|
||||||
search_space_id,
|
f"Google Drive File: {file_name}\n\n{indexable_content}"
|
||||||
disable_streaming=True,
|
|
||||||
)
|
)
|
||||||
|
summary_embedding = embed_text(summary_content)
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"file_name": file_name,
|
|
||||||
"mime_type": mime_type,
|
|
||||||
"document_type": "Google Drive File",
|
|
||||||
"connector_type": "Google Drive",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
|
||||||
summary_content = (
|
|
||||||
f"Google Drive File: {file_name}\n\n{indexable_content}"
|
|
||||||
)
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -84,32 +83,13 @@ class LinearKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
summary_content = (
|
||||||
search_space_id,
|
f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
||||||
disable_streaming=True,
|
|
||||||
)
|
)
|
||||||
|
summary_embedding = embed_text(summary_content)
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"issue_id": issue_identifier,
|
|
||||||
"issue_title": issue_title,
|
|
||||||
"document_type": "Linear Issue",
|
|
||||||
"connector_type": "Linear",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
issue_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
|
||||||
summary_content = (
|
|
||||||
f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
|
||||||
)
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(issue_content)
|
chunks = await create_document_chunks(issue_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -227,30 +207,12 @@ class LinearKBSyncService:
|
||||||
comment_count = len(formatted_issue.get("comments", []))
|
comment_count = len(formatted_issue.get("comments", []))
|
||||||
formatted_issue.get("description", "")
|
formatted_issue.get("description", "")
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session, user_id, search_space_id, disable_streaming=True
|
summary_content = (
|
||||||
|
f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
||||||
)
|
)
|
||||||
|
summary_embedding = embed_text(summary_content)
|
||||||
if user_llm:
|
|
||||||
document_metadata_for_summary = {
|
|
||||||
"issue_id": issue_identifier,
|
|
||||||
"issue_title": issue_title,
|
|
||||||
"state": state,
|
|
||||||
"priority": priority,
|
|
||||||
"comment_count": comment_count,
|
|
||||||
"document_type": "Linear Issue",
|
|
||||||
"connector_type": "Linear",
|
|
||||||
}
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
issue_content, user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = (
|
|
||||||
f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
|
||||||
)
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(issue_content)
|
chunks = await create_document_chunks(issue_content)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -73,30 +72,11 @@ class NotionKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"page_title": page_title,
|
|
||||||
"page_id": page_id,
|
|
||||||
"document_type": "Notion Page",
|
|
||||||
"connector_type": "Notion",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Notion Page: {page_title}\n\n{markdown_content}"
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
summary_embedding = embed_text(summary_content)
|
||||||
markdown_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
|
||||||
summary_content = f"Notion Page: {page_title}\n\n{markdown_content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(markdown_content)
|
chunks = await create_document_chunks(markdown_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -245,31 +225,11 @@ class NotionKBSyncService:
|
||||||
f"Final content length: {len(full_content)} chars, verified={content_verified}"
|
f"Final content length: {len(full_content)} chars, verified={content_verified}"
|
||||||
)
|
)
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
logger.debug("Generating summary and embeddings")
|
logger.debug("Generating summary and embeddings")
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True, # disable streaming to avoid leaking into the chat
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"Notion Page: {document.document_metadata.get('page_title')}\n\n{full_content}"
|
||||||
document_metadata_for_summary = {
|
summary_embedding = embed_text(summary_content)
|
||||||
"page_title": document.document_metadata.get("page_title"),
|
|
||||||
"page_id": document.document_metadata.get("page_id"),
|
|
||||||
"document_type": "Notion Page",
|
|
||||||
"connector_type": "Notion",
|
|
||||||
}
|
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
|
||||||
full_content, user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
logger.debug(f"Generated summary length: {len(summary_content)} chars")
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured - using fallback summary")
|
|
||||||
summary_content = f"Notion Page: {document.document_metadata.get('page_title')}\n\n{full_content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
logger.debug("Creating new chunks")
|
logger.debug("Creating new chunks")
|
||||||
chunks = await create_document_chunks(full_content)
|
chunks = await create_document_chunks(full_content)
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -73,30 +72,11 @@ class OneDriveKBSyncService:
|
||||||
)
|
)
|
||||||
content_hash = unique_hash
|
content_hash = unique_hash
|
||||||
|
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
|
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
self.db_session,
|
|
||||||
user_id,
|
|
||||||
search_space_id,
|
|
||||||
disable_streaming=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_metadata_for_summary = {
|
|
||||||
"file_name": file_name,
|
|
||||||
"mime_type": mime_type,
|
|
||||||
"document_type": "OneDrive File",
|
|
||||||
"connector_type": "OneDrive",
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm:
|
summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}"
|
||||||
summary_content, summary_embedding = await generate_document_summary(
|
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
||||||
indexable_content, user_llm, doc_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
|
||||||
summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}"
|
|
||||||
summary_embedding = await asyncio.to_thread(embed_text, summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -14,13 +14,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.airtable_history import AirtableHistoryConnector
|
from app.connectors.airtable_history import AirtableHistoryConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -394,29 +392,10 @@ async def index_airtable_records(
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Heavy processing (LLM, embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
summary_content = f"Airtable Record: {item['record_id']}\n\n{item['markdown_content']}"
|
||||||
document_metadata_for_summary = {
|
summary_embedding = embed_text(summary_content)
|
||||||
"record_id": item["record_id"],
|
|
||||||
"created_time": item["record"].get("CREATED_TIME()", ""),
|
|
||||||
"document_type": "Airtable Record",
|
|
||||||
"connector_type": "Airtable",
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
item["markdown_content"],
|
|
||||||
user_llm,
|
|
||||||
document_metadata_for_summary,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"Airtable Record: {item['record_id']}\n\n{item['markdown_content']}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(item["markdown_content"])
|
chunks = await create_document_chunks(item["markdown_content"])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,13 +15,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.bookstack_connector import BookStackConnector
|
from app.connectors.bookstack_connector import BookStackConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -384,10 +382,7 @@ async def index_bookstack_pages(
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Heavy processing (LLM, embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# Build document metadata
|
# Build document metadata
|
||||||
doc_metadata = {
|
doc_metadata = {
|
||||||
|
|
@ -403,23 +398,8 @@ async def index_bookstack_pages(
|
||||||
"connector_id": connector_id,
|
"connector_id": connector_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n{item['full_content']}"
|
||||||
summary_metadata = {
|
summary_embedding = embed_text(summary_content)
|
||||||
"page_name": item["page_name"],
|
|
||||||
"page_id": item["page_id"],
|
|
||||||
"book_id": item["book_id"],
|
|
||||||
"document_type": "BookStack Page",
|
|
||||||
"connector_type": "BookStack",
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
item["full_content"], user_llm, summary_metadata
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n{item['full_content']}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
# Process chunks - using the full page content
|
# Process chunks - using the full page content
|
||||||
chunks = await create_document_chunks(item["full_content"])
|
chunks = await create_document_chunks(item["full_content"])
|
||||||
|
|
|
||||||
|
|
@ -16,13 +16,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.clickup_history import ClickUpHistoryConnector
|
from app.connectors.clickup_history import ClickUpHistoryConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -393,32 +391,10 @@ async def index_clickup_tasks(
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Heavy processing (LLM, embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
summary_content = item["task_content"]
|
||||||
document_metadata_for_summary = {
|
summary_embedding = embed_text(item["task_content"])
|
||||||
"task_id": item["task_id"],
|
|
||||||
"task_name": item["task_name"],
|
|
||||||
"task_status": item["task_status"],
|
|
||||||
"task_priority": item["task_priority"],
|
|
||||||
"task_list": item["task_list_name"],
|
|
||||||
"task_space": item["task_space_name"],
|
|
||||||
"assignees": len(item["task_assignees"]),
|
|
||||||
"document_type": "ClickUp Task",
|
|
||||||
"connector_type": "ClickUp",
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
item["task_content"], user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = item["task_content"]
|
|
||||||
summary_embedding = embed_text(item["task_content"])
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(item["task_content"])
|
chunks = await create_document_chunks(item["task_content"])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
IndexingPipelineService,
|
IndexingPipelineService,
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
|
|
||||||
from .base import (
|
from .base import (
|
||||||
|
|
@ -36,7 +35,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Map a raw Confluence page dict to a ConnectorDocument."""
|
"""Map a raw Confluence page dict to a ConnectorDocument."""
|
||||||
page_id = page.get("id", "")
|
page_id = page.get("id", "")
|
||||||
|
|
@ -54,10 +52,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Confluence",
|
"connector_type": "Confluence",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = (
|
|
||||||
f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n{full_content}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=page_title,
|
title=page_title,
|
||||||
source_markdown=full_content,
|
source_markdown=full_content,
|
||||||
|
|
@ -66,8 +60,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -268,8 +260,7 @@ async def index_confluence_pages(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector.enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||||
|
|
@ -297,12 +288,8 @@ async def index_confluence_pages(
|
||||||
|
|
||||||
await pipeline.migrate_legacy_docs(connector_docs)
|
await pipeline.migrate_legacy_docs(connector_docs)
|
||||||
|
|
||||||
async def _get_llm(s: AsyncSession):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector
|
||||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.page_limit_service import PageLimitService
|
from app.services.page_limit_service import PageLimitService
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.tasks.connector_indexers.base import (
|
from app.tasks.connector_indexers.base import (
|
||||||
|
|
@ -126,7 +125,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
file_id = file.get("id", "")
|
file_id = file.get("id", "")
|
||||||
file_name = file.get("name", "Unknown")
|
file_name = file.get("name", "Unknown")
|
||||||
|
|
@ -138,8 +136,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Dropbox",
|
"connector_type": "Dropbox",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=file_name,
|
title=file_name,
|
||||||
source_markdown=markdown,
|
source_markdown=markdown,
|
||||||
|
|
@ -148,8 +144,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -161,7 +155,6 @@ async def _download_files_parallel(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
max_concurrency: int = 3,
|
max_concurrency: int = 3,
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
|
|
@ -191,7 +184,6 @@ async def _download_files_parallel(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
)
|
)
|
||||||
async with hb_lock:
|
async with hb_lock:
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
|
|
@ -223,7 +215,6 @@ async def _download_and_index(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
|
|
@ -234,7 +225,6 @@ async def _download_and_index(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -243,13 +233,8 @@ async def _download_and_index(
|
||||||
batch_failed = 0
|
batch_failed = 0
|
||||||
if connector_docs:
|
if connector_docs:
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
)
|
)
|
||||||
|
|
@ -289,7 +274,6 @@ async def _index_with_delta_sync(
|
||||||
log_entry: object,
|
log_entry: object,
|
||||||
max_files: int,
|
max_files: int,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int, str]:
|
) -> tuple[int, int, int, str]:
|
||||||
"""Delta sync using Dropbox cursor-based change tracking.
|
"""Delta sync using Dropbox cursor-based change tracking.
|
||||||
|
|
@ -361,7 +345,6 @@ async def _index_with_delta_sync(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -388,7 +371,6 @@ async def _index_full_scan(
|
||||||
include_subfolders: bool = True,
|
include_subfolders: bool = True,
|
||||||
incremental_sync: bool = True,
|
incremental_sync: bool = True,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int]:
|
) -> tuple[int, int, int]:
|
||||||
"""Full scan indexing of a folder.
|
"""Full scan indexing of a folder.
|
||||||
|
|
@ -473,7 +455,6 @@ async def _index_full_scan(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -502,7 +483,6 @@ async def _index_selected_files(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
incremental_sync: bool = True,
|
incremental_sync: bool = True,
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
|
|
@ -563,7 +543,6 @@ async def _index_selected_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -629,7 +608,6 @@ async def index_dropbox_files(
|
||||||
)
|
)
|
||||||
return 0, 0, error_msg, 0
|
return 0, 0, error_msg, 0
|
||||||
|
|
||||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
|
||||||
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
||||||
vision_llm = None
|
vision_llm = None
|
||||||
if connector_enable_vision_llm:
|
if connector_enable_vision_llm:
|
||||||
|
|
@ -664,7 +642,6 @@ async def index_dropbox_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
incremental_sync=incremental_sync,
|
incremental_sync=incremental_sync,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -700,7 +677,6 @@ async def index_dropbox_files(
|
||||||
task_logger,
|
task_logger,
|
||||||
log_entry,
|
log_entry,
|
||||||
max_files,
|
max_files,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
folder_cursors[folder_path] = new_cursor
|
folder_cursors[folder_path] = new_cursor
|
||||||
|
|
@ -720,7 +696,6 @@ async def index_dropbox_files(
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
incremental_sync=incremental_sync,
|
incremental_sync=incremental_sync,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
total_unsupported += unsup
|
total_unsupported += unsup
|
||||||
|
|
|
||||||
|
|
@ -18,13 +18,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.github_connector import GitHubConnector
|
from app.connectors.github_connector import GitHubConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -351,42 +349,14 @@ async def index_github_repos(
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Heavy processing (LLM, embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
summary_text = (
|
||||||
|
f"# GitHub Repository: {repo_full_name}\n\n"
|
||||||
|
f"## Summary\n{digest.summary}\n\n"
|
||||||
|
f"## File Structure\n{digest.tree}"
|
||||||
)
|
)
|
||||||
|
summary_embedding = embed_text(summary_text)
|
||||||
document_metadata_for_summary = {
|
|
||||||
"repository": repo_full_name,
|
|
||||||
"document_type": "GitHub Repository",
|
|
||||||
"connector_type": "GitHub",
|
|
||||||
"ingestion_method": "gitingest",
|
|
||||||
"file_tree": digest.tree[:2000]
|
|
||||||
if len(digest.tree) > 2000
|
|
||||||
else digest.tree,
|
|
||||||
"estimated_tokens": digest.estimated_tokens,
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
|
||||||
# Prepare content for summarization
|
|
||||||
summary_content = digest.full_digest
|
|
||||||
if len(summary_content) > MAX_DIGEST_CHARS:
|
|
||||||
summary_content = (
|
|
||||||
f"# Repository: {repo_full_name}\n\n"
|
|
||||||
f"## File Structure\n\n{digest.tree}\n\n"
|
|
||||||
f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
|
|
||||||
)
|
|
||||||
|
|
||||||
summary_text, summary_embedding = await generate_document_summary(
|
|
||||||
summary_content, user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_text = (
|
|
||||||
f"# GitHub Repository: {repo_full_name}\n\n"
|
|
||||||
f"## Summary\n{digest.summary}\n\n"
|
|
||||||
f"## File Structure\n{digest.tree}"
|
|
||||||
)
|
|
||||||
summary_embedding = embed_text(summary_text)
|
|
||||||
|
|
||||||
# Chunk the full digest content for granular search
|
# Chunk the full digest content for granular search
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.composio_service import ComposioService
|
from app.services.composio_service import ComposioService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
|
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
|
||||||
|
|
||||||
|
|
@ -53,7 +52,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Map a raw Google Calendar API event dict to a ConnectorDocument."""
|
"""Map a raw Google Calendar API event dict to a ConnectorDocument."""
|
||||||
event_id = event.get("id", "")
|
event_id = event.get("id", "")
|
||||||
|
|
@ -78,8 +76,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Google Calendar",
|
"connector_type": "Google Calendar",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = f"Google Calendar Event: {event_summary}\n\n{event_markdown}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=event_summary,
|
title=event_summary,
|
||||||
source_markdown=event_markdown,
|
source_markdown=event_markdown,
|
||||||
|
|
@ -88,8 +84,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -420,8 +414,7 @@ async def index_google_calendar_events(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector.enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate = await check_duplicate_document_by_hash(
|
duplicate = await check_duplicate_document_by_hash(
|
||||||
|
|
@ -448,13 +441,8 @@ async def index_google_calendar_events(
|
||||||
|
|
||||||
# ── Pipeline: migrate legacy docs + parallel index ─────────────
|
# ── Pipeline: migrate legacy docs + parallel index ─────────────
|
||||||
await pipeline.migrate_legacy_docs(connector_docs)
|
await pipeline.migrate_legacy_docs(connector_docs)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.composio_service import ComposioService
|
from app.services.composio_service import ComposioService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.page_limit_service import PageLimitService
|
from app.services.page_limit_service import PageLimitService
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.tasks.connector_indexers.base import (
|
from app.tasks.connector_indexers.base import (
|
||||||
|
|
@ -381,7 +380,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Build a ConnectorDocument from Drive file metadata + extracted markdown."""
|
"""Build a ConnectorDocument from Drive file metadata + extracted markdown."""
|
||||||
file_id = file.get("id", "")
|
file_id = file.get("id", "")
|
||||||
|
|
@ -394,8 +392,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Google Drive",
|
"connector_type": "Google Drive",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=file_name,
|
title=file_name,
|
||||||
source_markdown=markdown,
|
source_markdown=markdown,
|
||||||
|
|
@ -404,8 +400,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -461,7 +455,6 @@ async def _download_files_parallel(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
max_concurrency: int = 3,
|
max_concurrency: int = 3,
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
|
|
@ -494,7 +487,6 @@ async def _download_files_parallel(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
)
|
)
|
||||||
async with hb_lock:
|
async with hb_lock:
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
|
|
@ -525,7 +517,6 @@ async def _process_single_file(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int]:
|
) -> tuple[int, int, int]:
|
||||||
"""Download, extract, and index a single Drive file via the pipeline.
|
"""Download, extract, and index a single Drive file via the pipeline.
|
||||||
|
|
@ -561,8 +552,7 @@ async def _process_single_file(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
documents = await pipeline.prepare_for_indexing([doc])
|
documents = await pipeline.prepare_for_indexing([doc])
|
||||||
|
|
@ -578,10 +568,7 @@ async def _process_single_file(
|
||||||
connector_doc = doc_map.get(document.unique_identifier_hash)
|
connector_doc = doc_map.get(document.unique_identifier_hash)
|
||||||
if not connector_doc:
|
if not connector_doc:
|
||||||
continue
|
continue
|
||||||
user_llm = await get_user_long_context_llm(
|
await pipeline.index(document, connector_doc)
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
await pipeline.index(document, connector_doc, user_llm)
|
|
||||||
|
|
||||||
await page_limit_service.update_page_usage(
|
await page_limit_service.update_page_usage(
|
||||||
user_id, estimated_pages, allow_exceed=True
|
user_id, estimated_pages, allow_exceed=True
|
||||||
|
|
@ -636,7 +623,6 @@ async def _download_and_index(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
|
|
@ -650,7 +636,6 @@ async def _download_and_index(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -659,13 +644,8 @@ async def _download_and_index(
|
||||||
batch_failed = 0
|
batch_failed = 0
|
||||||
if connector_docs:
|
if connector_docs:
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
)
|
)
|
||||||
|
|
@ -681,7 +661,6 @@ async def _index_selected_files(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int, list[str]]:
|
) -> tuple[int, int, int, list[str]]:
|
||||||
|
|
@ -746,7 +725,6 @@ async def _index_selected_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -781,7 +759,6 @@ async def _index_full_scan(
|
||||||
max_files: int,
|
max_files: int,
|
||||||
include_subfolders: bool = False,
|
include_subfolders: bool = False,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int]:
|
) -> tuple[int, int, int]:
|
||||||
"""Full scan indexing of a folder.
|
"""Full scan indexing of a folder.
|
||||||
|
|
@ -911,7 +888,6 @@ async def _index_full_scan(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -946,7 +922,6 @@ async def _index_with_delta_sync(
|
||||||
max_files: int,
|
max_files: int,
|
||||||
include_subfolders: bool = False,
|
include_subfolders: bool = False,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int]:
|
) -> tuple[int, int, int]:
|
||||||
"""Delta sync using change tracking.
|
"""Delta sync using change tracking.
|
||||||
|
|
@ -1054,7 +1029,6 @@ async def _index_with_delta_sync(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -1142,7 +1116,6 @@ async def index_google_drive_files(
|
||||||
)
|
)
|
||||||
return 0, 0, client_error, 0
|
return 0, 0, client_error, 0
|
||||||
|
|
||||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
|
||||||
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
||||||
vision_llm = None
|
vision_llm = None
|
||||||
if connector_enable_vision_llm:
|
if connector_enable_vision_llm:
|
||||||
|
|
@ -1189,7 +1162,6 @@ async def index_google_drive_files(
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
on_heartbeat_callback,
|
on_heartbeat_callback,
|
||||||
connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
documents_unsupported += du
|
documents_unsupported += du
|
||||||
|
|
@ -1208,7 +1180,6 @@ async def index_google_drive_files(
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
on_heartbeat_callback,
|
on_heartbeat_callback,
|
||||||
connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
documents_indexed += ri
|
documents_indexed += ri
|
||||||
|
|
@ -1234,7 +1205,6 @@ async def index_google_drive_files(
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
on_heartbeat_callback,
|
on_heartbeat_callback,
|
||||||
connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1346,7 +1316,6 @@ async def index_google_drive_single_file(
|
||||||
)
|
)
|
||||||
return 0, client_error
|
return 0, client_error
|
||||||
|
|
||||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
|
||||||
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
||||||
vision_llm = None
|
vision_llm = None
|
||||||
if connector_enable_vision_llm:
|
if connector_enable_vision_llm:
|
||||||
|
|
@ -1370,7 +1339,6 @@ async def index_google_drive_single_file(
|
||||||
connector_id,
|
connector_id,
|
||||||
search_space_id,
|
search_space_id,
|
||||||
user_id,
|
user_id,
|
||||||
connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
@ -1467,7 +1435,6 @@ async def index_google_drive_selected_files(
|
||||||
)
|
)
|
||||||
return 0, 0, [error_msg]
|
return 0, 0, [error_msg]
|
||||||
|
|
||||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
|
||||||
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
||||||
vision_llm = None
|
vision_llm = None
|
||||||
if connector_enable_vision_llm:
|
if connector_enable_vision_llm:
|
||||||
|
|
@ -1481,7 +1448,6 @@ async def index_google_drive_selected_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.composio_service import ComposioService
|
from app.services.composio_service import ComposioService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
|
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
|
||||||
|
|
||||||
|
|
@ -105,7 +104,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Map a raw Gmail API message dict to a ConnectorDocument."""
|
"""Map a raw Gmail API message dict to a ConnectorDocument."""
|
||||||
message_id = message.get("id", "")
|
message_id = message.get("id", "")
|
||||||
|
|
@ -138,12 +136,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Google Gmail",
|
"connector_type": "Google Gmail",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = (
|
|
||||||
f"Google Gmail Message: {subject}\n\n"
|
|
||||||
f"From: {sender}\nDate: {date_str}\n\n"
|
|
||||||
f"{markdown_content}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=subject,
|
title=subject,
|
||||||
source_markdown=markdown_content,
|
source_markdown=markdown_content,
|
||||||
|
|
@ -152,8 +144,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -454,8 +444,7 @@ async def index_google_gmail_messages(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector.enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate = await check_duplicate_document_by_hash(
|
duplicate = await check_duplicate_document_by_hash(
|
||||||
|
|
@ -483,13 +472,8 @@ async def index_google_gmail_messages(
|
||||||
|
|
||||||
# ── Pipeline: migrate legacy docs + parallel index ─────────────
|
# ── Pipeline: migrate legacy docs + parallel index ─────────────
|
||||||
await pipeline.migrate_legacy_docs(connector_docs)
|
await pipeline.migrate_legacy_docs(connector_docs)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
IndexingPipelineService,
|
IndexingPipelineService,
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
|
|
||||||
from .base import (
|
from .base import (
|
||||||
|
|
@ -41,7 +40,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Map a raw Linear issue dict to a ConnectorDocument."""
|
"""Map a raw Linear issue dict to a ConnectorDocument."""
|
||||||
issue_id = issue.get("id", "")
|
issue_id = issue.get("id", "")
|
||||||
|
|
@ -63,11 +61,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Linear",
|
"connector_type": "Linear",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = (
|
|
||||||
f"Linear Issue {issue_identifier}: {issue_title}\n\n"
|
|
||||||
f"Status: {state}\n\n{issue_content}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=f"{issue_identifier}: {issue_title}",
|
title=f"{issue_identifier}: {issue_title}",
|
||||||
source_markdown=issue_content,
|
source_markdown=issue_content,
|
||||||
|
|
@ -76,8 +69,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -277,8 +268,7 @@ async def index_linear_issues(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector.enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate = await check_duplicate_document_by_hash(
|
duplicate = await check_duplicate_document_by_hash(
|
||||||
|
|
@ -306,13 +296,8 @@ async def index_linear_issues(
|
||||||
|
|
||||||
# ── Pipeline: migrate legacy docs + parallel index ────────────
|
# ── Pipeline: migrate legacy docs + parallel index ────────────
|
||||||
await pipeline.migrate_legacy_docs(connector_docs)
|
await pipeline.migrate_legacy_docs(connector_docs)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,6 @@ from app.db import (
|
||||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
|
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.tasks.celery_tasks import get_celery_session_maker
|
from app.tasks.celery_tasks import get_celery_session_maker
|
||||||
|
|
@ -478,7 +477,6 @@ def _build_connector_doc(
|
||||||
*,
|
*,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Build a ConnectorDocument from a local file's extracted content."""
|
"""Build a ConnectorDocument from a local file's extracted content."""
|
||||||
unique_id = f"{folder_name}:{relative_path}"
|
unique_id = f"{folder_name}:{relative_path}"
|
||||||
|
|
@ -488,7 +486,6 @@ def _build_connector_doc(
|
||||||
"document_type": "Local Folder File",
|
"document_type": "Local Folder File",
|
||||||
"connector_type": "Local Folder",
|
"connector_type": "Local Folder",
|
||||||
}
|
}
|
||||||
fallback_summary = f"File: {title}\n\n{content[:4000]}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=title,
|
title=title,
|
||||||
|
|
@ -498,8 +495,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=None,
|
connector_id=None,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -513,7 +508,6 @@ async def index_local_folder(
|
||||||
exclude_patterns: list[str] | None = None,
|
exclude_patterns: list[str] | None = None,
|
||||||
file_extensions: list[str] | None = None,
|
file_extensions: list[str] | None = None,
|
||||||
root_folder_id: int | None = None,
|
root_folder_id: int | None = None,
|
||||||
enable_summary: bool = False,
|
|
||||||
target_file_paths: list[str] | None = None,
|
target_file_paths: list[str] | None = None,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
) -> tuple[int, int, int | None, str | None]:
|
) -> tuple[int, int, int | None, str | None]:
|
||||||
|
|
@ -574,8 +568,7 @@ async def index_local_folder(
|
||||||
folder_path=folder_path,
|
folder_path=folder_path,
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
target_file_path=target_file_paths[0],
|
target_file_path=target_file_paths[0],
|
||||||
enable_summary=enable_summary,
|
root_folder_id=root_folder_id,
|
||||||
root_folder_id=root_folder_id,
|
|
||||||
task_logger=task_logger,
|
task_logger=task_logger,
|
||||||
log_entry=log_entry,
|
log_entry=log_entry,
|
||||||
)
|
)
|
||||||
|
|
@ -587,8 +580,7 @@ async def index_local_folder(
|
||||||
folder_path=folder_path,
|
folder_path=folder_path,
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
target_file_paths=target_file_paths,
|
target_file_paths=target_file_paths,
|
||||||
enable_summary=enable_summary,
|
root_folder_id=root_folder_id,
|
||||||
root_folder_id=root_folder_id,
|
|
||||||
on_progress_callback=on_heartbeat_callback,
|
on_progress_callback=on_heartbeat_callback,
|
||||||
)
|
)
|
||||||
if err:
|
if err:
|
||||||
|
|
@ -774,8 +766,7 @@ async def index_local_folder(
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
)
|
||||||
)
|
|
||||||
connector_docs.append(doc)
|
connector_docs.append(doc)
|
||||||
file_meta_map[unique_identifier] = {
|
file_meta_map[unique_identifier] = {
|
||||||
"relative_path": relative_path,
|
"relative_path": relative_path,
|
||||||
|
|
@ -845,15 +836,13 @@ async def index_local_folder(
|
||||||
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
|
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
|
||||||
documents = await pipeline.prepare_for_indexing(connector_docs)
|
documents = await pipeline.prepare_for_indexing(connector_docs)
|
||||||
|
|
||||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
|
||||||
|
|
||||||
for document in documents:
|
for document in documents:
|
||||||
connector_doc = doc_map.get(document.unique_identifier_hash)
|
connector_doc = doc_map.get(document.unique_identifier_hash)
|
||||||
if connector_doc is None:
|
if connector_doc is None:
|
||||||
failed_count += 1
|
failed_count += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
result = await pipeline.index(document, connector_doc, llm)
|
result = await pipeline.index(document, connector_doc)
|
||||||
|
|
||||||
if DocumentStatus.is_state(result.status, DocumentStatus.READY):
|
if DocumentStatus.is_state(result.status, DocumentStatus.READY):
|
||||||
indexed_count += 1
|
indexed_count += 1
|
||||||
|
|
@ -960,7 +949,6 @@ async def _index_batch_files(
|
||||||
folder_path: str,
|
folder_path: str,
|
||||||
folder_name: str,
|
folder_name: str,
|
||||||
target_file_paths: list[str],
|
target_file_paths: list[str],
|
||||||
enable_summary: bool,
|
|
||||||
root_folder_id: int | None,
|
root_folder_id: int | None,
|
||||||
on_progress_callback: HeartbeatCallbackType | None = None,
|
on_progress_callback: HeartbeatCallbackType | None = None,
|
||||||
) -> tuple[int, int, str | None]:
|
) -> tuple[int, int, str | None]:
|
||||||
|
|
@ -995,8 +983,7 @@ async def _index_batch_files(
|
||||||
folder_path=folder_path,
|
folder_path=folder_path,
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
target_file_path=file_path,
|
target_file_path=file_path,
|
||||||
enable_summary=enable_summary,
|
root_folder_id=root_folder_id,
|
||||||
root_folder_id=root_folder_id,
|
|
||||||
task_logger=task_logger,
|
task_logger=task_logger,
|
||||||
log_entry=log_entry,
|
log_entry=log_entry,
|
||||||
)
|
)
|
||||||
|
|
@ -1036,7 +1023,6 @@ async def _index_single_file(
|
||||||
folder_path: str,
|
folder_path: str,
|
||||||
folder_name: str,
|
folder_name: str,
|
||||||
target_file_path: str,
|
target_file_path: str,
|
||||||
enable_summary: bool,
|
|
||||||
root_folder_id: int | None,
|
root_folder_id: int | None,
|
||||||
task_logger,
|
task_logger,
|
||||||
log_entry,
|
log_entry,
|
||||||
|
|
@ -1125,8 +1111,7 @@ async def _index_single_file(
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
if root_folder_id:
|
if root_folder_id:
|
||||||
connector_doc.folder_id = await _resolve_folder_for_file(
|
connector_doc.folder_id = await _resolve_folder_for_file(
|
||||||
|
|
@ -1134,7 +1119,6 @@ async def _index_single_file(
|
||||||
)
|
)
|
||||||
|
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
|
||||||
documents = await pipeline.prepare_for_indexing([connector_doc])
|
documents = await pipeline.prepare_for_indexing([connector_doc])
|
||||||
|
|
||||||
if not documents:
|
if not documents:
|
||||||
|
|
@ -1142,7 +1126,7 @@ async def _index_single_file(
|
||||||
|
|
||||||
db_doc = documents[0]
|
db_doc = documents[0]
|
||||||
|
|
||||||
await pipeline.index(db_doc, connector_doc, llm)
|
await pipeline.index(db_doc, connector_doc)
|
||||||
|
|
||||||
await session.refresh(db_doc)
|
await session.refresh(db_doc)
|
||||||
doc_meta = dict(db_doc.document_metadata or {})
|
doc_meta = dict(db_doc.document_metadata or {})
|
||||||
|
|
@ -1275,7 +1259,6 @@ async def index_uploaded_files(
|
||||||
user_id: str,
|
user_id: str,
|
||||||
folder_name: str,
|
folder_name: str,
|
||||||
root_folder_id: int,
|
root_folder_id: int,
|
||||||
enable_summary: bool,
|
|
||||||
file_mappings: list[dict],
|
file_mappings: list[dict],
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
use_vision_llm: bool = False,
|
use_vision_llm: bool = False,
|
||||||
|
|
@ -1318,7 +1301,6 @@ async def index_uploaded_files(
|
||||||
|
|
||||||
page_limit_service = PageLimitService(session)
|
page_limit_service = PageLimitService(session)
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
|
||||||
|
|
||||||
vision_llm_instance = None
|
vision_llm_instance = None
|
||||||
if use_vision_llm:
|
if use_vision_llm:
|
||||||
|
|
@ -1414,8 +1396,7 @@ async def index_uploaded_files(
|
||||||
folder_name=folder_name,
|
folder_name=folder_name,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
connector_doc.folder_id = await _resolve_folder_for_file(
|
connector_doc.folder_id = await _resolve_folder_for_file(
|
||||||
session,
|
session,
|
||||||
|
|
@ -1432,7 +1413,7 @@ async def index_uploaded_files(
|
||||||
|
|
||||||
db_doc = documents[0]
|
db_doc = documents[0]
|
||||||
|
|
||||||
await pipeline.index(db_doc, connector_doc, llm)
|
await pipeline.index(db_doc, connector_doc)
|
||||||
|
|
||||||
await session.refresh(db_doc)
|
await session.refresh(db_doc)
|
||||||
doc_meta = dict(db_doc.document_metadata or {})
|
doc_meta = dict(db_doc.document_metadata or {})
|
||||||
|
|
|
||||||
|
|
@ -16,13 +16,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.luma_connector import LumaConnector
|
from app.connectors.luma_connector import LumaConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -437,38 +435,14 @@ async def index_luma_events(
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Heavy processing (LLM, embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
summary_content = (
|
||||||
document_metadata_for_summary = {
|
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
|
||||||
"event_id": item["event_id"],
|
)
|
||||||
"event_name": item["event_name"],
|
summary_embedding = await asyncio.to_thread(
|
||||||
"event_url": item["event_url"],
|
embed_text, summary_content
|
||||||
"start_at": item["start_at"],
|
)
|
||||||
"end_at": item["end_at"],
|
|
||||||
"timezone": item["timezone"],
|
|
||||||
"location": item["location"] or "No location",
|
|
||||||
"city": item["city"],
|
|
||||||
"hosts": item["host_names"],
|
|
||||||
"document_type": "Luma Event",
|
|
||||||
"connector_type": "Luma",
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
item["event_markdown"], user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = (
|
|
||||||
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
|
|
||||||
)
|
|
||||||
summary_embedding = await asyncio.to_thread(
|
|
||||||
embed_text, summary_content
|
|
||||||
)
|
|
||||||
|
|
||||||
chunks = await create_document_chunks(item["event_markdown"])
|
chunks = await create_document_chunks(item["event_markdown"])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
|
||||||
IndexingPipelineService,
|
IndexingPipelineService,
|
||||||
PlaceholderInfo,
|
PlaceholderInfo,
|
||||||
)
|
)
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.notion_utils import process_blocks
|
from app.utils.notion_utils import process_blocks
|
||||||
|
|
||||||
|
|
@ -43,7 +42,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
"""Map a raw Notion page dict to a ConnectorDocument."""
|
"""Map a raw Notion page dict to a ConnectorDocument."""
|
||||||
page_id = page.get("page_id", "")
|
page_id = page.get("page_id", "")
|
||||||
|
|
@ -57,8 +55,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "Notion",
|
"connector_type": "Notion",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = f"Notion Page: {page_title}\n\n{markdown_content}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=page_title,
|
title=page_title,
|
||||||
source_markdown=markdown_content,
|
source_markdown=markdown_content,
|
||||||
|
|
@ -67,8 +63,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -314,8 +308,7 @@ async def index_notion_pages(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector.enable_summary,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate = await check_duplicate_document_by_hash(
|
duplicate = await check_duplicate_document_by_hash(
|
||||||
|
|
@ -343,13 +336,8 @@ async def index_notion_pages(
|
||||||
|
|
||||||
# ── Pipeline: migrate legacy docs + parallel index ────────────
|
# ── Pipeline: migrate legacy docs + parallel index ────────────
|
||||||
await pipeline.migrate_legacy_docs(connector_docs)
|
await pipeline.migrate_legacy_docs(connector_docs)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector
|
||||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.page_limit_service import PageLimitService
|
from app.services.page_limit_service import PageLimitService
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.tasks.connector_indexers.base import (
|
from app.tasks.connector_indexers.base import (
|
||||||
|
|
@ -133,7 +132,6 @@ def _build_connector_doc(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
) -> ConnectorDocument:
|
) -> ConnectorDocument:
|
||||||
file_id = file.get("id", "")
|
file_id = file.get("id", "")
|
||||||
file_name = file.get("name", "Unknown")
|
file_name = file.get("name", "Unknown")
|
||||||
|
|
@ -145,8 +143,6 @@ def _build_connector_doc(
|
||||||
"connector_type": "OneDrive",
|
"connector_type": "OneDrive",
|
||||||
}
|
}
|
||||||
|
|
||||||
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
|
|
||||||
|
|
||||||
return ConnectorDocument(
|
return ConnectorDocument(
|
||||||
title=file_name,
|
title=file_name,
|
||||||
source_markdown=markdown,
|
source_markdown=markdown,
|
||||||
|
|
@ -155,8 +151,6 @@ def _build_connector_doc(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
created_by_id=user_id,
|
created_by_id=user_id,
|
||||||
should_summarize=enable_summary,
|
|
||||||
fallback_summary=fallback_summary,
|
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -168,7 +162,6 @@ async def _download_files_parallel(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
max_concurrency: int = 3,
|
max_concurrency: int = 3,
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
|
|
@ -198,7 +191,6 @@ async def _download_files_parallel(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
)
|
)
|
||||||
async with hb_lock:
|
async with hb_lock:
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
|
|
@ -230,7 +222,6 @@ async def _download_and_index(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
|
|
@ -241,7 +232,6 @@ async def _download_and_index(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -250,13 +240,8 @@ async def _download_and_index(
|
||||||
batch_failed = 0
|
batch_failed = 0
|
||||||
if connector_docs:
|
if connector_docs:
|
||||||
pipeline = IndexingPipelineService(session)
|
pipeline = IndexingPipelineService(session)
|
||||||
|
|
||||||
async def _get_llm(s):
|
|
||||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
|
||||||
|
|
||||||
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
||||||
connector_docs,
|
connector_docs,
|
||||||
_get_llm,
|
|
||||||
max_concurrency=3,
|
max_concurrency=3,
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
)
|
)
|
||||||
|
|
@ -294,7 +279,6 @@ async def _index_selected_files(
|
||||||
connector_id: int,
|
connector_id: int,
|
||||||
search_space_id: int,
|
search_space_id: int,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
enable_summary: bool,
|
|
||||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int, list[str]]:
|
) -> tuple[int, int, int, list[str]]:
|
||||||
|
|
@ -345,7 +329,6 @@ async def _index_selected_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat,
|
on_heartbeat=on_heartbeat,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -379,7 +362,6 @@ async def _index_full_scan(
|
||||||
max_files: int,
|
max_files: int,
|
||||||
include_subfolders: bool = True,
|
include_subfolders: bool = True,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int]:
|
) -> tuple[int, int, int]:
|
||||||
"""Full scan indexing of a folder.
|
"""Full scan indexing of a folder.
|
||||||
|
|
@ -454,7 +436,6 @@ async def _index_full_scan(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -487,7 +468,6 @@ async def _index_with_delta_sync(
|
||||||
log_entry: object,
|
log_entry: object,
|
||||||
max_files: int,
|
max_files: int,
|
||||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||||
enable_summary: bool = True,
|
|
||||||
vision_llm=None,
|
vision_llm=None,
|
||||||
) -> tuple[int, int, int, str | None]:
|
) -> tuple[int, int, int, str | None]:
|
||||||
"""Delta sync using OneDrive change tracking.
|
"""Delta sync using OneDrive change tracking.
|
||||||
|
|
@ -579,7 +559,6 @@ async def _index_with_delta_sync(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=enable_summary,
|
|
||||||
on_heartbeat=on_heartbeat_callback,
|
on_heartbeat=on_heartbeat_callback,
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
|
|
@ -651,7 +630,6 @@ async def index_onedrive_files(
|
||||||
)
|
)
|
||||||
return 0, 0, error_msg, 0
|
return 0, 0, error_msg, 0
|
||||||
|
|
||||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
|
||||||
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
|
||||||
vision_llm = None
|
vision_llm = None
|
||||||
if connector_enable_vision_llm:
|
if connector_enable_vision_llm:
|
||||||
|
|
@ -681,7 +659,6 @@ async def index_onedrive_files(
|
||||||
connector_id=connector_id,
|
connector_id=connector_id,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
total_indexed += indexed
|
total_indexed += indexed
|
||||||
|
|
@ -711,7 +688,6 @@ async def index_onedrive_files(
|
||||||
task_logger,
|
task_logger,
|
||||||
log_entry,
|
log_entry,
|
||||||
max_files,
|
max_files,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
total_indexed += indexed
|
total_indexed += indexed
|
||||||
|
|
@ -738,7 +714,6 @@ async def index_onedrive_files(
|
||||||
log_entry,
|
log_entry,
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
total_indexed += ri
|
total_indexed += ri
|
||||||
|
|
@ -758,7 +733,6 @@ async def index_onedrive_files(
|
||||||
log_entry,
|
log_entry,
|
||||||
max_files,
|
max_files,
|
||||||
include_subfolders,
|
include_subfolders,
|
||||||
enable_summary=connector_enable_summary,
|
|
||||||
vision_llm=vision_llm,
|
vision_llm=vision_llm,
|
||||||
)
|
)
|
||||||
total_indexed += indexed
|
total_indexed += indexed
|
||||||
|
|
|
||||||
|
|
@ -15,13 +15,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.webcrawler_connector import WebCrawlerConnector
|
from app.connectors.webcrawler_connector import WebCrawlerConnector
|
||||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
|
||||||
from app.services.task_logging_service import TaskLoggingService
|
from app.services.task_logging_service import TaskLoggingService
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
|
||||||
generate_unique_identifier_hash,
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
from app.utils.webcrawler_utils import parse_webcrawler_urls
|
from app.utils.webcrawler_utils import parse_webcrawler_urls
|
||||||
|
|
@ -372,29 +370,10 @@ async def index_crawled_urls(
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Generate summary with LLM
|
# Select deterministic document content
|
||||||
user_llm = await get_user_long_context_llm(
|
|
||||||
session, user_id, search_space_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_llm and connector.enable_summary:
|
summary_content = f"Crawled URL: {title}\n\nURL: {url}\n\n{content}"
|
||||||
document_metadata_for_summary = {
|
summary_embedding = embed_text(summary_content)
|
||||||
"url": url,
|
|
||||||
"title": title,
|
|
||||||
"description": description,
|
|
||||||
"language": language,
|
|
||||||
"document_type": "Crawled URL",
|
|
||||||
"crawler_type": crawler_type,
|
|
||||||
}
|
|
||||||
(
|
|
||||||
summary_content,
|
|
||||||
summary_embedding,
|
|
||||||
) = await generate_document_summary(
|
|
||||||
structured_document, user_llm, document_metadata_for_summary
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_content = f"Crawled URL: {title}\n\nURL: {url}\n\n{content}"
|
|
||||||
summary_embedding = embed_text(summary_content)
|
|
||||||
|
|
||||||
# Process chunks
|
# Process chunks
|
||||||
chunks = await create_document_chunks(content)
|
chunks = await create_document_chunks(content)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue