Add Airtable, Calendar, and Clickup connector route slices.

This commit is contained in:
CREDO23 2026-05-01 20:30:20 +02:00
parent ba57eae2bb
commit 4f0e84c6a3
23 changed files with 1516 additions and 0 deletions

View file

@ -0,0 +1,54 @@
"""`airtable` route: ``SubAgent`` spec for deepagents."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
merge_tools_permissions,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import (
pack_subagent,
)
from .tools.index import load_tools
NAME = "airtable"
def build_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
extra_tools_bucket: ToolsPermissions | None = None,
) -> SubAgent:
buckets = load_tools(dependencies=dependencies)
merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket)
tools = [
row["tool"]
for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"])
if row.get("tool") is not None
]
interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")}
description = read_md_file(__package__, "description").strip()
if not description:
description = "Handles airtable tasks for this workspace."
system_prompt = read_md_file(__package__, "system_prompt").strip()
return pack_subagent(
name=NAME,
description=description,
system_prompt=system_prompt,
tools=tools,
interrupt_on=interrupt_on,
model=model,
extra_middleware=extra_middleware,
)

View file

@ -0,0 +1 @@
Use for Airtable structured data operations: locate bases/tables and create/read/update records.

View file

@ -0,0 +1,46 @@
You are the Airtable MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Airtable MCP base/table/record operations accurately.
</goal>
<available_tools>
- Runtime-provided Airtable MCP tools for bases, tables, and records.
</available_tools>
<tool_policy>
- Resolve base and table targets before record-level actions.
- Do not guess IDs or schema fields.
- If targets are ambiguous, return `status=blocked` with candidate options.
- Never claim mutation success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not execute non-Airtable tasks.
</out_of_scope>
<safety>
- Never claim record mutations succeeded without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved target/schema ambiguity, return `status=blocked` with required options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -0,0 +1,3 @@
"""Airtable route: native tool factories are empty; MCP supplies tools when configured."""
__all__: list[str] = []

View file

@ -0,0 +1,12 @@
from __future__ import annotations
from typing import Any
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
)
def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions:
_ = {**(dependencies or {}), **kwargs}
return {"allow": [], "ask": []}

View file

@ -0,0 +1,54 @@
"""`calendar` route: ``SubAgent`` spec for deepagents."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
merge_tools_permissions,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import (
pack_subagent,
)
from .tools.index import load_tools
NAME = "calendar"
def build_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
extra_tools_bucket: ToolsPermissions | None = None,
) -> SubAgent:
buckets = load_tools(dependencies=dependencies)
merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket)
tools = [
row["tool"]
for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"])
if row.get("tool") is not None
]
interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")}
description = read_md_file(__package__, "description").strip()
if not description:
description = "Handles calendar tasks for this workspace."
system_prompt = read_md_file(__package__, "system_prompt").strip()
return pack_subagent(
name=NAME,
description=description,
system_prompt=system_prompt,
tools=tools,
interrupt_on=interrupt_on,
model=model,
extra_middleware=extra_middleware,
)

View file

@ -0,0 +1 @@
Use for calendar planning and scheduling: check availability, read event details, create events, and update events.

View file

