feat: made agent file sytem optimized

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-03-28 16:39:46 -07:00
parent ee0b59c0fa
commit 2cc2d339e6
67 changed files with 8011 additions and 5591 deletions

View file

@ -9,6 +9,7 @@ Supports loading LLM configurations from:
- NewLLMConfig database table (positive IDs for user-created configs with prompt settings)
"""
import ast
import asyncio
import contextlib
import gc
@ -36,10 +37,6 @@ from app.agents.new_chat.llm_config import (
load_agent_config,
load_llm_config_from_yaml,
)
from app.agents.new_chat.sandbox import (
get_or_create_sandbox,
is_sandbox_enabled,
)
from app.db import (
ChatVisibility,
Document,
@ -212,7 +209,7 @@ class StreamResult:
accumulated_text: str = ""
is_interrupted: bool = False
interrupt_value: dict[str, Any] | None = None
sandbox_files: list[str] = field(default_factory=list)
sandbox_files: list[str] = field(default_factory=list) # unused, kept for compat
async def _stream_agent_events(
@ -281,6 +278,8 @@ async def _stream_agent_events(
if event_type == "on_chat_model_stream":
if active_tool_depth > 0:
continue # Suppress inner-tool LLM tokens from leaking into chat
if "surfsense:internal" in event.get("tags", []):
continue # Suppress middleware-internal LLM tokens (e.g. KB search classification)
chunk = event.get("data", {}).get("chunk")
if chunk and hasattr(chunk, "content"):
content = chunk.content
@ -319,19 +318,114 @@ async def _stream_agent_events(
tool_step_ids[run_id] = tool_step_id
last_active_step_id = tool_step_id
if tool_name == "search_knowledge_base":
query = (
tool_input.get("query", "")
if tool_name == "ls":
ls_path = (
tool_input.get("path", "/")
if isinstance(tool_input, dict)
else str(tool_input)
)
last_active_step_title = "Searching knowledge base"
last_active_step_title = "Listing files"
last_active_step_items = [ls_path]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Listing files",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "read_file":
fp = (
tool_input.get("file_path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_fp = fp if len(fp) <= 80 else "" + fp[-77:]
last_active_step_title = "Reading file"
last_active_step_items = [display_fp]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Reading file",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "write_file":
fp = (
tool_input.get("file_path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_fp = fp if len(fp) <= 80 else "" + fp[-77:]
last_active_step_title = "Writing file"
last_active_step_items = [display_fp]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Writing file",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "edit_file":
fp = (
tool_input.get("file_path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_fp = fp if len(fp) <= 80 else "" + fp[-77:]
last_active_step_title = "Editing file"
last_active_step_items = [display_fp]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Editing file",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "glob":
pat = (
tool_input.get("pattern", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
base_path = (
tool_input.get("path", "/") if isinstance(tool_input, dict) else "/"
)
last_active_step_title = "Searching files"
last_active_step_items = [f"{pat} in {base_path}"]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Searching files",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "grep":
pat = (
tool_input.get("pattern", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
grep_path = (
tool_input.get("path", "") if isinstance(tool_input, dict) else ""
)
display_pat = pat[:60] + ("" if len(pat) > 60 else "")
last_active_step_title = "Searching content"
last_active_step_items = [
f"Query: {query[:100]}{'...' if len(query) > 100 else ''}"
f'"{display_pat}"' + (f" in {grep_path}" if grep_path else "")
]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Searching knowledge base",
title="Searching content",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "save_document":
doc_title = (
tool_input.get("title", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_title = doc_title[:60] + ("" if len(doc_title) > 60 else "")
last_active_step_title = "Saving document"
last_active_step_items = [display_title]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Saving document",
status="in_progress",
items=last_active_step_items,
)
@ -441,10 +535,22 @@ async def _stream_agent_events(
else streaming_service.generate_tool_call_id()
)
yield streaming_service.format_tool_input_start(tool_call_id, tool_name)
# Sanitize tool_input: strip runtime-injected non-serializable
# values (e.g. LangChain ToolRuntime) before sending over SSE.
if isinstance(tool_input, dict):
_safe_input: dict[str, Any] = {}
for _k, _v in tool_input.items():
try:
json.dumps(_v)
_safe_input[_k] = _v
except (TypeError, ValueError, OverflowError):
pass
else:
_safe_input = {"input": tool_input}
yield streaming_service.format_tool_input_available(
tool_call_id,
tool_name,
tool_input if isinstance(tool_input, dict) else {"input": tool_input},
_safe_input,
)
elif event_type == "on_tool_end":
@ -475,16 +581,55 @@ async def _stream_agent_events(
)
completed_step_ids.add(original_step_id)
if tool_name == "search_knowledge_base":
result_info = "Search completed"
if isinstance(tool_output, dict):
result_len = tool_output.get("result_length", 0)
if result_len > 0:
result_info = f"Found relevant information ({result_len} chars)"
completed_items = [*last_active_step_items, result_info]
if tool_name == "read_file":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Searching knowledge base",
title="Reading file",
status="completed",
items=last_active_step_items,
)
elif tool_name == "write_file":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Writing file",
status="completed",
items=last_active_step_items,
)
elif tool_name == "edit_file":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Editing file",
status="completed",
items=last_active_step_items,
)
elif tool_name == "glob":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Searching files",
status="completed",
items=last_active_step_items,
)
elif tool_name == "grep":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Searching content",
status="completed",
items=last_active_step_items,
)
elif tool_name == "save_document":
result_str = (
tool_output.get("result", "")
if isinstance(tool_output, dict)
else str(tool_output)
)
is_error = "Error" in result_str
completed_items = [
*last_active_step_items,
result_str[:80] if is_error else "Saved to knowledge base",
]
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Saving document",
status="completed",
items=completed_items,
)
@ -690,14 +835,23 @@ async def _stream_agent_events(
ls_output = str(tool_output) if tool_output else ""
file_names: list[str] = []
if ls_output:
for line in ls_output.strip().split("\n"):
line = line.strip()
if line:
name = line.rstrip("/").split("/")[-1]
if name and len(name) <= 40:
file_names.append(name)
elif name:
file_names.append(name[:37] + "...")
paths: list[str] = []
try:
parsed = ast.literal_eval(ls_output)
if isinstance(parsed, list):
paths = [str(p) for p in parsed]
except (ValueError, SyntaxError):
paths = [
line.strip()
for line in ls_output.strip().split("\n")
if line.strip()
]
for p in paths:
name = p.rstrip("/").split("/")[-1]
if name and len(name) <= 40:
file_names.append(name)
elif name:
file_names.append(name[:37] + "...")
if file_names:
if len(file_names) <= 5:
completed_items = [f"[{name}]" for name in file_names]
@ -708,7 +862,7 @@ async def _stream_agent_events(
completed_items = ["No files found"]
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Exploring files",
title="Listing files",
status="completed",
items=completed_items,
)
@ -832,14 +986,6 @@ async def _stream_agent_events(
f"Scrape failed: {error_msg}",
"error",
)
elif tool_name == "search_knowledge_base":
yield streaming_service.format_tool_output_available(
tool_call_id,
{"status": "completed", "result_length": len(str(tool_output))},
)
yield streaming_service.format_terminal_info(
"Knowledge base search completed", "success"
)
elif tool_name == "generate_report":
# Stream the full report result so frontend can render the ReportCard
yield streaming_service.format_tool_output_available(
@ -973,6 +1119,19 @@ async def _stream_agent_events(
items=last_active_step_items,
)
elif (
event_type == "on_custom_event" and event.get("name") == "document_created"
):
data = event.get("data", {})
if data.get("id"):
yield streaming_service.format_data(
"documents-updated",
{
"action": "created",
"document": data,
},
)
elif event_type in ("on_chain_end", "on_agent_end"):
if current_text_id is not None:
yield streaming_service.format_text_end(current_text_id)
@ -995,38 +1154,6 @@ async def _stream_agent_events(
yield streaming_service.format_interrupt_request(result.interrupt_value)
def _try_persist_and_delete_sandbox(
thread_id: int,
sandbox_files: list[str],
) -> None:
"""Fire-and-forget: persist sandbox files locally then delete the sandbox."""
from app.agents.new_chat.sandbox import (
is_sandbox_enabled,
persist_and_delete_sandbox,
)
if not is_sandbox_enabled():
return
async def _run() -> None:
try:
await persist_and_delete_sandbox(thread_id, sandbox_files)
except Exception:
logging.getLogger(__name__).warning(
"persist_and_delete_sandbox failed for thread %s",
thread_id,
exc_info=True,
)
try:
loop = asyncio.get_running_loop()
task = loop.create_task(_run())
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
except RuntimeError:
pass
async def stream_new_chat(
user_query: str,
search_space_id: int,
@ -1141,22 +1268,6 @@ async def stream_new_chat(
"[stream_new_chat] Checkpointer ready in %.3fs", time.perf_counter() - _t0
)
sandbox_backend = None
_t0 = time.perf_counter()
if is_sandbox_enabled():
try:
sandbox_backend = await get_or_create_sandbox(chat_id)
except Exception as sandbox_err:
logging.getLogger(__name__).warning(
"Sandbox creation failed, continuing without execute tool: %s",
sandbox_err,
)
_perf_log.info(
"[stream_new_chat] Sandbox provisioning in %.3fs (enabled=%s)",
time.perf_counter() - _t0,
sandbox_backend is not None,
)
visibility = thread_visibility or ChatVisibility.PRIVATE
_t0 = time.perf_counter()
agent = await create_surfsense_deep_agent(
@ -1170,7 +1281,6 @@ async def stream_new_chat(
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
sandbox_backend=sandbox_backend,
disabled_tools=disabled_tools,
)
_perf_log.info(
@ -1531,8 +1641,6 @@ async def stream_new_chat(
"Failed to clear AI responding state for thread %s", chat_id
)
_try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files)
with contextlib.suppress(Exception):
session.expunge_all()
@ -1541,7 +1649,7 @@ async def stream_new_chat(
# Break circular refs held by the agent graph, tools, and LLM
# wrappers so the GC can reclaim them in a single pass.
agent = llm = connector_service = sandbox_backend = None
agent = llm = connector_service = None
input_state = stream_result = None
session = None
@ -1627,22 +1735,6 @@ async def stream_resume_chat(
"[stream_resume] Checkpointer ready in %.3fs", time.perf_counter() - _t0
)
sandbox_backend = None
_t0 = time.perf_counter()
if is_sandbox_enabled():
try:
sandbox_backend = await get_or_create_sandbox(chat_id)
except Exception as sandbox_err:
logging.getLogger(__name__).warning(
"Sandbox creation failed, continuing without execute tool: %s",
sandbox_err,
)
_perf_log.info(
"[stream_resume] Sandbox provisioning in %.3fs (enabled=%s)",
time.perf_counter() - _t0,
sandbox_backend is not None,
)
visibility = thread_visibility or ChatVisibility.PRIVATE
_t0 = time.perf_counter()
@ -1657,7 +1749,6 @@ async def stream_resume_chat(
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
sandbox_backend=sandbox_backend,
)
_perf_log.info(
"[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0
@ -1742,15 +1833,13 @@ async def stream_resume_chat(
"Failed to clear AI responding state for thread %s", chat_id
)
_try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files)
with contextlib.suppress(Exception):
session.expunge_all()
with contextlib.suppress(Exception):
await session.close()
agent = llm = connector_service = sandbox_backend = None
agent = llm = connector_service = None
stream_result = None
session = None

View file

@ -10,7 +10,10 @@ from app.connectors.confluence_history import ConfluenceHistoryConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -194,6 +197,27 @@ async def index_confluence_pages(
await confluence_client.close()
return 0, 0, None
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=page.get("title", ""),
document_type=DocumentType.CONFLUENCE_CONNECTOR,
unique_id=page.get("id", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"page_id": page.get("id", ""),
"connector_id": connector_id,
"connector_type": "Confluence",
},
)
for page in pages
if page.get("id") and page.get("title")
]
await pipeline.create_placeholder_documents(placeholders)
documents_skipped = 0
duplicate_content_count = 0
connector_docs: list[ConnectorDocument] = []
@ -202,7 +226,7 @@ async def index_confluence_pages(
try:
page_id = page.get("id")
page_title = page.get("title", "")
space_id = page.get("spaceId", "")
page.get("spaceId", "")
if not page_id or not page_title:
logger.warning(
@ -265,11 +289,12 @@ async def index_confluence_pages(
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error building ConnectorDocument for page: {e!s}", exc_info=True)
logger.error(
f"Error building ConnectorDocument for page: {e!s}", exc_info=True
)
documents_skipped += 1
continue
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s: AsyncSession):

View file

@ -16,7 +16,10 @@ from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import (
@ -73,9 +76,7 @@ def _build_connector_doc(
"connector_type": "Google Calendar",
}
fallback_summary = (
f"Google Calendar Event: {event_summary}\n\n{event_markdown}"
)
fallback_summary = f"Google Calendar Event: {event_summary}\n\n{event_markdown}"
return ConnectorDocument(
title=event_summary,
@ -344,6 +345,27 @@ async def index_google_calendar_events(
logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True)
return 0, 0, f"Error fetching Google Calendar events: {e!s}"
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=event.get("summary", "No Title"),
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
unique_id=event.get("id", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"event_id": event.get("id", ""),
"connector_id": connector_id,
"connector_type": "Google Calendar",
},
)
for event in events
if event.get("id")
]
await pipeline.create_placeholder_documents(placeholders)
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
@ -391,13 +413,13 @@ async def index_google_calendar_events(
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error building ConnectorDocument for event: {e!s}", exc_info=True)
logger.error(
f"Error building ConnectorDocument for event: {e!s}", exc_info=True
)
documents_skipped += 1
continue
# ── Pipeline: migrate legacy docs + parallel index ─────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):

View file

@ -29,7 +29,10 @@ from app.connectors.google_drive.file_types import should_skip_file as skip_mime
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
@ -57,6 +60,7 @@ logger = logging.getLogger(__name__)
# Helpers
# ---------------------------------------------------------------------------
async def _should_skip_file(
session: AsyncSession,
file: dict,
@ -97,11 +101,14 @@ async def _should_skip_file(
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type.in_([
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
cast(Document.document_metadata["google_drive_file_id"], String) == file_id,
Document.document_type.in_(
[
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
),
cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
)
)
existing = result.scalar_one_or_none()
@ -191,6 +198,50 @@ def _build_connector_doc(
)
async def _create_drive_placeholders(
session: AsyncSession,
files: list[dict],
*,
connector_id: int,
search_space_id: int,
user_id: str,
) -> None:
"""Create placeholder document rows for discovered Drive files.
Called immediately after file discovery (Phase 1) so documents appear
in the UI via Zero sync before the slow download/ETL phase begins.
"""
if not files:
return
placeholders = []
for file in files:
file_id = file.get("id")
file_name = file.get("name", "Unknown")
if not file_id:
continue
placeholders.append(
PlaceholderInfo(
title=file_name,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
unique_id=file_id,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"google_drive_file_id": file_id,
"FILE_NAME": file_name,
"connector_id": connector_id,
"connector_type": "Google Drive",
},
)
)
if placeholders:
pipeline = IndexingPipelineService(session)
await pipeline.create_placeholder_documents(placeholders)
async def _download_files_parallel(
drive_client: GoogleDriveClient,
files: list[dict],
@ -246,9 +297,7 @@ async def _download_files_parallel(
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception):
failed += 1
elif outcome is None:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
results.append(outcome)
@ -300,14 +349,18 @@ async def _process_single_file(
if not documents:
return 0, 1, 0
from app.indexing_pipeline.document_hashing import compute_unique_identifier_hash
from app.indexing_pipeline.document_hashing import (
compute_unique_identifier_hash,
)
doc_map = {compute_unique_identifier_hash(doc): doc}
for document in documents:
connector_doc = doc_map.get(document.unique_identifier_hash)
if not connector_doc:
continue
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
await pipeline.index(document, connector_doc, user_llm)
logger.info(f"Successfully indexed Google Drive file: {file_name}")
@ -335,11 +388,14 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type.in_([
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
cast(Document.document_metadata["google_drive_file_id"], String) == file_id,
Document.document_type.in_(
[
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
),
cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
)
)
existing = result.scalar_one_or_none()
@ -383,7 +439,9 @@ async def _download_and_index(
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs, _get_llm, max_concurrency=3,
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
@ -430,10 +488,22 @@ async def _index_selected_files(
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
drive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
await _create_drive_placeholders(
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
batch_indexed, _failed = await _download_and_index(
drive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
)
@ -444,6 +514,7 @@ async def _index_selected_files(
# Scan strategies
# ---------------------------------------------------------------------------
async def _index_full_scan(
drive_client: GoogleDriveClient,
session: AsyncSession,
@ -464,7 +535,11 @@ async def _index_full_scan(
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
{"stage": "full_scan", "folder_id": folder_id, "include_subfolders": include_subfolders},
{
"stage": "full_scan",
"folder_id": folder_id,
"include_subfolders": include_subfolders,
},
)
# ------------------------------------------------------------------
@ -483,7 +558,10 @@ async def _index_full_scan(
while files_processed < max_files:
files, next_token, error = await get_files_in_folder(
drive_client, cur_id, include_subfolders=True, page_token=page_token,
drive_client,
cur_id,
include_subfolders=True,
page_token=page_token,
)
if error:
logger.error(f"Error listing files in {cur_name}: {error}")
@ -500,7 +578,9 @@ async def _index_full_scan(
mime = file.get("mimeType", "")
if mime == "application/vnd.google-apps.folder":
if include_subfolders:
folders_to_process.append((file["id"], file.get("name", "Unknown")))
folders_to_process.append(
(file["id"], file.get("name", "Unknown"))
)
continue
files_processed += 1
@ -521,24 +601,45 @@ async def _index_full_scan(
if not files_processed and first_error:
err_lower = first_error.lower()
if "401" in first_error or "invalid credentials" in err_lower or "authError" in first_error:
if (
"401" in first_error
or "invalid credentials" in err_lower
or "authError" in first_error
):
raise Exception(
f"Google Drive authentication failed. Please re-authenticate. (Error: {first_error})"
)
raise Exception(f"Failed to list Google Drive files: {first_error}")
# ------------------------------------------------------------------
# Phase 1.5: create placeholders for instant UI feedback
# ------------------------------------------------------------------
await _create_drive_placeholders(
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
# ------------------------------------------------------------------
# Phase 2+3 (parallel): download, ETL, index
# ------------------------------------------------------------------
batch_indexed, failed = await _download_and_index(
drive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
drive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed")
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped
@ -565,7 +666,9 @@ async def _index_with_delta_sync(
{"stage": "delta_sync", "start_token": start_page_token},
)
changes, _final_token, error = await fetch_all_changes(drive_client, start_page_token, folder_id)
changes, _final_token, error = await fetch_all_changes(
drive_client, start_page_token, folder_id
)
if error:
err_lower = error.lower()
if "401" in error or "invalid credentials" in err_lower or "authError" in error:
@ -614,18 +717,35 @@ async def _index_with_delta_sync(
files_to_download.append(file)
# ------------------------------------------------------------------
# Phase 1.5: create placeholders for instant UI feedback
# ------------------------------------------------------------------
await _create_drive_placeholders(
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
# ------------------------------------------------------------------
# Phase 2+3 (parallel): download, ETL, index
# ------------------------------------------------------------------
batch_indexed, failed = await _download_and_index(
drive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
drive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed")
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped
@ -633,6 +753,7 @@ async def _index_with_delta_sync(
# Public entry points
# ---------------------------------------------------------------------------
async def index_google_drive_files(
session: AsyncSession,
connector_id: int,
@ -653,8 +774,11 @@ async def index_google_drive_files(
source="connector_indexing_task",
message=f"Starting Google Drive indexing for connector {connector_id}",
metadata={
"connector_id": connector_id, "user_id": str(user_id),
"folder_id": folder_id, "use_delta_sync": use_delta_sync, "max_files": max_files,
"connector_id": connector_id,
"user_id": str(user_id),
"folder_id": folder_id,
"use_delta_sync": use_delta_sync,
"max_files": max_files,
},
)
@ -666,11 +790,14 @@ async def index_google_drive_files(
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"})
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
await task_logger.log_task_progress(
log_entry, f"Initializing Google Drive client for connector {connector_id}",
log_entry,
f"Initializing Google Drive client for connector {connector_id}",
{"stage": "client_initialization"},
)
@ -679,24 +806,39 @@ async def index_google_drive_files(
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(log_entry, error_msg, "Missing Composio account", {"error_type": "MissingComposioAccount"})
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry, "SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY", {"error_type": "MissingSecretKey"},
log_entry,
"SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return (
0,
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
return 0, 0, "SECRET_KEY not configured but credentials are marked as encrypted"
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(session, connector_id, credentials=pre_built_credentials)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
if not folder_id:
error_msg = "folder_id is required for Google Drive indexing"
await task_logger.log_task_failure(log_entry, error_msg, {"error_type": "MissingParameter"})
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingParameter"}
)
return 0, 0, error_msg
target_folder_id = folder_id
@ -704,29 +846,64 @@ async def index_google_drive_files(
folder_tokens = connector.config.get("folder_tokens", {})
start_page_token = folder_tokens.get(target_folder_id)
can_use_delta = use_delta_sync and start_page_token and connector.last_indexed_at
can_use_delta = (
use_delta_sync and start_page_token and connector.last_indexed_at
)
if can_use_delta:
logger.info(f"Using delta sync for connector {connector_id}")
documents_indexed, documents_skipped = await _index_with_delta_sync(
drive_client, session, connector, connector_id, search_space_id, user_id,
target_folder_id, start_page_token, task_logger, log_entry, max_files,
include_subfolders, on_heartbeat_callback, connector_enable_summary,
drive_client,
session,
connector,
connector_id,
search_space_id,
user_id,
target_folder_id,
start_page_token,
task_logger,
log_entry,
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
)
logger.info("Running reconciliation scan after delta sync")
ri, rs = await _index_full_scan(
drive_client, session, connector, connector_id, search_space_id, user_id,
target_folder_id, target_folder_name, task_logger, log_entry, max_files,
include_subfolders, on_heartbeat_callback, connector_enable_summary,
drive_client,
session,
connector,
connector_id,
search_space_id,
user_id,
target_folder_id,
target_folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
)
documents_indexed += ri
documents_skipped += rs
else:
logger.info(f"Using full scan for connector {connector_id}")
documents_indexed, documents_skipped = await _index_full_scan(
drive_client, session, connector, connector_id, search_space_id, user_id,
target_folder_id, target_folder_name, task_logger, log_entry, max_files,
include_subfolders, on_heartbeat_callback, connector_enable_summary,
drive_client,
session,
connector,
connector_id,
search_space_id,
user_id,
target_folder_id,
target_folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
)
if documents_indexed > 0 or can_use_delta:
@ -745,26 +922,34 @@ async def index_google_drive_files(
log_entry,
f"Successfully completed Google Drive indexing for connector {connector_id}",
{
"files_processed": documents_indexed, "files_skipped": documents_skipped,
"sync_type": "delta" if can_use_delta else "full", "folder": target_folder_name,
"files_processed": documents_indexed,
"files_skipped": documents_skipped,
"sync_type": "delta" if can_use_delta else "full",
"folder": target_folder_name,
},
)
logger.info(f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped")
logger.info(
f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped"
)
return documents_indexed, documents_skipped, None
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"Database error during Google Drive indexing for connector {connector_id}",
str(db_error), {"error_type": "SQLAlchemyError"},
log_entry,
f"Database error during Google Drive indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"Failed to index Google Drive files for connector {connector_id}",
str(e), {"error_type": type(e).__name__},
log_entry,
f"Failed to index Google Drive files for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Google Drive files: {e!s}"
@ -784,7 +969,12 @@ async def index_google_drive_single_file(
task_name="google_drive_single_file_indexing",
source="connector_indexing_task",
message=f"Starting Google Drive single file indexing for file {file_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id), "file_id": file_id, "file_name": file_name},
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"file_id": file_id,
"file_name": file_name,
},
)
try:
@ -795,7 +985,9 @@ async def index_google_drive_single_file(
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"})
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
pre_built_credentials = None
@ -803,43 +995,65 @@ async def index_google_drive_single_file(
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(log_entry, error_msg, "Missing Composio account", {"error_type": "MissingComposioAccount"})
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry, "SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY", {"error_type": "MissingSecretKey"},
log_entry,
"SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
return 0, "SECRET_KEY not configured but credentials are marked as encrypted"
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(session, connector_id, credentials=pre_built_credentials)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
file, error = await get_file_by_id(drive_client, file_id)
if error or not file:
error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}"
await task_logger.log_task_failure(log_entry, error_msg, {"error_type": "FileNotFound"})
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "FileNotFound"}
)
return 0, error_msg
display_name = file_name or file.get("name", "Unknown")
indexed, _skipped, failed = await _process_single_file(
drive_client, session, file,
connector_id, search_space_id, user_id, connector_enable_summary,
drive_client,
session,
file,
connector_id,
search_space_id,
user_id,
connector_enable_summary,
)
await session.commit()
if failed > 0:
error_msg = f"Failed to index file {display_name}"
await task_logger.log_task_failure(log_entry, error_msg, {"file_name": display_name, "file_id": file_id})
await task_logger.log_task_failure(
log_entry, error_msg, {"file_name": display_name, "file_id": file_id}
)
return 0, error_msg
if indexed > 0:
await task_logger.log_task_success(
log_entry, f"Successfully indexed file {display_name}",
log_entry,
f"Successfully indexed file {display_name}",
{"file_name": display_name, "file_id": file_id},
)
return 1, None
@ -848,12 +1062,22 @@ async def index_google_drive_single_file(
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(log_entry, "Database error during file indexing", str(db_error), {"error_type": "SQLAlchemyError"})
await task_logger.log_task_failure(
log_entry,
"Database error during file indexing",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(log_entry, "Failed to index Google Drive file", str(e), {"error_type": type(e).__name__})
await task_logger.log_task_failure(
log_entry,
"Failed to index Google Drive file",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive file: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive file: {e!s}"
@ -878,7 +1102,11 @@ async def index_google_drive_selected_files(
task_name="google_drive_selected_files_indexing",
source="connector_indexing_task",
message=f"Starting Google Drive batch file indexing for {len(files)} files",
metadata={"connector_id": connector_id, "user_id": str(user_id), "file_count": len(files)},
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"file_count": len(files),
},
)
try:
@ -889,7 +1117,9 @@ async def index_google_drive_selected_files(
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"})
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, [error_msg]
pre_built_credentials = None
@ -897,25 +1127,41 @@ async def index_google_drive_selected_files(
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(log_entry, error_msg, "Missing Composio account", {"error_type": "MissingComposioAccount"})
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, [error_msg]
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
error_msg = "SECRET_KEY not configured but credentials are marked as encrypted"
error_msg = (
"SECRET_KEY not configured but credentials are marked as encrypted"
)
await task_logger.log_task_failure(
log_entry, error_msg, "Missing SECRET_KEY", {"error_type": "MissingSecretKey"},
log_entry,
error_msg,
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, [error_msg]
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(session, connector_id, credentials=pre_built_credentials)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
indexed, skipped, errors = await _index_selected_files(
drive_client, session, files,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=connector_enable_summary,
drive_client,
session,
files,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
on_heartbeat=on_heartbeat_callback,
)
@ -935,18 +1181,24 @@ async def index_google_drive_selected_files(
{"indexed": indexed, "skipped": skipped},
)
logger.info(f"Selected files indexing: {indexed} indexed, {skipped} skipped, {len(errors)} errors")
logger.info(
f"Selected files indexing: {indexed} indexed, {skipped} skipped, {len(errors)} errors"
)
return indexed, skipped, errors
except SQLAlchemyError as db_error:
await session.rollback()
error_msg = f"Database error: {db_error!s}"
await task_logger.log_task_failure(log_entry, error_msg, str(db_error), {"error_type": "SQLAlchemyError"})
await task_logger.log_task_failure(
log_entry, error_msg, str(db_error), {"error_type": "SQLAlchemyError"}
)
logger.error(error_msg, exc_info=True)
return 0, 0, [error_msg]
except Exception as e:
await session.rollback()
error_msg = f"Failed to index Google Drive files: {e!s}"
await task_logger.log_task_failure(log_entry, error_msg, str(e), {"error_type": type(e).__name__})
await task_logger.log_task_failure(
log_entry, error_msg, str(e), {"error_type": type(e).__name__}
)
logger.error(error_msg, exc_info=True)
return 0, 0, [error_msg]

View file

@ -16,7 +16,10 @@ from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import (
@ -282,6 +285,34 @@ async def index_google_gmail_messages(
logger.info(f"Found {len(messages)} Google gmail messages to index")
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
def _gmail_subject(msg: dict) -> str:
for h in msg.get("payload", {}).get("headers", []):
if h.get("name", "").lower() == "subject":
return h.get("value", "No Subject")
return "No Subject"
placeholders = [
PlaceholderInfo(
title=_gmail_subject(msg),
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
unique_id=msg.get("id", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"message_id": msg.get("id", ""),
"connector_id": connector_id,
"connector_type": "Google Gmail",
},
)
for msg in messages
if msg.get("id")
]
await pipeline.create_placeholder_documents(placeholders)
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
@ -327,13 +358,14 @@ async def index_google_gmail_messages(
connector_docs.append(doc)
except Exception as e:
logger.error(f"Error building ConnectorDocument for message: {e!s}", exc_info=True)
logger.error(
f"Error building ConnectorDocument for message: {e!s}",
exc_info=True,
)
documents_skipped += 1
continue
# ── Pipeline: migrate legacy docs + parallel index ─────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):

View file

@ -10,7 +10,10 @@ from app.connectors.jira_history import JiraHistoryConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -191,6 +194,27 @@ async def index_jira_issues(
await jira_client.close()
return 0, 0, None
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=f"{issue.get('key', '')}: {issue.get('id', '')}",
document_type=DocumentType.JIRA_CONNECTOR,
unique_id=issue.get("key", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"issue_id": issue.get("key", ""),
"connector_id": connector_id,
"connector_type": "Jira",
},
)
for issue in issues
if issue.get("key") and issue.get("id")
]
await pipeline.create_placeholder_documents(placeholders)
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
duplicate_content_count = 0
@ -253,7 +277,6 @@ async def index_jira_issues(
documents_skipped += 1
continue
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s: AsyncSession):

View file

@ -14,7 +14,10 @@ from app.connectors.linear_connector import LinearConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -199,9 +202,7 @@ async def index_linear_issues(
logger.info(f"Retrieved {len(issues)} issues from Linear API")
except Exception as e:
logger.error(
f"Exception when calling Linear API: {e!s}", exc_info=True
)
logger.error(f"Exception when calling Linear API: {e!s}", exc_info=True)
return 0, 0, f"Failed to get Linear issues: {e!s}"
if not issues:
@ -213,6 +214,28 @@ async def index_linear_issues(
await session.commit()
return 0, 0, None
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=f"{issue.get('identifier', '')}: {issue.get('title', '')}",
document_type=DocumentType.LINEAR_CONNECTOR,
unique_id=issue.get("id", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"issue_id": issue.get("id", ""),
"issue_identifier": issue.get("identifier", ""),
"connector_id": connector_id,
"connector_type": "Linear",
},
)
for issue in issues
if issue.get("id") and issue.get("title")
]
await pipeline.create_placeholder_documents(placeholders)
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
@ -238,9 +261,7 @@ async def index_linear_issues(
continue
formatted_issue = linear_client.format_issue(issue)
issue_content = linear_client.format_issue_to_markdown(
formatted_issue
)
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
if not issue_content:
logger.warning(
@ -284,8 +305,6 @@ async def index_linear_issues(
continue
# ── Pipeline: migrate legacy docs + parallel index ────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
@ -302,9 +321,7 @@ async def index_linear_issues(
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)
logger.info(
f"Final commit: Total {documents_indexed} Linear issues processed"
)
logger.info(f"Final commit: Total {documents_indexed} Linear issues processed")
try:
await session.commit()
logger.info(

View file

@ -15,7 +15,10 @@ from app.connectors.notion_history import NotionHistoryConnector
from app.db import DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.notion_utils import process_blocks
@ -245,13 +248,32 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
await update_connector_last_indexed(
session, connector, update_last_indexed
)
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await notion_client.close()
return 0, 0, None
# ── Create placeholders for instant UI feedback ───────────────
pipeline = IndexingPipelineService(session)
placeholders = [
PlaceholderInfo(
title=page.get("title", f"Untitled page ({page.get('page_id', '')})"),
document_type=DocumentType.NOTION_CONNECTOR,
unique_id=page.get("page_id", ""),
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
metadata={
"page_id": page.get("page_id", ""),
"connector_id": connector_id,
"connector_type": "Notion",
},
)
for page in pages
if page.get("page_id")
]
await pipeline.create_placeholder_documents(placeholders)
# ── Build ConnectorDocuments ──────────────────────────────────
connector_docs: list[ConnectorDocument] = []
documents_skipped = 0
@ -282,9 +304,7 @@ async def index_notion_pages(
markdown_content += process_blocks(page_content)
if not markdown_content.strip():
logger.warning(
f"Skipping page with empty markdown: {page_title}"
)
logger.warning(f"Skipping page with empty markdown: {page_title}")
documents_skipped += 1
continue
@ -322,8 +342,6 @@ async def index_notion_pages(
continue
# ── Pipeline: migrate legacy docs + parallel index ────────────
pipeline = IndexingPipelineService(session)
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):