mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-02 19:55:18 +02:00
feat: implement knowledge base synchronization for Linear and Notion issue creation
- Added `sync_after_create` methods in `LinearKBSyncService` and `NotionKBSyncService` to handle synchronization of newly created issues and pages with the knowledge base. - Enhanced the `create_issue.py` and `create_page.py` tools to provide user feedback on the success of the knowledge base update, indicating whether the content has been synced or will be added in the next scheduled sync. - Improved error handling during synchronization to log failures and manage duplicate document scenarios effectively.
This commit is contained in:
parent
23c23c7528
commit
8850fac722
4 changed files with 337 additions and 7 deletions
|
|
@ -226,12 +226,36 @@ def create_create_linear_issue_tool(
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Linear issue created: {result.get('identifier')} - {result.get('title')}"
|
f"Linear issue created: {result.get('identifier')} - {result.get('title')}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
kb_message_suffix = ""
|
||||||
|
try:
|
||||||
|
from app.services.linear import LinearKBSyncService
|
||||||
|
|
||||||
|
kb_service = LinearKBSyncService(db_session)
|
||||||
|
kb_result = await kb_service.sync_after_create(
|
||||||
|
issue_id=result.get("id"),
|
||||||
|
issue_identifier=result.get("identifier", ""),
|
||||||
|
issue_title=result.get("title", final_title),
|
||||||
|
issue_url=result.get("url"),
|
||||||
|
description=final_description,
|
||||||
|
connector_id=actual_connector_id,
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
user_id=user_id,
|
||||||
|
)
|
||||||
|
if kb_result["status"] == "success":
|
||||||
|
kb_message_suffix = " Your knowledge base has also been updated."
|
||||||
|
else:
|
||||||
|
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||||
|
except Exception as kb_err:
|
||||||
|
logger.warning(f"KB sync after create failed: {kb_err}")
|
||||||
|
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": "success",
|
"status": "success",
|
||||||
"issue_id": result.get("id"),
|
"issue_id": result.get("id"),
|
||||||
"identifier": result.get("identifier"),
|
"identifier": result.get("identifier"),
|
||||||
"url": result.get("url"),
|
"url": result.get("url"),
|
||||||
"message": result.get("message"),
|
"message": (result.get("message", "") + kb_message_suffix),
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -228,6 +228,32 @@ def create_create_notion_page_tool(
|
||||||
logger.info(
|
logger.info(
|
||||||
f"create_page result: {result.get('status')} - {result.get('message', '')}"
|
f"create_page result: {result.get('status')} - {result.get('message', '')}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if result.get("status") == "success":
|
||||||
|
kb_message_suffix = ""
|
||||||
|
try:
|
||||||
|
from app.services.notion import NotionKBSyncService
|
||||||
|
|
||||||
|
kb_service = NotionKBSyncService(db_session)
|
||||||
|
kb_result = await kb_service.sync_after_create(
|
||||||
|
page_id=result.get("page_id"),
|
||||||
|
page_title=result.get("title", final_title),
|
||||||
|
page_url=result.get("url"),
|
||||||
|
content=final_content,
|
||||||
|
connector_id=actual_connector_id,
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
user_id=user_id,
|
||||||
|
)
|
||||||
|
if kb_result["status"] == "success":
|
||||||
|
kb_message_suffix = " Your knowledge base has also been updated."
|
||||||
|
else:
|
||||||
|
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
|
||||||
|
except Exception as kb_err:
|
||||||
|
logger.warning(f"KB sync after create failed: {kb_err}")
|
||||||
|
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
|
||||||
|
|
||||||
|
result["message"] = result.get("message", "") + kb_message_suffix
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -4,29 +4,171 @@ from datetime import datetime
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.connectors.linear_connector import LinearConnector
|
from app.connectors.linear_connector import LinearConnector
|
||||||
from app.db import Document
|
from app.db import Document, DocumentType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
from app.services.llm_service import get_user_long_context_llm
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
generate_document_summary,
|
||||||
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LinearKBSyncService:
|
class LinearKBSyncService:
|
||||||
"""Re-indexes a single Linear issue document after a successful update.
|
"""Syncs Linear issue documents to the knowledge base after HITL actions.
|
||||||
|
|
||||||
Mirrors the indexer's Phase-2 logic exactly: fetch fresh issue content,
|
Provides sync_after_create (new issue) and sync_after_update (existing issue).
|
||||||
run generate_document_summary, create_document_chunks, then update the
|
Both mirror the indexer's Phase-2 logic: generate summary, create chunks,
|
||||||
document row in the knowledge base.
|
then persist the document row.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, db_session: AsyncSession):
|
def __init__(self, db_session: AsyncSession):
|
||||||
self.db_session = db_session
|
self.db_session = db_session
|
||||||
|
|
||||||
|
async def sync_after_create(
|
||||||
|
self,
|
||||||
|
issue_id: str,
|
||||||
|
issue_identifier: str,
|
||||||
|
issue_title: str,
|
||||||
|
issue_url: str | None,
|
||||||
|
description: str | None,
|
||||||
|
connector_id: int,
|
||||||
|
search_space_id: int,
|
||||||
|
user_id: str,
|
||||||
|
) -> dict:
|
||||||
|
from app.tasks.connector_indexers.base import (
|
||||||
|
check_document_by_unique_identifier,
|
||||||
|
check_duplicate_document_by_hash,
|
||||||
|
get_current_timestamp,
|
||||||
|
safe_set_chunks,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
unique_hash = generate_unique_identifier_hash(
|
||||||
|
DocumentType.LINEAR_CONNECTOR, issue_id, search_space_id
|
||||||
|
)
|
||||||
|
|
||||||
|
existing = await check_document_by_unique_identifier(
|
||||||
|
self.db_session, unique_hash
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
logger.info(
|
||||||
|
"Document for Linear issue %s already exists (doc_id=%s), skipping",
|
||||||
|
issue_identifier,
|
||||||
|
existing.id,
|
||||||
|
)
|
||||||
|
return {"status": "success"}
|
||||||
|
|
||||||
|
indexable_content = (description or "").strip()
|
||||||
|
if not indexable_content:
|
||||||
|
indexable_content = f"Linear Issue {issue_identifier}: {issue_title}"
|
||||||
|
|
||||||
|
issue_content = f"# {issue_identifier}: {issue_title}\n\n{indexable_content}"
|
||||||
|
|
||||||
|
content_hash = generate_content_hash(issue_content, search_space_id)
|
||||||
|
|
||||||
|
with self.db_session.no_autoflush:
|
||||||
|
dup = await check_duplicate_document_by_hash(
|
||||||
|
self.db_session, content_hash
|
||||||
|
)
|
||||||
|
if dup:
|
||||||
|
logger.info(
|
||||||
|
"Content-hash collision for Linear issue %s — identical content "
|
||||||
|
"exists in doc %s. Using unique_identifier_hash as content_hash.",
|
||||||
|
issue_identifier,
|
||||||
|
dup.id,
|
||||||
|
)
|
||||||
|
content_hash = unique_hash
|
||||||
|
|
||||||
|
user_llm = await get_user_long_context_llm(
|
||||||
|
self.db_session,
|
||||||
|
user_id,
|
||||||
|
search_space_id,
|
||||||
|
disable_streaming=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
doc_metadata_for_summary = {
|
||||||
|
"issue_id": issue_identifier,
|
||||||
|
"issue_title": issue_title,
|
||||||
|
"document_type": "Linear Issue",
|
||||||
|
"connector_type": "Linear",
|
||||||
|
}
|
||||||
|
|
||||||
|
if user_llm:
|
||||||
|
summary_content, summary_embedding = await generate_document_summary(
|
||||||
|
issue_content, user_llm, doc_metadata_for_summary
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning("No LLM configured — using fallback summary")
|
||||||
|
summary_content = (
|
||||||
|
f"Linear Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
||||||
|
)
|
||||||
|
summary_embedding = embed_text(summary_content)
|
||||||
|
|
||||||
|
chunks = await create_document_chunks(issue_content)
|
||||||
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
document = Document(
|
||||||
|
title=f"{issue_identifier}: {issue_title}",
|
||||||
|
document_type=DocumentType.LINEAR_CONNECTOR,
|
||||||
|
document_metadata={
|
||||||
|
"issue_id": issue_id,
|
||||||
|
"issue_identifier": issue_identifier,
|
||||||
|
"issue_title": issue_title,
|
||||||
|
"issue_url": issue_url,
|
||||||
|
"source_connector": "linear",
|
||||||
|
"indexed_at": now_str,
|
||||||
|
"connector_id": connector_id,
|
||||||
|
},
|
||||||
|
content=summary_content,
|
||||||
|
content_hash=content_hash,
|
||||||
|
unique_identifier_hash=unique_hash,
|
||||||
|
embedding=summary_embedding,
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
connector_id=connector_id,
|
||||||
|
updated_at=get_current_timestamp(),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.db_session.add(document)
|
||||||
|
await self.db_session.flush()
|
||||||
|
await safe_set_chunks(self.db_session, document, chunks)
|
||||||
|
await self.db_session.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"KB sync after create succeeded: doc_id=%s, issue=%s, chunks=%d",
|
||||||
|
document.id,
|
||||||
|
issue_identifier,
|
||||||
|
len(chunks),
|
||||||
|
)
|
||||||
|
return {"status": "success"}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_str = str(e).lower()
|
||||||
|
if (
|
||||||
|
"duplicate key value violates unique constraint" in error_str
|
||||||
|
or "uniqueviolationerror" in error_str
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Duplicate constraint hit during KB sync for issue %s. "
|
||||||
|
"Rolling back — periodic indexer will handle it. Error: %s",
|
||||||
|
issue_identifier,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
await self.db_session.rollback()
|
||||||
|
return {"status": "error", "message": "Duplicate document detected"}
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
"KB sync after create failed for issue %s: %s",
|
||||||
|
issue_identifier,
|
||||||
|
e,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
await self.db_session.rollback()
|
||||||
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
async def sync_after_update(
|
async def sync_after_update(
|
||||||
self,
|
self,
|
||||||
document_id: int,
|
document_id: int,
|
||||||
|
|
|
||||||
|
|
@ -3,13 +3,14 @@ from datetime import datetime
|
||||||
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.db import Document
|
from app.db import Document, DocumentType
|
||||||
from app.services.llm_service import get_user_long_context_llm
|
from app.services.llm_service import get_user_long_context_llm
|
||||||
from app.utils.document_converters import (
|
from app.utils.document_converters import (
|
||||||
create_document_chunks,
|
create_document_chunks,
|
||||||
embed_text,
|
embed_text,
|
||||||
generate_content_hash,
|
generate_content_hash,
|
||||||
generate_document_summary,
|
generate_document_summary,
|
||||||
|
generate_unique_identifier_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -19,6 +20,143 @@ class NotionKBSyncService:
|
||||||
def __init__(self, db_session: AsyncSession):
|
def __init__(self, db_session: AsyncSession):
|
||||||
self.db_session = db_session
|
self.db_session = db_session
|
||||||
|
|
||||||
|
async def sync_after_create(
|
||||||
|
self,
|
||||||
|
page_id: str,
|
||||||
|
page_title: str,
|
||||||
|
page_url: str | None,
|
||||||
|
content: str | None,
|
||||||
|
connector_id: int,
|
||||||
|
search_space_id: int,
|
||||||
|
user_id: str,
|
||||||
|
) -> dict:
|
||||||
|
from app.tasks.connector_indexers.base import (
|
||||||
|
check_document_by_unique_identifier,
|
||||||
|
check_duplicate_document_by_hash,
|
||||||
|
get_current_timestamp,
|
||||||
|
safe_set_chunks,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
unique_hash = generate_unique_identifier_hash(
|
||||||
|
DocumentType.NOTION_CONNECTOR, page_id, search_space_id
|
||||||
|
)
|
||||||
|
|
||||||
|
existing = await check_document_by_unique_identifier(
|
||||||
|
self.db_session, unique_hash
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
logger.info(
|
||||||
|
"Document for Notion page %s already exists (doc_id=%s), skipping",
|
||||||
|
page_id,
|
||||||
|
existing.id,
|
||||||
|
)
|
||||||
|
return {"status": "success"}
|
||||||
|
|
||||||
|
indexable_content = (content or "").strip()
|
||||||
|
if not indexable_content:
|
||||||
|
indexable_content = f"Notion Page: {page_title}"
|
||||||
|
|
||||||
|
markdown_content = f"# Notion Page: {page_title}\n\n{indexable_content}"
|
||||||
|
|
||||||
|
content_hash = generate_content_hash(markdown_content, search_space_id)
|
||||||
|
|
||||||
|
with self.db_session.no_autoflush:
|
||||||
|
dup = await check_duplicate_document_by_hash(
|
||||||
|
self.db_session, content_hash
|
||||||
|
)
|
||||||
|
if dup:
|
||||||
|
logger.info(
|
||||||
|
"Content-hash collision for Notion page %s — identical content "
|
||||||
|
"exists in doc %s. Using unique_identifier_hash as content_hash.",
|
||||||
|
page_id,
|
||||||
|
dup.id,
|
||||||
|
)
|
||||||
|
content_hash = unique_hash
|
||||||
|
|
||||||
|
user_llm = await get_user_long_context_llm(
|
||||||
|
self.db_session,
|
||||||
|
user_id,
|
||||||
|
search_space_id,
|
||||||
|
disable_streaming=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
doc_metadata_for_summary = {
|
||||||
|
"page_title": page_title,
|
||||||
|
"page_id": page_id,
|
||||||
|
"document_type": "Notion Page",
|
||||||
|
"connector_type": "Notion",
|
||||||
|
}
|
||||||
|
|
||||||
|
if user_llm:
|
||||||
|
summary_content, summary_embedding = await generate_document_summary(
|
||||||
|
markdown_content, user_llm, doc_metadata_for_summary
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning("No LLM configured — using fallback summary")
|
||||||
|
summary_content = f"Notion Page: {page_title}\n\n{markdown_content}"
|
||||||
|
summary_embedding = embed_text(summary_content)
|
||||||
|
|
||||||
|
chunks = await create_document_chunks(markdown_content)
|
||||||
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
document = Document(
|
||||||
|
title=page_title,
|
||||||
|
document_type=DocumentType.NOTION_CONNECTOR,
|
||||||
|
document_metadata={
|
||||||
|
"page_title": page_title,
|
||||||
|
"page_id": page_id,
|
||||||
|
"page_url": page_url,
|
||||||
|
"source_connector": "notion",
|
||||||
|
"indexed_at": now_str,
|
||||||
|
"connector_id": connector_id,
|
||||||
|
},
|
||||||
|
content=summary_content,
|
||||||
|
content_hash=content_hash,
|
||||||
|
unique_identifier_hash=unique_hash,
|
||||||
|
embedding=summary_embedding,
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
connector_id=connector_id,
|
||||||
|
updated_at=get_current_timestamp(),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.db_session.add(document)
|
||||||
|
await self.db_session.flush()
|
||||||
|
await safe_set_chunks(self.db_session, document, chunks)
|
||||||
|
await self.db_session.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"KB sync after create succeeded: doc_id=%s, page=%s, chunks=%d",
|
||||||
|
document.id,
|
||||||
|
page_title,
|
||||||
|
len(chunks),
|
||||||
|
)
|
||||||
|
return {"status": "success"}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_str = str(e).lower()
|
||||||
|
if (
|
||||||
|
"duplicate key value violates unique constraint" in error_str
|
||||||
|
or "uniqueviolationerror" in error_str
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Duplicate constraint hit during KB sync for page %s. "
|
||||||
|
"Rolling back — periodic indexer will handle it. Error: %s",
|
||||||
|
page_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
await self.db_session.rollback()
|
||||||
|
return {"status": "error", "message": "Duplicate document detected"}
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
"KB sync after create failed for page %s: %s",
|
||||||
|
page_id,
|
||||||
|
e,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
await self.db_session.rollback()
|
||||||
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
async def sync_after_update(
|
async def sync_after_update(
|
||||||
self,
|
self,
|
||||||
document_id: int,
|
document_id: int,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue