mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-25 19:15:18 +02:00
feat: initial files for jira and confluence HITL tool
This commit is contained in:
parent
affc89dd5c
commit
e71eae26fc
31 changed files with 5888 additions and 2 deletions
13
surfsense_backend/app/services/confluence/__init__.py
Normal file
13
surfsense_backend/app/services/confluence/__init__.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from app.services.confluence.kb_sync_service import ConfluenceKBSyncService
|
||||
from app.services.confluence.tool_metadata_service import (
|
||||
ConfluencePage,
|
||||
ConfluenceToolMetadataService,
|
||||
ConfluenceWorkspace,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ConfluenceKBSyncService",
|
||||
"ConfluencePage",
|
||||
"ConfluenceToolMetadataService",
|
||||
"ConfluenceWorkspace",
|
||||
]
|
||||
225
surfsense_backend/app/services/confluence/kb_sync_service.py
Normal file
225
surfsense_backend/app/services/confluence/kb_sync_service.py
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.confluence_history import ConfluenceHistoryConnector
|
||||
from app.db import Document, DocumentType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
embed_text,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfluenceKBSyncService:
|
||||
"""Syncs Confluence page documents to the knowledge base after HITL actions."""
|
||||
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self.db_session = db_session
|
||||
|
||||
async def sync_after_create(
|
||||
self,
|
||||
page_id: str,
|
||||
page_title: str,
|
||||
space_id: str,
|
||||
body_content: str | None,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
) -> dict:
|
||||
from app.tasks.connector_indexers.base import (
|
||||
check_document_by_unique_identifier,
|
||||
check_duplicate_document_by_hash,
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
try:
|
||||
unique_hash = generate_unique_identifier_hash(
|
||||
DocumentType.CONFLUENCE_CONNECTOR, page_id, search_space_id
|
||||
)
|
||||
|
||||
existing = await check_document_by_unique_identifier(
|
||||
self.db_session, unique_hash
|
||||
)
|
||||
if existing:
|
||||
return {"status": "success"}
|
||||
|
||||
indexable_content = (body_content or "").strip()
|
||||
if not indexable_content:
|
||||
indexable_content = f"Confluence Page: {page_title}"
|
||||
|
||||
page_content = f"# {page_title}\n\n{indexable_content}"
|
||||
|
||||
content_hash = generate_content_hash(page_content, search_space_id)
|
||||
|
||||
with self.db_session.no_autoflush:
|
||||
dup = await check_duplicate_document_by_hash(
|
||||
self.db_session, content_hash
|
||||
)
|
||||
if dup:
|
||||
content_hash = unique_hash
|
||||
|
||||
user_llm = await get_user_long_context_llm(
|
||||
self.db_session, user_id, search_space_id, disable_streaming=True,
|
||||
)
|
||||
|
||||
doc_metadata_for_summary = {
|
||||
"page_title": page_title,
|
||||
"space_id": space_id,
|
||||
"document_type": "Confluence Page",
|
||||
"connector_type": "Confluence",
|
||||
}
|
||||
|
||||
if user_llm:
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
page_content, user_llm, doc_metadata_for_summary
|
||||
)
|
||||
else:
|
||||
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
||||
summary_embedding = embed_text(summary_content)
|
||||
|
||||
chunks = await create_document_chunks(page_content)
|
||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
document = Document(
|
||||
title=page_title,
|
||||
document_type=DocumentType.CONFLUENCE_CONNECTOR,
|
||||
document_metadata={
|
||||
"page_id": page_id,
|
||||
"page_title": page_title,
|
||||
"space_id": space_id,
|
||||
"comment_count": 0,
|
||||
"indexed_at": now_str,
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_hash,
|
||||
embedding=summary_embedding,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
)
|
||||
|
||||
self.db_session.add(document)
|
||||
await self.db_session.flush()
|
||||
await safe_set_chunks(self.db_session, document, chunks)
|
||||
await self.db_session.commit()
|
||||
|
||||
logger.info(
|
||||
"KB sync after create succeeded: doc_id=%s, page=%s",
|
||||
document.id, page_title,
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if "duplicate key value violates unique constraint" in error_str or "uniqueviolationerror" in error_str:
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": "Duplicate document detected"}
|
||||
|
||||
logger.error("KB sync after create failed for page %s: %s", page_title, e, exc_info=True)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
async def sync_after_update(
|
||||
self,
|
||||
document_id: int,
|
||||
page_id: str,
|
||||
user_id: str,
|
||||
search_space_id: int,
|
||||
) -> dict:
|
||||
from app.tasks.connector_indexers.base import (
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
try:
|
||||
document = await self.db_session.get(Document, document_id)
|
||||
if not document:
|
||||
return {"status": "not_indexed"}
|
||||
|
||||
connector_id = document.connector_id
|
||||
if not connector_id:
|
||||
return {"status": "error", "message": "Document has no connector_id"}
|
||||
|
||||
client = ConfluenceHistoryConnector(
|
||||
session=self.db_session, connector_id=connector_id
|
||||
)
|
||||
page_data = await client.get_page(page_id)
|
||||
await client.close()
|
||||
|
||||
page_title = page_data.get("title", "")
|
||||
body_obj = page_data.get("body", {})
|
||||
body_content = ""
|
||||
if isinstance(body_obj, dict):
|
||||
storage = body_obj.get("storage", {})
|
||||
if isinstance(storage, dict):
|
||||
body_content = storage.get("value", "")
|
||||
|
||||
page_content = f"# {page_title}\n\n{body_content}"
|
||||
|
||||
if not page_content.strip():
|
||||
return {"status": "error", "message": "Page produced empty content"}
|
||||
|
||||
space_id = (document.document_metadata or {}).get("space_id", "")
|
||||
|
||||
user_llm = await get_user_long_context_llm(
|
||||
self.db_session, user_id, search_space_id, disable_streaming=True
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
doc_meta = {
|
||||
"page_title": page_title,
|
||||
"space_id": space_id,
|
||||
"document_type": "Confluence Page",
|
||||
"connector_type": "Confluence",
|
||||
}
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
page_content, user_llm, doc_meta
|
||||
)
|
||||
else:
|
||||
summary_content = f"Confluence Page: {page_title}\n\n{page_content}"
|
||||
summary_embedding = embed_text(summary_content)
|
||||
|
||||
chunks = await create_document_chunks(page_content)
|
||||
|
||||
document.title = page_title
|
||||
document.content = summary_content
|
||||
document.content_hash = generate_content_hash(page_content, search_space_id)
|
||||
document.embedding = summary_embedding
|
||||
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
document.document_metadata = {
|
||||
**(document.document_metadata or {}),
|
||||
"page_id": page_id,
|
||||
"page_title": page_title,
|
||||
"space_id": space_id,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"connector_id": connector_id,
|
||||
}
|
||||
flag_modified(document, "document_metadata")
|
||||
await safe_set_chunks(self.db_session, document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
|
||||
await self.db_session.commit()
|
||||
|
||||
logger.info(
|
||||
"KB sync successful for document %s (%s)",
|
||||
document_id, page_title,
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("KB sync failed for document %s: %s", document_id, e, exc_info=True)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
|
@ -0,0 +1,307 @@
|
|||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import and_, func, or_
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.connectors.confluence_history import ConfluenceHistoryConnector
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfluenceWorkspace:
|
||||
"""Represents a Confluence connector as a workspace for tool context."""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
base_url: str
|
||||
|
||||
@classmethod
|
||||
def from_connector(cls, connector: SearchSourceConnector) -> "ConfluenceWorkspace":
|
||||
return cls(
|
||||
id=connector.id,
|
||||
name=connector.name,
|
||||
base_url=connector.config.get("base_url", ""),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"base_url": self.base_url,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfluencePage:
|
||||
"""Represents an indexed Confluence page resolved from the knowledge base."""
|
||||
|
||||
page_id: str
|
||||
page_title: str
|
||||
space_id: str
|
||||
connector_id: int
|
||||
document_id: int
|
||||
indexed_at: str | None
|
||||
|
||||
@classmethod
|
||||
def from_document(cls, document: Document) -> "ConfluencePage":
|
||||
meta = document.document_metadata or {}
|
||||
return cls(
|
||||
page_id=meta.get("page_id", ""),
|
||||
page_title=meta.get("page_title", document.title),
|
||||
space_id=meta.get("space_id", ""),
|
||||
connector_id=document.connector_id,
|
||||
document_id=document.id,
|
||||
indexed_at=meta.get("indexed_at"),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"page_id": self.page_id,
|
||||
"page_title": self.page_title,
|
||||
"space_id": self.space_id,
|
||||
"connector_id": self.connector_id,
|
||||
"document_id": self.document_id,
|
||||
"indexed_at": self.indexed_at,
|
||||
}
|
||||
|
||||
|
||||
class ConfluenceToolMetadataService:
|
||||
"""Builds interrupt context for Confluence HITL tools."""
|
||||
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self._db_session = db_session
|
||||
|
||||
async def _check_account_health(self, connector: SearchSourceConnector) -> bool:
|
||||
"""Check if the Confluence connector auth is still valid.
|
||||
|
||||
Returns True if auth is expired/invalid, False if healthy.
|
||||
"""
|
||||
try:
|
||||
client = ConfluenceHistoryConnector(
|
||||
session=self._db_session, connector_id=connector.id
|
||||
)
|
||||
await client._get_valid_token()
|
||||
await client.close()
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Confluence connector %s health check failed: %s", connector.id, e
|
||||
)
|
||||
try:
|
||||
connector.config = {**connector.config, "auth_expired": True}
|
||||
flag_modified(connector, "config")
|
||||
await self._db_session.commit()
|
||||
await self._db_session.refresh(connector)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
connector.id,
|
||||
exc_info=True,
|
||||
)
|
||||
return True
|
||||
|
||||
async def get_creation_context(self, search_space_id: int, user_id: str) -> dict:
|
||||
"""Return context needed to create a new Confluence page.
|
||||
|
||||
Fetches all connected accounts, and for the first healthy one fetches spaces.
|
||||
"""
|
||||
connectors = await self._get_all_confluence_connectors(search_space_id, user_id)
|
||||
if not connectors:
|
||||
return {"error": "No Confluence account connected"}
|
||||
|
||||
accounts = []
|
||||
spaces = []
|
||||
fetched_context = False
|
||||
|
||||
for connector in connectors:
|
||||
auth_expired = await self._check_account_health(connector)
|
||||
workspace = ConfluenceWorkspace.from_connector(connector)
|
||||
accounts.append({
|
||||
**workspace.to_dict(),
|
||||
"auth_expired": auth_expired,
|
||||
})
|
||||
|
||||
if not auth_expired and not fetched_context:
|
||||
try:
|
||||
client = ConfluenceHistoryConnector(
|
||||
session=self._db_session, connector_id=connector.id
|
||||
)
|
||||
raw_spaces = await client.get_all_spaces()
|
||||
spaces = [
|
||||
{"id": s.get("id"), "key": s.get("key"), "name": s.get("name")}
|
||||
for s in raw_spaces
|
||||
]
|
||||
await client.close()
|
||||
fetched_context = True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Confluence spaces for connector %s: %s",
|
||||
connector.id, e,
|
||||
)
|
||||
|
||||
return {
|
||||
"accounts": accounts,
|
||||
"spaces": spaces,
|
||||
}
|
||||
|
||||
async def get_update_context(
|
||||
self, search_space_id: int, user_id: str, page_ref: str
|
||||
) -> dict:
|
||||
"""Return context needed to update an indexed Confluence page.
|
||||
|
||||
Resolves the page from KB, then fetches current content and version from API.
|
||||
"""
|
||||
document = await self._resolve_page(search_space_id, user_id, page_ref)
|
||||
if not document:
|
||||
return {
|
||||
"error": f"Page '{page_ref}' not found in your synced Confluence pages. "
|
||||
"Please make sure the page is indexed in your knowledge base."
|
||||
}
|
||||
|
||||
connector = await self._get_connector_for_document(document, user_id)
|
||||
if not connector:
|
||||
return {"error": "Connector not found or access denied"}
|
||||
|
||||
auth_expired = await self._check_account_health(connector)
|
||||
if auth_expired:
|
||||
return {
|
||||
"error": "Confluence authentication has expired. Please re-authenticate.",
|
||||
"auth_expired": True,
|
||||
"connector_id": connector.id,
|
||||
}
|
||||
|
||||
workspace = ConfluenceWorkspace.from_connector(connector)
|
||||
page = ConfluencePage.from_document(document)
|
||||
|
||||
try:
|
||||
client = ConfluenceHistoryConnector(
|
||||
session=self._db_session, connector_id=connector.id
|
||||
)
|
||||
page_data = await client.get_page(page.page_id)
|
||||
await client.close()
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if "401" in error_str or "403" in error_str or "authentication" in error_str:
|
||||
return {
|
||||
"error": f"Failed to fetch Confluence page: {e!s}",
|
||||
"auth_expired": True,
|
||||
"connector_id": connector.id,
|
||||
}
|
||||
return {"error": f"Failed to fetch Confluence page: {e!s}"}
|
||||
|
||||
body_storage = ""
|
||||
body_obj = page_data.get("body", {})
|
||||
if isinstance(body_obj, dict):
|
||||
storage = body_obj.get("storage", {})
|
||||
if isinstance(storage, dict):
|
||||
body_storage = storage.get("value", "")
|
||||
|
||||
version_obj = page_data.get("version", {})
|
||||
version_number = version_obj.get("number", 1) if isinstance(version_obj, dict) else 1
|
||||
|
||||
return {
|
||||
"account": {**workspace.to_dict(), "auth_expired": False},
|
||||
"page": {
|
||||
"page_id": page.page_id,
|
||||
"page_title": page_data.get("title", page.page_title),
|
||||
"space_id": page.space_id,
|
||||
"body": body_storage,
|
||||
"version": version_number,
|
||||
"document_id": page.document_id,
|
||||
"indexed_at": page.indexed_at,
|
||||
},
|
||||
}
|
||||
|
||||
async def get_deletion_context(
|
||||
self, search_space_id: int, user_id: str, page_ref: str
|
||||
) -> dict:
|
||||
"""Return context needed to delete a Confluence page (KB metadata only)."""
|
||||
document = await self._resolve_page(search_space_id, user_id, page_ref)
|
||||
if not document:
|
||||
return {
|
||||
"error": f"Page '{page_ref}' not found in your synced Confluence pages. "
|
||||
"Please make sure the page is indexed in your knowledge base."
|
||||
}
|
||||
|
||||
connector = await self._get_connector_for_document(document, user_id)
|
||||
if not connector:
|
||||
return {"error": "Connector not found or access denied"}
|
||||
|
||||
auth_expired = connector.config.get("auth_expired", False)
|
||||
workspace = ConfluenceWorkspace.from_connector(connector)
|
||||
page = ConfluencePage.from_document(document)
|
||||
|
||||
return {
|
||||
"account": {**workspace.to_dict(), "auth_expired": auth_expired},
|
||||
"page": page.to_dict(),
|
||||
}
|
||||
|
||||
async def _resolve_page(
|
||||
self, search_space_id: int, user_id: str, page_ref: str
|
||||
) -> Document | None:
|
||||
"""Resolve a page from KB: page_title -> document.title."""
|
||||
ref_lower = page_ref.lower()
|
||||
|
||||
result = await self._db_session.execute(
|
||||
select(Document)
|
||||
.join(
|
||||
SearchSourceConnector, Document.connector_id == SearchSourceConnector.id
|
||||
)
|
||||
.filter(
|
||||
and_(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.CONFLUENCE_CONNECTOR,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
or_(
|
||||
func.lower(
|
||||
Document.document_metadata.op("->>")("page_title")
|
||||
)
|
||||
== ref_lower,
|
||||
func.lower(Document.title) == ref_lower,
|
||||
),
|
||||
)
|
||||
)
|
||||
.order_by(Document.updated_at.desc().nullslast())
|
||||
.limit(1)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
async def _get_all_confluence_connectors(
|
||||
self, search_space_id: int, user_id: str
|
||||
) -> list[SearchSourceConnector]:
|
||||
result = await self._db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.CONFLUENCE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalars().all()
|
||||
|
||||
async def _get_connector_for_document(
|
||||
self, document: Document, user_id: str
|
||||
) -> SearchSourceConnector | None:
|
||||
if not document.connector_id:
|
||||
return None
|
||||
result = await self._db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.id == document.connector_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalars().first()
|
||||
Loading…
Add table
Add a link
Reference in a new issue