@ -0,0 +1,62 @@
You are the Google Calendar operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute calendar event operations (search, create, update, delete) accurately with timezone-safe scheduling.
</goal>
<available_tools>
- `search_calendar_events`
- `create_calendar_event`
- `update_calendar_event`
- `delete_calendar_event`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Resolve relative dates against current runtime timestamp.
- If required fields (date/time/timezone/target event) are missing or ambiguous, return `status=blocked` with `missing_fields` and supervisor `next_step`.
- Never invent event IDs or mutation results.
</tool_policy>
<out_of_scope>
- Do not perform non-calendar tasks.
</out_of_scope>
<safety>
- Before update/delete, ensure event target is explicit.
- Never claim event mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On ambiguity, return `status=blocked` with top event candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"event_id": string | null,
"title": string | null,
"start_at": string (ISO 8601 with timezone) | null,
"end_at": string (ISO 8601 with timezone) | null,
"matched_candidates": [
{
"event_id": string,
"title": string | null,
"start_at": string (ISO 8601 with timezone) | null
}
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -0,0 +1,19 @@
from app.agents.new_chat.tools.google_calendar.create_event import (
create_create_calendar_event_tool,
)
from app.agents.new_chat.tools.google_calendar.delete_event import (
create_delete_calendar_event_tool,
)
from app.agents.new_chat.tools.google_calendar.search_events import (
create_search_calendar_events_tool,
)
from app.agents.new_chat.tools.google_calendar.update_event import (
create_update_calendar_event_tool,
)
__all__ = [
"create_create_calendar_event_tool",
"create_delete_calendar_event_tool",
"create_search_calendar_events_tool",
"create_update_calendar_event_tool",
]

View file

@ -0,0 +1,324 @@
import asyncio
import logging
from datetime import datetime
from typing import Any
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.google_calendar import GoogleCalendarToolMetadataService
logger = logging.getLogger(__name__)
def create_create_calendar_event_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
):
@tool
async def create_calendar_event(
summary: str,
start_datetime: str,
end_datetime: str,
description: str | None = None,
location: str | None = None,
attendees: list[str] | None = None,
) -> dict[str, Any]:
"""Create a new event on Google Calendar.
Use when the user asks to schedule, create, or add a calendar event.
Ask for event details if not provided.
Args:
summary: The event title.
start_datetime: Start time in ISO 8601 format (e.g. "2026-03-20T10:00:00").
end_datetime: End time in ISO 8601 format (e.g. "2026-03-20T11:00:00").
description: Optional event description.
location: Optional event location.
attendees: Optional list of attendee email addresses.
Returns:
Dictionary with:
- status: "success", "rejected", "auth_error", or "error"
- event_id: Google Calendar event ID (if success)
- html_link: URL to open the event (if success)
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
Examples:
- "Schedule a meeting with John tomorrow at 10am"
- "Create a calendar event for the team standup"
"""
logger.info(
f"create_calendar_event called: summary='{summary}', start='{start_datetime}', end='{end_datetime}'"
)
if db_session is None or search_space_id is None or user_id is None:
return {
"status": "error",
"message": "Google Calendar tool not properly configured. Please contact support.",
}
try:
metadata_service = GoogleCalendarToolMetadataService(db_session)
context = await metadata_service.get_creation_context(
search_space_id, user_id
)
if "error" in context:
logger.error(f"Failed to fetch creation context: {context['error']}")
return {"status": "error", "message": context["error"]}
accounts = context.get("accounts", [])
if accounts and all(a.get("auth_expired") for a in accounts):
logger.warning(
"All Google Calendar accounts have expired authentication"
)
return {
"status": "auth_error",
"message": "All connected Google Calendar accounts need re-authentication. Please re-authenticate in your connector settings.",
"connector_type": "google_calendar",
}
logger.info(
f"Requesting approval for creating calendar event: summary='{summary}'"
)
result = request_approval(
action_type="google_calendar_event_creation",
tool_name="create_calendar_event",
params={
"summary": summary,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"description": description,
"location": location,
"attendees": attendees,
"timezone": context.get("timezone"),
"connector_id": None,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. The event was not created. Do not ask again or suggest alternatives.",
}
final_summary = result.params.get("summary", summary)
final_start_datetime = result.params.get("start_datetime", start_datetime)
final_end_datetime = result.params.get("end_datetime", end_datetime)
final_description = result.params.get("description", description)
final_location = result.params.get("location", location)
final_attendees = result.params.get("attendees", attendees)
final_connector_id = result.params.get("connector_id")
if not final_summary or not final_summary.strip():
return {"status": "error", "message": "Event summary cannot be empty."}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
_calendar_types = [
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
if final_connector_id is not None:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type.in_(_calendar_types),
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Google Calendar connector is invalid or has been disconnected.",
}
actual_connector_id = connector.id
else:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type.in_(_calendar_types),
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.",
}
actual_connector_id = connector.id
logger.info(
f"Creating calendar event: summary='{final_summary}', connector={actual_connector_id}"
)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
else:
config_data = dict(connector.config)
from app.config import config as app_config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and app_config.SECRET_KEY:
token_encryption = TokenEncryption(app_config.SECRET_KEY)
for key in ("token", "refresh_token", "client_secret"):
if config_data.get(key):
config_data[key] = token_encryption.decrypt_token(
config_data[key]
)
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
creds = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
tz = context.get("timezone", "UTC")
event_body: dict[str, Any] = {
"summary": final_summary,
"start": {"dateTime": final_start_datetime, "timeZone": tz},
"end": {"dateTime": final_end_datetime, "timeZone": tz},
}
if final_description:
event_body["description"] = final_description
if final_location:
event_body["location"] = final_location
if final_attendees:
event_body["attendees"] = [
{"email": e.strip()} for e in final_attendees if e.strip()
]
try:
created = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.insert(calendarId="primary", body=event_body)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Calendar event created: id={created.get('id')}, summary={created.get('summary')}"
)
kb_message_suffix = ""
try:
from app.services.google_calendar import GoogleCalendarKBSyncService
kb_service = GoogleCalendarKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
event_id=created.get("id"),
event_summary=final_summary,
calendar_id="primary",
start_time=final_start_datetime,
end_time=final_end_datetime,
location=final_location,
html_link=created.get("htmlLink"),
description=final_description,
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = " Your knowledge base has also been updated."
else:
kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning(f"KB sync after create failed: {kb_err}")
kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync."
return {
"status": "success",
"event_id": created.get("id"),
"html_link": created.get("htmlLink"),
"message": f"Successfully created '{final_summary}' on Google Calendar.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error creating calendar event: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while creating the event. Please try again.",
}
return create_calendar_event

