feat(gateway): implement search space management for messaging channels

This commit is contained in:
Anish Sarkar 2026-06-01 21:39:09 +05:30
parent 455a3ee021
commit 2d1a6be776
3 changed files with 205 additions and 40 deletions

View file

@ -53,6 +53,7 @@ from app.observability.metrics import (
)
from app.users import current_active_user
from app.utils.oauth_security import OAuthStateManager, TokenEncryption
from app.utils.rbac import check_search_space_access
router = APIRouter(prefix="/gateway", tags=["gateway"])
logger = logging.getLogger(__name__)
@ -164,6 +165,10 @@ class StartBindingResponse(BaseModel):
expires_at: datetime
class UpdateBindingSearchSpaceRequest(BaseModel):
search_space_id: int
def _classify_telegram_event(payload: dict[str, Any]) -> str:
if "message" in payload:
return "message"
@ -182,9 +187,11 @@ def _telegram_message(payload: dict[str, Any]) -> dict[str, Any] | None:
async def install_slack_gateway(
search_space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, str]:
if not config.GATEWAY_SLACK_CLIENT_ID:
raise HTTPException(status_code=500, detail="Slack gateway OAuth is not configured")
await check_search_space_access(session, user, search_space_id)
state = _get_state_manager().generate_secure_state(search_space_id, user.id)
auth_params = {
"client_id": config.GATEWAY_SLACK_CLIENT_ID,
@ -329,9 +336,11 @@ async def slack_gateway_callback(
async def install_discord_gateway(
search_space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, str]:
if not config.DISCORD_CLIENT_ID:
raise HTTPException(status_code=500, detail="Discord gateway OAuth is not configured")
await check_search_space_access(session, user, search_space_id)
state = _get_state_manager().generate_secure_state(search_space_id, user.id)
auth_params = {
"client_id": config.DISCORD_CLIENT_ID,
@ -613,6 +622,7 @@ async def start_binding(
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> StartBindingResponse:
await check_search_space_access(session, user, body.search_space_id)
code = generate_pairing_code()
if body.platform == ExternalChatPlatform.TELEGRAM:
account = await get_or_create_system_telegram_account(session)
@ -692,6 +702,62 @@ async def list_bindings(
]
@router.get("/connections")
async def list_connections(
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> list[dict[str, Any]]:
result = await session.execute(
select(ExternalChatBinding, ExternalChatAccount)
.join(ExternalChatAccount, ExternalChatBinding.account_id == ExternalChatAccount.id)
.where(
ExternalChatBinding.user_id == user.id,
ExternalChatBinding.state.in_(
[ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED]
),
)
)
connections: list[dict[str, Any]] = []
for binding, account in result.all():
binding_metadata = binding.external_metadata or {}
kind = str(binding_metadata.get("kind") or "")
if kind in {"slack_thread", "discord_thread"}:
continue
account_state = account.cursor_state or {}
workspace_name = None
workspace_id = None
if account.platform == ExternalChatPlatform.SLACK:
workspace_name = account_state.get("team_name")
workspace_id = account_state.get("team_id")
elif account.platform == ExternalChatPlatform.DISCORD:
workspace_name = account_state.get("guild_name")
workspace_id = account_state.get("guild_id")
elif account.platform == ExternalChatPlatform.WHATSAPP:
workspace_name = account_state.get("display_phone_number")
workspace_id = account_state.get("phone_number_id")
connections.append(
{
"id": binding.id,
"platform": account.platform.value,
"state": binding.state.value,
"search_space_id": binding.search_space_id,
"display_name": binding.external_display_name
or binding.external_username
or workspace_name,
"external_username": binding.external_username,
"workspace_name": workspace_name,
"workspace_id": workspace_id,
"health_status": account.health_status.value,
"suspended_reason": binding.suspended_reason,
}
)
return connections
@router.get("/platforms")
async def list_platforms(
user: User = Depends(current_active_user),
@ -716,6 +782,31 @@ async def list_platforms(
]
@router.patch("/bindings/{binding_id}/search-space")
async def update_binding_search_space(
binding_id: int,
body: UpdateBindingSearchSpaceRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, bool]:
binding = await session.get(ExternalChatBinding, binding_id)
if binding is None or binding.user_id != user.id:
raise HTTPException(status_code=404, detail="Binding not found")
if binding.state not in {
ExternalChatBindingState.BOUND,
ExternalChatBindingState.SUSPENDED,
}:
raise HTTPException(status_code=400, detail="Only active bindings can be routed")
await check_search_space_access(session, user, body.search_space_id)
if binding.search_space_id != body.search_space_id:
binding.search_space_id = body.search_space_id
binding.new_chat_thread_id = None
binding.updated_at = datetime.now(UTC)
await session.commit()
return {"ok": True}
@router.delete("/bindings/{binding_id}")
async def delete_binding(
binding_id: int,

View file

@ -21,6 +21,7 @@ from app.db import (
)
from app.gateway.whatsapp.adapter_baileys import WhatsAppBaileysAdapter
from app.users import current_active_user
from app.utils.rbac import check_search_space_access
router = APIRouter(prefix="/gateway/whatsapp/baileys", tags=["gateway"])
@ -61,6 +62,7 @@ async def request_pairing_code(
session: AsyncSession = Depends(get_async_session),
) -> dict[str, Any]:
_ensure_baileys_enabled()
await check_search_space_access(session, user, body.search_space_id)
adapter = WhatsAppBaileysAdapter()
try:
pairing = await adapter.request_pairing_code(phone_number=body.phone_number)