subagents/connectors: delete orphan pre-MCP issue-mutation tool files from jira and linear routes (MCP supplies these now).

This commit is contained in:
CREDO23 2026-05-11 11:30:58 +02:00
parent 9b8ebbab2c
commit 7fba56862e
4 changed files with 0 additions and 943 deletions

View file

@ -1,216 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_create_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def create_jira_issue(
project_key: str,
summary: str,
issue_type: str = "Task",
description: str | None = None,
priority: str | None = None,
) -> dict[str, Any]:
"""Create a new issue in Jira.
Use this tool when the user explicitly asks to create a new Jira issue/ticket.
Args:
project_key: The Jira project key (e.g. "PROJ", "ENG").
summary: Short, descriptive issue title.
issue_type: Issue type (default "Task"). Others: "Bug", "Story", "Epic".
description: Optional description body for the issue.
priority: Optional priority name (e.g. "High", "Medium", "Low").
Returns:
Dictionary with status, issue_key, and message.
IMPORTANT:
- If status is "rejected", the user declined. Do NOT retry.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"create_jira_issue called: project_key='{project_key}', summary='{summary}'"
)
if db_session is None or search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_creation_context(
search_space_id, user_id
)
if "error" in context:
return {"status": "error", "message": context["error"]}
accounts = context.get("accounts", [])
if accounts and all(a.get("auth_expired") for a in accounts):
return {
"status": "auth_error",
"message": "All connected Jira accounts need re-authentication.",
"connector_type": "jira",
}
result = request_approval(
action_type="jira_issue_creation",
tool_name="create_jira_issue",
params={
"project_key": project_key,
"summary": summary,
"issue_type": issue_type,
"description": description,
"priority": priority,
"connector_id": connector_id,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_project_key = result.params.get("project_key", project_key)
final_summary = result.params.get("summary", summary)
final_issue_type = result.params.get("issue_type", issue_type)
final_description = result.params.get("description", description)
final_priority = result.params.get("priority", priority)
final_connector_id = result.params.get("connector_id", connector_id)
if not final_summary or not final_summary.strip():
return {"status": "error", "message": "Issue summary cannot be empty."}
if not final_project_key:
return {"status": "error", "message": "A project must be selected."}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
actual_connector_id = final_connector_id
if actual_connector_id is None:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {"status": "error", "message": "No Jira connector found."}
actual_connector_id = connector.id
else:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == actual_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=actual_connector_id
)
jira_client = await jira_history._get_jira_client()
api_result = await asyncio.to_thread(
jira_client.create_issue,
project_key=final_project_key,
summary=final_summary,
issue_type=final_issue_type,
description=final_description,
priority=final_priority,
)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
_conn = connector
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
issue_key = api_result.get("key", "")
issue_url = (
f"{jira_history._base_url}/browse/{issue_key}"
if jira_history._base_url and issue_key
else ""
)
kb_message_suffix = ""
try:
from app.services.jira import JiraKBSyncService
kb_service = JiraKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
issue_id=issue_key,
issue_identifier=issue_key,
issue_title=final_summary,
description=final_description,
state="To Do",
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = " Your knowledge base has also been updated."
else:
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning(f"KB sync after create failed: {kb_err}")
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
return {
"status": "success",
"issue_key": issue_key,
"issue_url": issue_url,
"message": f"Jira issue {issue_key} created successfully.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error creating Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while creating the issue.",
}
return create_jira_issue

View file

@ -1,183 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_delete_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def delete_jira_issue(
issue_title_or_key: str,
delete_from_kb: bool = False,
) -> dict[str, Any]:
"""Delete a Jira issue.
Use this tool when the user asks to delete or remove a Jira issue.
Args:
issue_title_or_key: The issue key (e.g. "PROJ-42") or title.
delete_from_kb: Whether to also remove from the knowledge base.
Returns:
Dictionary with status, message, and deleted_from_kb.
IMPORTANT:
- If status is "rejected", do NOT retry.
- If status is "not_found", relay the message to the user.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"delete_jira_issue called: issue_title_or_key='{issue_title_or_key}'"
)
if db_session is None or search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_deletion_context(
search_space_id, user_id, issue_title_or_key
)
if "error" in context:
error_msg = context["error"]
if context.get("auth_expired"):
return {
"status": "auth_error",
"message": error_msg,
"connector_id": context.get("connector_id"),
"connector_type": "jira",
}
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
issue_data = context["issue"]
issue_key = issue_data["issue_id"]
document_id = issue_data["document_id"]
connector_id_from_context = context.get("account", {}).get("id")
result = request_approval(
action_type="jira_issue_deletion",
tool_name="delete_jira_issue",
params={
"issue_key": issue_key,
"connector_id": connector_id_from_context,
"delete_from_kb": delete_from_kb,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_issue_key = result.params.get("issue_key", issue_key)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this issue.",
}
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=final_connector_id
)
jira_client = await jira_history._get_jira_client()
await asyncio.to_thread(jira_client.delete_issue, final_issue_key)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": final_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
deleted_from_kb = False
if final_delete_from_kb and document_id:
try:
from app.db import Document
doc_result = await db_session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if document:
await db_session.delete(document)
await db_session.commit()
deleted_from_kb = True
except Exception as e:
logger.error(f"Failed to delete document from KB: {e}")
await db_session.rollback()
message = f"Jira issue {final_issue_key} deleted successfully."
if deleted_from_kb:
message += " Also removed from the knowledge base."
return {
"status": "success",
"issue_key": final_issue_key,
"deleted_from_kb": deleted_from_kb,
"message": message,
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error deleting Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while deleting the issue.",
}
return delete_jira_issue