View file

@ -0,0 +1,304 @@
import asyncio
import logging
from datetime import datetime
from typing import Any
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.google_calendar import GoogleCalendarToolMetadataService
logger = logging.getLogger(__name__)
def create_delete_calendar_event_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
):
@tool
async def delete_calendar_event(
event_title_or_id: str,
delete_from_kb: bool = False,
) -> dict[str, Any]:
"""Delete a Google Calendar event.
Use when the user asks to delete, remove, or cancel a calendar event.
Args:
event_title_or_id: The exact title or event ID of the event to delete.
delete_from_kb: Whether to also remove the event from the knowledge base.
Default is False.
Set to True to remove from both Google Calendar and knowledge base.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", "auth_error", or "error"
- event_id: Google Calendar event ID (if success)
- deleted_from_kb: whether the document was removed from the knowledge base
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined. Respond with a brief
acknowledgment and do NOT retry or suggest alternatives.
- If status is "not_found", relay the exact message to the user and ask them
to verify the event name or check if it has been indexed.
Examples:
- "Delete the team standup event"
- "Cancel my dentist appointment on Friday"
"""
logger.info(
f"delete_calendar_event called: event_ref='{event_title_or_id}', delete_from_kb={delete_from_kb}"
)
if db_session is None or search_space_id is None or user_id is None:
return {
"status": "error",
"message": "Google Calendar tool not properly configured. Please contact support.",
}
try:
metadata_service = GoogleCalendarToolMetadataService(db_session)
context = await metadata_service.get_deletion_context(
search_space_id, user_id, event_title_or_id
)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
logger.warning(f"Event not found: {error_msg}")
return {"status": "not_found", "message": error_msg}
logger.error(f"Failed to fetch deletion context: {error_msg}")
return {"status": "error", "message": error_msg}
account = context.get("account", {})
if account.get("auth_expired"):
logger.warning(
"Google Calendar account %s has expired authentication",
account.get("id"),
)
return {
"status": "auth_error",
"message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.",
"connector_type": "google_calendar",
}
event = context["event"]
event_id = event["event_id"]
document_id = event.get("document_id")
connector_id_from_context = context["account"]["id"]
if not event_id:
return {
"status": "error",
"message": "Event ID is missing from the indexed document. Please re-index the event and try again.",
}
logger.info(
f"Requesting approval for deleting calendar event: '{event_title_or_id}' (event_id={event_id}, delete_from_kb={delete_from_kb})"
)
result = request_approval(
action_type="google_calendar_event_deletion",
tool_name="delete_calendar_event",
params={
"event_id": event_id,
"connector_id": connector_id_from_context,
"delete_from_kb": delete_from_kb,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. The event was not deleted. Do not ask again or suggest alternatives.",
}
final_event_id = result.params.get("event_id", event_id)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb)
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this event.",
}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
_calendar_types = [
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type.in_(_calendar_types),
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Google Calendar connector is invalid or has been disconnected.",
}
actual_connector_id = connector.id
logger.info(
f"Deleting calendar event: event_id='{final_event_id}', connector={actual_connector_id}"
)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
else:
config_data = dict(connector.config)
from app.config import config as app_config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and app_config.SECRET_KEY:
token_encryption = TokenEncryption(app_config.SECRET_KEY)
for key in ("token", "refresh_token", "client_secret"):
if config_data.get(key):
config_data[key] = token_encryption.decrypt_token(
config_data[key]
)
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
creds = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
try:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.delete(calendarId="primary", eventId=final_event_id)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(f"Calendar event deleted: event_id={final_event_id}")
delete_result: dict[str, Any] = {
"status": "success",
"event_id": final_event_id,
"message": f"Successfully deleted the calendar event '{event.get('summary', event_title_or_id)}'.",
}
deleted_from_kb = False
if final_delete_from_kb and document_id:
try:
from app.db import Document
doc_result = await db_session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if document:
await db_session.delete(document)
await db_session.commit()
deleted_from_kb = True
logger.info(
f"Deleted document {document_id} from knowledge base"
)
else:
logger.warning(f"Document {document_id} not found in KB")
except Exception as e:
logger.error(f"Failed to delete document from KB: {e}")
await db_session.rollback()
delete_result["warning"] = (
f"Event deleted, but failed to remove from knowledge base: {e!s}"
)
delete_result["deleted_from_kb"] = deleted_from_kb
if deleted_from_kb:
delete_result["message"] = (
f"{delete_result.get('message', '')} (also removed from knowledge base)"
)
return delete_result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error deleting calendar event: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while deleting the event. Please try again.",
}
return delete_calendar_event

