diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 3ad7aafc6..f4af16b78 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -256,6 +256,18 @@ async def create_surfsense_deep_agent( ] modified_disabled_tools.extend(notion_tools) + # Disable Linear action tools if no Linear connector is configured + has_linear_connector = ( + available_connectors is not None and "LINEAR_CONNECTOR" in available_connectors + ) + if not has_linear_connector: + linear_tools = [ + "create_linear_issue", + "update_linear_issue", + "delete_linear_issue", + ] + modified_disabled_tools.extend(linear_tools) + # Build tools using the async registry (includes MCP tools) tools = await build_tools_async( dependencies=dependencies, diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py b/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py new file mode 100644 index 000000000..31acf1e2a --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py @@ -0,0 +1,11 @@ +"""Linear tools for creating, updating, and deleting issues.""" + +from .create_issue import create_create_linear_issue_tool +from .delete_issue import create_delete_linear_issue_tool +from .update_issue import create_update_linear_issue_tool + +__all__ = [ + "create_create_linear_issue_tool", + "create_delete_linear_issue_tool", + "create_update_linear_issue_tool", +] diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py new file mode 100644 index 000000000..a97cecbb7 --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py @@ -0,0 +1,239 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from langgraph.types import interrupt +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_create_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the create_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured create_linear_issue tool + """ + + @tool + async def create_linear_issue( + title: str, + description: str | None = None, + ) -> dict[str, Any]: + """Create a new issue in Linear. + + Use this tool when the user explicitly asks to create, add, or file + a new issue / ticket / task in Linear. + + Args: + title: Short, descriptive issue title. + description: Optional markdown body for the issue. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - issue_id: Linear issue UUID (if success) + - identifier: Human-readable ID like "ENG-42" (if success) + - url: URL to the created issue (if success) + - message: Result message + + IMPORTANT: If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I won't create the issue.") + and move on. Do NOT retry, troubleshoot, or suggest alternatives. + + Examples: + - "Create a Linear issue titled 'Fix login bug'" + - "Add a ticket for the payment timeout problem" + - "File an issue about the broken search feature" + """ + logger.info(f"create_linear_issue called: title='{title}'") + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return {"status": "error", "message": context["error"]} + + logger.info(f"Requesting approval for creating Linear issue: '{title}'") + approval = interrupt( + { + "type": "linear_issue_creation", + "action": { + "tool": "create_linear_issue", + "params": { + "title": title, + "description": description, + "team_id": None, + "state_id": None, + "assignee_id": None, + "priority": None, + "label_ids": [], + "connector_id": connector_id, + }, + }, + "context": context, + } + ) + + decisions_raw = ( + approval.get("decisions", []) if isinstance(approval, dict) else [] + ) + decisions = ( + decisions_raw if isinstance(decisions_raw, list) else [decisions_raw] + ) + decisions = [d for d in decisions if isinstance(d, dict)] + if not decisions: + logger.warning("No approval decision received") + return {"status": "error", "message": "No approval decision received"} + + decision = decisions[0] + decision_type = decision.get("type") or decision.get("decision_type") + logger.info(f"User decision: {decision_type}") + + if decision_type == "reject": + logger.info("Linear issue creation rejected by user") + return { + "status": "rejected", + "message": "User declined. The issue was not created. Do not ask again or suggest alternatives.", + } + + final_params: dict[str, Any] = {} + edited_action = decision.get("edited_action") + if isinstance(edited_action, dict): + edited_args = edited_action.get("args") + if isinstance(edited_args, dict): + final_params = edited_args + elif isinstance(decision.get("args"), dict): + final_params = decision["args"] + + final_title = final_params.get("title", title) + final_description = final_params.get("description", description) + final_team_id = final_params.get("team_id") + final_state_id = final_params.get("state_id") + final_assignee_id = final_params.get("assignee_id") + final_priority = final_params.get("priority") + final_label_ids = final_params.get("label_ids") or [] + final_connector_id = final_params.get("connector_id", connector_id) + + if not final_title or not final_title.strip(): + logger.error("Title is empty or contains only whitespace") + return {"status": "error", "message": "Issue title cannot be empty."} + if not final_team_id: + return { + "status": "error", + "message": "A team must be selected to create an issue.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + actual_connector_id = final_connector_id + if actual_connector_id is None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Linear connector found. Please connect Linear in your workspace settings.", + } + actual_connector_id = connector.id + logger.info(f"Found Linear connector: id={actual_connector_id}") + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == actual_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + logger.info(f"Validated Linear connector: id={actual_connector_id}") + + logger.info( + f"Creating Linear issue with final params: title='{final_title}'" + ) + linear_client = LinearConnector( + session=db_session, connector_id=actual_connector_id + ) + result = await linear_client.create_issue( + team_id=final_team_id, + title=final_title, + description=final_description, + state_id=final_state_id, + assignee_id=final_assignee_id, + priority=final_priority, + label_ids=final_label_ids if final_label_ids else None, + ) + + if result.get("status") == "error": + logger.error(f"Failed to create Linear issue: {result.get('message')}") + return {"status": "error", "message": result.get("message")} + + logger.info( + f"Linear issue created: {result.get('identifier')} - {result.get('title')}" + ) + return { + "status": "success", + "issue_id": result.get("id"), + "identifier": result.get("identifier"), + "url": result.get("url"), + "message": result.get("message"), + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error creating Linear issue: {e}", exc_info=True) + if isinstance(e, (ValueError, LinearAPIError)): + message = str(e) + else: + message = "Something went wrong while creating the issue. Please try again." + return {"status": "error", "message": message} + + return create_linear_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py new file mode 100644 index 000000000..a1931077e --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py @@ -0,0 +1,262 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from langgraph.types import interrupt +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_delete_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the delete_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for finding the correct Linear connector + connector_id: Optional specific connector ID (if known) + + Returns: + Configured delete_linear_issue tool + """ + + @tool + async def delete_linear_issue( + issue_ref: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Archive (delete) a Linear issue. + + Use this tool when the user asks to delete, remove, or archive a Linear issue. + Note that Linear archives issues rather than permanently deleting them + (they can be restored from the archive). + + + Args: + issue_ref: The issue to delete. Can be the issue title (e.g. "Fix login bug"), + the identifier (e.g. "ENG-42"), or the full document title + (e.g. "ENG-42: Fix login bug"). + delete_from_kb: Whether to also remove the issue from the knowledge base. + Default is False. Set to True to remove from both Linear + and the knowledge base. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - identifier: Human-readable ID like "ENG-42" (if success) + - message: Success or error message + - deleted_from_kb: Whether the issue was also removed from the knowledge base (if success) + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I won't delete the issue.") + and move on. Do NOT ask for alternatives or troubleshoot. + - If status is "not_found", inform the user conversationally using the exact message + provided. Do NOT treat this as an error. Simply relay the message and ask the user + to verify the issue title or identifier, or check if it has been indexed. + + Examples: + - "Delete the 'Fix login bug' Linear issue" + - "Archive ENG-42" + - "Remove the 'Old payment flow' issue from Linear" + """ + logger.info( + f"delete_linear_issue called: issue_ref='{issue_ref}', delete_from_kb={delete_from_kb}" + ) + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_delete_context( + search_space_id, user_id, issue_ref + ) + + if "error" in context: + error_msg = context["error"] + if "not found" in error_msg.lower(): + logger.warning(f"Issue not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + else: + logger.error(f"Failed to fetch delete context: {error_msg}") + return {"status": "error", "message": error_msg} + + issue_id = context["issue"]["id"] + issue_identifier = context["issue"].get("identifier", "") + document_id = context["issue"]["document_id"] + connector_id_from_context = context.get("workspace", {}).get("id") + + logger.info( + f"Requesting approval for deleting Linear issue: '{issue_ref}' " + f"(id={issue_id}, delete_from_kb={delete_from_kb})" + ) + approval = interrupt( + { + "type": "linear_issue_deletion", + "action": { + "tool": "delete_linear_issue", + "params": { + "issue_id": issue_id, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + }, + "context": context, + } + ) + + decisions_raw = ( + approval.get("decisions", []) if isinstance(approval, dict) else [] + ) + decisions = ( + decisions_raw if isinstance(decisions_raw, list) else [decisions_raw] + ) + decisions = [d for d in decisions if isinstance(d, dict)] + if not decisions: + logger.warning("No approval decision received") + return {"status": "error", "message": "No approval decision received"} + + decision = decisions[0] + decision_type = decision.get("type") or decision.get("decision_type") + logger.info(f"User decision: {decision_type}") + + if decision_type == "reject": + logger.info("Linear issue deletion rejected by user") + return { + "status": "rejected", + "message": "User declined. The issue was not deleted. Do not ask again or suggest alternatives.", + } + + edited_action = decision.get("edited_action") + final_params: dict[str, Any] = {} + if isinstance(edited_action, dict): + edited_args = edited_action.get("args") + if isinstance(edited_args, dict): + final_params = edited_args + elif isinstance(decision.get("args"), dict): + final_params = decision["args"] + + final_issue_id = final_params.get("issue_id", issue_id) + final_connector_id = final_params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb) + + logger.info( + f"Deleting Linear issue with final params: issue_id={final_issue_id}, " + f"connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" + ) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + if final_connector_id: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + actual_connector_id = connector.id + logger.info(f"Validated Linear connector: id={actual_connector_id}") + else: + logger.error("No connector found for this issue") + return { + "status": "error", + "message": "No connector found for this issue.", + } + + linear_client = LinearConnector( + session=db_session, connector_id=actual_connector_id + ) + + result = await linear_client.archive_issue(issue_id=final_issue_id) + + logger.info( + f"archive_issue result: {result.get('status')} - {result.get('message', '')}" + ) + + deleted_from_kb = False + if ( + result.get("status") == "success" + and final_delete_from_kb + and document_id + ): + try: + from app.db import Document + + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + logger.info( + f"Deleted document {document_id} from knowledge base" + ) + else: + logger.warning(f"Document {document_id} not found in KB") + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + result["warning"] = ( + f"Issue archived in Linear, but failed to remove from knowledge base: {e!s}" + ) + + if result.get("status") == "success": + result["deleted_from_kb"] = deleted_from_kb + if issue_identifier: + result["message"] = f"Issue {issue_identifier} archived successfully." + if deleted_from_kb: + result["message"] = ( + f"{result.get('message', '')} Also removed from the knowledge base." + ) + + return result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error deleting Linear issue: {e}", exc_info=True) + if isinstance(e, (ValueError, LinearAPIError)): + message = str(e) + else: + message = "Something went wrong while deleting the issue. Please try again." + return {"status": "error", "message": message} + + return delete_linear_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py new file mode 100644 index 000000000..efffaa098 --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py @@ -0,0 +1,332 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from langgraph.types import interrupt +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearKBSyncService, LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_update_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the update_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured update_linear_issue tool + """ + + @tool + async def update_linear_issue( + issue_ref: str, + new_title: str | None = None, + new_description: str | None = None, + new_state_name: str | None = None, + new_assignee_email: str | None = None, + new_priority: int | None = None, + new_label_names: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing Linear issue that has been indexed in the knowledge base. + + Use this tool when the user asks to modify, change, or update a Linear issue — + for example, changing its status, reassigning it, updating its title or description, + adjusting its priority, or changing its labels. + + Only issues already indexed in the knowledge base can be updated. + + Args: + issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"), + the identifier (e.g. "ENG-42"), or the full document title + (e.g. "ENG-42: Fix login bug"). Matched case-insensitively. + new_title: New title for the issue (optional). + new_description: New markdown body for the issue (optional). + new_state_name: New workflow state name (e.g. "In Progress", "Done"). + Matched case-insensitively against the team's states. + new_assignee_email: Email address of the new assignee. + Matched case-insensitively against the team's members. + new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High, + 3 = Medium, 4 = Low). + new_label_names: New set of label names to apply. + Matched case-insensitively against the team's labels. + Unrecognised names are silently skipped. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - identifier: Human-readable ID like "ENG-42" (if success) + - url: URL to the updated issue (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.") + and move on. Do NOT ask for alternatives or troubleshoot. + - If status is "not_found", inform the user conversationally using the exact message + provided. Do NOT treat this as an error. Simply relay the message and ask the user + to verify the issue title or identifier, or check if it has been indexed. + + Examples: + - "Mark the 'Fix login bug' issue as done" + - "Assign ENG-42 to john@company.com" + - "Change the priority of 'Payment timeout' to urgent" + """ + logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'") + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_update_context( + search_space_id, user_id, issue_ref + ) + + if "error" in context: + error_msg = context["error"] + if "not found" in error_msg.lower(): + logger.warning(f"Issue not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + else: + logger.error(f"Failed to fetch update context: {error_msg}") + return {"status": "error", "message": error_msg} + + issue_id = context["issue"]["id"] + document_id = context["issue"]["document_id"] + connector_id_from_context = context.get("workspace", {}).get("id") + + team = context.get("team", {}) + new_state_id = _resolve_state(team, new_state_name) + new_assignee_id = _resolve_assignee(team, new_assignee_email) + new_label_ids = _resolve_labels(team, new_label_names) + + logger.info( + f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})" + ) + approval = interrupt( + { + "type": "linear_issue_update", + "action": { + "tool": "update_linear_issue", + "params": { + "issue_id": issue_id, + "document_id": document_id, + "new_title": new_title, + "new_description": new_description, + "new_state_id": new_state_id, + "new_assignee_id": new_assignee_id, + "new_priority": new_priority, + "new_label_ids": new_label_ids, + "connector_id": connector_id_from_context, + }, + }, + "context": context, + } + ) + + decisions_raw = ( + approval.get("decisions", []) if isinstance(approval, dict) else [] + ) + decisions = ( + decisions_raw if isinstance(decisions_raw, list) else [decisions_raw] + ) + decisions = [d for d in decisions if isinstance(d, dict)] + if not decisions: + logger.warning("No approval decision received") + return {"status": "error", "message": "No approval decision received"} + + decision = decisions[0] + decision_type = decision.get("type") or decision.get("decision_type") + logger.info(f"User decision: {decision_type}") + + if decision_type == "reject": + logger.info("Linear issue update rejected by user") + return { + "status": "rejected", + "message": "User declined. The issue was not updated. Do not ask again or suggest alternatives.", + } + + edited_action = decision.get("edited_action") + final_params: dict[str, Any] = {} + if isinstance(edited_action, dict): + edited_args = edited_action.get("args") + if isinstance(edited_args, dict): + final_params = edited_args + elif isinstance(decision.get("args"), dict): + final_params = decision["args"] + + final_issue_id = final_params.get("issue_id", issue_id) + final_document_id = final_params.get("document_id", document_id) + final_new_title = final_params.get("new_title", new_title) + final_new_description = final_params.get("new_description", new_description) + final_new_state_id = final_params.get("new_state_id", new_state_id) + final_new_assignee_id = final_params.get("new_assignee_id", new_assignee_id) + final_new_priority = final_params.get("new_priority", new_priority) + final_new_label_ids: list[str] | None = final_params.get( + "new_label_ids", new_label_ids + ) + final_connector_id = final_params.get( + "connector_id", connector_id_from_context + ) + + if not final_connector_id: + logger.error("No connector found for this issue") + return { + "status": "error", + "message": "No connector found for this issue.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + logger.info(f"Validated Linear connector: id={final_connector_id}") + + logger.info( + f"Updating Linear issue with final params: issue_id={final_issue_id}" + ) + linear_client = LinearConnector( + session=db_session, connector_id=final_connector_id + ) + updated_issue = await linear_client.update_issue( + issue_id=final_issue_id, + title=final_new_title, + description=final_new_description, + state_id=final_new_state_id, + assignee_id=final_new_assignee_id, + priority=final_new_priority, + label_ids=final_new_label_ids, + ) + + if updated_issue.get("status") == "error": + logger.error( + f"Failed to update Linear issue: {updated_issue.get('message')}" + ) + return { + "status": "error", + "message": updated_issue.get("message"), + } + + logger.info( + f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}" + ) + + if final_document_id is not None: + logger.info( + f"Updating knowledge base for document {final_document_id}..." + ) + kb_service = LinearKBSyncService(db_session) + kb_result = await kb_service.sync_after_update( + document_id=final_document_id, + issue_id=final_issue_id, + user_id=user_id, + search_space_id=search_space_id, + ) + if kb_result["status"] == "success": + logger.info( + f"Knowledge base successfully updated for issue {final_issue_id}" + ) + kb_message = " Your knowledge base has also been updated." + elif kb_result["status"] == "not_indexed": + kb_message = " This issue will be added to your knowledge base in the next scheduled sync." + else: + logger.warning( + f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}" + ) + kb_message = " Your knowledge base will be updated in the next scheduled sync." + else: + kb_message = "" + + identifier = updated_issue.get("identifier") + default_msg = f"Issue {identifier} updated successfully." + return { + "status": "success", + "identifier": identifier, + "url": updated_issue.get("url"), + "message": f"{updated_issue.get('message', default_msg)}{kb_message}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error updating Linear issue: {e}", exc_info=True) + if isinstance(e, (ValueError, LinearAPIError)): + message = str(e) + else: + message = "Something went wrong while updating the issue. Please try again." + return {"status": "error", "message": message} + + return update_linear_issue + + +def _resolve_state(team: dict, state_name: str | None) -> str | None: + if not state_name: + return None + name_lower = state_name.lower() + for state in team.get("states", []): + if state.get("name", "").lower() == name_lower: + return state["id"] + return None + + +def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None: + if not assignee_email: + return None + email_lower = assignee_email.lower() + for member in team.get("members", []): + if member.get("email", "").lower() == email_lower: + return member["id"] + return None + + +def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None: + if label_names is None: + return None + if not label_names: + return [] + name_set = {n.lower() for n in label_names} + return [ + label["id"] + for label in team.get("labels", []) + if label.get("name", "").lower() in name_set + ] diff --git a/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py b/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py index ce00025a4..552910382 100644 --- a/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py +++ b/surfsense_backend/app/agents/new_chat/tools/notion/create_page.py @@ -5,7 +5,7 @@ from langchain_core.tools import tool from langgraph.types import interrupt from sqlalchemy.ext.asyncio import AsyncSession -from app.connectors.notion_history import NotionHistoryConnector +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector from app.services.notion import NotionToolMetadataService logger = logging.getLogger(__name__) @@ -34,7 +34,6 @@ def create_create_notion_page_tool( async def create_notion_page( title: str, content: str, - parent_page_id: str | None = None, ) -> dict[str, Any]: """Create a new page in Notion with the given title and content. @@ -45,8 +44,6 @@ def create_create_notion_page_tool( Args: title: The title of the Notion page. content: The markdown content for the page body (supports headings, lists, paragraphs). - parent_page_id: Optional parent page ID to create as a subpage. - If not provided, will ask for one. Returns: Dictionary with: @@ -58,15 +55,13 @@ def create_create_notion_page_tool( IMPORTANT: If status is "rejected", the user explicitly declined the action. Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.") - and move on. Do NOT ask for parent page IDs, troubleshoot, or suggest alternatives. + and move on. Do NOT troubleshoot or suggest alternatives. Examples: - "Create a Notion page titled 'Meeting Notes' with content 'Discussed project timeline'" - "Save this to Notion with title 'Research Summary'" """ - logger.info( - f"create_notion_page called: title='{title}', parent_page_id={parent_page_id}" - ) + logger.info(f"create_notion_page called: title='{title}'") if db_session is None or search_space_id is None or user_id is None: logger.error( @@ -99,7 +94,7 @@ def create_create_notion_page_tool( "params": { "title": title, "content": content, - "parent_page_id": parent_page_id, + "parent_page_id": None, "connector_id": connector_id, }, }, @@ -144,7 +139,7 @@ def create_create_notion_page_tool( final_title = final_params.get("title", title) final_content = final_params.get("content", content) - final_parent_page_id = final_params.get("parent_page_id", parent_page_id) + final_parent_page_id = final_params.get("parent_page_id") final_connector_id = final_params.get("connector_id", connector_id) if not final_title or not final_title.strip(): @@ -229,11 +224,10 @@ def create_create_notion_page_tool( raise logger.error(f"Error creating Notion page: {e}", exc_info=True) - return { - "status": "error", - "message": str(e) - if isinstance(e, ValueError) - else f"Unexpected error: {e!s}", - } + if isinstance(e, (ValueError, NotionAPIError)): + message = str(e) + else: + message = "Something went wrong while creating the page. Please try again." + return {"status": "error", "message": message} return create_notion_page diff --git a/surfsense_backend/app/agents/new_chat/tools/notion/delete_page.py b/surfsense_backend/app/agents/new_chat/tools/notion/delete_page.py index 065a4f9d4..b86c8dee4 100644 --- a/surfsense_backend/app/agents/new_chat/tools/notion/delete_page.py +++ b/surfsense_backend/app/agents/new_chat/tools/notion/delete_page.py @@ -5,7 +5,7 @@ from langchain_core.tools import tool from langgraph.types import interrupt from sqlalchemy.ext.asyncio import AsyncSession -from app.connectors.notion_history import NotionHistoryConnector +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector from app.services.notion.tool_metadata_service import NotionToolMetadataService logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ def create_delete_notion_page_tool( @tool async def delete_notion_page( page_title: str, - delete_from_db: bool = False, + delete_from_kb: bool = False, ) -> dict[str, Any]: """Delete (archive) a Notion page. @@ -43,8 +43,8 @@ def create_delete_notion_page_tool( Args: page_title: The title of the Notion page to delete. - delete_from_db: Whether to also remove the page from the knowledge base. - Default is False (in Notion). + delete_from_kb: Whether to also remove the page from the knowledge base. + Default is False. Set to True to permanently remove from both Notion and knowledge base. Returns: @@ -52,7 +52,7 @@ def create_delete_notion_page_tool( - status: "success", "rejected", "not_found", or "error" - page_id: Deleted page ID (if success) - message: Success or error message - - deleted_from_db: Whether the page was also removed from knowledge base (if success) + - deleted_from_kb: Whether the page was also removed from knowledge base (if success) Examples: - "Delete the 'Meeting Notes' Notion page" @@ -60,7 +60,7 @@ def create_delete_notion_page_tool( - "Archive the 'Draft Ideas' Notion page" """ logger.info( - f"delete_notion_page called: page_title='{page_title}', delete_from_db={delete_from_db}" + f"delete_notion_page called: page_title='{page_title}', delete_from_kb={delete_from_kb}" ) if db_session is None or search_space_id is None or user_id is None: @@ -100,7 +100,7 @@ def create_delete_notion_page_tool( document_id = context.get("document_id") logger.info( - f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_db={delete_from_db})" + f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_kb={delete_from_kb})" ) # Request approval before deleting @@ -112,7 +112,7 @@ def create_delete_notion_page_tool( "params": { "page_id": page_id, "connector_id": connector_id_from_context, - "delete_from_db": delete_from_db, + "delete_from_kb": delete_from_kb, }, }, "context": context, @@ -159,10 +159,10 @@ def create_delete_notion_page_tool( final_connector_id = final_params.get( "connector_id", connector_id_from_context ) - final_delete_from_db = final_params.get("delete_from_db", delete_from_db) + final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb) logger.info( - f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_db={final_delete_from_db}" + f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" ) from sqlalchemy.future import select @@ -211,11 +211,11 @@ def create_delete_notion_page_tool( f"delete_page result: {result.get('status')} - {result.get('message', '')}" ) - # If deletion was successful and user wants to delete from DB - deleted_from_db = False + # If deletion was successful and user wants to delete from KB + deleted_from_kb = False if ( result.get("status") == "success" - and final_delete_from_db + and final_delete_from_kb and document_id ): try: @@ -232,24 +232,23 @@ def create_delete_notion_page_tool( if document: await db_session.delete(document) await db_session.commit() - deleted_from_db = True + deleted_from_kb = True logger.info( f"Deleted document {document_id} from knowledge base" ) else: - logger.warning(f"Document {document_id} not found in DB") + logger.warning(f"Document {document_id} not found in KB") except Exception as e: - logger.error(f"Failed to delete document from DB: {e}") - # Don't fail the whole operation if DB deletion fails - # The page is already deleted from Notion, so inform the user + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() result["warning"] = ( f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}" ) - # Update result with DB deletion status + # Update result with KB deletion status if result.get("status") == "success": - result["deleted_from_db"] = deleted_from_db - if deleted_from_db: + result["deleted_from_kb"] = deleted_from_kb + if deleted_from_kb: result["message"] = ( f"{result.get('message', '')} (also removed from knowledge base)" ) @@ -263,11 +262,10 @@ def create_delete_notion_page_tool( raise logger.error(f"Error deleting Notion page: {e}", exc_info=True) - return { - "status": "error", - "message": str(e) - if isinstance(e, ValueError) - else f"Unexpected error: {e!s}", - } + if isinstance(e, (ValueError, NotionAPIError)): + message = str(e) + else: + message = "Something went wrong while deleting the page. Please try again." + return {"status": "error", "message": message} return delete_notion_page diff --git a/surfsense_backend/app/agents/new_chat/tools/notion/update_page.py b/surfsense_backend/app/agents/new_chat/tools/notion/update_page.py index 7a58f3aad..ad7fe088d 100644 --- a/surfsense_backend/app/agents/new_chat/tools/notion/update_page.py +++ b/surfsense_backend/app/agents/new_chat/tools/notion/update_page.py @@ -5,7 +5,7 @@ from langchain_core.tools import tool from langgraph.types import interrupt from sqlalchemy.ext.asyncio import AsyncSession -from app.connectors.notion_history import NotionHistoryConnector +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector from app.services.notion import NotionToolMetadataService logger = logging.getLogger(__name__) @@ -108,6 +108,7 @@ def create_update_notion_page_tool( } page_id = context.get("page_id") + document_id = context.get("document_id") connector_id_from_context = context.get("account", {}).get("id") logger.info( @@ -218,6 +219,39 @@ def create_update_notion_page_tool( logger.info( f"update_page result: {result.get('status')} - {result.get('message', '')}" ) + + if result.get("status") == "success" and document_id is not None: + from app.services.notion import NotionKBSyncService + + logger.info(f"Updating knowledge base for document {document_id}...") + kb_service = NotionKBSyncService(db_session) + kb_result = await kb_service.sync_after_update( + document_id=document_id, + appended_content=final_content, + user_id=user_id, + search_space_id=search_space_id, + appended_block_ids=result.get("appended_block_ids"), + ) + + if kb_result["status"] == "success": + result["message"] = ( + f"{result['message']}. Your knowledge base has also been updated." + ) + logger.info( + f"Knowledge base successfully updated for page {final_page_id}" + ) + elif kb_result["status"] == "not_indexed": + result["message"] = ( + f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync." + ) + else: + result["message"] = ( + f"{result['message']}. Your knowledge base will be updated in the next scheduled sync." + ) + logger.warning( + f"KB update failed for page {final_page_id}: {kb_result['message']}" + ) + return result except Exception as e: @@ -227,11 +261,10 @@ def create_update_notion_page_tool( raise logger.error(f"Error updating Notion page: {e}", exc_info=True) - return { - "status": "error", - "message": str(e) - if isinstance(e, ValueError) - else f"Unexpected error: {e!s}", - } + if isinstance(e, (ValueError, NotionAPIError)): + message = str(e) + else: + message = "Something went wrong while updating the page. Please try again." + return {"status": "error", "message": message} return update_notion_page diff --git a/surfsense_backend/app/agents/new_chat/tools/registry.py b/surfsense_backend/app/agents/new_chat/tools/registry.py index 132791c4b..db48276bc 100644 --- a/surfsense_backend/app/agents/new_chat/tools/registry.py +++ b/surfsense_backend/app/agents/new_chat/tools/registry.py @@ -48,6 +48,11 @@ from app.db import ChatVisibility from .display_image import create_display_image_tool from .generate_image import create_generate_image_tool from .knowledge_base import create_search_knowledge_base_tool +from .linear import ( + create_create_linear_issue_tool, + create_delete_linear_issue_tool, + create_update_linear_issue_tool, +) from .link_preview import create_link_preview_tool from .mcp_tool import load_mcp_tools from .notion import ( @@ -216,6 +221,39 @@ BUILTIN_TOOLS: list[ToolDefinition] = [ requires=["user_id", "search_space_id", "db_session", "thread_visibility"], ), # ========================================================================= + # LINEAR TOOLS - create, update, delete issues + # ========================================================================= + ToolDefinition( + name="create_linear_issue", + description="Create a new issue in the user's Linear workspace", + factory=lambda deps: create_create_linear_issue_tool( + db_session=deps["db_session"], + search_space_id=deps["search_space_id"], + user_id=deps["user_id"], + ), + requires=["db_session", "search_space_id", "user_id"], + ), + ToolDefinition( + name="update_linear_issue", + description="Update an existing indexed Linear issue", + factory=lambda deps: create_update_linear_issue_tool( + db_session=deps["db_session"], + search_space_id=deps["search_space_id"], + user_id=deps["user_id"], + ), + requires=["db_session", "search_space_id", "user_id"], + ), + ToolDefinition( + name="delete_linear_issue", + description="Archive (delete) an existing indexed Linear issue", + factory=lambda deps: create_delete_linear_issue_tool( + db_session=deps["db_session"], + search_space_id=deps["search_space_id"], + user_id=deps["user_id"], + ), + requires=["db_session", "search_space_id", "user_id"], + ), + # ========================================================================= # NOTION TOOLS - create, update, delete pages # ========================================================================= ToolDefinition( diff --git a/surfsense_backend/app/connectors/linear_connector.py b/surfsense_backend/app/connectors/linear_connector.py index 534d70b89..8805219a3 100644 --- a/surfsense_backend/app/connectors/linear_connector.py +++ b/surfsense_backend/app/connectors/linear_connector.py @@ -10,7 +10,6 @@ from datetime import datetime from typing import Any import httpx -import requests from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select @@ -23,6 +22,14 @@ logger = logging.getLogger(__name__) LINEAR_GRAPHQL_URL = "https://api.linear.app/graphql" + +class LinearAPIError(Exception): + """Raised when the Linear API returns a non-200 response. + + The message is always user-presentable; callers should surface it directly + without any additional prefix or wrapping. + """ + ORGANIZATION_QUERY = """ query { organization { @@ -244,6 +251,37 @@ class LinearConnector: "Authorization": f"Bearer {self._credentials.access_token}", } + @staticmethod + def _raise_api_error(status_code: int, body: str) -> None: + """Parse a non-200 Linear API response and raise a clean exception. + + Translates known Linear error codes into user-readable messages so that + raw GraphQL payloads never reach the end user. + """ + import json as _json + + friendly = None + try: + payload = _json.loads(body) + errors = payload.get("errors", []) + if errors: + ext = errors[0].get("extensions", {}) + code = ext.get("code", "") + if code == "INPUT_ERROR" and "too complex" in errors[0].get("message", "").lower(): + friendly = ( + "Linear rejected the request because the workspace is too large " + "to fetch in one query. Please try again — if the problem persists, " + "contact support." + ) + elif ext.get("userPresentableMessage"): + friendly = ext["userPresentableMessage"] + elif errors[0].get("message"): + friendly = errors[0]["message"] + except Exception: + pass + + raise LinearAPIError(friendly or f"Linear API error (HTTP {status_code})") + async def execute_graphql_query( self, query: str, variables: dict[str, Any] | None = None ) -> dict[str, Any]: @@ -274,14 +312,15 @@ class LinearConnector: if variables: payload["variables"] = variables - response = requests.post(self.api_url, headers=headers, json=payload) + async with httpx.AsyncClient() as client: + response = await client.post( + self.api_url, headers=headers, json=payload, timeout=30.0 + ) if response.status_code == 200: return response.json() else: - raise Exception( - f"Query failed with status code {response.status_code}: {response.text}" - ) + self._raise_api_error(response.status_code, response.text) async def get_all_issues( self, include_comments: bool = True @@ -588,6 +627,148 @@ class LinearConnector: return formatted + async def create_issue( + self, + team_id: str, + title: str, + description: str | None = None, + state_id: str | None = None, + assignee_id: str | None = None, + priority: int | None = None, + label_ids: list[str] | None = None, + ) -> dict[str, Any]: + try: + mutation = """ + mutation IssueCreate($input: IssueCreateInput!) { + issueCreate(input: $input) { + success + issue { id identifier title url } + } + } + """ + input_data: dict[str, Any] = {"teamId": team_id, "title": title} + if description is not None: + input_data["description"] = description + if state_id is not None: + input_data["stateId"] = state_id + if assignee_id is not None: + input_data["assigneeId"] = assignee_id + if priority is not None: + input_data["priority"] = priority + if label_ids: + input_data["labelIds"] = label_ids + + result = await self.execute_graphql_query(mutation, {"input": input_data}) + payload = result.get("data", {}).get("issueCreate", {}) + if not payload.get("success"): + errors = result.get("errors", []) + msg = ( + errors[0].get("message", "Unknown error") + if errors + else "Unknown error" + ) + return {"status": "error", "message": f"issueCreate failed: {msg}"} + issue = payload.get("issue", {}) + return { + "status": "success", + "id": issue.get("id"), + "identifier": issue.get("identifier"), + "title": issue.get("title"), + "url": issue.get("url"), + "message": f"Issue {issue.get('identifier')} created successfully.", + } + except Exception as e: + logger.error(f"Error creating Linear issue: {e}") + return {"status": "error", "message": str(e)} + + async def update_issue( + self, + issue_id: str, + title: str | None = None, + description: str | None = None, + state_id: str | None = None, + assignee_id: str | None = None, + priority: int | None = None, + label_ids: list[str] | None = None, + ) -> dict[str, Any]: + try: + mutation = """ + mutation IssueUpdate($id: String!, $input: IssueUpdateInput!) { + issueUpdate(id: $id, input: $input) { + success + issue { id identifier title url } + } + } + """ + input_data: dict[str, Any] = {} + if title is not None: + input_data["title"] = title + if description is not None: + input_data["description"] = description + if state_id is not None: + input_data["stateId"] = state_id + if assignee_id is not None: + input_data["assigneeId"] = assignee_id + if priority is not None: + input_data["priority"] = priority + if label_ids is not None: + input_data["labelIds"] = label_ids + + if not input_data: + return { + "status": "error", + "message": "No fields provided for update. Please specify at least one field to change.", + } + + result = await self.execute_graphql_query( + mutation, {"id": issue_id, "input": input_data} + ) + payload = result.get("data", {}).get("issueUpdate", {}) + if not payload.get("success"): + errors = result.get("errors", []) + msg = ( + errors[0].get("message", "Unknown error") + if errors + else "Unknown error" + ) + return {"status": "error", "message": f"issueUpdate failed: {msg}"} + issue = payload.get("issue", {}) + return { + "status": "success", + "id": issue.get("id"), + "identifier": issue.get("identifier"), + "title": issue.get("title"), + "url": issue.get("url"), + "message": f"Issue {issue.get('identifier')} updated successfully.", + } + except Exception as e: + logger.error(f"Error updating Linear issue: {e}") + return {"status": "error", "message": str(e)} + + async def archive_issue(self, issue_id: str) -> dict[str, Any]: + try: + mutation = """ + mutation IssueArchive($id: String!) { + issueArchive(id: $id) { + success + } + } + """ + result = await self.execute_graphql_query(mutation, {"id": issue_id}) + payload = result.get("data", {}).get("issueArchive", {}) + if not payload.get("success"): + errors = result.get("errors", []) + msg = ( + errors[0].get("message", "Unknown error") + if errors + else "Unknown error" + ) + return {"status": "error", "message": f"issueArchive failed: {msg}"} + return {"status": "success", "message": "Issue archived successfully."} + except Exception as e: + logger.error(f"Error archiving Linear issue: {e}") + return {"status": "error", "message": str(e)} + def format_issue_to_markdown(self, issue: dict[str, Any]) -> str: """ Convert an issue to markdown format. diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index 311f97ddd..7425ceafc 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -17,6 +17,15 @@ from app.utils.oauth_security import TokenEncryption logger = logging.getLogger(__name__) + +class NotionAPIError(Exception): + """Raised when the Notion API returns a non-200 response. + + The message is always user-presentable; callers should surface it directly + without any additional prefix or wrapping. + """ + + # Type variable for generic return type T = TypeVar("T") @@ -250,8 +259,9 @@ class NotionHistoryConnector: logger.error( f"Failed to refresh Notion token for connector {self._connector_id}: {e!s}" ) - raise Exception( - f"Failed to refresh Notion OAuth credentials: {e!s}" + raise NotionAPIError( + "Failed to refresh your Notion connection. " + "Please try again or reconnect your Notion account." ) from e return self._credentials.access_token @@ -1041,7 +1051,7 @@ class NotionHistoryConnector: try: notion = await self._get_client() - # Append content if provided + appended_block_ids = [] if content: # Convert new content to blocks try: @@ -1065,14 +1075,23 @@ class NotionHistoryConnector: try: for i in range(0, len(children), 100): batch = children[i : i + 100] - await self._api_call_with_retry( + response = await self._api_call_with_retry( notion.blocks.children.append, block_id=page_id, children=batch, ) + batch_block_ids = [ + block["id"] for block in response.get("results", []) + ] + appended_block_ids.extend(batch_block_ids) logger.info( f"Successfully appended {len(children)} new blocks to page {page_id}" ) + logger.debug( + f"Appended block IDs: {appended_block_ids[:5]}..." + if len(appended_block_ids) > 5 + else f"Appended block IDs: {appended_block_ids}" + ) except Exception as e: logger.error(f"Failed to append content blocks: {e}") return { @@ -1092,6 +1111,7 @@ class NotionHistoryConnector: "page_id": page_id, "url": page_url, "title": page_title, + "appended_block_ids": appended_block_ids, "message": f"Updated Notion page '{page_title}' (content appended)", } diff --git a/surfsense_backend/app/services/linear/__init__.py b/surfsense_backend/app/services/linear/__init__.py new file mode 100644 index 000000000..4bade91b7 --- /dev/null +++ b/surfsense_backend/app/services/linear/__init__.py @@ -0,0 +1,13 @@ +from app.services.linear.kb_sync_service import LinearKBSyncService +from app.services.linear.tool_metadata_service import ( + LinearIssue, + LinearToolMetadataService, + LinearWorkspace, +) + +__all__ = [ + "LinearIssue", + "LinearKBSyncService", + "LinearToolMetadataService", + "LinearWorkspace", +] diff --git a/surfsense_backend/app/services/linear/kb_sync_service.py b/surfsense_backend/app/services/linear/kb_sync_service.py new file mode 100644 index 000000000..bbae8c6e8 --- /dev/null +++ b/surfsense_backend/app/services/linear/kb_sync_service.py @@ -0,0 +1,182 @@ +import logging +from datetime import datetime + +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.linear_connector import LinearConnector +from app.db import Chunk, Document +from app.services.llm_service import get_user_long_context_llm +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) + +logger = logging.getLogger(__name__) + + +class LinearKBSyncService: + """Re-indexes a single Linear issue document after a successful update. + + Mirrors the indexer's Phase-2 logic exactly: fetch fresh issue content, + run generate_document_summary, create_document_chunks, then update the + document row in the knowledge base. + """ + + def __init__(self, db_session: AsyncSession): + self.db_session = db_session + + async def sync_after_update( + self, + document_id: int, + issue_id: str, + user_id: str, + search_space_id: int, + ) -> dict: + """Re-index a Linear issue document after it has been updated via the API. + + Args: + document_id: The KB document ID to update. + issue_id: The Linear issue UUID to fetch fresh content from. + user_id: Used to select the correct LLM configuration. + search_space_id: Used to select the correct LLM configuration. + + Returns: + dict with 'status': 'success' | 'not_indexed' | 'error'. + """ + from app.tasks.connector_indexers.base import ( + get_current_timestamp, + safe_set_chunks, + ) + + try: + document = await self.db_session.get(Document, document_id) + if not document: + logger.warning(f"Document {document_id} not found in KB") + return {"status": "not_indexed"} + + connector_id = document.connector_id + if not connector_id: + return {"status": "error", "message": "Document has no connector_id"} + + linear_client = LinearConnector( + session=self.db_session, connector_id=connector_id + ) + + issue_raw = await self._fetch_issue(linear_client, issue_id) + if not issue_raw: + return {"status": "error", "message": "Issue not found in Linear API"} + + formatted_issue = linear_client.format_issue(issue_raw) + issue_content = linear_client.format_issue_to_markdown(formatted_issue) + + if not issue_content: + return {"status": "error", "message": "Issue produced empty content"} + + issue_identifier = formatted_issue.get("identifier", "") + issue_title = formatted_issue.get("title", "") + state = formatted_issue.get("state", "Unknown") + priority = issue_raw.get("priorityLabel", "Unknown") + comment_count = len(formatted_issue.get("comments", [])) + description = formatted_issue.get("description", "") + + user_llm = await get_user_long_context_llm( + self.db_session, user_id, search_space_id, disable_streaming=True + ) + + if user_llm: + document_metadata_for_summary = { + "issue_id": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": priority, + "comment_count": comment_count, + "document_type": "Linear Issue", + "connector_type": "Linear", + } + summary_content, summary_embedding = await generate_document_summary( + issue_content, user_llm, document_metadata_for_summary + ) + else: + if description and len(description) > 1000: + description = description[:997] + "..." + summary_content = ( + f"Linear Issue {issue_identifier}: {issue_title}\n\n" + f"Status: {state}\n\n" + ) + if description: + summary_content += f"Description: {description}\n\n" + summary_content += f"Comments: {comment_count}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + await self.db_session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + + chunks = await create_document_chunks(issue_content) + + document.title = f"{issue_identifier}: {issue_title}" + document.content = summary_content + document.content_hash = generate_content_hash( + issue_content, search_space_id + ) + document.embedding = summary_embedding + from sqlalchemy.orm.attributes import flag_modified + + document.document_metadata = { + **(document.document_metadata or {}), + "issue_id": issue_id, + "issue_identifier": issue_identifier, + "issue_title": issue_title, + "state": state, + "priority": priority, + "comment_count": comment_count, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + flag_modified(document, "document_metadata") + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + + await self.db_session.commit() + + logger.info( + f"KB sync successful for document {document_id} " + f"({issue_identifier}: {issue_title})" + ) + return {"status": "success"} + + except Exception as e: + logger.error( + f"KB sync failed for document {document_id}: {e}", exc_info=True + ) + await self.db_session.rollback() + return {"status": "error", "message": str(e)} + + @staticmethod + async def _fetch_issue(client: LinearConnector, issue_id: str) -> dict | None: + """Fetch a full issue from Linear, matching the fields used by format_issue.""" + query = """ + query LinearIssueSync($id: String!) { + issue(id: $id) { + id identifier title description priority priorityLabel + createdAt updatedAt url + state { id name type color } + creator { id name email } + assignee { id name email } + comments { + nodes { + id body createdAt updatedAt + user { id name email } + } + } + team { id name key } + } + } + """ + result = await client.execute_graphql_query(query, {"id": issue_id}) + return (result.get("data") or {}).get("issue") diff --git a/surfsense_backend/app/services/linear/tool_metadata_service.py b/surfsense_backend/app/services/linear/tool_metadata_service.py new file mode 100644 index 000000000..5e6345b85 --- /dev/null +++ b/surfsense_backend/app/services/linear/tool_metadata_service.py @@ -0,0 +1,360 @@ +from dataclasses import dataclass + +from sqlalchemy import and_, func, or_ +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.connectors.linear_connector import LinearConnector +from app.db import ( + Document, + DocumentType, + SearchSourceConnector, + SearchSourceConnectorType, +) + + +@dataclass +class LinearWorkspace: + """Represents a Linear connector as a workspace for tool context.""" + + id: int + name: str + organization_name: str + + @classmethod + def from_connector(cls, connector: SearchSourceConnector) -> "LinearWorkspace": + return cls( + id=connector.id, + name=connector.name, + organization_name=connector.config.get( + "organization_name", "Linear Workspace" + ), + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "name": self.name, + "organization_name": self.organization_name, + } + + +@dataclass +class LinearIssue: + """Represents an indexed Linear issue resolved from the knowledge base.""" + + id: str + identifier: str + title: str + state: str + connector_id: int + document_id: int + indexed_at: str | None + + @classmethod + def from_document(cls, document: Document) -> "LinearIssue": + meta = document.document_metadata or {} + return cls( + id=meta.get("issue_id", ""), + identifier=meta.get("issue_identifier", ""), + title=meta.get("issue_title", document.title), + state=meta.get("state", ""), + connector_id=document.connector_id, + document_id=document.id, + indexed_at=meta.get("indexed_at"), + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "identifier": self.identifier, + "title": self.title, + "state": self.state, + "connector_id": self.connector_id, + "document_id": self.document_id, + "indexed_at": self.indexed_at, + } + + +class LinearToolMetadataService: + """Builds interrupt context for Linear HITL tools. + + All context queries (GraphQL reads) live here. + Write mutations live in LinearConnector. + """ + + def __init__(self, db_session: AsyncSession): + self._db_session = db_session + + async def get_creation_context(self, search_space_id: int, user_id: str) -> dict: + """Return context needed to create a new Linear issue. + + Fetches all connected Linear workspaces, and for each one fetches + its teams with states, members, and labels from the Linear API. + + Returns a dict with key: workspaces (each entry has id, name, organization_name, teams, priorities). + Returns a dict with key 'error' on failure. + """ + connectors = await self._get_all_linear_connectors(search_space_id, user_id) + if not connectors: + return {"error": "No Linear account connected"} + + workspaces = [] + for connector in connectors: + workspace = LinearWorkspace.from_connector(connector) + linear_client = LinearConnector( + session=self._db_session, connector_id=connector.id + ) + try: + priorities = await self._fetch_priority_values(linear_client) + teams = await self._fetch_teams_context(linear_client) + except Exception as e: + return {"error": f"Failed to fetch Linear context: {e!s}"} + workspaces.append({ + "id": workspace.id, + "name": workspace.name, + "organization_name": workspace.organization_name, + "teams": teams, + "priorities": priorities, + }) + + return {"workspaces": workspaces} + + async def get_update_context( + self, search_space_id: int, user_id: str, issue_ref: str + ) -> dict: + """Return context needed to update an indexed Linear issue. + + Resolves the issue from the KB (title → identifier → full title), + then fetches its current state, assignee, labels, and team context + from the Linear API. + + Returns a dict with keys: workspace, priorities, issue, team. + Returns a dict with key 'error' if the issue is not found or API fails. + """ + document = await self._resolve_issue(search_space_id, user_id, issue_ref) + if not document: + return { + "error": f"Issue '{issue_ref}' not found in your indexed Linear issues. " + "This could mean: (1) the issue doesn't exist, (2) it hasn't been indexed yet, " + "or (3) the title or identifier is different." + } + + connector = await self._get_connector_for_document(document, user_id) + if not connector: + return {"error": "Connector not found or access denied"} + + workspace = LinearWorkspace.from_connector(connector) + issue = LinearIssue.from_document(document) + + linear_client = LinearConnector( + session=self._db_session, connector_id=connector.id + ) + + try: + priorities = await self._fetch_priority_values(linear_client) + issue_api = await self._fetch_issue_context(linear_client, issue.id) + except Exception as e: + return {"error": f"Failed to fetch Linear issue context: {e!s}"} + + if not issue_api: + return { + "error": f"Issue '{issue_ref}' could not be fetched from Linear API" + } + + team_raw = issue_api.get("team") or {} + labels_raw = issue_api.get("labels") or {} + states_raw = team_raw.get("states") or {} + members_raw = team_raw.get("members") or {} + team_labels_raw = team_raw.get("labels") or {} + + return { + "workspace": workspace.to_dict(), + "priorities": priorities, + "issue": { + "id": issue_api.get("id"), + "identifier": issue_api.get("identifier"), + "title": issue_api.get("title"), + "description": issue_api.get("description"), + "priority": issue_api.get("priority"), + "url": issue_api.get("url"), + "current_state": issue_api.get("state"), + "current_assignee": issue_api.get("assignee"), + "current_labels": labels_raw.get("nodes", []), + "team_id": team_raw.get("id"), + "document_id": issue.document_id, + "indexed_at": issue.indexed_at, + }, + "team": { + "id": team_raw.get("id"), + "name": team_raw.get("name"), + "key": team_raw.get("key"), + "states": states_raw.get("nodes", []), + "members": members_raw.get("nodes", []), + "labels": team_labels_raw.get("nodes", []), + }, + } + + async def get_delete_context( + self, search_space_id: int, user_id: str, issue_ref: str + ) -> dict: + """Return context needed to archive an indexed Linear issue. + + Resolves the issue from the KB only — no Linear API call required. + + Returns a dict with keys: workspace, issue. + Returns a dict with key 'error' if the issue is not found. + """ + document = await self._resolve_issue(search_space_id, user_id, issue_ref) + if not document: + return { + "error": f"Issue '{issue_ref}' not found in your indexed Linear issues. " + "This could mean: (1) the issue doesn't exist, (2) it hasn't been indexed yet, " + "or (3) the title or identifier is different." + } + + connector = await self._get_connector_for_document(document, user_id) + if not connector: + return {"error": "Connector not found or access denied"} + + workspace = LinearWorkspace.from_connector(connector) + issue = LinearIssue.from_document(document) + + return { + "workspace": workspace.to_dict(), + "issue": issue.to_dict(), + } + + @staticmethod + async def _fetch_priority_values(client: LinearConnector) -> list[dict]: + """Fetch Linear priority values (0-4) with their display labels.""" + query = "{ issuePriorityValues { priority label } }" + result = await client.execute_graphql_query(query) + return result.get("data", {}).get("issuePriorityValues", []) + + @staticmethod + async def _fetch_teams_context(client: LinearConnector) -> list[dict]: + """Fetch all teams with their states, members, and labels.""" + query = """ + query { + teams(first: 25) { + nodes { + id name key + states { nodes { id name type color position } } + members { nodes { id name displayName email avatarUrl active } } + labels { nodes { id name color } } + } + } + } + """ + result = await client.execute_graphql_query(query) + raw_teams = result.get("data", {}).get("teams", {}).get("nodes", []) + + return [ + { + "id": t.get("id"), + "name": t.get("name"), + "key": t.get("key"), + "states": (t.get("states") or {}).get("nodes", []), + "members": (t.get("members") or {}).get("nodes", []), + "labels": (t.get("labels") or {}).get("nodes", []), + } + for t in raw_teams + ] + + @staticmethod + async def _fetch_issue_context( + client: LinearConnector, issue_id: str + ) -> dict | None: + """Fetch a single issue with its current state, assignee, labels, and team context.""" + query = """ + query LinearIssueContext($id: String!) { + issue(id: $id) { + id identifier title description priority url + state { id name type color } + assignee { id name displayName email } + labels { nodes { id name color } } + team { + id name key + states { nodes { id name type color position } } + members { nodes { id name displayName email avatarUrl active } } + labels { nodes { id name color } } + } + } + } + """ + result = await client.execute_graphql_query(query, {"id": issue_id}) + return result.get("data", {}).get("issue") + + async def _resolve_issue( + self, search_space_id: int, user_id: str, issue_ref: str + ) -> Document | None: + """Resolve an issue from the KB using a 3-step fallback. + + Order: issue_title (most natural) → issue_identifier (e.g. ENG-42) → document.title. + All comparisons are case-insensitive. + """ + ref_lower = issue_ref.lower() + + result = await self._db_session.execute( + select(Document) + .join( + SearchSourceConnector, Document.connector_id == SearchSourceConnector.id + ) + .filter( + and_( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.LINEAR_CONNECTOR, + SearchSourceConnector.user_id == user_id, + or_( + func.lower( + Document.document_metadata.op("->>")( + "issue_title" + ) + ) + == ref_lower, + func.lower( + Document.document_metadata.op("->>")( + "issue_identifier" + ) + ) + == ref_lower, + func.lower(Document.title) == ref_lower, + ), + ) + ) + .limit(1) + ) + return result.scalars().first() + + async def _get_all_linear_connectors( + self, search_space_id: int, user_id: str + ) -> list[SearchSourceConnector]: + """Fetch all Linear connectors for the given search space and user.""" + result = await self._db_session.execute( + select(SearchSourceConnector).filter( + and_( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + ) + return result.scalars().all() + + async def _get_connector_for_document( + self, document: Document, user_id: str + ) -> SearchSourceConnector | None: + """Fetch the connector associated with a document, scoped to the user.""" + if not document.connector_id: + return None + result = await self._db_session.execute( + select(SearchSourceConnector).filter( + and_( + SearchSourceConnector.id == document.connector_id, + SearchSourceConnector.user_id == user_id, + ) + ) + ) + return result.scalars().first() diff --git a/surfsense_backend/app/services/llm_service.py b/surfsense_backend/app/services/llm_service.py index 24bc4138b..e562d0d35 100644 --- a/surfsense_backend/app/services/llm_service.py +++ b/surfsense_backend/app/services/llm_service.py @@ -162,7 +162,7 @@ async def validate_llm_config( async def get_search_space_llm_instance( - session: AsyncSession, search_space_id: int, role: str + session: AsyncSession, search_space_id: int, role: str, disable_streaming: bool = False ) -> ChatLiteLLM | ChatLiteLLMRouter | None: """ Get a ChatLiteLLM instance for a specific search space and role. @@ -218,7 +218,7 @@ async def get_search_space_llm_instance( logger.debug( f"Using Auto mode (LLM Router) for search space {search_space_id}, role {role}" ) - return ChatLiteLLMRouter() + return ChatLiteLLMRouter(disable_streaming=disable_streaming) except Exception as e: logger.error(f"Failed to create ChatLiteLLMRouter: {e}") return None @@ -284,6 +284,9 @@ async def get_search_space_llm_instance( if global_config.get("litellm_params"): litellm_kwargs.update(global_config["litellm_params"]) + if disable_streaming: + litellm_kwargs["disable_streaming"] = True + return ChatLiteLLM(**litellm_kwargs) # Get the LLM configuration from database (NewLLMConfig) @@ -357,6 +360,9 @@ async def get_search_space_llm_instance( if llm_config.litellm_params: litellm_kwargs.update(llm_config.litellm_params) + if disable_streaming: + litellm_kwargs["disable_streaming"] = True + return ChatLiteLLM(**litellm_kwargs) except Exception as e: @@ -374,20 +380,20 @@ async def get_agent_llm( async def get_document_summary_llm( - session: AsyncSession, search_space_id: int + session: AsyncSession, search_space_id: int, disable_streaming: bool = False ) -> ChatLiteLLM | ChatLiteLLMRouter | None: """Get the search space's document summary LLM instance.""" return await get_search_space_llm_instance( - session, search_space_id, LLMRole.DOCUMENT_SUMMARY + session, search_space_id, LLMRole.DOCUMENT_SUMMARY, disable_streaming=disable_streaming ) # Backward-compatible alias (LLM preferences are now per-search-space, not per-user) async def get_user_long_context_llm( - session: AsyncSession, user_id: str, search_space_id: int + session: AsyncSession, user_id: str, search_space_id: int, disable_streaming: bool = False ) -> ChatLiteLLM | ChatLiteLLMRouter | None: """ Deprecated: Use get_document_summary_llm instead. The user_id parameter is ignored as LLM preferences are now per-search-space. """ - return await get_document_summary_llm(session, search_space_id) + return await get_document_summary_llm(session, search_space_id, disable_streaming=disable_streaming) diff --git a/surfsense_backend/app/services/notion/__init__.py b/surfsense_backend/app/services/notion/__init__.py index 3818d1af3..9511238e3 100644 --- a/surfsense_backend/app/services/notion/__init__.py +++ b/surfsense_backend/app/services/notion/__init__.py @@ -1,3 +1,4 @@ +from app.services.notion.kb_sync_service import NotionKBSyncService from app.services.notion.tool_metadata_service import ( NotionAccount, NotionPage, @@ -6,6 +7,7 @@ from app.services.notion.tool_metadata_service import ( __all__ = [ "NotionAccount", + "NotionKBSyncService", "NotionPage", "NotionToolMetadataService", ] diff --git a/surfsense_backend/app/services/notion/kb_sync_service.py b/surfsense_backend/app/services/notion/kb_sync_service.py new file mode 100644 index 000000000..ea6328d46 --- /dev/null +++ b/surfsense_backend/app/services/notion/kb_sync_service.py @@ -0,0 +1,163 @@ +import logging +from datetime import datetime + +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.db import Chunk, Document +from app.services.llm_service import get_user_long_context_llm +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, +) + +logger = logging.getLogger(__name__) + + +class NotionKBSyncService: + def __init__(self, db_session: AsyncSession): + self.db_session = db_session + + async def sync_after_update( + self, + document_id: int, + appended_content: str, + user_id: str, + search_space_id: int, + appended_block_ids: list[str] | None = None, + ) -> dict: + from app.tasks.connector_indexers.base import ( + get_current_timestamp, + safe_set_chunks, + ) + + try: + logger.debug(f"Starting KB sync for document {document_id}") + document = await self.db_session.get(Document, document_id) + + if not document: + logger.warning(f"Document {document_id} not found in KB") + return {"status": "not_indexed"} + + page_id = document.document_metadata.get("page_id") + if not page_id: + logger.error(f"Document {document_id} missing page_id in metadata") + return {"status": "error", "message": "Missing page_id in metadata"} + + logger.debug( + f"Document found: id={document_id}, page_id={page_id}, connector_id={document.connector_id}" + ) + + from app.connectors.notion_history import NotionHistoryConnector + + notion_connector = NotionHistoryConnector( + session=self.db_session, connector_id=document.connector_id + ) + + logger.debug(f"Fetching page content from Notion for page {page_id}") + blocks, _ = await notion_connector.get_page_content(page_id, page_title=None) + + from app.utils.notion_utils import extract_all_block_ids, process_blocks + + fetched_content = process_blocks(blocks) + logger.debug(f"Fetched content length: {len(fetched_content)} chars") + + if not fetched_content or not fetched_content.strip(): + logger.warning( + f"Fetched empty content for page {page_id} - document will have minimal searchable text" + ) + + content_verified = False + if appended_block_ids: + fetched_block_ids = set(extract_all_block_ids(blocks)) + found_blocks = [ + bid for bid in appended_block_ids if bid in fetched_block_ids + ] + + logger.debug( + f"Block verification: {len(found_blocks)}/{len(appended_block_ids)} blocks found" + ) + logger.debug( + f"Appended IDs (first 3): {appended_block_ids[:3]}, Fetched IDs count: {len(fetched_block_ids)}" + ) + + if len(found_blocks) >= len(appended_block_ids) * 0.8: # 80% threshold + logger.info( + f"Content verified fresh: found {len(found_blocks)}/{len(appended_block_ids)} appended blocks" + ) + full_content = fetched_content + content_verified = True + else: + logger.warning( + "No appended blocks found in fetched content - appending manually" + ) + full_content = fetched_content + "\n\n" + appended_content + content_verified = False + else: + logger.warning("No block IDs provided - using fetched content as-is") + full_content = fetched_content + content_verified = False + + logger.debug(f"Final content length: {len(full_content)} chars, verified={content_verified}") + + logger.debug("Generating summary and embeddings") + user_llm = await get_user_long_context_llm( + self.db_session, user_id, search_space_id, disable_streaming=True # disable streaming to avoid leaking into the chat + ) + + if user_llm: + document_metadata_for_summary = { + "page_title": document.document_metadata.get("page_title"), + "page_id": document.document_metadata.get("page_id"), + "document_type": "Notion Page", + "connector_type": "Notion", + } + summary_content, summary_embedding = await generate_document_summary( + full_content, user_llm, document_metadata_for_summary + ) + logger.debug(f"Generated summary length: {len(summary_content)} chars") + else: + logger.warning("No LLM configured - using fallback summary") + summary_content = f"Notion Page: {document.document_metadata.get('page_title')}\n\n{full_content[:500]}..." + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + logger.debug(f"Deleting old chunks for document {document_id}") + await self.db_session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + + logger.debug("Creating new chunks") + chunks = await create_document_chunks(full_content) + logger.debug(f"Created {len(chunks)} chunks") + + logger.debug("Updating document fields") + document.content = summary_content + document.content_hash = generate_content_hash(full_content, search_space_id) + document.embedding = summary_embedding + document.document_metadata = { + **document.document_metadata, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + + logger.debug("Committing changes to database") + await self.db_session.commit() + + logger.info( + f"Successfully synced KB for document {document_id}: " + f"summary={len(summary_content)} chars, chunks={len(chunks)}, " + f"content_verified={content_verified}" + ) + return {"status": "success"} + + except Exception as e: + logger.error( + f"Failed to sync KB for document {document_id}: {e}", exc_info=True + ) + await self.db_session.rollback() + return {"status": "error", "message": str(e)} diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index a319554a5..0decbb2a4 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -796,6 +796,9 @@ async def _stream_agent_events( "create_notion_page", "update_notion_page", "delete_notion_page", + "create_linear_issue", + "update_linear_issue", + "delete_linear_issue", ): yield streaming_service.format_tool_output_available( tool_call_id, diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 1a67ee7fc..d2b1c9137 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -24,6 +24,7 @@ from app.utils.document_converters import ( generate_document_summary, generate_unique_identifier_hash, ) +from app.utils.notion_utils import process_blocks from .base import ( build_document_metadata_string, @@ -280,53 +281,6 @@ async def index_notion_pages( pages_to_process = [] # List of dicts with document and page data new_documents_created = False - # Helper function to convert page content to markdown - def process_blocks(blocks, level=0): - result = "" - for block in blocks: - block_type = block.get("type") - block_content = block.get("content", "") - children = block.get("children", []) - - # Add indentation based on level - indent = " " * level - - # Format based on block type - if block_type in ["paragraph", "text"]: - result += f"{indent}{block_content}\n\n" - elif block_type in ["heading_1", "header"]: - result += f"{indent}# {block_content}\n\n" - elif block_type == "heading_2": - result += f"{indent}## {block_content}\n\n" - elif block_type == "heading_3": - result += f"{indent}### {block_content}\n\n" - elif block_type == "bulleted_list_item": - result += f"{indent}* {block_content}\n" - elif block_type == "numbered_list_item": - result += f"{indent}1. {block_content}\n" - elif block_type == "to_do": - result += f"{indent}- [ ] {block_content}\n" - elif block_type == "toggle": - result += f"{indent}> {block_content}\n" - elif block_type == "code": - result += f"{indent}```\n{block_content}\n```\n\n" - elif block_type == "quote": - result += f"{indent}> {block_content}\n\n" - elif block_type == "callout": - result += f"{indent}> **Note:** {block_content}\n\n" - elif block_type == "image": - result += f"{indent}![Image]({block_content})\n\n" - else: - # Default for other block types - if block_content: - result += f"{indent}{block_content}\n\n" - - # Process children recursively - if children: - result += process_blocks(children, level + 1) - - return result - for page in pages: try: page_id = page.get("page_id") diff --git a/surfsense_backend/app/utils/notion_utils.py b/surfsense_backend/app/utils/notion_utils.py new file mode 100644 index 000000000..8f833fab6 --- /dev/null +++ b/surfsense_backend/app/utils/notion_utils.py @@ -0,0 +1,58 @@ +"""Utility functions for processing Notion blocks and content.""" + + +def extract_all_block_ids(blocks_list): + ids = [] + for block in blocks_list: + if isinstance(block, dict) and "id" in block: + ids.append(block["id"]) + if isinstance(block, dict) and block.get("children"): + ids.extend(extract_all_block_ids(block["children"])) + return ids + + +def process_blocks(blocks, level=0): + result = "" + for block in blocks: + block_type = block.get("type") + block_content = block.get("content", "") + children = block.get("children", []) + + # Add indentation based on level + indent = " " * level + + # Format based on block type + if block_type in ["paragraph", "text"]: + result += f"{indent}{block_content}\n\n" + elif block_type in ["heading_1", "header"]: + result += f"{indent}# {block_content}\n\n" + elif block_type == "heading_2": + result += f"{indent}## {block_content}\n\n" + elif block_type == "heading_3": + result += f"{indent}### {block_content}\n\n" + elif block_type == "bulleted_list_item": + result += f"{indent}* {block_content}\n" + elif block_type == "numbered_list_item": + result += f"{indent}1. {block_content}\n" + elif block_type == "to_do": + result += f"{indent}- [ ] {block_content}\n" + elif block_type == "toggle": + result += f"{indent}> {block_content}\n" + elif block_type == "code": + result += f"{indent}```\n{block_content}\n```\n\n" + elif block_type == "quote": + result += f"{indent}> {block_content}\n\n" + elif block_type == "callout": + result += f"{indent}> **Note:** {block_content}\n\n" + elif block_type == "image": + result += f"{indent}![Image]({block_content})\n\n" + else: + # Default for other block types + if block_content: + result += f"{indent}{block_content}\n\n" + + # Process children recursively + if children: + result += process_blocks(children, level + 1) + + return result diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index fde462a44..dd11382a8 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -34,15 +34,22 @@ import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { Thread } from "@/components/assistant-ui/thread"; import { ChatHeader } from "@/components/new-chat/chat-header"; import { ReportPanel } from "@/components/report-panel/report-panel"; -import { CreateNotionPageToolUI } from "@/components/tool-ui/create-notion-page"; import type { ThinkingStep } from "@/components/tool-ui/deepagent-thinking"; -import { DeleteNotionPageToolUI } from "@/components/tool-ui/delete-notion-page"; import { DisplayImageToolUI } from "@/components/tool-ui/display-image"; import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast"; import { GenerateReportToolUI } from "@/components/tool-ui/generate-report"; +import { + CreateLinearIssueToolUI, + DeleteLinearIssueToolUI, + UpdateLinearIssueToolUI, +} from "@/components/tool-ui/linear"; import { LinkPreviewToolUI } from "@/components/tool-ui/link-preview"; +import { + CreateNotionPageToolUI, + DeleteNotionPageToolUI, + UpdateNotionPageToolUI, +} from "@/components/tool-ui/notion"; import { ScrapeWebpageToolUI } from "@/components/tool-ui/scrape-webpage"; -import { UpdateNotionPageToolUI } from "@/components/tool-ui/update-notion-page"; import { RecallMemoryToolUI, SaveMemoryToolUI } from "@/components/tool-ui/user-memory"; import { Skeleton } from "@/components/ui/skeleton"; import { useChatSessionStateSync } from "@/hooks/use-chat-session-state"; @@ -141,6 +148,9 @@ const TOOLS_WITH_UI = new Set([ "scrape_webpage", "create_notion_page", "update_notion_page", + "create_linear_issue", + "update_linear_issue", + "delete_linear_issue", // "write_todos", // Disabled for now ]); @@ -1651,6 +1661,9 @@ export default function NewChatPage() { + + + {/* Disabled for now */}
diff --git a/surfsense_web/components/tool-ui/index.ts b/surfsense_web/components/tool-ui/index.ts index 96d1e2502..93b6229a0 100644 --- a/surfsense_web/components/tool-ui/index.ts +++ b/surfsense_web/components/tool-ui/index.ts @@ -16,7 +16,6 @@ export { type SerializableArticle, } from "./article"; export { Audio } from "./audio"; -export { CreateNotionPageToolUI } from "./create-notion-page"; export { type DeepAgentThinkingArgs, type DeepAgentThinkingResult, @@ -42,6 +41,11 @@ export { parseSerializableImage, type SerializableImage, } from "./image"; +export { + CreateLinearIssueToolUI, + DeleteLinearIssueToolUI, + UpdateLinearIssueToolUI, +} from "./linear"; export { type LinkPreviewArgs, LinkPreviewArgsSchema, @@ -63,6 +67,7 @@ export { parseSerializableMediaCard, type SerializableMediaCard, } from "./media-card"; +export { CreateNotionPageToolUI, DeleteNotionPageToolUI, UpdateNotionPageToolUI } from "./notion"; export { Plan, PlanErrorBoundary, @@ -79,7 +84,6 @@ export { ScrapeWebpageResultSchema, ScrapeWebpageToolUI, } from "./scrape-webpage"; -export { UpdateNotionPageToolUI } from "./update-notion-page"; export { type MemoryItem, type RecallMemoryArgs, diff --git a/surfsense_web/components/tool-ui/linear/create-linear-issue.tsx b/surfsense_web/components/tool-ui/linear/create-linear-issue.tsx new file mode 100644 index 000000000..5867863cc --- /dev/null +++ b/surfsense_web/components/tool-ui/linear/create-linear-issue.tsx @@ -0,0 +1,619 @@ +"use client"; + +import { makeAssistantToolUI } from "@assistant-ui/react"; +import { AlertTriangleIcon, CheckIcon, Loader2Icon, PencilIcon, XIcon } from "lucide-react"; +import { useMemo, useState } from "react"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Textarea } from "@/components/ui/textarea"; + +interface LinearLabel { + id: string; + name: string; + color: string; +} + +interface LinearState { + id: string; + name: string; + type: string; + color: string; + position: number; +} + +interface LinearMember { + id: string; + name: string; + displayName: string; + email: string; + active: boolean; +} + +interface LinearTeam { + id: string; + name: string; + key: string; + states: LinearState[]; + members: LinearMember[]; + labels: LinearLabel[]; +} + +interface LinearPriority { + priority: number; + label: string; +} + +interface LinearWorkspace { + id: number; + name: string; + organization_name: string; + teams: LinearTeam[]; + priorities: LinearPriority[]; +} + +interface InterruptResult { + __interrupt__: true; + __decided__?: "approve" | "reject" | "edit"; + action_requests: Array<{ + name: string; + args: Record; + }>; + review_configs: Array<{ + action_name: string; + allowed_decisions: Array<"approve" | "edit" | "reject">; + }>; + interrupt_type?: string; + context?: { + workspaces?: LinearWorkspace[]; + error?: string; + }; +} + +interface SuccessResult { + status: "success"; + issue_id: string; + identifier: string; + url: string; + message?: string; +} + +interface ErrorResult { + status: "error"; + message: string; +} + +type CreateLinearIssueResult = InterruptResult | SuccessResult | ErrorResult; + +function isInterruptResult(result: unknown): result is InterruptResult { + return ( + typeof result === "object" && + result !== null && + "__interrupt__" in result && + (result as InterruptResult).__interrupt__ === true + ); +} + +function isErrorResult(result: unknown): result is ErrorResult { + return ( + typeof result === "object" && + result !== null && + "status" in result && + (result as ErrorResult).status === "error" + ); +} + +function ApprovalCard({ + args, + interruptData, + onDecision, +}: { + args: { title: string; description?: string }; + interruptData: InterruptResult; + onDecision: (decision: { + type: "approve" | "reject" | "edit"; + message?: string; + edited_action?: { name: string; args: Record }; + }) => void; +}) { + const [decided, setDecided] = useState<"approve" | "reject" | "edit" | null>( + interruptData.__decided__ ?? null + ); + const [isEditing, setIsEditing] = useState(false); + const [editedTitle, setEditedTitle] = useState(args.title ?? ""); + const [editedDescription, setEditedDescription] = useState(args.description ?? ""); + const [selectedWorkspaceId, setSelectedWorkspaceId] = useState(""); + const [selectedTeamId, setSelectedTeamId] = useState(""); + const [selectedStateId, setSelectedStateId] = useState("__none__"); + const [selectedAssigneeId, setSelectedAssigneeId] = useState("__none__"); + const [selectedPriority, setSelectedPriority] = useState("0"); + const [selectedLabelIds, setSelectedLabelIds] = useState([]); + + const workspaces = interruptData.context?.workspaces ?? []; + + const selectedWorkspace = useMemo( + () => workspaces.find((w) => String(w.id) === selectedWorkspaceId) ?? null, + [workspaces, selectedWorkspaceId] + ); + + const selectedTeam = useMemo( + () => selectedWorkspace?.teams.find((t) => t.id === selectedTeamId) ?? null, + [selectedWorkspace, selectedTeamId] + ); + + const isTitleValid = editedTitle.trim().length > 0; + const canApprove = !!selectedWorkspaceId && !!selectedTeamId && isTitleValid; + + const reviewConfig = interruptData.review_configs[0]; + const allowedDecisions = reviewConfig?.allowed_decisions ?? ["approve", "reject"]; + const canEdit = allowedDecisions.includes("edit"); + + function buildFinalArgs() { + return { + title: editedTitle, + description: editedDescription || null, + connector_id: selectedWorkspaceId ? Number(selectedWorkspaceId) : null, + team_id: selectedTeamId || null, + state_id: selectedStateId === "__none__" ? null : selectedStateId, + assignee_id: selectedAssigneeId === "__none__" ? null : selectedAssigneeId, + priority: Number(selectedPriority), + label_ids: selectedLabelIds, + }; + } + + return ( +
+ {/* Header */} +
+
+ +
+
+

Create Linear Issue

+

+ {isEditing ? "You can edit the arguments below" : "Requires your approval to proceed"} +

+
+
+ + {/* Context section */} + {!decided && ( +
+ {interruptData.context?.error ? ( +

{interruptData.context.error}

+ ) : ( + <> + {workspaces.length > 0 && ( +
+
+ Linear Account * +
+ +
+ )} + + {selectedWorkspace && ( + <> +
+
+ Team * +
+ +
+ + {selectedTeam && ( + <> +
+
State
+ +
+ +
+
Assignee
+ +
+ +
+
Priority
+ +
+ + {selectedTeam.labels.length > 0 && ( +
+
Labels
+
+ {selectedTeam.labels.map((label) => { + const isSelected = selectedLabelIds.includes(label.id); + return ( + + ); + })} +
+
+ )} + + )} + + )} + + )} +
+ )} + + {/* Display mode */} + {!isEditing && ( +
+
+

Title

+

{args.title}

+
+ {args.description && ( +
+

Description

+

+ {args.description} +

+
+ )} +
+ )} + + {/* Edit mode */} + {isEditing && !decided && ( +
+
+ + setEditedTitle(e.target.value)} + placeholder="Enter issue title" + className={!isTitleValid ? "border-destructive" : ""} + /> + {!isTitleValid &&

Title is required

} +
+
+ +