import httpx import json import logging import re import time from typing import Dict, Any, Optional, List from app.providers.base import BaseProvider from app.core.config import settings logger = logging.getLogger(__name__) class WhatsAppCloudProvider(BaseProvider): """WhatsApp Cloud API provider (Meta) IMPORTANT: WhatsApp templates are scoped to a Phone Number ID. A template created under phone number A will ALWAYS return error 132001 if you try to send it using phone number B's credentials. The provider MUST use the correct PHONE_NUMBER_ID that owns the template. """ def __init__(self): self.access_token = settings.WHATSAPP_CLOUD_ACCESS_TOKEN self.phone_number_id = settings.WHATSAPP_CLOUD_PHONE_NUMBER_ID self.phone_number_id_override = settings.WHATSAPP_CLOUD_PHONE_NUMBER_ID_OVERRIDE self.base_url = "https://graph.facebook.com/v22.0" def _mask_to(self, to: str) -> str: digits = re.sub(r"\D", "", to or "") if len(digits) <= 4: return "****" return f"***{digits[-4:]}" def _mask_token(self, token: str) -> str: if not token: return "" if len(token) <= 6: return "***" return f"***{token[-6:]}" def _resolve_phone_number_id(self) -> str: resolved = (self.phone_number_id_override or "").strip() or (self.phone_number_id or "").strip() if self.phone_number_id_override and self.phone_number_id_override.strip(): logger.info( "WhatsApp Cloud phone_number_id override active", extra={"phone_number_id": resolved} ) return resolved def _validate_phone_number_id(self, phone_number_id: str) -> None: if not phone_number_id: raise ValueError( "Set WHATSAPP_CLOUD_PHONE_NUMBER_ID to the value from Meta API Setup URL: " "https://graph.facebook.com/v22.0//messages" ) if not phone_number_id.isdigit(): raise ValueError( "Set WHATSAPP_CLOUD_PHONE_NUMBER_ID to the value from Meta API Setup URL: " "https://graph.facebook.com/v22.0//messages" ) if len(phone_number_id) < 10 or len(phone_number_id) > 20: raise ValueError( "Set WHATSAPP_CLOUD_PHONE_NUMBER_ID to the value from Meta API Setup URL: " "https://graph.facebook.com/v22.0//messages" ) def _extract_placeholder_names(self, template_body: str) -> List[str]: if not template_body: return [] return re.findall(r"\{\{(\w+)\}\}", template_body) def _build_template_parameters( self, template_body: str, variables: Dict[str, str] ) -> List[Dict[str, str]]: params: List[Dict[str, str]] = [] # Meta templates use ordered parameters ({{1}}, {{2}}, ...); we map # app placeholders (e.g. {{first_name}}) to that ordered list. for name in self._extract_placeholder_names(template_body): value = variables.get(name) or "Friend" params.append({"type": "text", "text": value}) return params def _safe_json(self, response: httpx.Response) -> Dict[str, Any]: try: return response.json() except Exception: return {"raw": response.text} def _should_retry_with_en_us( self, language: str, response_json: Dict[str, Any] ) -> bool: if language != "en": return False error = response_json.get("error", {}) if isinstance(response_json, dict) else {} error_code = error.get("code") # Do NOT retry on 132001 — that's a phone number mismatch, not language if error_code == 132001: return False message = " ".join([ str(error.get("message", "")), str(error.get("error_user_msg", "")), json.dumps(error.get("error_data", {})) ]).lower() # Only retry if error explicitly mentions language or locale return "language" in message or "locale" in message def _post_with_retries( self, url: str, headers: Dict[str, str], payload: Dict[str, Any], retries: int = 2 ) -> httpx.Response: with httpx.Client(timeout=30.0) as client: for attempt in range(retries + 1): response = client.post(url, headers=headers, json=payload) if response.status_code >= 500 and attempt < retries: time.sleep(0.5 * (attempt + 1)) continue return response return response def _log_response( self, phone_number_id: str, to: str, template_name: Optional[str], language: str, url: str, response_json: Dict[str, Any] ) -> None: logger.info( "WhatsApp Cloud response", extra={ "phone_number_id": phone_number_id, "url": url, "to": self._mask_to(to), "template_name": template_name, "language": language, "response": response_json } ) def _raise_api_error(self, response: httpx.Response, response_json: Dict[str, Any]) -> None: if isinstance(response_json, dict) and response_json.get("error"): message = response_json["error"].get("message", "Unknown error") else: message = response.text raise Exception(f"WhatsApp API error: {response.status_code} - {message}") def send_message( self, to: str, template_name: Optional[str], template_body: str, variables: Dict[str, str], language: str = "en" ) -> str: """ Send message via WhatsApp Cloud API. Requires: - WHATSAPP_CLOUD_PHONE_NUMBER_ID: The phone number ID that owns the template - WHATSAPP_CLOUD_ACCESS_TOKEN: Valid Meta access token Template ownership matters: templates are scoped to a specific Phone Number ID. Sending from a different phone number will fail with error 132001. """ phone_number_id = self._resolve_phone_number_id() self._validate_phone_number_id(phone_number_id) if not template_name: raise Exception("WhatsApp Cloud requires an approved template name") # Send template message # Build template parameters only if template body has placeholders parameters = self._build_template_parameters(template_body, variables) # Build template object - only include components if there are parameters template_obj = { "name": template_name, "language": { "code": language } } if parameters: template_obj["components"] = [ { "type": "body", "parameters": parameters } ] url = f"{self.base_url}/{phone_number_id}/messages" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } payload = { "messaging_product": "whatsapp", "to": to.replace("+", ""), "type": "template", "template": template_obj } logger.info( "WhatsApp Cloud config", extra={ "phone_number_id": phone_number_id, "url": url, "template_name": template_name, "language": language, "access_token": self._mask_token(self.access_token), "to": self._mask_to(to) } ) logger.info( "WhatsApp Cloud config details: phone_number_id=%s url=%s template=%s language=%s access_token=%s to=%s", phone_number_id, url, template_name, language, self._mask_token(self.access_token), self._mask_to(to) ) # Log sanitized payload (mask sensitive data) sanitized_payload = payload.copy() sanitized_payload["to"] = self._mask_to(sanitized_payload["to"]) logger.info( "WhatsApp Cloud request payload: %s", json.dumps(sanitized_payload, ensure_ascii=True) ) try: response = self._post_with_retries(url, headers, payload) response_json = self._safe_json(response) self._log_response(phone_number_id, to, template_name, language, url, response_json) logger.info( "WhatsApp Cloud response details: status=%s response=%s", response.status_code, json.dumps(response_json, ensure_ascii=True) ) if response.is_error and template_name and self._should_retry_with_en_us(language, response_json): logger.info(f"Retrying with en_US language code") payload["template"]["language"]["code"] = "en_US" response = self._post_with_retries(url, headers, payload) response_json = self._safe_json(response) self._log_response(phone_number_id, to, template_name, "en_US", url, response_json) logger.info( "WhatsApp Cloud response details (en_US retry): status=%s response=%s", response.status_code, json.dumps(response_json, ensure_ascii=True) ) if response.is_error: error_code = response_json.get("error", {}).get("code") if isinstance(response_json, dict) else None if error_code == 132001: logger.error( "WhatsApp Cloud template not found (132001)", extra={ "phone_number_id": phone_number_id, "template_name": template_name, "language": language, "response": response_json } ) logger.error( "WhatsApp Cloud template not found (132001) details: phone_number_id=%s template=%s language=%s response=%s", phone_number_id, template_name, language, json.dumps(response_json, ensure_ascii=True) ) raise Exception( f"WhatsApp API error 132001: Template '{template_name}' not found for " f"Phone Number ID {phone_number_id}. " f"Make sure the template exists and is associated with this phone number. " f"(Full error: {response_json.get('error', {}).get('message', 'unknown')})" ) else: self._raise_api_error(response, response_json) message_id = response_json.get("messages", [{}])[0].get("id") if not message_id: raise Exception("No message ID in response") return message_id except Exception as e: raise Exception(f"Send error: {str(e)}") def get_provider_name(self) -> str: return "whatsapp_cloud"