View file

@ -0,0 +1,33 @@
from __future__ import annotations
from typing import Any
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
)
from .create_event import create_create_calendar_event_tool
from .delete_event import create_delete_calendar_event_tool
from .search_events import create_search_calendar_events_tool
from .update_event import create_update_calendar_event_tool
def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions:
resolved_dependencies = {**(dependencies or {}), **kwargs}
session_dependencies = {
"db_session": resolved_dependencies["db_session"],
"search_space_id": resolved_dependencies["search_space_id"],
"user_id": resolved_dependencies["user_id"],
}
search = create_search_calendar_events_tool(**session_dependencies)
create = create_create_calendar_event_tool(**session_dependencies)
update = create_update_calendar_event_tool(**session_dependencies)
delete = create_delete_calendar_event_tool(**session_dependencies)
return {
"allow": [{"name": getattr(search, "name", "") or "", "tool": search}],
"ask": [
{"name": getattr(create, "name", "") or "", "tool": create},
{"name": getattr(update, "name", "") or "", "tool": update},
{"name": getattr(delete, "name", "") or "", "tool": delete},
],
}

View file

@ -0,0 +1,132 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.agents.new_chat.tools.gmail.search_emails import _build_credentials
from app.db import SearchSourceConnector, SearchSourceConnectorType
logger = logging.getLogger(__name__)
_CALENDAR_TYPES = [
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
def create_search_calendar_events_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
):
@tool
async def search_calendar_events(
start_date: str,
end_date: str,
max_results: int = 25,
) -> dict[str, Any]:
"""Search Google Calendar events within a date range.
Args:
start_date: Start date in YYYY-MM-DD format (e.g. "2026-04-01").
end_date: End date in YYYY-MM-DD format (e.g. "2026-04-30").
max_results: Maximum number of events to return (default 25, max 50).
Returns:
Dictionary with status and a list of events including
event_id, summary, start, end, location, attendees.
"""
if db_session is None or search_space_id is None or user_id is None:
return {
"status": "error",
"message": "Calendar tool not properly configured.",
}
max_results = min(max_results, 50)
try:
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES),
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.",
}
creds = _build_credentials(connector)
from app.connectors.google_calendar_connector import GoogleCalendarConnector
cal = GoogleCalendarConnector(
credentials=creds,
session=db_session,
user_id=user_id,
connector_id=connector.id,
)
events_raw, error = await cal.get_all_primary_calendar_events(
start_date=start_date,
end_date=end_date,
max_results=max_results,
)
if error:
if (
"re-authenticate" in error.lower()
or "authentication failed" in error.lower()
):
return {
"status": "auth_error",
"message": error,
"connector_type": "google_calendar",
}
if "no events found" in error.lower():
return {
"status": "success",
"events": [],
"total": 0,
"message": error,
}
return {"status": "error", "message": error}
events = []
for ev in events_raw:
start = ev.get("start", {})
end = ev.get("end", {})
attendees_raw = ev.get("attendees", [])
events.append(
{
"event_id": ev.get("id"),
"summary": ev.get("summary", "No Title"),
"start": start.get("dateTime") or start.get("date", ""),
"end": end.get("dateTime") or end.get("date", ""),
"location": ev.get("location", ""),
"description": ev.get("description", ""),
"html_link": ev.get("htmlLink", ""),
"attendees": [a.get("email", "") for a in attendees_raw[:10]],
"status": ev.get("status", ""),
}
)
return {"status": "success", "events": events, "total": len(events)}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error searching calendar events: %s", e, exc_info=True)
return {
"status": "error",
"message": "Failed to search calendar events. Please try again.",
}
return search_calendar_events