View file

@ -1,226 +0,0 @@
import asyncio
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.jira_history import JiraHistoryConnector
from app.services.jira import JiraToolMetadataService
logger = logging.getLogger(__name__)
def create_update_jira_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def update_jira_issue(
issue_title_or_key: str,
new_summary: str | None = None,
new_description: str | None = None,
new_priority: str | None = None,
) -> dict[str, Any]:
"""Update an existing Jira issue.
Use this tool when the user asks to modify, edit, or update a Jira issue.
Args:
issue_title_or_key: The issue key (e.g. "PROJ-42") or title to identify the issue.
new_summary: Optional new title/summary for the issue.
new_description: Optional new description.
new_priority: Optional new priority name.
Returns:
Dictionary with status and message.
IMPORTANT:
- If status is "rejected", do NOT retry.
- If status is "not_found", relay the message and ask user to verify.
- If status is "insufficient_permissions", inform user to re-authenticate.
"""
logger.info(
f"update_jira_issue called: issue_title_or_key='{issue_title_or_key}'"
)
if db_session is None or search_space_id is None or user_id is None:
return {"status": "error", "message": "Jira tool not properly configured."}
try:
metadata_service = JiraToolMetadataService(db_session)
context = await metadata_service.get_update_context(
search_space_id, user_id, issue_title_or_key
)
if "error" in context:
error_msg = context["error"]
if context.get("auth_expired"):
return {
"status": "auth_error",
"message": error_msg,
"connector_id": context.get("connector_id"),
"connector_type": "jira",
}
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
issue_data = context["issue"]
issue_key = issue_data["issue_id"]
document_id = issue_data.get("document_id")
connector_id_from_context = context.get("account", {}).get("id")
result = request_approval(
action_type="jira_issue_update",
tool_name="update_jira_issue",
params={
"issue_key": issue_key,
"document_id": document_id,
"new_summary": new_summary,
"new_description": new_description,
"new_priority": new_priority,
"connector_id": connector_id_from_context,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_issue_key = result.params.get("issue_key", issue_key)
final_summary = result.params.get("new_summary", new_summary)
final_description = result.params.get("new_description", new_description)
final_priority = result.params.get("new_priority", new_priority)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_document_id = result.params.get("document_id", document_id)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this issue.",
}
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.JIRA_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Jira connector is invalid.",
}
fields: dict[str, Any] = {}
if final_summary:
fields["summary"] = final_summary
if final_description is not None:
fields["description"] = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [{"type": "text", "text": final_description}],
}
],
}
if final_priority:
fields["priority"] = {"name": final_priority}
if not fields:
return {"status": "error", "message": "No changes specified."}
try:
jira_history = JiraHistoryConnector(
session=db_session, connector_id=final_connector_id
)
jira_client = await jira_history._get_jira_client()
await asyncio.to_thread(
jira_client.update_issue, final_issue_key, fields
)
except Exception as api_err:
if "status code 403" in str(api_err).lower():
try:
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
pass
return {
"status": "insufficient_permissions",
"connector_id": final_connector_id,
"message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
issue_url = (
f"{jira_history._base_url}/browse/{final_issue_key}"
if jira_history._base_url and final_issue_key
else ""
)
kb_message_suffix = ""
if final_document_id:
try:
from app.services.jira import JiraKBSyncService
kb_service = JiraKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=final_document_id,
issue_id=final_issue_key,
user_id=user_id,
search_space_id=search_space_id,
)
if kb_result["status"] == "success":
kb_message_suffix = (
" Your knowledge base has also been updated."
)
else:
kb_message_suffix = (
" The knowledge base will be updated in the next sync."
)
except Exception as kb_err:
logger.warning(f"KB sync after update failed: {kb_err}")
kb_message_suffix = (
" The knowledge base will be updated in the next sync."
)
return {
"status": "success",
"issue_key": final_issue_key,
"issue_url": issue_url,
"message": f"Jira issue {final_issue_key} updated successfully.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error updating Jira issue: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while updating the issue.",
}
return update_jira_issue

