mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-28 21:49:40 +02:00
Merge pull request #825 from CREDO23/sur-169-feat-implement-human-in-the-loop-for-linear-sensitive
[Feat] Add human in the loop for linear sensitive actions
This commit is contained in:
commit
9aef655566
30 changed files with 3789 additions and 126 deletions
|
|
@ -256,6 +256,18 @@ async def create_surfsense_deep_agent(
|
|||
]
|
||||
modified_disabled_tools.extend(notion_tools)
|
||||
|
||||
# Disable Linear action tools if no Linear connector is configured
|
||||
has_linear_connector = (
|
||||
available_connectors is not None and "LINEAR_CONNECTOR" in available_connectors
|
||||
)
|
||||
if not has_linear_connector:
|
||||
linear_tools = [
|
||||
"create_linear_issue",
|
||||
"update_linear_issue",
|
||||
"delete_linear_issue",
|
||||
]
|
||||
modified_disabled_tools.extend(linear_tools)
|
||||
|
||||
# Build tools using the async registry (includes MCP tools)
|
||||
tools = await build_tools_async(
|
||||
dependencies=dependencies,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
"""Linear tools for creating, updating, and deleting issues."""
|
||||
|
||||
from .create_issue import create_create_linear_issue_tool
|
||||
from .delete_issue import create_delete_linear_issue_tool
|
||||
from .update_issue import create_update_linear_issue_tool
|
||||
|
||||
__all__ = [
|
||||
"create_create_linear_issue_tool",
|
||||
"create_delete_linear_issue_tool",
|
||||
"create_update_linear_issue_tool",
|
||||
]
|
||||
|
|
@ -0,0 +1,239 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.services.linear import LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_create_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the create_linear_issue tool.
|
||||
|
||||
Args:
|
||||
db_session: Database session for accessing the Linear connector
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for fetching user-specific context
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured create_linear_issue tool
|
||||
"""
|
||||
|
||||
@tool
|
||||
async def create_linear_issue(
|
||||
title: str,
|
||||
description: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a new issue in Linear.
|
||||
|
||||
Use this tool when the user explicitly asks to create, add, or file
|
||||
a new issue / ticket / task in Linear.
|
||||
|
||||
Args:
|
||||
title: Short, descriptive issue title.
|
||||
description: Optional markdown body for the issue.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", or "error"
|
||||
- issue_id: Linear issue UUID (if success)
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- url: URL to the created issue (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT: If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I won't create the issue.")
|
||||
and move on. Do NOT retry, troubleshoot, or suggest alternatives.
|
||||
|
||||
Examples:
|
||||
- "Create a Linear issue titled 'Fix login bug'"
|
||||
- "Add a ticket for the payment timeout problem"
|
||||
- "File an issue about the broken search feature"
|
||||
"""
|
||||
logger.info(f"create_linear_issue called: title='{title}'")
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_creation_context(
|
||||
search_space_id, user_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
logger.error(f"Failed to fetch creation context: {context['error']}")
|
||||
return {"status": "error", "message": context["error"]}
|
||||
|
||||
logger.info(f"Requesting approval for creating Linear issue: '{title}'")
|
||||
approval = interrupt(
|
||||
{
|
||||
"type": "linear_issue_creation",
|
||||
"action": {
|
||||
"tool": "create_linear_issue",
|
||||
"params": {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"team_id": None,
|
||||
"state_id": None,
|
||||
"assignee_id": None,
|
||||
"priority": None,
|
||||
"label_ids": [],
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
}
|
||||
)
|
||||
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
logger.warning("No approval decision received")
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
||||
decision = decisions[0]
|
||||
decision_type = decision.get("type") or decision.get("decision_type")
|
||||
logger.info(f"User decision: {decision_type}")
|
||||
|
||||
if decision_type == "reject":
|
||||
logger.info("Linear issue creation rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The issue was not created. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_params: dict[str, Any] = {}
|
||||
edited_action = decision.get("edited_action")
|
||||
if isinstance(edited_action, dict):
|
||||
edited_args = edited_action.get("args")
|
||||
if isinstance(edited_args, dict):
|
||||
final_params = edited_args
|
||||
elif isinstance(decision.get("args"), dict):
|
||||
final_params = decision["args"]
|
||||
|
||||
final_title = final_params.get("title", title)
|
||||
final_description = final_params.get("description", description)
|
||||
final_team_id = final_params.get("team_id")
|
||||
final_state_id = final_params.get("state_id")
|
||||
final_assignee_id = final_params.get("assignee_id")
|
||||
final_priority = final_params.get("priority")
|
||||
final_label_ids = final_params.get("label_ids") or []
|
||||
final_connector_id = final_params.get("connector_id", connector_id)
|
||||
|
||||
if not final_title or not final_title.strip():
|
||||
logger.error("Title is empty or contains only whitespace")
|
||||
return {"status": "error", "message": "Issue title cannot be empty."}
|
||||
if not final_team_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "A team must be selected to create an issue.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
actual_connector_id = final_connector_id
|
||||
if actual_connector_id is None:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Linear connector found. Please connect Linear in your workspace settings.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
logger.info(f"Found Linear connector: id={actual_connector_id}")
|
||||
else:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == actual_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
logger.info(f"Validated Linear connector: id={actual_connector_id}")
|
||||
|
||||
logger.info(
|
||||
f"Creating Linear issue with final params: title='{final_title}'"
|
||||
)
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
result = await linear_client.create_issue(
|
||||
team_id=final_team_id,
|
||||
title=final_title,
|
||||
description=final_description,
|
||||
state_id=final_state_id,
|
||||
assignee_id=final_assignee_id,
|
||||
priority=final_priority,
|
||||
label_ids=final_label_ids if final_label_ids else None,
|
||||
)
|
||||
|
||||
if result.get("status") == "error":
|
||||
logger.error(f"Failed to create Linear issue: {result.get('message')}")
|
||||
return {"status": "error", "message": result.get("message")}
|
||||
|
||||
logger.info(
|
||||
f"Linear issue created: {result.get('identifier')} - {result.get('title')}"
|
||||
)
|
||||
return {
|
||||
"status": "success",
|
||||
"issue_id": result.get("id"),
|
||||
"identifier": result.get("identifier"),
|
||||
"url": result.get("url"),
|
||||
"message": result.get("message"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error creating Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, (ValueError, LinearAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while creating the issue. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return create_linear_issue
|
||||
|
|
@ -0,0 +1,262 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.services.linear import LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_delete_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the delete_linear_issue tool.
|
||||
|
||||
Args:
|
||||
db_session: Database session for accessing the Linear connector
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for finding the correct Linear connector
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured delete_linear_issue tool
|
||||
"""
|
||||
|
||||
@tool
|
||||
async def delete_linear_issue(
|
||||
issue_ref: str,
|
||||
delete_from_kb: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Archive (delete) a Linear issue.
|
||||
|
||||
Use this tool when the user asks to delete, remove, or archive a Linear issue.
|
||||
Note that Linear archives issues rather than permanently deleting them
|
||||
(they can be restored from the archive).
|
||||
|
||||
|
||||
Args:
|
||||
issue_ref: The issue to delete. Can be the issue title (e.g. "Fix login bug"),
|
||||
the identifier (e.g. "ENG-42"), or the full document title
|
||||
(e.g. "ENG-42: Fix login bug").
|
||||
delete_from_kb: Whether to also remove the issue from the knowledge base.
|
||||
Default is False. Set to True to remove from both Linear
|
||||
and the knowledge base.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- message: Success or error message
|
||||
- deleted_from_kb: Whether the issue was also removed from the knowledge base (if success)
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I won't delete the issue.")
|
||||
and move on. Do NOT ask for alternatives or troubleshoot.
|
||||
- If status is "not_found", inform the user conversationally using the exact message
|
||||
provided. Do NOT treat this as an error. Simply relay the message and ask the user
|
||||
to verify the issue title or identifier, or check if it has been indexed.
|
||||
|
||||
Examples:
|
||||
- "Delete the 'Fix login bug' Linear issue"
|
||||
- "Archive ENG-42"
|
||||
- "Remove the 'Old payment flow' issue from Linear"
|
||||
"""
|
||||
logger.info(
|
||||
f"delete_linear_issue called: issue_ref='{issue_ref}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_delete_context(
|
||||
search_space_id, user_id, issue_ref
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Issue not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
else:
|
||||
logger.error(f"Failed to fetch delete context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
issue_id = context["issue"]["id"]
|
||||
issue_identifier = context["issue"].get("identifier", "")
|
||||
document_id = context["issue"]["document_id"]
|
||||
connector_id_from_context = context.get("workspace", {}).get("id")
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for deleting Linear issue: '{issue_ref}' "
|
||||
f"(id={issue_id}, delete_from_kb={delete_from_kb})"
|
||||
)
|
||||
approval = interrupt(
|
||||
{
|
||||
"type": "linear_issue_deletion",
|
||||
"action": {
|
||||
"tool": "delete_linear_issue",
|
||||
"params": {
|
||||
"issue_id": issue_id,
|
||||
"connector_id": connector_id_from_context,
|
||||
"delete_from_kb": delete_from_kb,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
}
|
||||
)
|
||||
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
logger.warning("No approval decision received")
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
||||
decision = decisions[0]
|
||||
decision_type = decision.get("type") or decision.get("decision_type")
|
||||
logger.info(f"User decision: {decision_type}")
|
||||
|
||||
if decision_type == "reject":
|
||||
logger.info("Linear issue deletion rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The issue was not deleted. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
edited_action = decision.get("edited_action")
|
||||
final_params: dict[str, Any] = {}
|
||||
if isinstance(edited_action, dict):
|
||||
edited_args = edited_action.get("args")
|
||||
if isinstance(edited_args, dict):
|
||||
final_params = edited_args
|
||||
elif isinstance(decision.get("args"), dict):
|
||||
final_params = decision["args"]
|
||||
|
||||
final_issue_id = final_params.get("issue_id", issue_id)
|
||||
final_connector_id = final_params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb)
|
||||
|
||||
logger.info(
|
||||
f"Deleting Linear issue with final params: issue_id={final_issue_id}, "
|
||||
f"connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}"
|
||||
)
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
if final_connector_id:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
logger.error(
|
||||
f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
logger.info(f"Validated Linear connector: id={actual_connector_id}")
|
||||
else:
|
||||
logger.error("No connector found for this issue")
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this issue.",
|
||||
}
|
||||
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
|
||||
result = await linear_client.archive_issue(issue_id=final_issue_id)
|
||||
|
||||
logger.info(
|
||||
f"archive_issue result: {result.get('status')} - {result.get('message', '')}"
|
||||
)
|
||||
|
||||
deleted_from_kb = False
|
||||
if (
|
||||
result.get("status") == "success"
|
||||
and final_delete_from_kb
|
||||
and document_id
|
||||
):
|
||||
try:
|
||||
from app.db import Document
|
||||
|
||||
doc_result = await db_session.execute(
|
||||
select(Document).filter(Document.id == document_id)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
if document:
|
||||
await db_session.delete(document)
|
||||
await db_session.commit()
|
||||
deleted_from_kb = True
|
||||
logger.info(
|
||||
f"Deleted document {document_id} from knowledge base"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document from KB: {e}")
|
||||
await db_session.rollback()
|
||||
result["warning"] = (
|
||||
f"Issue archived in Linear, but failed to remove from knowledge base: {e!s}"
|
||||
)
|
||||
|
||||
if result.get("status") == "success":
|
||||
result["deleted_from_kb"] = deleted_from_kb
|
||||
if issue_identifier:
|
||||
result["message"] = f"Issue {issue_identifier} archived successfully."
|
||||
if deleted_from_kb:
|
||||
result["message"] = (
|
||||
f"{result.get('message', '')} Also removed from the knowledge base."
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error deleting Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, (ValueError, LinearAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while deleting the issue. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return delete_linear_issue
|
||||
|
|
@ -0,0 +1,332 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.services.linear import LinearKBSyncService, LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_update_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the update_linear_issue tool.
|
||||
|
||||
Args:
|
||||
db_session: Database session for accessing the Linear connector
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for fetching user-specific context
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured update_linear_issue tool
|
||||
"""
|
||||
|
||||
@tool
|
||||
async def update_linear_issue(
|
||||
issue_ref: str,
|
||||
new_title: str | None = None,
|
||||
new_description: str | None = None,
|
||||
new_state_name: str | None = None,
|
||||
new_assignee_email: str | None = None,
|
||||
new_priority: int | None = None,
|
||||
new_label_names: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Update an existing Linear issue that has been indexed in the knowledge base.
|
||||
|
||||
Use this tool when the user asks to modify, change, or update a Linear issue —
|
||||
for example, changing its status, reassigning it, updating its title or description,
|
||||
adjusting its priority, or changing its labels.
|
||||
|
||||
Only issues already indexed in the knowledge base can be updated.
|
||||
|
||||
Args:
|
||||
issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"),
|
||||
the identifier (e.g. "ENG-42"), or the full document title
|
||||
(e.g. "ENG-42: Fix login bug"). Matched case-insensitively.
|
||||
new_title: New title for the issue (optional).
|
||||
new_description: New markdown body for the issue (optional).
|
||||
new_state_name: New workflow state name (e.g. "In Progress", "Done").
|
||||
Matched case-insensitively against the team's states.
|
||||
new_assignee_email: Email address of the new assignee.
|
||||
Matched case-insensitively against the team's members.
|
||||
new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High,
|
||||
3 = Medium, 4 = Low).
|
||||
new_label_names: New set of label names to apply.
|
||||
Matched case-insensitively against the team's labels.
|
||||
Unrecognised names are silently skipped.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- url: URL to the updated issue (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.")
|
||||
and move on. Do NOT ask for alternatives or troubleshoot.
|
||||
- If status is "not_found", inform the user conversationally using the exact message
|
||||
provided. Do NOT treat this as an error. Simply relay the message and ask the user
|
||||
to verify the issue title or identifier, or check if it has been indexed.
|
||||
|
||||
Examples:
|
||||
- "Mark the 'Fix login bug' issue as done"
|
||||
- "Assign ENG-42 to john@company.com"
|
||||
- "Change the priority of 'Payment timeout' to urgent"
|
||||
"""
|
||||
logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'")
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_update_context(
|
||||
search_space_id, user_id, issue_ref
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Issue not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
else:
|
||||
logger.error(f"Failed to fetch update context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
issue_id = context["issue"]["id"]
|
||||
document_id = context["issue"]["document_id"]
|
||||
connector_id_from_context = context.get("workspace", {}).get("id")
|
||||
|
||||
team = context.get("team", {})
|
||||
new_state_id = _resolve_state(team, new_state_name)
|
||||
new_assignee_id = _resolve_assignee(team, new_assignee_email)
|
||||
new_label_ids = _resolve_labels(team, new_label_names)
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})"
|
||||
)
|
||||
approval = interrupt(
|
||||
{
|
||||
"type": "linear_issue_update",
|
||||
"action": {
|
||||
"tool": "update_linear_issue",
|
||||
"params": {
|
||||
"issue_id": issue_id,
|
||||
"document_id": document_id,
|
||||
"new_title": new_title,
|
||||
"new_description": new_description,
|
||||
"new_state_id": new_state_id,
|
||||
"new_assignee_id": new_assignee_id,
|
||||
"new_priority": new_priority,
|
||||
"new_label_ids": new_label_ids,
|
||||
"connector_id": connector_id_from_context,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
}
|
||||
)
|
||||
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
logger.warning("No approval decision received")
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
||||
decision = decisions[0]
|
||||
decision_type = decision.get("type") or decision.get("decision_type")
|
||||
logger.info(f"User decision: {decision_type}")
|
||||
|
||||
if decision_type == "reject":
|
||||
logger.info("Linear issue update rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The issue was not updated. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
edited_action = decision.get("edited_action")
|
||||
final_params: dict[str, Any] = {}
|
||||
if isinstance(edited_action, dict):
|
||||
edited_args = edited_action.get("args")
|
||||
if isinstance(edited_args, dict):
|
||||
final_params = edited_args
|
||||
elif isinstance(decision.get("args"), dict):
|
||||
final_params = decision["args"]
|
||||
|
||||
final_issue_id = final_params.get("issue_id", issue_id)
|
||||
final_document_id = final_params.get("document_id", document_id)
|
||||
final_new_title = final_params.get("new_title", new_title)
|
||||
final_new_description = final_params.get("new_description", new_description)
|
||||
final_new_state_id = final_params.get("new_state_id", new_state_id)
|
||||
final_new_assignee_id = final_params.get("new_assignee_id", new_assignee_id)
|
||||
final_new_priority = final_params.get("new_priority", new_priority)
|
||||
final_new_label_ids: list[str] | None = final_params.get(
|
||||
"new_label_ids", new_label_ids
|
||||
)
|
||||
final_connector_id = final_params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
|
||||
if not final_connector_id:
|
||||
logger.error("No connector found for this issue")
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this issue.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
logger.error(
|
||||
f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
logger.info(f"Validated Linear connector: id={final_connector_id}")
|
||||
|
||||
logger.info(
|
||||
f"Updating Linear issue with final params: issue_id={final_issue_id}"
|
||||
)
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=final_connector_id
|
||||
)
|
||||
updated_issue = await linear_client.update_issue(
|
||||
issue_id=final_issue_id,
|
||||
title=final_new_title,
|
||||
description=final_new_description,
|
||||
state_id=final_new_state_id,
|
||||
assignee_id=final_new_assignee_id,
|
||||
priority=final_new_priority,
|
||||
label_ids=final_new_label_ids,
|
||||
)
|
||||
|
||||
if updated_issue.get("status") == "error":
|
||||
logger.error(
|
||||
f"Failed to update Linear issue: {updated_issue.get('message')}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": updated_issue.get("message"),
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}"
|
||||
)
|
||||
|
||||
if final_document_id is not None:
|
||||
logger.info(
|
||||
f"Updating knowledge base for document {final_document_id}..."
|
||||
)
|
||||
kb_service = LinearKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_update(
|
||||
document_id=final_document_id,
|
||||
issue_id=final_issue_id,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
logger.info(
|
||||
f"Knowledge base successfully updated for issue {final_issue_id}"
|
||||
)
|
||||
kb_message = " Your knowledge base has also been updated."
|
||||
elif kb_result["status"] == "not_indexed":
|
||||
kb_message = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||
else:
|
||||
logger.warning(
|
||||
f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}"
|
||||
)
|
||||
kb_message = " Your knowledge base will be updated in the next scheduled sync."
|
||||
else:
|
||||
kb_message = ""
|
||||
|
||||
identifier = updated_issue.get("identifier")
|
||||
default_msg = f"Issue {identifier} updated successfully."
|
||||
return {
|
||||
"status": "success",
|
||||
"identifier": identifier,
|
||||
"url": updated_issue.get("url"),
|
||||
"message": f"{updated_issue.get('message', default_msg)}{kb_message}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error updating Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, (ValueError, LinearAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while updating the issue. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return update_linear_issue
|
||||
|
||||
|
||||
def _resolve_state(team: dict, state_name: str | None) -> str | None:
|
||||
if not state_name:
|
||||
return None
|
||||
name_lower = state_name.lower()
|
||||
for state in team.get("states", []):
|
||||
if state.get("name", "").lower() == name_lower:
|
||||
return state["id"]
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None:
|
||||
if not assignee_email:
|
||||
return None
|
||||
email_lower = assignee_email.lower()
|
||||
for member in team.get("members", []):
|
||||
if member.get("email", "").lower() == email_lower:
|
||||
return member["id"]
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None:
|
||||
if label_names is None:
|
||||
return None
|
||||
if not label_names:
|
||||
return []
|
||||
name_set = {n.lower() for n in label_names}
|
||||
return [
|
||||
label["id"]
|
||||
for label in team.get("labels", [])
|
||||
if label.get("name", "").lower() in name_set
|
||||
]
|
||||
|
|
@ -5,7 +5,7 @@ from langchain_core.tools import tool
|
|||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector
|
||||
from app.services.notion import NotionToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -34,7 +34,6 @@ def create_create_notion_page_tool(
|
|||
async def create_notion_page(
|
||||
title: str,
|
||||
content: str,
|
||||
parent_page_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a new page in Notion with the given title and content.
|
||||
|
||||
|
|
@ -45,8 +44,6 @@ def create_create_notion_page_tool(
|
|||
Args:
|
||||
title: The title of the Notion page.
|
||||
content: The markdown content for the page body (supports headings, lists, paragraphs).
|
||||
parent_page_id: Optional parent page ID to create as a subpage.
|
||||
If not provided, will ask for one.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
|
|
@ -58,15 +55,13 @@ def create_create_notion_page_tool(
|
|||
|
||||
IMPORTANT: If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.")
|
||||
and move on. Do NOT ask for parent page IDs, troubleshoot, or suggest alternatives.
|
||||
and move on. Do NOT troubleshoot or suggest alternatives.
|
||||
|
||||
Examples:
|
||||
- "Create a Notion page titled 'Meeting Notes' with content 'Discussed project timeline'"
|
||||
- "Save this to Notion with title 'Research Summary'"
|
||||
"""
|
||||
logger.info(
|
||||
f"create_notion_page called: title='{title}', parent_page_id={parent_page_id}"
|
||||
)
|
||||
logger.info(f"create_notion_page called: title='{title}'")
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
|
|
@ -99,7 +94,7 @@ def create_create_notion_page_tool(
|
|||
"params": {
|
||||
"title": title,
|
||||
"content": content,
|
||||
"parent_page_id": parent_page_id,
|
||||
"parent_page_id": None,
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
},
|
||||
|
|
@ -144,7 +139,7 @@ def create_create_notion_page_tool(
|
|||
|
||||
final_title = final_params.get("title", title)
|
||||
final_content = final_params.get("content", content)
|
||||
final_parent_page_id = final_params.get("parent_page_id", parent_page_id)
|
||||
final_parent_page_id = final_params.get("parent_page_id")
|
||||
final_connector_id = final_params.get("connector_id", connector_id)
|
||||
|
||||
if not final_title or not final_title.strip():
|
||||
|
|
@ -229,11 +224,10 @@ def create_create_notion_page_tool(
|
|||
raise
|
||||
|
||||
logger.error(f"Error creating Notion page: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
if isinstance(e, ValueError)
|
||||
else f"Unexpected error: {e!s}",
|
||||
}
|
||||
if isinstance(e, (ValueError, NotionAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while creating the page. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return create_notion_page
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from langchain_core.tools import tool
|
|||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector
|
||||
from app.services.notion.tool_metadata_service import NotionToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -33,7 +33,7 @@ def create_delete_notion_page_tool(
|
|||
@tool
|
||||
async def delete_notion_page(
|
||||
page_title: str,
|
||||
delete_from_db: bool = False,
|
||||
delete_from_kb: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Delete (archive) a Notion page.
|
||||
|
||||
|
|
@ -43,8 +43,8 @@ def create_delete_notion_page_tool(
|
|||
|
||||
Args:
|
||||
page_title: The title of the Notion page to delete.
|
||||
delete_from_db: Whether to also remove the page from the knowledge base.
|
||||
Default is False (in Notion).
|
||||
delete_from_kb: Whether to also remove the page from the knowledge base.
|
||||
Default is False.
|
||||
Set to True to permanently remove from both Notion and knowledge base.
|
||||
|
||||
Returns:
|
||||
|
|
@ -52,7 +52,7 @@ def create_delete_notion_page_tool(
|
|||
- status: "success", "rejected", "not_found", or "error"
|
||||
- page_id: Deleted page ID (if success)
|
||||
- message: Success or error message
|
||||
- deleted_from_db: Whether the page was also removed from knowledge base (if success)
|
||||
- deleted_from_kb: Whether the page was also removed from knowledge base (if success)
|
||||
|
||||
Examples:
|
||||
- "Delete the 'Meeting Notes' Notion page"
|
||||
|
|
@ -60,7 +60,7 @@ def create_delete_notion_page_tool(
|
|||
- "Archive the 'Draft Ideas' Notion page"
|
||||
"""
|
||||
logger.info(
|
||||
f"delete_notion_page called: page_title='{page_title}', delete_from_db={delete_from_db}"
|
||||
f"delete_notion_page called: page_title='{page_title}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
|
|
@ -100,7 +100,7 @@ def create_delete_notion_page_tool(
|
|||
document_id = context.get("document_id")
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_db={delete_from_db})"
|
||||
f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_kb={delete_from_kb})"
|
||||
)
|
||||
|
||||
# Request approval before deleting
|
||||
|
|
@ -112,7 +112,7 @@ def create_delete_notion_page_tool(
|
|||
"params": {
|
||||
"page_id": page_id,
|
||||
"connector_id": connector_id_from_context,
|
||||
"delete_from_db": delete_from_db,
|
||||
"delete_from_kb": delete_from_kb,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
|
|
@ -159,10 +159,10 @@ def create_delete_notion_page_tool(
|
|||
final_connector_id = final_params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
final_delete_from_db = final_params.get("delete_from_db", delete_from_db)
|
||||
final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb)
|
||||
|
||||
logger.info(
|
||||
f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_db={final_delete_from_db}"
|
||||
f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}"
|
||||
)
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
|
@ -211,11 +211,11 @@ def create_delete_notion_page_tool(
|
|||
f"delete_page result: {result.get('status')} - {result.get('message', '')}"
|
||||
)
|
||||
|
||||
# If deletion was successful and user wants to delete from DB
|
||||
deleted_from_db = False
|
||||
# If deletion was successful and user wants to delete from KB
|
||||
deleted_from_kb = False
|
||||
if (
|
||||
result.get("status") == "success"
|
||||
and final_delete_from_db
|
||||
and final_delete_from_kb
|
||||
and document_id
|
||||
):
|
||||
try:
|
||||
|
|
@ -232,24 +232,23 @@ def create_delete_notion_page_tool(
|
|||
if document:
|
||||
await db_session.delete(document)
|
||||
await db_session.commit()
|
||||
deleted_from_db = True
|
||||
deleted_from_kb = True
|
||||
logger.info(
|
||||
f"Deleted document {document_id} from knowledge base"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Document {document_id} not found in DB")
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document from DB: {e}")
|
||||
# Don't fail the whole operation if DB deletion fails
|
||||
# The page is already deleted from Notion, so inform the user
|
||||
logger.error(f"Failed to delete document from KB: {e}")
|
||||
await db_session.rollback()
|
||||
result["warning"] = (
|
||||
f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}"
|
||||
)
|
||||
|
||||
# Update result with DB deletion status
|
||||
# Update result with KB deletion status
|
||||
if result.get("status") == "success":
|
||||
result["deleted_from_db"] = deleted_from_db
|
||||
if deleted_from_db:
|
||||
result["deleted_from_kb"] = deleted_from_kb
|
||||
if deleted_from_kb:
|
||||
result["message"] = (
|
||||
f"{result.get('message', '')} (also removed from knowledge base)"
|
||||
)
|
||||
|
|
@ -263,11 +262,10 @@ def create_delete_notion_page_tool(
|
|||
raise
|
||||
|
||||
logger.error(f"Error deleting Notion page: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
if isinstance(e, ValueError)
|
||||
else f"Unexpected error: {e!s}",
|
||||
}
|
||||
if isinstance(e, (ValueError, NotionAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while deleting the page. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return delete_notion_page
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from langchain_core.tools import tool
|
|||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector
|
||||
from app.services.notion import NotionToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -108,6 +108,7 @@ def create_update_notion_page_tool(
|
|||
}
|
||||
|
||||
page_id = context.get("page_id")
|
||||
document_id = context.get("document_id")
|
||||
connector_id_from_context = context.get("account", {}).get("id")
|
||||
|
||||
logger.info(
|
||||
|
|
@ -218,6 +219,39 @@ def create_update_notion_page_tool(
|
|||
logger.info(
|
||||
f"update_page result: {result.get('status')} - {result.get('message', '')}"
|
||||
)
|
||||
|
||||
if result.get("status") == "success" and document_id is not None:
|
||||
from app.services.notion import NotionKBSyncService
|
||||
|
||||
logger.info(f"Updating knowledge base for document {document_id}...")
|
||||
kb_service = NotionKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_update(
|
||||
document_id=document_id,
|
||||
appended_content=final_content,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
appended_block_ids=result.get("appended_block_ids"),
|
||||
)
|
||||
|
||||
if kb_result["status"] == "success":
|
||||
result["message"] = (
|
||||
f"{result['message']}. Your knowledge base has also been updated."
|
||||
)
|
||||
logger.info(
|
||||
f"Knowledge base successfully updated for page {final_page_id}"
|
||||
)
|
||||
elif kb_result["status"] == "not_indexed":
|
||||
result["message"] = (
|
||||
f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync."
|
||||
)
|
||||
else:
|
||||
result["message"] = (
|
||||
f"{result['message']}. Your knowledge base will be updated in the next scheduled sync."
|
||||
)
|
||||
logger.warning(
|
||||
f"KB update failed for page {final_page_id}: {kb_result['message']}"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -227,11 +261,10 @@ def create_update_notion_page_tool(
|
|||
raise
|
||||
|
||||
logger.error(f"Error updating Notion page: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
if isinstance(e, ValueError)
|
||||
else f"Unexpected error: {e!s}",
|
||||
}
|
||||
if isinstance(e, (ValueError, NotionAPIError)):
|
||||
message = str(e)
|
||||
else:
|
||||
message = "Something went wrong while updating the page. Please try again."
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return update_notion_page
|
||||
|
|
|
|||
|
|
@ -48,6 +48,11 @@ from app.db import ChatVisibility
|
|||
from .display_image import create_display_image_tool
|
||||
from .generate_image import create_generate_image_tool
|
||||
from .knowledge_base import create_search_knowledge_base_tool
|
||||
from .linear import (
|
||||
create_create_linear_issue_tool,
|
||||
create_delete_linear_issue_tool,
|
||||
create_update_linear_issue_tool,
|
||||
)
|
||||
from .link_preview import create_link_preview_tool
|
||||
from .mcp_tool import load_mcp_tools
|
||||
from .notion import (
|
||||
|
|
@ -216,6 +221,39 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
|
|||
requires=["user_id", "search_space_id", "db_session", "thread_visibility"],
|
||||
),
|
||||
# =========================================================================
|
||||
# LINEAR TOOLS - create, update, delete issues
|
||||
# =========================================================================
|
||||
ToolDefinition(
|
||||
name="create_linear_issue",
|
||||
description="Create a new issue in the user's Linear workspace",
|
||||
factory=lambda deps: create_create_linear_issue_tool(
|
||||
db_session=deps["db_session"],
|
||||
search_space_id=deps["search_space_id"],
|
||||
user_id=deps["user_id"],
|
||||
),
|
||||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
ToolDefinition(
|
||||
name="update_linear_issue",
|
||||
description="Update an existing indexed Linear issue",
|
||||
factory=lambda deps: create_update_linear_issue_tool(
|
||||
db_session=deps["db_session"],
|
||||
search_space_id=deps["search_space_id"],
|
||||
user_id=deps["user_id"],
|
||||
),
|
||||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
ToolDefinition(
|
||||
name="delete_linear_issue",
|
||||
description="Archive (delete) an existing indexed Linear issue",
|
||||
factory=lambda deps: create_delete_linear_issue_tool(
|
||||
db_session=deps["db_session"],
|
||||
search_space_id=deps["search_space_id"],
|
||||
user_id=deps["user_id"],
|
||||
),
|
||||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
# =========================================================================
|
||||
# NOTION TOOLS - create, update, delete pages
|
||||
# =========================================================================
|
||||
ToolDefinition(
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ from datetime import datetime
|
|||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import requests
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
|
|
@ -23,6 +22,14 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
LINEAR_GRAPHQL_URL = "https://api.linear.app/graphql"
|
||||
|
||||
|
||||
class LinearAPIError(Exception):
|
||||
"""Raised when the Linear API returns a non-200 response.
|
||||
|
||||
The message is always user-presentable; callers should surface it directly
|
||||
without any additional prefix or wrapping.
|
||||
"""
|
||||
|
||||
ORGANIZATION_QUERY = """
|
||||
query {
|
||||
organization {
|
||||
|
|
@ -244,6 +251,37 @@ class LinearConnector:
|
|||
"Authorization": f"Bearer {self._credentials.access_token}",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _raise_api_error(status_code: int, body: str) -> None:
|
||||
"""Parse a non-200 Linear API response and raise a clean exception.
|
||||
|
||||
Translates known Linear error codes into user-readable messages so that
|
||||
raw GraphQL payloads never reach the end user.
|
||||
"""
|
||||
import json as _json
|
||||
|
||||
friendly = None
|
||||
try:
|
||||
payload = _json.loads(body)
|
||||
errors = payload.get("errors", [])
|
||||
if errors:
|
||||
ext = errors[0].get("extensions", {})
|
||||
code = ext.get("code", "")
|
||||
if code == "INPUT_ERROR" and "too complex" in errors[0].get("message", "").lower():
|
||||
friendly = (
|
||||
"Linear rejected the request because the workspace is too large "
|
||||
"to fetch in one query. Please try again — if the problem persists, "
|
||||
"contact support."
|
||||
)
|
||||
elif ext.get("userPresentableMessage"):
|
||||
friendly = ext["userPresentableMessage"]
|
||||
elif errors[0].get("message"):
|
||||
friendly = errors[0]["message"]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
raise LinearAPIError(friendly or f"Linear API error (HTTP {status_code})")
|
||||
|
||||
async def execute_graphql_query(
|
||||
self, query: str, variables: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
|
|
@ -274,14 +312,15 @@ class LinearConnector:
|
|||
if variables:
|
||||
payload["variables"] = variables
|
||||
|
||||
response = requests.post(self.api_url, headers=headers, json=payload)
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
self.api_url, headers=headers, json=payload, timeout=30.0
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
raise Exception(
|
||||
f"Query failed with status code {response.status_code}: {response.text}"
|
||||
)
|
||||
self._raise_api_error(response.status_code, response.text)
|
||||
|
||||
async def get_all_issues(
|
||||
self, include_comments: bool = True
|
||||
|
|
@ -588,6 +627,148 @@ class LinearConnector:
|
|||
|
||||
return formatted
|
||||
|
||||
async def create_issue(
|
||||
self,
|
||||
team_id: str,
|
||||
title: str,
|
||||
description: str | None = None,
|
||||
state_id: str | None = None,
|
||||
assignee_id: str | None = None,
|
||||
priority: int | None = None,
|
||||
label_ids: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
try:
|
||||
mutation = """
|
||||
mutation IssueCreate($input: IssueCreateInput!) {
|
||||
issueCreate(input: $input) {
|
||||
success
|
||||
issue { id identifier title url }
|
||||
}
|
||||
}
|
||||
"""
|
||||
input_data: dict[str, Any] = {"teamId": team_id, "title": title}
|
||||
if description is not None:
|
||||
input_data["description"] = description
|
||||
if state_id is not None:
|
||||
input_data["stateId"] = state_id
|
||||
if assignee_id is not None:
|
||||
input_data["assigneeId"] = assignee_id
|
||||
if priority is not None:
|
||||
input_data["priority"] = priority
|
||||
if label_ids:
|
||||
input_data["labelIds"] = label_ids
|
||||
|
||||
result = await self.execute_graphql_query(mutation, {"input": input_data})
|
||||
payload = result.get("data", {}).get("issueCreate", {})
|
||||
if not payload.get("success"):
|
||||
errors = result.get("errors", [])
|
||||
msg = (
|
||||
errors[0].get("message", "Unknown error")
|
||||
if errors
|
||||
else "Unknown error"
|
||||
)
|
||||
return {"status": "error", "message": f"issueCreate failed: {msg}"}
|
||||
issue = payload.get("issue", {})
|
||||
return {
|
||||
"status": "success",
|
||||
"id": issue.get("id"),
|
||||
"identifier": issue.get("identifier"),
|
||||
"title": issue.get("title"),
|
||||
"url": issue.get("url"),
|
||||
"message": f"Issue {issue.get('identifier')} created successfully.",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating Linear issue: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
async def update_issue(
|
||||
self,
|
||||
issue_id: str,
|
||||
title: str | None = None,
|
||||
description: str | None = None,
|
||||
state_id: str | None = None,
|
||||
assignee_id: str | None = None,
|
||||
priority: int | None = None,
|
||||
label_ids: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
try:
|
||||
mutation = """
|
||||
mutation IssueUpdate($id: String!, $input: IssueUpdateInput!) {
|
||||
issueUpdate(id: $id, input: $input) {
|
||||
success
|
||||
issue { id identifier title url }
|
||||
}
|
||||
}
|
||||
"""
|
||||
input_data: dict[str, Any] = {}
|
||||
if title is not None:
|
||||
input_data["title"] = title
|
||||
if description is not None:
|
||||
input_data["description"] = description
|
||||
if state_id is not None:
|
||||
input_data["stateId"] = state_id
|
||||
if assignee_id is not None:
|
||||
input_data["assigneeId"] = assignee_id
|
||||
if priority is not None:
|
||||
input_data["priority"] = priority
|
||||
if label_ids is not None:
|
||||
input_data["labelIds"] = label_ids
|
||||
|
||||
if not input_data:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No fields provided for update. Please specify at least one field to change.",
|
||||
}
|
||||
|
||||
result = await self.execute_graphql_query(
|
||||
mutation, {"id": issue_id, "input": input_data}
|
||||
)
|
||||
payload = result.get("data", {}).get("issueUpdate", {})
|
||||
if not payload.get("success"):
|
||||
errors = result.get("errors", [])
|
||||
msg = (
|
||||
errors[0].get("message", "Unknown error")
|
||||
if errors
|
||||
else "Unknown error"
|
||||
)
|
||||
return {"status": "error", "message": f"issueUpdate failed: {msg}"}
|
||||
issue = payload.get("issue", {})
|
||||
return {
|
||||
"status": "success",
|
||||
"id": issue.get("id"),
|
||||
"identifier": issue.get("identifier"),
|
||||
"title": issue.get("title"),
|
||||
"url": issue.get("url"),
|
||||
"message": f"Issue {issue.get('identifier')} updated successfully.",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating Linear issue: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
async def archive_issue(self, issue_id: str) -> dict[str, Any]:
|
||||
try:
|
||||
mutation = """
|
||||
mutation IssueArchive($id: String!) {
|
||||
issueArchive(id: $id) {
|
||||
success
|
||||
}
|
||||
}
|
||||
"""
|
||||
result = await self.execute_graphql_query(mutation, {"id": issue_id})
|
||||
payload = result.get("data", {}).get("issueArchive", {})
|
||||
if not payload.get("success"):
|
||||
errors = result.get("errors", [])
|
||||
msg = (
|
||||
errors[0].get("message", "Unknown error")
|
||||
if errors
|
||||
else "Unknown error"
|
||||
)
|
||||
return {"status": "error", "message": f"issueArchive failed: {msg}"}
|
||||
return {"status": "success", "message": "Issue archived successfully."}
|
||||
except Exception as e:
|
||||
logger.error(f"Error archiving Linear issue: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def format_issue_to_markdown(self, issue: dict[str, Any]) -> str:
|
||||
"""
|
||||
Convert an issue to markdown format.
|
||||
|
|
|
|||
|
|
@ -17,6 +17,15 @@ from app.utils.oauth_security import TokenEncryption
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotionAPIError(Exception):
|
||||
"""Raised when the Notion API returns a non-200 response.
|
||||
|
||||
The message is always user-presentable; callers should surface it directly
|
||||
without any additional prefix or wrapping.
|
||||
"""
|
||||
|
||||
|
||||
# Type variable for generic return type
|
||||
T = TypeVar("T")
|
||||
|
||||
|
|
@ -250,8 +259,9 @@ class NotionHistoryConnector:
|
|||
logger.error(
|
||||
f"Failed to refresh Notion token for connector {self._connector_id}: {e!s}"
|
||||
)
|
||||
raise Exception(
|
||||
f"Failed to refresh Notion OAuth credentials: {e!s}"
|
||||
raise NotionAPIError(
|
||||
"Failed to refresh your Notion connection. "
|
||||
"Please try again or reconnect your Notion account."
|
||||
) from e
|
||||
|
||||
return self._credentials.access_token
|
||||
|
|
@ -1041,7 +1051,7 @@ class NotionHistoryConnector:
|
|||
try:
|
||||
notion = await self._get_client()
|
||||
|
||||
# Append content if provided
|
||||
appended_block_ids = []
|
||||
if content:
|
||||
# Convert new content to blocks
|
||||
try:
|
||||
|
|
@ -1065,14 +1075,23 @@ class NotionHistoryConnector:
|
|||
try:
|
||||
for i in range(0, len(children), 100):
|
||||
batch = children[i : i + 100]
|
||||
await self._api_call_with_retry(
|
||||
response = await self._api_call_with_retry(
|
||||
notion.blocks.children.append,
|
||||
block_id=page_id,
|
||||
children=batch,
|
||||
)
|
||||
batch_block_ids = [
|
||||
block["id"] for block in response.get("results", [])
|
||||
]
|
||||
appended_block_ids.extend(batch_block_ids)
|
||||
logger.info(
|
||||
f"Successfully appended {len(children)} new blocks to page {page_id}"
|
||||
)
|
||||
logger.debug(
|
||||
f"Appended block IDs: {appended_block_ids[:5]}..."
|
||||
if len(appended_block_ids) > 5
|
||||
else f"Appended block IDs: {appended_block_ids}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to append content blocks: {e}")
|
||||
return {
|
||||
|
|
@ -1092,6 +1111,7 @@ class NotionHistoryConnector:
|
|||
"page_id": page_id,
|
||||
"url": page_url,
|
||||
"title": page_title,
|
||||
"appended_block_ids": appended_block_ids,
|
||||
"message": f"Updated Notion page '{page_title}' (content appended)",
|
||||
}
|
||||
|
||||
|
|
|
|||
13
surfsense_backend/app/services/linear/__init__.py
Normal file
13
surfsense_backend/app/services/linear/__init__.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from app.services.linear.kb_sync_service import LinearKBSyncService
|
||||
from app.services.linear.tool_metadata_service import (
|
||||
LinearIssue,
|
||||
LinearToolMetadataService,
|
||||
LinearWorkspace,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"LinearIssue",
|
||||
"LinearKBSyncService",
|
||||
"LinearToolMetadataService",
|
||||
"LinearWorkspace",
|
||||
]
|
||||
182
surfsense_backend/app/services/linear/kb_sync_service.py
Normal file
182
surfsense_backend/app/services/linear/kb_sync_service.py
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.linear_connector import LinearConnector
|
||||
from app.db import Chunk, Document
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LinearKBSyncService:
|
||||
"""Re-indexes a single Linear issue document after a successful update.
|
||||
|
||||
Mirrors the indexer's Phase-2 logic exactly: fetch fresh issue content,
|
||||
run generate_document_summary, create_document_chunks, then update the
|
||||
document row in the knowledge base.
|
||||
"""
|
||||
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self.db_session = db_session
|
||||
|
||||
async def sync_after_update(
|
||||
self,
|
||||
document_id: int,
|
||||
issue_id: str,
|
||||
user_id: str,
|
||||
search_space_id: int,
|
||||
) -> dict:
|
||||
"""Re-index a Linear issue document after it has been updated via the API.
|
||||
|
||||
Args:
|
||||
document_id: The KB document ID to update.
|
||||
issue_id: The Linear issue UUID to fetch fresh content from.
|
||||
user_id: Used to select the correct LLM configuration.
|
||||
search_space_id: Used to select the correct LLM configuration.
|
||||
|
||||
Returns:
|
||||
dict with 'status': 'success' | 'not_indexed' | 'error'.
|
||||
"""
|
||||
from app.tasks.connector_indexers.base import (
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
try:
|
||||
document = await self.db_session.get(Document, document_id)
|
||||
if not document:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
return {"status": "not_indexed"}
|
||||
|
||||
connector_id = document.connector_id
|
||||
if not connector_id:
|
||||
return {"status": "error", "message": "Document has no connector_id"}
|
||||
|
||||
linear_client = LinearConnector(
|
||||
session=self.db_session, connector_id=connector_id
|
||||
)
|
||||
|
||||
issue_raw = await self._fetch_issue(linear_client, issue_id)
|
||||
if not issue_raw:
|
||||
return {"status": "error", "message": "Issue not found in Linear API"}
|
||||
|
||||
formatted_issue = linear_client.format_issue(issue_raw)
|
||||
issue_content = linear_client.format_issue_to_markdown(formatted_issue)
|
||||
|
||||
if not issue_content:
|
||||
return {"status": "error", "message": "Issue produced empty content"}
|
||||
|
||||
issue_identifier = formatted_issue.get("identifier", "")
|
||||
issue_title = formatted_issue.get("title", "")
|
||||
state = formatted_issue.get("state", "Unknown")
|
||||
priority = issue_raw.get("priorityLabel", "Unknown")
|
||||
comment_count = len(formatted_issue.get("comments", []))
|
||||
description = formatted_issue.get("description", "")
|
||||
|
||||
user_llm = await get_user_long_context_llm(
|
||||
self.db_session, user_id, search_space_id, disable_streaming=True
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata_for_summary = {
|
||||
"issue_id": issue_identifier,
|
||||
"issue_title": issue_title,
|
||||
"state": state,
|
||||
"priority": priority,
|
||||
"comment_count": comment_count,
|
||||
"document_type": "Linear Issue",
|
||||
"connector_type": "Linear",
|
||||
}
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
issue_content, user_llm, document_metadata_for_summary
|
||||
)
|
||||
else:
|
||||
if description and len(description) > 1000:
|
||||
description = description[:997] + "..."
|
||||
summary_content = (
|
||||
f"Linear Issue {issue_identifier}: {issue_title}\n\n"
|
||||
f"Status: {state}\n\n"
|
||||
)
|
||||
if description:
|
||||
summary_content += f"Description: {description}\n\n"
|
||||
summary_content += f"Comments: {comment_count}"
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
await self.db_session.execute(
|
||||
delete(Chunk).where(Chunk.document_id == document.id)
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(issue_content)
|
||||
|
||||
document.title = f"{issue_identifier}: {issue_title}"
|
||||
document.content = summary_content
|
||||
document.content_hash = generate_content_hash(
|
||||
issue_content, search_space_id
|
||||
)
|
||||
document.embedding = summary_embedding
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
document.document_metadata = {
|
||||
**(document.document_metadata or {}),
|
||||
"issue_id": issue_id,
|
||||
"issue_identifier": issue_identifier,
|
||||
"issue_title": issue_title,
|
||||
"state": state,
|
||||
"priority": priority,
|
||||
"comment_count": comment_count,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"connector_id": connector_id,
|
||||
}
|
||||
flag_modified(document, "document_metadata")
|
||||
safe_set_chunks(document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
|
||||
await self.db_session.commit()
|
||||
|
||||
logger.info(
|
||||
f"KB sync successful for document {document_id} "
|
||||
f"({issue_identifier}: {issue_title})"
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"KB sync failed for document {document_id}: {e}", exc_info=True
|
||||
)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@staticmethod
|
||||
async def _fetch_issue(client: LinearConnector, issue_id: str) -> dict | None:
|
||||
"""Fetch a full issue from Linear, matching the fields used by format_issue."""
|
||||
query = """
|
||||
query LinearIssueSync($id: String!) {
|
||||
issue(id: $id) {
|
||||
id identifier title description priority priorityLabel
|
||||
createdAt updatedAt url
|
||||
state { id name type color }
|
||||
creator { id name email }
|
||||
assignee { id name email }
|
||||
comments {
|
||||
nodes {
|
||||
id body createdAt updatedAt
|
||||
user { id name email }
|
||||
}
|
||||
}
|
||||
team { id name key }
|
||||
}
|
||||
}
|
||||
"""
|
||||
result = await client.execute_graphql_query(query, {"id": issue_id})
|
||||
return (result.get("data") or {}).get("issue")
|
||||
360
surfsense_backend/app/services/linear/tool_metadata_service.py
Normal file
360
surfsense_backend/app/services/linear/tool_metadata_service.py
Normal file
|
|
@ -0,0 +1,360 @@
|
|||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import and_, func, or_
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.connectors.linear_connector import LinearConnector
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinearWorkspace:
|
||||
"""Represents a Linear connector as a workspace for tool context."""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
organization_name: str
|
||||
|
||||
@classmethod
|
||||
def from_connector(cls, connector: SearchSourceConnector) -> "LinearWorkspace":
|
||||
return cls(
|
||||
id=connector.id,
|
||||
name=connector.name,
|
||||
organization_name=connector.config.get(
|
||||
"organization_name", "Linear Workspace"
|
||||
),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"organization_name": self.organization_name,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinearIssue:
|
||||
"""Represents an indexed Linear issue resolved from the knowledge base."""
|
||||
|
||||
id: str
|
||||
identifier: str
|
||||
title: str
|
||||
state: str
|
||||
connector_id: int
|
||||
document_id: int
|
||||
indexed_at: str | None
|
||||
|
||||
@classmethod
|
||||
def from_document(cls, document: Document) -> "LinearIssue":
|
||||
meta = document.document_metadata or {}
|
||||
return cls(
|
||||
id=meta.get("issue_id", ""),
|
||||
identifier=meta.get("issue_identifier", ""),
|
||||
title=meta.get("issue_title", document.title),
|
||||
state=meta.get("state", ""),
|
||||
connector_id=document.connector_id,
|
||||
document_id=document.id,
|
||||
indexed_at=meta.get("indexed_at"),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"identifier": self.identifier,
|
||||
"title": self.title,
|
||||
"state": self.state,
|
||||
"connector_id": self.connector_id,
|
||||
"document_id": self.document_id,
|
||||
"indexed_at": self.indexed_at,
|
||||
}
|
||||
|
||||
|
||||
class LinearToolMetadataService:
|
||||
"""Builds interrupt context for Linear HITL tools.
|
||||
|
||||
All context queries (GraphQL reads) live here.
|
||||
Write mutations live in LinearConnector.
|
||||
"""
|
||||
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self._db_session = db_session
|
||||
|
||||
async def get_creation_context(self, search_space_id: int, user_id: str) -> dict:
|
||||
"""Return context needed to create a new Linear issue.
|
||||
|
||||
Fetches all connected Linear workspaces, and for each one fetches
|
||||
its teams with states, members, and labels from the Linear API.
|
||||
|
||||
Returns a dict with key: workspaces (each entry has id, name, organization_name, teams, priorities).
|
||||
Returns a dict with key 'error' on failure.
|
||||
"""
|
||||
connectors = await self._get_all_linear_connectors(search_space_id, user_id)
|
||||
if not connectors:
|
||||
return {"error": "No Linear account connected"}
|
||||
|
||||
workspaces = []
|
||||
for connector in connectors:
|
||||
workspace = LinearWorkspace.from_connector(connector)
|
||||
linear_client = LinearConnector(
|
||||
session=self._db_session, connector_id=connector.id
|
||||
)
|
||||
try:
|
||||
priorities = await self._fetch_priority_values(linear_client)
|
||||
teams = await self._fetch_teams_context(linear_client)
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to fetch Linear context: {e!s}"}
|
||||
workspaces.append({
|
||||
"id": workspace.id,
|
||||
"name": workspace.name,
|
||||
"organization_name": workspace.organization_name,
|
||||
"teams": teams,
|
||||
"priorities": priorities,
|
||||
})
|
||||
|
||||
return {"workspaces": workspaces}
|
||||
|
||||
async def get_update_context(
|
||||
self, search_space_id: int, user_id: str, issue_ref: str
|
||||
) -> dict:
|
||||
"""Return context needed to update an indexed Linear issue.
|
||||
|
||||
Resolves the issue from the KB (title → identifier → full title),
|
||||
then fetches its current state, assignee, labels, and team context
|
||||
from the Linear API.
|
||||
|
||||
Returns a dict with keys: workspace, priorities, issue, team.
|
||||
Returns a dict with key 'error' if the issue is not found or API fails.
|
||||
"""
|
||||
document = await self._resolve_issue(search_space_id, user_id, issue_ref)
|
||||
if not document:
|
||||
return {
|
||||
"error": f"Issue '{issue_ref}' not found in your indexed Linear issues. "
|
||||
"This could mean: (1) the issue doesn't exist, (2) it hasn't been indexed yet, "
|
||||
"or (3) the title or identifier is different."
|
||||
}
|
||||
|
||||
connector = await self._get_connector_for_document(document, user_id)
|
||||
if not connector:
|
||||
return {"error": "Connector not found or access denied"}
|
||||
|
||||
workspace = LinearWorkspace.from_connector(connector)
|
||||
issue = LinearIssue.from_document(document)
|
||||
|
||||
linear_client = LinearConnector(
|
||||
session=self._db_session, connector_id=connector.id
|
||||
)
|
||||
|
||||
try:
|
||||
priorities = await self._fetch_priority_values(linear_client)
|
||||
issue_api = await self._fetch_issue_context(linear_client, issue.id)
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to fetch Linear issue context: {e!s}"}
|
||||
|
||||
if not issue_api:
|
||||
return {
|
||||
"error": f"Issue '{issue_ref}' could not be fetched from Linear API"
|
||||
}
|
||||
|
||||
team_raw = issue_api.get("team") or {}
|
||||
labels_raw = issue_api.get("labels") or {}
|
||||
states_raw = team_raw.get("states") or {}
|
||||
members_raw = team_raw.get("members") or {}
|
||||
team_labels_raw = team_raw.get("labels") or {}
|
||||
|
||||
return {
|
||||
"workspace": workspace.to_dict(),
|
||||
"priorities": priorities,
|
||||
"issue": {
|
||||
"id": issue_api.get("id"),
|
||||
"identifier": issue_api.get("identifier"),
|
||||
"title": issue_api.get("title"),
|
||||
"description": issue_api.get("description"),
|
||||
"priority": issue_api.get("priority"),
|
||||
"url": issue_api.get("url"),
|
||||
"current_state": issue_api.get("state"),
|
||||
"current_assignee": issue_api.get("assignee"),
|
||||
"current_labels": labels_raw.get("nodes", []),
|
||||
"team_id": team_raw.get("id"),
|
||||
"document_id": issue.document_id,
|
||||
"indexed_at": issue.indexed_at,
|
||||
},
|
||||
"team": {
|
||||
"id": team_raw.get("id"),
|
||||
"name": team_raw.get("name"),
|
||||
"key": team_raw.get("key"),
|
||||
"states": states_raw.get("nodes", []),
|
||||
"members": members_raw.get("nodes", []),
|
||||
"labels": team_labels_raw.get("nodes", []),
|
||||
},
|
||||
}
|
||||
|
||||
async def get_delete_context(
|
||||
self, search_space_id: int, user_id: str, issue_ref: str
|
||||
) -> dict:
|
||||
"""Return context needed to archive an indexed Linear issue.
|
||||
|
||||
Resolves the issue from the KB only — no Linear API call required.
|
||||
|
||||
Returns a dict with keys: workspace, issue.
|
||||
Returns a dict with key 'error' if the issue is not found.
|
||||
"""
|
||||
document = await self._resolve_issue(search_space_id, user_id, issue_ref)
|
||||
if not document:
|
||||
return {
|
||||
"error": f"Issue '{issue_ref}' not found in your indexed Linear issues. "
|
||||
"This could mean: (1) the issue doesn't exist, (2) it hasn't been indexed yet, "
|
||||
"or (3) the title or identifier is different."
|
||||
}
|
||||
|
||||
connector = await self._get_connector_for_document(document, user_id)
|
||||
if not connector:
|
||||
return {"error": "Connector not found or access denied"}
|
||||
|
||||
workspace = LinearWorkspace.from_connector(connector)
|
||||
issue = LinearIssue.from_document(document)
|
||||
|
||||
return {
|
||||
"workspace": workspace.to_dict(),
|
||||
"issue": issue.to_dict(),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
async def _fetch_priority_values(client: LinearConnector) -> list[dict]:
|
||||
"""Fetch Linear priority values (0-4) with their display labels."""
|
||||
query = "{ issuePriorityValues { priority label } }"
|
||||
result = await client.execute_graphql_query(query)
|
||||
return result.get("data", {}).get("issuePriorityValues", [])
|
||||
|
||||
@staticmethod
|
||||
async def _fetch_teams_context(client: LinearConnector) -> list[dict]:
|
||||
"""Fetch all teams with their states, members, and labels."""
|
||||
query = """
|
||||
query {
|
||||
teams(first: 25) {
|
||||
nodes {
|
||||
id name key
|
||||
states { nodes { id name type color position } }
|
||||
members { nodes { id name displayName email avatarUrl active } }
|
||||
labels { nodes { id name color } }
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
result = await client.execute_graphql_query(query)
|
||||
raw_teams = result.get("data", {}).get("teams", {}).get("nodes", [])
|
||||
|
||||
return [
|
||||
{
|
||||
"id": t.get("id"),
|
||||
"name": t.get("name"),
|
||||
"key": t.get("key"),
|
||||
"states": (t.get("states") or {}).get("nodes", []),
|
||||
"members": (t.get("members") or {}).get("nodes", []),
|
||||
"labels": (t.get("labels") or {}).get("nodes", []),
|
||||
}
|
||||
for t in raw_teams
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
async def _fetch_issue_context(
|
||||
client: LinearConnector, issue_id: str
|
||||
) -> dict | None:
|
||||
"""Fetch a single issue with its current state, assignee, labels, and team context."""
|
||||
query = """
|
||||
query LinearIssueContext($id: String!) {
|
||||
issue(id: $id) {
|
||||
id identifier title description priority url
|
||||
state { id name type color }
|
||||
assignee { id name displayName email }
|
||||
labels { nodes { id name color } }
|
||||
team {
|
||||
id name key
|
||||
states { nodes { id name type color position } }
|
||||
members { nodes { id name displayName email avatarUrl active } }
|
||||
labels { nodes { id name color } }
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
result = await client.execute_graphql_query(query, {"id": issue_id})
|
||||
return result.get("data", {}).get("issue")
|
||||
|
||||
async def _resolve_issue(
|
||||
self, search_space_id: int, user_id: str, issue_ref: str
|
||||
) -> Document | None:
|
||||
"""Resolve an issue from the KB using a 3-step fallback.
|
||||
|
||||
Order: issue_title (most natural) → issue_identifier (e.g. ENG-42) → document.title.
|
||||
All comparisons are case-insensitive.
|
||||
"""
|
||||
ref_lower = issue_ref.lower()
|
||||
|
||||
result = await self._db_session.execute(
|
||||
select(Document)
|
||||
.join(
|
||||
SearchSourceConnector, Document.connector_id == SearchSourceConnector.id
|
||||
)
|
||||
.filter(
|
||||
and_(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.LINEAR_CONNECTOR,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
or_(
|
||||
func.lower(
|
||||
Document.document_metadata.op("->>")(
|
||||
"issue_title"
|
||||
)
|
||||
)
|
||||
== ref_lower,
|
||||
func.lower(
|
||||
Document.document_metadata.op("->>")(
|
||||
"issue_identifier"
|
||||
)
|
||||
)
|
||||
== ref_lower,
|
||||
func.lower(Document.title) == ref_lower,
|
||||
),
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
async def _get_all_linear_connectors(
|
||||
self, search_space_id: int, user_id: str
|
||||
) -> list[SearchSourceConnector]:
|
||||
"""Fetch all Linear connectors for the given search space and user."""
|
||||
result = await self._db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalars().all()
|
||||
|
||||
async def _get_connector_for_document(
|
||||
self, document: Document, user_id: str
|
||||
) -> SearchSourceConnector | None:
|
||||
"""Fetch the connector associated with a document, scoped to the user."""
|
||||
if not document.connector_id:
|
||||
return None
|
||||
result = await self._db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.id == document.connector_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
|
@ -162,7 +162,7 @@ async def validate_llm_config(
|
|||
|
||||
|
||||
async def get_search_space_llm_instance(
|
||||
session: AsyncSession, search_space_id: int, role: str
|
||||
session: AsyncSession, search_space_id: int, role: str, disable_streaming: bool = False
|
||||
) -> ChatLiteLLM | ChatLiteLLMRouter | None:
|
||||
"""
|
||||
Get a ChatLiteLLM instance for a specific search space and role.
|
||||
|
|
@ -218,7 +218,7 @@ async def get_search_space_llm_instance(
|
|||
logger.debug(
|
||||
f"Using Auto mode (LLM Router) for search space {search_space_id}, role {role}"
|
||||
)
|
||||
return ChatLiteLLMRouter()
|
||||
return ChatLiteLLMRouter(disable_streaming=disable_streaming)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create ChatLiteLLMRouter: {e}")
|
||||
return None
|
||||
|
|
@ -284,6 +284,9 @@ async def get_search_space_llm_instance(
|
|||
if global_config.get("litellm_params"):
|
||||
litellm_kwargs.update(global_config["litellm_params"])
|
||||
|
||||
if disable_streaming:
|
||||
litellm_kwargs["disable_streaming"] = True
|
||||
|
||||
return ChatLiteLLM(**litellm_kwargs)
|
||||
|
||||
# Get the LLM configuration from database (NewLLMConfig)
|
||||
|
|
@ -357,6 +360,9 @@ async def get_search_space_llm_instance(
|
|||
if llm_config.litellm_params:
|
||||
litellm_kwargs.update(llm_config.litellm_params)
|
||||
|
||||
if disable_streaming:
|
||||
litellm_kwargs["disable_streaming"] = True
|
||||
|
||||
return ChatLiteLLM(**litellm_kwargs)
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -374,20 +380,20 @@ async def get_agent_llm(
|
|||
|
||||
|
||||
async def get_document_summary_llm(
|
||||
session: AsyncSession, search_space_id: int
|
||||
session: AsyncSession, search_space_id: int, disable_streaming: bool = False
|
||||
) -> ChatLiteLLM | ChatLiteLLMRouter | None:
|
||||
"""Get the search space's document summary LLM instance."""
|
||||
return await get_search_space_llm_instance(
|
||||
session, search_space_id, LLMRole.DOCUMENT_SUMMARY
|
||||
session, search_space_id, LLMRole.DOCUMENT_SUMMARY, disable_streaming=disable_streaming
|
||||
)
|
||||
|
||||
|
||||
# Backward-compatible alias (LLM preferences are now per-search-space, not per-user)
|
||||
async def get_user_long_context_llm(
|
||||
session: AsyncSession, user_id: str, search_space_id: int
|
||||
session: AsyncSession, user_id: str, search_space_id: int, disable_streaming: bool = False
|
||||
) -> ChatLiteLLM | ChatLiteLLMRouter | None:
|
||||
"""
|
||||
Deprecated: Use get_document_summary_llm instead.
|
||||
The user_id parameter is ignored as LLM preferences are now per-search-space.
|
||||
"""
|
||||
return await get_document_summary_llm(session, search_space_id)
|
||||
return await get_document_summary_llm(session, search_space_id, disable_streaming=disable_streaming)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
from app.services.notion.kb_sync_service import NotionKBSyncService
|
||||
from app.services.notion.tool_metadata_service import (
|
||||
NotionAccount,
|
||||
NotionPage,
|
||||
|
|
@ -6,6 +7,7 @@ from app.services.notion.tool_metadata_service import (
|
|||
|
||||
__all__ = [
|
||||
"NotionAccount",
|
||||
"NotionKBSyncService",
|
||||
"NotionPage",
|
||||
"NotionToolMetadataService",
|
||||
]
|
||||
|
|
|
|||
163
surfsense_backend/app/services/notion/kb_sync_service.py
Normal file
163
surfsense_backend/app/services/notion/kb_sync_service.py
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.db import Chunk, Document
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotionKBSyncService:
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self.db_session = db_session
|
||||
|
||||
async def sync_after_update(
|
||||
self,
|
||||
document_id: int,
|
||||
appended_content: str,
|
||||
user_id: str,
|
||||
search_space_id: int,
|
||||
appended_block_ids: list[str] | None = None,
|
||||
) -> dict:
|
||||
from app.tasks.connector_indexers.base import (
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
try:
|
||||
logger.debug(f"Starting KB sync for document {document_id}")
|
||||
document = await self.db_session.get(Document, document_id)
|
||||
|
||||
if not document:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
return {"status": "not_indexed"}
|
||||
|
||||
page_id = document.document_metadata.get("page_id")
|
||||
if not page_id:
|
||||
logger.error(f"Document {document_id} missing page_id in metadata")
|
||||
return {"status": "error", "message": "Missing page_id in metadata"}
|
||||
|
||||
logger.debug(
|
||||
f"Document found: id={document_id}, page_id={page_id}, connector_id={document.connector_id}"
|
||||
)
|
||||
|
||||
from app.connectors.notion_history import NotionHistoryConnector
|
||||
|
||||
notion_connector = NotionHistoryConnector(
|
||||
session=self.db_session, connector_id=document.connector_id
|
||||
)
|
||||
|
||||
logger.debug(f"Fetching page content from Notion for page {page_id}")
|
||||
blocks, _ = await notion_connector.get_page_content(page_id, page_title=None)
|
||||
|
||||
from app.utils.notion_utils import extract_all_block_ids, process_blocks
|
||||
|
||||
fetched_content = process_blocks(blocks)
|
||||
logger.debug(f"Fetched content length: {len(fetched_content)} chars")
|
||||
|
||||
if not fetched_content or not fetched_content.strip():
|
||||
logger.warning(
|
||||
f"Fetched empty content for page {page_id} - document will have minimal searchable text"
|
||||
)
|
||||
|
||||
content_verified = False
|
||||
if appended_block_ids:
|
||||
fetched_block_ids = set(extract_all_block_ids(blocks))
|
||||
found_blocks = [
|
||||
bid for bid in appended_block_ids if bid in fetched_block_ids
|
||||
]
|
||||
|
||||
logger.debug(
|
||||
f"Block verification: {len(found_blocks)}/{len(appended_block_ids)} blocks found"
|
||||
)
|
||||
logger.debug(
|
||||
f"Appended IDs (first 3): {appended_block_ids[:3]}, Fetched IDs count: {len(fetched_block_ids)}"
|
||||
)
|
||||
|
||||
if len(found_blocks) >= len(appended_block_ids) * 0.8: # 80% threshold
|
||||
logger.info(
|
||||
f"Content verified fresh: found {len(found_blocks)}/{len(appended_block_ids)} appended blocks"
|
||||
)
|
||||
full_content = fetched_content
|
||||
content_verified = True
|
||||
else:
|
||||
logger.warning(
|
||||
"No appended blocks found in fetched content - appending manually"
|
||||
)
|
||||
full_content = fetched_content + "\n\n" + appended_content
|
||||
content_verified = False
|
||||
else:
|
||||
logger.warning("No block IDs provided - using fetched content as-is")
|
||||
full_content = fetched_content
|
||||
content_verified = False
|
||||
|
||||
logger.debug(f"Final content length: {len(full_content)} chars, verified={content_verified}")
|
||||
|
||||
logger.debug("Generating summary and embeddings")
|
||||
user_llm = await get_user_long_context_llm(
|
||||
self.db_session, user_id, search_space_id, disable_streaming=True # disable streaming to avoid leaking into the chat
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata_for_summary = {
|
||||
"page_title": document.document_metadata.get("page_title"),
|
||||
"page_id": document.document_metadata.get("page_id"),
|
||||
"document_type": "Notion Page",
|
||||
"connector_type": "Notion",
|
||||
}
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
full_content, user_llm, document_metadata_for_summary
|
||||
)
|
||||
logger.debug(f"Generated summary length: {len(summary_content)} chars")
|
||||
else:
|
||||
logger.warning("No LLM configured - using fallback summary")
|
||||
summary_content = f"Notion Page: {document.document_metadata.get('page_title')}\n\n{full_content[:500]}..."
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
logger.debug(f"Deleting old chunks for document {document_id}")
|
||||
await self.db_session.execute(
|
||||
delete(Chunk).where(Chunk.document_id == document.id)
|
||||
)
|
||||
|
||||
logger.debug("Creating new chunks")
|
||||
chunks = await create_document_chunks(full_content)
|
||||
logger.debug(f"Created {len(chunks)} chunks")
|
||||
|
||||
logger.debug("Updating document fields")
|
||||
document.content = summary_content
|
||||
document.content_hash = generate_content_hash(full_content, search_space_id)
|
||||
document.embedding = summary_embedding
|
||||
document.document_metadata = {
|
||||
**document.document_metadata,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
safe_set_chunks(document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
|
||||
logger.debug("Committing changes to database")
|
||||
await self.db_session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Successfully synced KB for document {document_id}: "
|
||||
f"summary={len(summary_content)} chars, chunks={len(chunks)}, "
|
||||
f"content_verified={content_verified}"
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to sync KB for document {document_id}: {e}", exc_info=True
|
||||
)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
|
@ -796,6 +796,9 @@ async def _stream_agent_events(
|
|||
"create_notion_page",
|
||||
"update_notion_page",
|
||||
"delete_notion_page",
|
||||
"create_linear_issue",
|
||||
"update_linear_issue",
|
||||
"delete_linear_issue",
|
||||
):
|
||||
yield streaming_service.format_tool_output_available(
|
||||
tool_call_id,
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ from app.utils.document_converters import (
|
|||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
from app.utils.notion_utils import process_blocks
|
||||
|
||||
from .base import (
|
||||
build_document_metadata_string,
|
||||
|
|
@ -280,53 +281,6 @@ async def index_notion_pages(
|
|||
pages_to_process = [] # List of dicts with document and page data
|
||||
new_documents_created = False
|
||||
|
||||
# Helper function to convert page content to markdown
|
||||
def process_blocks(blocks, level=0):
|
||||
result = ""
|
||||
for block in blocks:
|
||||
block_type = block.get("type")
|
||||
block_content = block.get("content", "")
|
||||
children = block.get("children", [])
|
||||
|
||||
# Add indentation based on level
|
||||
indent = " " * level
|
||||
|
||||
# Format based on block type
|
||||
if block_type in ["paragraph", "text"]:
|
||||
result += f"{indent}{block_content}\n\n"
|
||||
elif block_type in ["heading_1", "header"]:
|
||||
result += f"{indent}# {block_content}\n\n"
|
||||
elif block_type == "heading_2":
|
||||
result += f"{indent}## {block_content}\n\n"
|
||||
elif block_type == "heading_3":
|
||||
result += f"{indent}### {block_content}\n\n"
|
||||
elif block_type == "bulleted_list_item":
|
||||
result += f"{indent}* {block_content}\n"
|
||||
elif block_type == "numbered_list_item":
|
||||
result += f"{indent}1. {block_content}\n"
|
||||
elif block_type == "to_do":
|
||||
result += f"{indent}- [ ] {block_content}\n"
|
||||
elif block_type == "toggle":
|
||||
result += f"{indent}> {block_content}\n"
|
||||
elif block_type == "code":
|
||||
result += f"{indent}```\n{block_content}\n```\n\n"
|
||||
elif block_type == "quote":
|
||||
result += f"{indent}> {block_content}\n\n"
|
||||
elif block_type == "callout":
|
||||
result += f"{indent}> **Note:** {block_content}\n\n"
|
||||
elif block_type == "image":
|
||||
result += f"{indent}\n\n"
|
||||
else:
|
||||
# Default for other block types
|
||||
if block_content:
|
||||
result += f"{indent}{block_content}\n\n"
|
||||
|
||||
# Process children recursively
|
||||
if children:
|
||||
result += process_blocks(children, level + 1)
|
||||
|
||||
return result
|
||||
|
||||
for page in pages:
|
||||
try:
|
||||
page_id = page.get("page_id")
|
||||
|
|
|
|||
58
surfsense_backend/app/utils/notion_utils.py
Normal file
58
surfsense_backend/app/utils/notion_utils.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
"""Utility functions for processing Notion blocks and content."""
|
||||
|
||||
|
||||
def extract_all_block_ids(blocks_list):
|
||||
ids = []
|
||||
for block in blocks_list:
|
||||
if isinstance(block, dict) and "id" in block:
|
||||
ids.append(block["id"])
|
||||
if isinstance(block, dict) and block.get("children"):
|
||||
ids.extend(extract_all_block_ids(block["children"]))
|
||||
return ids
|
||||
|
||||
|
||||
def process_blocks(blocks, level=0):
|
||||
result = ""
|
||||
for block in blocks:
|
||||
block_type = block.get("type")
|
||||
block_content = block.get("content", "")
|
||||
children = block.get("children", [])
|
||||
|
||||
# Add indentation based on level
|
||||
indent = " " * level
|
||||
|
||||
# Format based on block type
|
||||
if block_type in ["paragraph", "text"]:
|
||||
result += f"{indent}{block_content}\n\n"
|
||||
elif block_type in ["heading_1", "header"]:
|
||||
result += f"{indent}# {block_content}\n\n"
|
||||
elif block_type == "heading_2":
|
||||
result += f"{indent}## {block_content}\n\n"
|
||||
elif block_type == "heading_3":
|
||||
result += f"{indent}### {block_content}\n\n"
|
||||
elif block_type == "bulleted_list_item":
|
||||
result += f"{indent}* {block_content}\n"
|
||||
elif block_type == "numbered_list_item":
|
||||
result += f"{indent}1. {block_content}\n"
|
||||
elif block_type == "to_do":
|
||||
result += f"{indent}- [ ] {block_content}\n"
|
||||
elif block_type == "toggle":
|
||||
result += f"{indent}> {block_content}\n"
|
||||
elif block_type == "code":
|
||||
result += f"{indent}```\n{block_content}\n```\n\n"
|
||||
elif block_type == "quote":
|
||||
result += f"{indent}> {block_content}\n\n"
|
||||
elif block_type == "callout":
|
||||
result += f"{indent}> **Note:** {block_content}\n\n"
|
||||
elif block_type == "image":
|
||||
result += f"{indent}\n\n"
|
||||
else:
|
||||
# Default for other block types
|
||||
if block_content:
|
||||
result += f"{indent}{block_content}\n\n"
|
||||
|
||||
# Process children recursively
|
||||
if children:
|
||||
result += process_blocks(children, level + 1)
|
||||
|
||||
return result
|
||||
Loading…
Add table
Add a link
Reference in a new issue