SurfSense/surfsense_backend/app/gateway/whatsapp/client_cloud.py

100 lines
3.3 KiB
Python
Raw Normal View History

"""Small httpx wrapper for the WhatsApp Cloud API."""
from __future__ import annotations
from typing import Any
import httpx
from app.config import config
from app.gateway.base.adapter import PlatformSendResult
from app.gateway.ratelimit import wait_for_token
from app.observability.metrics import record_gateway_rate_limit_hit
class WhatsAppCloudClient:
def __init__(
self,
*,
business_token: str,
phone_number_id: str,
api_version: str | None = None,
) -> None:
self.business_token = business_token
self.phone_number_id = phone_number_id
self.api_version = api_version or config.WHATSAPP_GRAPH_API_VERSION
self.base_url = f"https://graph.facebook.com/{self.api_version}"
async def send_text(
self,
*,
to: str,
text: str,
reply_to_message_id: str | None = None,
) -> PlatformSendResult:
payload: dict[str, Any] = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": to,
"type": "text",
"text": {"preview_url": True, "body": text},
}
if reply_to_message_id:
payload["context"] = {"message_id": reply_to_message_id}
data = await self._post(f"/{self.phone_number_id}/messages", json=payload)
message_id = str((data.get("messages") or [{}])[0].get("id") or "")
return PlatformSendResult(external_message_id=message_id, raw_response=data)
async def send_typing_indicator(self, *, message_id: str) -> dict[str, Any]:
payload = {
"messaging_product": "whatsapp",
"status": "read",
"message_id": message_id,
"typing_indicator": {"type": "text"},
}
return await self._post(f"/{self.phone_number_id}/messages", json=payload)
async def validate(self) -> dict[str, Any]:
return await self._get(
f"/{self.phone_number_id}",
params={
"fields": "verified_name,quality_rating,account_review_status,display_phone_number"
},
)
async def _post(self, path: str, *, json: dict[str, Any]) -> dict[str, Any]:
await self._throttle()
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.business_token}"},
json=json,
)
response.raise_for_status()
return response.json()
async def _get(
self,
path: str,
*,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
await self._throttle()
async with httpx.AsyncClient(timeout=20) as client:
response = await client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.business_token}"},
params=params,
)
response.raise_for_status()
return response.json()
async def _throttle(self) -> None:
wait_ms = await wait_for_token(
f"wa:phone:{self.phone_number_id}",
capacity=10,
refill_per_sec=10.0,
)
if wait_ms:
record_gateway_rate_limit_hit(bucket="wa:phone")