View file

@ -0,0 +1,356 @@
import asyncio
import logging
from datetime import datetime
from typing import Any
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.google_calendar import GoogleCalendarToolMetadataService
logger = logging.getLogger(__name__)
def _is_date_only(value: str) -> bool:
"""Return True when *value* looks like a bare date (YYYY-MM-DD) with no time component."""
return len(value) <= 10 and "T" not in value
def _build_time_body(value: str, context: dict[str, Any] | Any) -> dict[str, str]:
"""Build a Google Calendar start/end body using ``date`` for all-day
events and ``dateTime`` for timed events."""
if _is_date_only(value):
return {"date": value}
tz = context.get("timezone", "UTC") if isinstance(context, dict) else "UTC"
return {"dateTime": value, "timeZone": tz}
def create_update_calendar_event_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
):
@tool
async def update_calendar_event(
event_title_or_id: str,
new_summary: str | None = None,
new_start_datetime: str | None = None,
new_end_datetime: str | None = None,
new_description: str | None = None,
new_location: str | None = None,
new_attendees: list[str] | None = None,
) -> dict[str, Any]:
"""Update an existing Google Calendar event.
Use when the user asks to modify, reschedule, or change a calendar event.
Args:
event_title_or_id: The exact title or event ID of the event to update.
new_summary: New event title (if changing).
new_start_datetime: New start time in ISO 8601 format (if rescheduling).
new_end_datetime: New end time in ISO 8601 format (if rescheduling).
new_description: New event description (if changing).
new_location: New event location (if changing).
new_attendees: New list of attendee email addresses (if changing).
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", "auth_error", or "error"
- event_id: Google Calendar event ID (if success)
- html_link: URL to open the event (if success)
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined. Respond with a brief
acknowledgment and do NOT retry or suggest alternatives.
- If status is "not_found", relay the exact message to the user and ask them
to verify the event name or check if it has been indexed.
Examples:
- "Reschedule the team standup to 3pm"
- "Change the location of my dentist appointment"
"""
logger.info(f"update_calendar_event called: event_ref='{event_title_or_id}'")
if db_session is None or search_space_id is None or user_id is None:
return {
"status": "error",
"message": "Google Calendar tool not properly configured. Please contact support.",
}
try:
metadata_service = GoogleCalendarToolMetadataService(db_session)
context = await metadata_service.get_update_context(
search_space_id, user_id, event_title_or_id
)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
logger.warning(f"Event not found: {error_msg}")
return {"status": "not_found", "message": error_msg}
logger.error(f"Failed to fetch update context: {error_msg}")
return {"status": "error", "message": error_msg}
if context.get("auth_expired"):
logger.warning("Google Calendar account has expired authentication")
return {
"status": "auth_error",
"message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.",
"connector_type": "google_calendar",
}
event = context["event"]
event_id = event["event_id"]
document_id = event.get("document_id")
connector_id_from_context = context["account"]["id"]
if not event_id:
return {
"status": "error",
"message": "Event ID is missing from the indexed document. Please re-index the event and try again.",
}
logger.info(
f"Requesting approval for updating calendar event: '{event_title_or_id}' (event_id={event_id})"
)
result = request_approval(
action_type="google_calendar_event_update",
tool_name="update_calendar_event",
params={
"event_id": event_id,
"document_id": document_id,
"connector_id": connector_id_from_context,
"new_summary": new_summary,
"new_start_datetime": new_start_datetime,
"new_end_datetime": new_end_datetime,
"new_description": new_description,
"new_location": new_location,
"new_attendees": new_attendees,
},
context=context,
)
if result.rejected:
return {
"status": "rejected",
"message": "User declined. The event was not updated. Do not ask again or suggest alternatives.",
}
final_event_id = result.params.get("event_id", event_id)
final_connector_id = result.params.get(
"connector_id", connector_id_from_context
)
final_new_summary = result.params.get("new_summary", new_summary)
final_new_start_datetime = result.params.get(
"new_start_datetime", new_start_datetime
)
final_new_end_datetime = result.params.get(
"new_end_datetime", new_end_datetime
)
final_new_description = result.params.get(
"new_description", new_description
)
final_new_location = result.params.get("new_location", new_location)
final_new_attendees = result.params.get("new_attendees", new_attendees)
if not final_connector_id:
return {
"status": "error",
"message": "No connector found for this event.",
}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
_calendar_types = [
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type.in_(_calendar_types),
)
)
connector = result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Google Calendar connector is invalid or has been disconnected.",
}
actual_connector_id = connector.id
logger.info(
f"Updating calendar event: event_id='{final_event_id}', connector={actual_connector_id}"
)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
else:
config_data = dict(connector.config)
from app.config import config as app_config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and app_config.SECRET_KEY:
token_encryption = TokenEncryption(app_config.SECRET_KEY)
for key in ("token", "refresh_token", "client_secret"):
if config_data.get(key):
config_data[key] = token_encryption.decrypt_token(
config_data[key]
)
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
creds = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
update_body: dict[str, Any] = {}
if final_new_summary is not None:
update_body["summary"] = final_new_summary
if final_new_start_datetime is not None:
update_body["start"] = _build_time_body(
final_new_start_datetime, context
)
if final_new_end_datetime is not None:
update_body["end"] = _build_time_body(final_new_end_datetime, context)
if final_new_description is not None:
update_body["description"] = final_new_description
if final_new_location is not None:
update_body["location"] = final_new_location
if final_new_attendees is not None:
update_body["attendees"] = [
{"email": e.strip()} for e in final_new_attendees if e.strip()
]
if not update_body:
return {
"status": "error",
"message": "No changes specified. Please provide at least one field to update.",
}
try:
updated = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.patch(
calendarId="primary",
eventId=final_event_id,
body=update_body,
)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(f"Calendar event updated: event_id={final_event_id}")
kb_message_suffix = ""
if document_id is not None:
try:
from app.services.google_calendar import GoogleCalendarKBSyncService
kb_service = GoogleCalendarKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=document_id,
event_id=final_event_id,
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = (
" Your knowledge base has also been updated."
)
else:
kb_message_suffix = " The knowledge base will be updated in the next scheduled sync."
except Exception as kb_err:
logger.warning(f"KB sync after update failed: {kb_err}")
kb_message_suffix = " The knowledge base will be updated in the next scheduled sync."
return {
"status": "success",
"event_id": final_event_id,
"html_link": updated.get("htmlLink"),
"message": f"Successfully updated the calendar event.{kb_message_suffix}",
}
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error(f"Error updating calendar event: {e}", exc_info=True)
return {
"status": "error",
"message": "Something went wrong while updating the event. Please try again.",
}
return update_calendar_event

