diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py index 781b1afe5..2b5d37903 100644 --- a/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py +++ b/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py @@ -226,12 +226,36 @@ def create_create_linear_issue_tool( logger.info( f"Linear issue created: {result.get('identifier')} - {result.get('title')}" ) + + kb_message_suffix = "" + try: + from app.services.linear import LinearKBSyncService + + kb_service = LinearKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + issue_id=result.get("id"), + issue_identifier=result.get("identifier", ""), + issue_title=result.get("title", final_title), + issue_url=result.get("url"), + description=final_description, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + return { "status": "success", "issue_id": result.get("id"), "identifier": result.get("identifier"), "url": result.get("url"), - "message": result.get("message"), + "message": (result.get("message", "") + kb_message_suffix), } except Exception as e: diff --git a/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py b/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py index 6cb720173..13a930b03 100644 --- a/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py +++ b/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py @@ -228,6 +228,32 @@ def create_create_notion_page_tool( logger.info( f"create_page result: {result.get('status')} - {result.get('message', '')}" ) + + if result.get("status") == "success": + kb_message_suffix = "" + try: + from app.services.notion import NotionKBSyncService + + kb_service = NotionKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + page_id=result.get("page_id"), + page_title=result.get("title", final_title), + page_url=result.get("url"), + content=final_content, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." + + result["message"] = result.get("message", "") + kb_message_suffix + return result except Exception as e: diff --git a/surfsense_backend/app/services/linear/kb_sync_service.py b/surfsense_backend/app/services/linear/kb_sync_service.py index 4f97a3dd0..b63f5328e 100644 --- a/surfsense_backend/app/services/linear/kb_sync_service.py +++ b/surfsense_backend/app/services/linear/kb_sync_service.py @@ -4,29 +4,171 @@ from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.linear_connector import LinearConnector -from app.db import Document +from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( create_document_chunks, embed_text, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) logger = logging.getLogger(__name__) class LinearKBSyncService: - """Re-indexes a single Linear issue document after a successful update. + """Syncs Linear issue documents to the knowledge base after HITL actions. - Mirrors the indexer's Phase-2 logic exactly: fetch fresh issue content, - run generate_document_summary, create_document_chunks, then update the - document row in the knowledge base. + Provides sync_after_create (new issue) and sync_after_update (existing issue). + Both mirror the indexer's Phase-2 logic: generate summary, create chunks, + then persist the document row. """ def __init__(self, db_session: AsyncSession): self.db_session = db_session + async def sync_after_create( + self, + issue_id: str, + issue_identifier: str, + issue_title: str, + issue_url: str | None, + description: str | None, + connector_id: int, + search_space_id: int, + user_id: str, + ) -> dict: + from app.tasks.connector_indexers.base import ( + check_document_by_unique_identifier, + check_duplicate_document_by_hash, + get_current_timestamp, + safe_set_chunks, + ) + + try: + unique_hash = generate_unique_identifier_hash( + DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id + ) + + existing = await check_document_by_unique_identifier( + self.db_session, unique_hash + ) + if existing: + logger.info( + "Document for Linear issue %s already exists (doc_id=%s), skipping", + issue_identifier, + existing.id, + ) + return {"status": "success"} + + indexable_content = (description or "").strip() + if not indexable_content: + indexable_content = f"Linear Issue {issue_identifier}: {issue_title}" + + issue_content = f"# {issue_identifier}: {issue_title}\n\n{indexable_content}" + + content_hash = generate_content_hash(issue_content, search_space_id) + + with self.db_session.no_autoflush: + dup = await check_duplicate_document_by_hash( + self.db_session, content_hash + ) + if dup: + logger.info( + "Content-hash collision for Linear issue %s — identical content " + "exists in doc %s. Using unique_identifier_hash as content_hash.", + issue_identifier, + dup.id, + ) + content_hash = unique_hash + + user_llm = await get_user_long_context_llm( + self.db_session, + user_id, + search_space_id, + disable_streaming=True, + ) + + doc_metadata_for_summary = { + "issue_id": issue_identifier, + "issue_title": issue_title, + "document_type": "Linear Issue", + "connector_type": "Linear", + } + + if user_llm: + summary_content, summary_embedding = await generate_document_summary( + issue_content, user_llm, doc_metadata_for_summary + ) + else: + logger.warning("No LLM configured — using fallback summary") + summary_content = ( + f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}" + ) + summary_embedding = embed_text(summary_content) + + chunks = await create_document_chunks(issue_content) + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + document = Document( + title=f"{issue_identifier}: {issue_title}", + document_type=DocumentType.LINEAR_CONNECTOR, + document_metadata={ + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "issue_url": issue_url, + "source_connector": "linear", + "indexed_at": now_str, + "connector_id": connector_id, + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_hash, + embedding=summary_embedding, + search_space_id=search_space_id, + connector_id=connector_id, + updated_at=get_current_timestamp(), + ) + + self.db_session.add(document) + await self.db_session.flush() + await safe_set_chunks(self.db_session, document, chunks) + await self.db_session.commit() + + logger.info( + "KB sync after create succeeded: doc_id=%s, issue=%s, chunks=%d", + document.id, + issue_identifier, + len(chunks), + ) + return {"status": "success"} + + except Exception as e: + error_str = str(e).lower() + if ( + "duplicate key value violates unique constraint" in error_str + or "uniqueviolationerror" in error_str + ): + logger.warning( + "Duplicate constraint hit during KB sync for issue %s. " + "Rolling back — periodic indexer will handle it. Error: %s", + issue_identifier, + e, + ) + await self.db_session.rollback() + return {"status": "error", "message": "Duplicate document detected"} + + logger.error( + "KB sync after create failed for issue %s: %s", + issue_identifier, + e, + exc_info=True, + ) + await self.db_session.rollback() + return {"status": "error", "message": str(e)} + async def sync_after_update( self, document_id: int, diff --git a/surfsense_backend/app/services/notion/kb_sync_service.py b/surfsense_backend/app/services/notion/kb_sync_service.py index d6c64897f..0cfe58fd5 100644 --- a/surfsense_backend/app/services/notion/kb_sync_service.py +++ b/surfsense_backend/app/services/notion/kb_sync_service.py @@ -3,13 +3,14 @@ from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Document +from app.db import Document, DocumentType from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( create_document_chunks, embed_text, generate_content_hash, generate_document_summary, + generate_unique_identifier_hash, ) logger = logging.getLogger(__name__) @@ -19,6 +20,143 @@ class NotionKBSyncService: def __init__(self, db_session: AsyncSession): self.db_session = db_session + async def sync_after_create( + self, + page_id: str, + page_title: str, + page_url: str | None, + content: str | None, + connector_id: int, + search_space_id: int, + user_id: str, + ) -> dict: + from app.tasks.connector_indexers.base import ( + check_document_by_unique_identifier, + check_duplicate_document_by_hash, + get_current_timestamp, + safe_set_chunks, + ) + + try: + unique_hash = generate_unique_identifier_hash( + DocumentType.NOTION_CONNECTOR, page_id, search_space_id + ) + + existing = await check_document_by_unique_identifier( + self.db_session, unique_hash + ) + if existing: + logger.info( + "Document for Notion page %s already exists (doc_id=%s), skipping", + page_id, + existing.id, + ) + return {"status": "success"} + + indexable_content = (content or "").strip() + if not indexable_content: + indexable_content = f"Notion Page: {page_title}" + + markdown_content = f"# Notion Page: {page_title}\n\n{indexable_content}" + + content_hash = generate_content_hash(markdown_content, search_space_id) + + with self.db_session.no_autoflush: + dup = await check_duplicate_document_by_hash( + self.db_session, content_hash + ) + if dup: + logger.info( + "Content-hash collision for Notion page %s — identical content " + "exists in doc %s. Using unique_identifier_hash as content_hash.", + page_id, + dup.id, + ) + content_hash = unique_hash + + user_llm = await get_user_long_context_llm( + self.db_session, + user_id, + search_space_id, + disable_streaming=True, + ) + + doc_metadata_for_summary = { + "page_title": page_title, + "page_id": page_id, + "document_type": "Notion Page", + "connector_type": "Notion", + } + + if user_llm: + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, doc_metadata_for_summary + ) + else: + logger.warning("No LLM configured — using fallback summary") + summary_content = f"Notion Page: {page_title}\n\n{markdown_content}" + summary_embedding = embed_text(summary_content) + + chunks = await create_document_chunks(markdown_content) + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + document = Document( + title=page_title, + document_type=DocumentType.NOTION_CONNECTOR, + document_metadata={ + "page_title": page_title, + "page_id": page_id, + "page_url": page_url, + "source_connector": "notion", + "indexed_at": now_str, + "connector_id": connector_id, + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_hash, + embedding=summary_embedding, + search_space_id=search_space_id, + connector_id=connector_id, + updated_at=get_current_timestamp(), + ) + + self.db_session.add(document) + await self.db_session.flush() + await safe_set_chunks(self.db_session, document, chunks) + await self.db_session.commit() + + logger.info( + "KB sync after create succeeded: doc_id=%s, page=%s, chunks=%d", + document.id, + page_title, + len(chunks), + ) + return {"status": "success"} + + except Exception as e: + error_str = str(e).lower() + if ( + "duplicate key value violates unique constraint" in error_str + or "uniqueviolationerror" in error_str + ): + logger.warning( + "Duplicate constraint hit during KB sync for page %s. " + "Rolling back — periodic indexer will handle it. Error: %s", + page_id, + e, + ) + await self.db_session.rollback() + return {"status": "error", "message": "Duplicate document detected"} + + logger.error( + "KB sync after create failed for page %s: %s", + page_id, + e, + exc_info=True, + ) + await self.db_session.rollback() + return {"status": "error", "message": str(e)} + async def sync_after_update( self, document_id: int,