View file

@ -1,318 +0,0 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.connectors.linear_connector import LinearAPIError, LinearConnector
from app.services.linear import LinearKBSyncService, LinearToolMetadataService
logger = logging.getLogger(__name__)
def create_update_linear_issue_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
"""
Factory function to create the update_linear_issue tool.
Args:
db_session: Database session for accessing the Linear connector
search_space_id: Search space ID to find the Linear connector
user_id: User ID for fetching user-specific context
connector_id: Optional specific connector ID (if known)
Returns:
Configured update_linear_issue tool
"""
@tool
async def update_linear_issue(
issue_ref: str,
new_title: str | None = None,
new_description: str | None = None,
new_state_name: str | None = None,
new_assignee_email: str | None = None,
new_priority: int | None = None,
new_label_names: list[str] | None = None,
) -> dict[str, Any]:
"""Update an existing Linear issue that has been indexed in the knowledge base.
Use this tool when the user asks to modify, change, or update a Linear issue
for example, changing its status, reassigning it, updating its title or description,
adjusting its priority, or changing its labels.
Only issues already indexed in the knowledge base can be updated.
Args:
issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"),
the identifier (e.g. "ENG-42"), or the full document title
(e.g. "ENG-42: Fix login bug"). Matched case-insensitively.
new_title: New title for the issue (optional).
new_description: New markdown body for the issue (optional).
new_state_name: New workflow state name (e.g. "In Progress", "Done").
Matched case-insensitively against the team's states.
new_assignee_email: Email address of the new assignee.
Matched case-insensitively against the team's members.
new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High,
3 = Medium, 4 = Low).
new_label_names: New set of label names to apply.
Matched case-insensitively against the team's labels.
Unrecognised names are silently skipped.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", or "error"
- identifier: Human-readable ID like "ENG-42" (if success)
- url: URL to the updated issue (if success)
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.")
and move on. Do NOT ask for alternatives or troubleshoot.
- If status is "not_found", inform the user conversationally using the exact message
provided. Do NOT treat this as an error. Simply relay the message and ask the user
to verify the issue title or identifier, or check if it has been indexed.
Examples:
- "Mark the 'Fix login bug' issue as done"
- "Assign ENG-42 to john@company.com"
- "Change the priority of 'Payment timeout' to urgent"
"""
logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'")
if db_session is None or search_space_id is None or user_id is None:
logger.error(
"Linear tool not properly configured - missing required parameters"
)
return {
"status": "error",
"message": "Linear tool not properly configured. Please contact support.",
}
try:
metadata_service = LinearToolMetadataService(db_session)
context = await metadata_service.get_update_context(
search_space_id, user_id, issue_ref
)
if "error" in context:
error_msg = context["error"]
if context.get("auth_expired"):
logger.warning(f"Auth expired for update context: {error_msg}")
return {
"status": "auth_error",
"message": error_msg,
"connector_id": context.get("connector_id"),
"connector_type": "linear",
}
if "not found" in error_msg.lower():
logger.warning(f"Issue not found: {error_msg}")
return {"status": "not_found", "message": error_msg}
else:
logger.error(f"Failed to fetch update context: {error_msg}")
return {"status": "error", "message": error_msg}
issue_id = context["issue"]["id"]
document_id = context["issue"]["document_id"]
connector_id_from_context = context.get("workspace", {}).get("id")
team = context.get("team", {})
new_state_id = _resolve_state(team, new_state_name)
new_assignee_id = _resolve_assignee(team, new_assignee_email)
new_label_ids = _resolve_labels(team, new_label_names)
logger.info(
f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})"
)
result = request_approval(
action_type="linear_issue_update",
tool_name="update_linear_issue",
params={
"issue_id": issue_id,
"document_id": document_id,
"new_title": new_title,
"new_description": new_description,
"new_state_id": new_state_id,
"new_assignee_id": new_assignee_id,
"new_priority": new_priority,
"new_label_ids": new_label_ids,
"connector_id": connector_id_from_context,
},
context=context,
)
if result.rejected:
logger.info("Linear issue update rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_issue_id = result.params.get("issue_id", issue_id)
final_document_id = result.params.get("document_id", document_id)
final_new_title = result.params.get("new_title", new_title)
final_new_description = result.params.get(
"new_description", new_description
)
final_new_state_id = result.params.get("new_state_id", new_state_id)
final_new_assignee_id = result.params.get(
"new_assignee_id", new_assignee_id
)
final_new_priority = result.params.get("new_priority", new_priority)
final_new_label_ids: list[str] | None = result.params.get(
"new_label_ids", new_label_ids
)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
if not final_connector_id:
logger.error("No connector found for this issue")
return {
"status": "error",
"message": "No connector found for this issue.",
}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LINEAR_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
logger.error(
f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}"
)
return {
"status": "error",
"message": "Selected Linear connector is invalid or has been disconnected.",
}
logger.info(f"Validated Linear connector: id={final_connector_id}")
logger.info(
f"Updating Linear issue with final params: issue_id={final_issue_id}"
)
linear_client = LinearConnector(
session=db_session, connector_id=final_connector_id
)
updated_issue = await linear_client.update_issue(
issue_id=final_issue_id,
title=final_new_title,
description=final_new_description,
state_id=final_new_state_id,
assignee_id=final_new_assignee_id,
priority=final_new_priority,
label_ids=final_new_label_ids,
)
if updated_issue.get("status") == "error":
logger.error(
f"Failed to update Linear issue: {updated_issue.get('message')}"
)
return {
"status": "error",
"message": updated_issue.get("message"),
}
logger.info(
f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}"
)
if final_document_id is not None:
logger.info(
f"Updating knowledge base for document {final_document_id}..."
)
kb_service = LinearKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=final_document_id,
issue_id=final_issue_id,
user_id=user_id,
search_space_id=search_space_id,
)
if kb_result["status"] == "success":
logger.info(
f"Knowledge base successfully updated for issue {final_issue_id}"
)
kb_message = " Your knowledge base has also been updated."
elif kb_result["status"] == "not_indexed":
kb_message = " This issue will be added to your knowledge base in the next scheduled sync."
else:
logger.warning(
f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}"
)
kb_message = " Your knowledge base will be updated in the next scheduled sync."
else:
kb_message = ""
identifier = updated_issue.get("identifier")
default_msg = f"Issue {identifier} updated successfully."
return {
"status": "success",
"identifier": identifier,
"url": updated_issue.get("url"),
"message": f"{updated_issue.get('message', default_msg)}{kb_message}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error updating Linear issue: {e}", exc_info=True)
if isinstance(e, ValueError | LinearAPIError):
message = str(e)
else:
message = (
"Something went wrong while updating the issue. Please try again."
)
return {"status": "error", "message": message}
return update_linear_issue
def _resolve_state(team: dict, state_name: str | None) -> str | None:
if not state_name:
return None
name_lower = state_name.lower()
for state in team.get("states", []):
if state.get("name", "").lower() == name_lower:
return state["id"]
return None
def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None:
if not assignee_email:
return None
email_lower = assignee_email.lower()
for member in team.get("members", []):
if member.get("email", "").lower() == email_lower:
return member["id"]
return None
def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None:
if label_names is None:
return None
if not label_names:
return []
name_set = {n.lower() for n in label_names}
return [
label["id"]
for label in team.get("labels", [])
if label.get("name", "").lower() in name_set
]