View file

@ -0,0 +1,54 @@
"""`clickup` route: ``SubAgent`` spec for deepagents."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
merge_tools_permissions,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import (
pack_subagent,
)
from .tools.index import load_tools
NAME = "clickup"
def build_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
extra_tools_bucket: ToolsPermissions | None = None,
) -> SubAgent:
buckets = load_tools(dependencies=dependencies)
merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket)
tools = [
row["tool"]
for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"])
if row.get("tool") is not None
]
interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")}
description = read_md_file(__package__, "description").strip()
if not description:
description = "Handles clickup tasks for this workspace."
system_prompt = read_md_file(__package__, "system_prompt").strip()
return pack_subagent(
name=NAME,
description=description,
system_prompt=system_prompt,
tools=tools,
interrupt_on=interrupt_on,
model=model,
extra_middleware=extra_middleware,
)

View file

@ -0,0 +1 @@
Use for ClickUp task management: find tasks/lists, update task fields, and track execution progress.

View file

@ -0,0 +1,45 @@
You are the ClickUp MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute ClickUp MCP operations accurately using only runtime-provided tools.
</goal>
<available_tools>
- Runtime-provided ClickUp MCP tools for task/workspace search and mutation.
</available_tools>
<tool_policy>
- Follow tool descriptions exactly.
- If task/workspace target is ambiguous or missing, return `status=blocked` with required disambiguation fields.
- Never claim mutation success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not execute non-ClickUp tasks.
</out_of_scope>
<safety>
- Never claim update/create success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved ambiguity, return `status=blocked` with candidate options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -0,0 +1,3 @@
"""ClickUp route: native tool factories are empty; MCP supplies tools when configured."""
__all__: list[str] = []

View file

@ -0,0 +1,12 @@
from __future__ import annotations
from typing import Any
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
)
def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions:
_ = {**(dependencies or {}), **kwargs}
return {"allow": [], "ask": []}