diff --git a/core/clients/README.md b/core/clients/README.md new file mode 100644 index 0000000..23851b7 --- /dev/null +++ b/core/clients/README.md @@ -0,0 +1,513 @@ +# 第三方 API 客户端使用指南 + +本目录提供了用于集成第三方 API 的基础客户端封装,包含连接池管理、自动重试、日志记录、链路追踪等功能。 + +## 📁 模块划分原则 + +### **Core 层客户端** (`core/clients/`) + +放置多个应用共享的第三方 API 客户端: + +- 平台级服务 (微信、支付宝、短信服务等) +- 通用基础设施客户端 +- 公共 SaaS 服务 + +### **应用层客户端** (`apps/{app_name}/clients/`) + +放置单个应用专用的第三方 API 客户端: + +- 特定业务的外部服务 +- 垂直领域的专用接口 +- 应用独有的集成 + +--- + +## 🏗️ 核心组件 + +### 1. `BaseAPIClient` - 基础客户端类 + +提供所有 API 客户端的通用功能: + +- ✅ 连接池管理 +- ✅ 自动重试 (指数退避) +- ✅ 超时控制 +- ✅ 请求/响应日志 +- ✅ OpenTelemetry 链路追踪 +- ✅ 健康检查 + +### 2. `HTTPClient` - HTTP 客户端 + +基于 `httpx` 的异步 HTTP 客户端封装: + +- ✅ GET/POST/PUT/PATCH/DELETE 方法 +- ✅ JSON 和表单数据支持 +- ✅ 查询参数处理 +- ✅ 自定义请求头 +- ✅ 连接池优化 + +--- + +## 🚀 快速开始 + +### 基础使用 + +```python +from core.clients.http_client import HTTPClient + +# 创建客户端 +client = HTTPClient( + base_url="https://api.example.com", + timeout=30.0, + max_retries=3, + headers={"Authorization": "Bearer YOUR_TOKEN"} +) + +# 使用上下文管理器 (推荐) +async with client: + # GET 请求 + response = await client.get("/users/123") + data = response.json() + + # POST 请求 + response = await client.post( + "/users", + json={"name": "John", "email": "john@example.com"} + ) + + # 带查询参数的请求 + response = await client.get( + "/users", + params={"page": 1, "page_size": 10} + ) + +# 手动关闭 +await client.close() +``` + +### 创建自定义客户端 + +```python +from typing import Any +from core.clients.http_client import HTTPClient +from core.config import settings + + +class WeChatAPIClient(HTTPClient): + """微信 API 客户端""" + + def __init__( + self, + app_id: str, + app_secret: str, + ) -> None: + super().__init__( + base_url="https://api.weixin.qq.com", + timeout=30.0, + max_retries=3, + trace_enabled=settings.otel_enabled, + ) + self.app_id = app_id + self.app_secret = app_secret + self._access_token: str | None = None + + async def get_access_token(self) -> str: + """获取访问令牌""" + if self._access_token: + return self._access_token + + response = await self.get( + "/cgi-bin/token", + params={ + "grant_type": "client_credential", + "appid": self.app_id, + "secret": self.app_secret, + } + ) + data = response.json() + self._access_token = data["access_token"] + return self._access_token + + async def send_template_message( + self, + openid: str, + template_id: str, + data: dict[str, Any], + ) -> dict[str, Any]: + """发送模板消息""" + access_token = await self.get_access_token() + + response = await self.post( + "/cgi-bin/message/template/send", + params={"access_token": access_token}, + json={ + "touser": openid, + "template_id": template_id, + "data": data, + } + ) + return response.json() +``` + +--- + +## 🔌 FastAPI 集成 + +### 1. 创建依赖注入函数 + +```python +# core/clients/wechat.py +from typing import Optional +from core.clients.http_client import HTTPClient +from core.config import settings + +_wechat_client: Optional[WeChatAPIClient] = None + + +async def get_wechat_client() -> WeChatAPIClient: + """获取微信 API 客户端实例 (用于依赖注入)""" + global _wechat_client + + if _wechat_client is None: + _wechat_client = WeChatAPIClient( + app_id=settings.wechat_app_id, + app_secret=settings.wechat_app_secret, + ) + + return _wechat_client + + +async def close_wechat_client() -> None: + """关闭微信 API 客户端""" + global _wechat_client + if _wechat_client: + await _wechat_client.close() + _wechat_client = None +``` + +### 2. 在路由中使用 + +```python +from fastapi import APIRouter, Depends +from core.clients.wechat import get_wechat_client, WeChatAPIClient + +router = APIRouter(prefix="/wechat", tags=["WeChat"]) + + +@router.post("/send-message") +async def send_wechat_message( + openid: str, + message: str, + client: WeChatAPIClient = Depends(get_wechat_client) +): + """发送微信消息""" + result = await client.send_template_message( + openid=openid, + template_id="your_template_id", + data={"content": {"value": message}} + ) + return {"success": True, "result": result} +``` + +### 3. 在应用生命周期中管理 + +```python +# main.py +from core.clients.wechat import close_wechat_client + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + logger.info("Starting application...") + yield + + # Shutdown + logger.info("Shutting down application...") + await close_wechat_client() # 关闭第三方客户端 +``` + +--- + +## ⚙️ 配置说明 + +在 `core/config.py` 中添加第三方 API 配置: + +```python +class Settings(BaseSettings): + + # WeChat API + wechat_app_id: str = Field(default="", description="微信 AppID") + wechat_app_secret: str = Field(default="", description="微信 AppSecret") + + # Payment API + payment_api_key: str = Field(default="", description="支付 API 密钥") + payment_base_url: str = Field(default="https://api.payment.com", description="支付 API 地址") +``` + +环境变量: + +```bash +# .env +WECHAT_APP_ID=wx1234567890 +WECHAT_APP_SECRET=your_secret_here +PAYMENT_API_KEY=pk_live_xxx +``` + +--- + +## 🎯 功能特性 + +### 1. 自动重试 + +```python +client = HTTPClient( + base_url="https://api.example.com", + max_retries=3, # 最多重试 3 次 + retry_delay=1.0, # 初始延迟 1 秒 + retry_backoff=2.0, # 指数退避因子 (1s -> 2s -> 4s) +) +``` + +**可重试的错误:** + +- `httpx.TimeoutException` - 请求超时 +- `httpx.NetworkError` - 网络错误 +- `httpx.RemoteProtocolError` - 协议错误 + +**不可重试的错误:** + +- 4xx 客户端错误 (立即抛出异常) +- 其他非网络相关错误 + +### 2. 超时控制 + +```python +# 全局超时 +client = HTTPClient(base_url="...", timeout=30.0) + +# 单个请求超时 +response = await client.get("/users", timeout=10.0) +``` + +### 3. 请求日志 + +自动记录所有请求和响应: + +```json +{ + "level": "DEBUG", + "message": "API Request: GET https://api.example.com/users/123", + "method": "GET", + "url": "https://api.example.com/users/123", + "headers": {"Authorization": "***REDACTED***"} +} + +{ + "level": "INFO", + "message": "API Response: GET https://api.example.com/users/123 - 200 (0.234s)", + "method": "GET", + "url": "https://api.example.com/users/123", + "status_code": 200, + "elapsed_seconds": 0.234 +} +``` + +**敏感数据脱敏:** `Authorization`, `API-Key`, `Cookie` 等头会自动脱敏 + +### 4. OpenTelemetry 追踪 + +自动创建 Span 并记录: + +- HTTP 方法和 URL +- 请求/响应状态码 +- 请求耗时 +- 异常信息 + +```python +client = HTTPClient( + base_url="...", + trace_enabled=True # 启用追踪 (默认跟随 settings.otel_enabled) +) +``` + +### 5. 连接池优化 + +```python +client = HTTPClient( + base_url="...", + max_connections=100, # 最大连接数 + max_keepalive_connections=20, # 保持活动的连接数 +) +``` + +### 6. 健康检查 + +```python +is_healthy = await client.health_check() +if not is_healthy: + logger.error("API service is unavailable") +``` + +--- + +## 📝 最佳实践 + +### ✅ 推荐做法 + +1. **使用上下文管理器** + + ```python + async with HTTPClient(base_url="...") as client: + response = await client.get("/endpoint") + # 自动关闭连接 + ``` + +2. **单例模式 (长生命周期)** + + ```python + # 创建全局客户端实例 + _client: Optional[WeChatAPIClient] = None + + async def get_client() -> WeChatAPIClient: + global _client + if _client is None: + _client = WeChatAPIClient(...) + return _client + ``` + +3. **在应用生命周期中管理** + + ```python + @asynccontextmanager + async def lifespan(app: FastAPI): + yield + await close_all_clients() # 优雅关闭 + ``` + +4. **使用依赖注入** + + ```python + async def endpoint( + client: WeChatAPIClient = Depends(get_wechat_client) + ): + return await client.get_user(...) + ``` + +### ❌ 避免做法 + +1. **不要在每个请求中创建新客户端** + + ```python + # ❌ 错误示例 + async def bad_example(): + client = HTTPClient(base_url="...") + response = await client.get("/endpoint") + await client.close() + ``` + +2. **不要忘记关闭客户端** + + ```python + # ❌ 资源泄漏 + client = HTTPClient(base_url="...") + response = await client.get("/endpoint") + # 忘记调用 await client.close() + ``` + +3. **不要在日志中暴露敏感信息** + + ```python + # ✅ BaseAPIClient 已自动脱敏敏感请求头 + # 无需额外处理 + ``` + +--- + +## 🧪 测试示例 + +```python +import pytest +from unittest.mock import AsyncMock, patch +from core.clients.http_client import HTTPClient + + +@pytest.mark.asyncio +async def test_http_client_get(): + """测试 HTTP GET 请求""" + async with HTTPClient(base_url="https://api.example.com") as client: + with patch.object(client, '_get_client') as mock_get_client: + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": 1, "name": "Test"} + + mock_client = AsyncMock() + mock_client.request.return_value = mock_response + mock_get_client.return_value = mock_client + + response = await client.get("/users/1") + + assert response.status_code == 200 + data = response.json() + assert data["name"] == "Test" + + +@pytest.mark.asyncio +async def test_custom_client(): + """测试自定义客户端""" + from core.clients.example_client import ExampleAPIClient + + async with ExampleAPIClient(api_key="test_key") as client: + # Mock the request + with patch.object(client, 'get') as mock_get: + mock_response = AsyncMock() + mock_response.json.return_value = {"user_id": 123} + mock_get.return_value = mock_response + + result = await client.get_user(123) + + assert result["user_id"] == 123 + mock_get.assert_called_once_with("/users/123") +``` + +--- + +## 📚 参考资料 + +- [httpx 文档](https://www.python-httpx.org/) +- [OpenTelemetry Python](https://opentelemetry.io/docs/instrumentation/python/) +- [FastAPI 依赖注入](https://fastapi.tiangolo.com/tutorial/dependencies/) + +--- + +## 🆘 常见问题 + +### Q: 如何处理 API 认证? + +A: 在自定义客户端的 `__init__` 中设置默认请求头: + +```python +headers = {"Authorization": f"Bearer {api_key}"} +super().__init__(base_url=base_url, headers=headers) +``` + +### Q: 如何禁用自动重试? + +A: 设置 `max_retries=0`: + +```python +client = HTTPClient(base_url="...", max_retries=0) +``` + +### Q: 如何自定义重试逻辑? + +A: 继承 `BaseAPIClient` 并重写 `_retry_with_backoff` 方法 + +### Q: 如何处理大文件上传/下载? + +A: 使用 httpx 的流式 API: + +```python +async with client._get_client() as http_client: + async with http_client.stream("GET", "/large-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` diff --git a/core/clients/__init__.py b/core/clients/__init__.py new file mode 100644 index 0000000..cb159d3 --- /dev/null +++ b/core/clients/__init__.py @@ -0,0 +1,13 @@ +""" +Third-party API clients. + +This package provides base classes and utilities for integrating with external APIs. +""" + +from core.clients.base import BaseAPIClient +from core.clients.http_client import HTTPClient + +__all__ = [ + "BaseAPIClient", + "HTTPClient", +] diff --git a/core/clients/base.py b/core/clients/base.py new file mode 100644 index 0000000..5c4be9f --- /dev/null +++ b/core/clients/base.py @@ -0,0 +1,252 @@ +""" +Base API client with common functionality. + +Provides: +- Connection pooling +- Automatic retry with exponential backoff +- Timeout control +- Request/response logging +- OpenTelemetry tracing integration +- Error handling +""" + +import asyncio +from typing import Any, Optional, Callable +from abc import ABC, abstractmethod +import httpx +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode +from observability.logging import get_logger + +logger = get_logger(__name__) +tracer = trace.get_tracer(__name__) + + +class BaseAPIClient(ABC): + """ + Abstract base class for all API clients. + + Provides common functionality like retry logic, timeout handling, + logging, and tracing integration. + """ + + def __init__( + self, + base_url: str, + timeout: float = 30.0, + max_retries: int = 3, + retry_delay: float = 1.0, + retry_backoff: float = 2.0, + trace_enabled: bool = True, + ) -> None: + """ + Initialize base API client. + + Args: + base_url: Base URL for the API + timeout: Request timeout in seconds + max_retries: Maximum number of retry attempts + retry_delay: Initial retry delay in seconds + retry_backoff: Exponential backoff multiplier + trace_enabled: Enable OpenTelemetry tracing + """ + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.max_retries = max_retries + self.retry_delay = retry_delay + self.retry_backoff = retry_backoff + self.trace_enabled = trace_enabled + self._client: Optional[httpx.AsyncClient] = None + + @abstractmethod + async def _get_client(self) -> httpx.AsyncClient: + """ + Get or create HTTP client instance. + + Subclasses must implement this to provide their own client configuration. + + Returns: + httpx.AsyncClient: Configured async HTTP client + """ + pass + + async def close(self) -> None: + """Close the HTTP client and cleanup resources.""" + if self._client is not None: + await self._client.aclose() + self._client = None + logger.info(f"{self.__class__.__name__} client closed") + + async def __aenter__(self) -> "BaseAPIClient": + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Async context manager exit.""" + await self.close() + + async def _retry_with_backoff( + self, + func: Callable, + *args: Any, + **kwargs: Any + ) -> Any: + """ + Execute function with retry and exponential backoff. + + Args: + func: Async function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + + Returns: + Result from the function + + Raises: + Exception: Last exception if all retries fail + """ + last_exception = None + delay = self.retry_delay + + for attempt in range(self.max_retries): + try: + return await func(*args, **kwargs) + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + last_exception = e + + if attempt < self.max_retries - 1: + logger.warning( + f"Request failed (attempt {attempt + 1}/{self.max_retries}): {str(e)}. " + f"Retrying in {delay}s..." + ) + await asyncio.sleep(delay) + delay *= self.retry_backoff + else: + logger.error( + f"Request failed after {self.max_retries} attempts: {str(e)}" + ) + except Exception as e: + # For non-retryable errors, raise immediately + logger.error(f"Non-retryable error: {str(e)}") + raise + + if last_exception: + raise last_exception + + def _create_span_attributes( + self, + method: str, + url: str, + **kwargs: Any + ) -> dict[str, Any]: + """ + Create OpenTelemetry span attributes. + + Args: + method: HTTP method + url: Request URL + **kwargs: Additional attributes + + Returns: + Dictionary of span attributes + """ + attributes = { + "http.method": method, + "http.url": url, + "http.client": self.__class__.__name__, + } + attributes.update(kwargs) + return attributes + + def _log_request( + self, + method: str, + url: str, + headers: Optional[dict[str, str]] = None, + **kwargs: Any + ) -> None: + """ + Log outgoing request. + + Args: + method: HTTP method + url: Request URL + headers: Request headers + **kwargs: Additional log context + """ + logger.debug( + f"API Request: {method} {url}", + extra={ + "method": method, + "url": url, + "headers": self._sanitize_headers(headers), + **kwargs + } + ) + + def _log_response( + self, + method: str, + url: str, + status_code: int, + elapsed: float, + **kwargs: Any + ) -> None: + """ + Log API response. + + Args: + method: HTTP method + url: Request URL + status_code: Response status code + elapsed: Request duration in seconds + **kwargs: Additional log context + """ + log_level = "info" if 200 <= status_code < 400 else "warning" + log_func = getattr(logger, log_level) + + log_func( + f"API Response: {method} {url} - {status_code} ({elapsed:.3f}s)", + extra={ + "method": method, + "url": url, + "status_code": status_code, + "elapsed_seconds": elapsed, + **kwargs + } + ) + + @staticmethod + def _sanitize_headers(headers: Optional[dict[str, str]]) -> dict[str, str]: + """ + Sanitize headers for logging (remove sensitive data). + + Args: + headers: Request headers + + Returns: + Sanitized headers + """ + if not headers: + return {} + + sensitive_keys = {"authorization", "api-key", "x-api-key", "cookie", "set-cookie"} + return { + k: "***REDACTED***" if k.lower() in sensitive_keys else v + for k, v in headers.items() + } + + async def health_check(self) -> bool: + """ + Check if the API client can connect to the service. + + Returns: + bool: True if healthy, False otherwise + """ + try: + client = await self._get_client() + response = await client.get(f"{self.base_url}/health", timeout=5.0) + return response.status_code == 200 + except Exception as e: + logger.error(f"{self.__class__.__name__} health check failed: {str(e)}") + return False diff --git a/core/clients/example_client.py b/core/clients/example_client.py new file mode 100644 index 0000000..a982030 --- /dev/null +++ b/core/clients/example_client.py @@ -0,0 +1,180 @@ +""" +Example third-party API client implementation. + +This demonstrates how to create a custom API client by extending HTTPClient. +""" + +from typing import Any, Optional +from core.clients.http_client import HTTPClient +from core.config import settings + + +class ExampleAPIClient(HTTPClient): + """ + Example API client for a third-party service. + + Usage: + # Initialize client + client = ExampleAPIClient(api_key="your-api-key") + + # Use as context manager + async with client: + data = await client.get_user(user_id=123) + + # Or manual lifecycle + await client.get_user(user_id=123) + await client.close() + """ + + def __init__( + self, + api_key: str, + base_url: str = "https://api.example.com", + timeout: float = 30.0, + max_retries: int = 3, + ) -> None: + """ + Initialize Example API client. + + Args: + api_key: API authentication key + base_url: Base URL for the API + timeout: Request timeout in seconds + max_retries: Maximum number of retry attempts + """ + # Set default headers with API key + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + super().__init__( + base_url=base_url, + timeout=timeout, + max_retries=max_retries, + headers=headers, + trace_enabled=settings.otel_enabled, + ) + + async def get_user(self, user_id: int) -> dict[str, Any]: + """ + Get user information from the API. + + Args: + user_id: User ID + + Returns: + User data dictionary + """ + response = await self.get(f"/users/{user_id}") + return response.json() + + async def create_user(self, user_data: dict[str, Any]) -> dict[str, Any]: + """ + Create a new user. + + Args: + user_data: User data to create + + Returns: + Created user data + """ + response = await self.post("/users", json=user_data) + return response.json() + + async def update_user( + self, + user_id: int, + user_data: dict[str, Any] + ) -> dict[str, Any]: + """ + Update user information. + + Args: + user_id: User ID + user_data: Updated user data + + Returns: + Updated user data + """ + response = await self.put(f"/users/{user_id}", json=user_data) + return response.json() + + async def delete_user(self, user_id: int) -> None: + """ + Delete a user. + + Args: + user_id: User ID + """ + await self.delete(f"/users/{user_id}") + + async def search_users( + self, + query: str, + page: int = 1, + page_size: int = 10 + ) -> dict[str, Any]: + """ + Search users by query. + + Args: + query: Search query + page: Page number + page_size: Items per page + + Returns: + Search results + """ + params = { + "q": query, + "page": page, + "page_size": page_size, + } + response = await self.get("/users/search", params=params) + return response.json() + + +# Dependency injection function for FastAPI +_example_client: Optional[ExampleAPIClient] = None + + +async def get_example_client() -> ExampleAPIClient: + """ + Get or create Example API client instance. + + This function can be used as a FastAPI dependency. + + Returns: + ExampleAPIClient: Shared client instance + + Example: + @router.get("/example") + async def example_endpoint( + client: ExampleAPIClient = Depends(get_example_client) + ): + return await client.get_user(123) + """ + global _example_client + + if _example_client is None: + # Initialize from settings (add these to core/config.py) + api_key = getattr(settings, "example_api_key", "your-api-key") + base_url = getattr(settings, "example_api_base_url", "https://api.example.com") + + _example_client = ExampleAPIClient( + api_key=api_key, + base_url=base_url, + ) + + return _example_client + + +async def close_example_client() -> None: + """Close the Example API client connection.""" + global _example_client + + if _example_client is not None: + await _example_client.close() + _example_client = None diff --git a/core/clients/http_client.py b/core/clients/http_client.py new file mode 100644 index 0000000..21ce7aa --- /dev/null +++ b/core/clients/http_client.py @@ -0,0 +1,394 @@ +""" +HTTP client for third-party API integration. + +Provides a fully-featured async HTTP client with: +- Connection pooling +- Automatic retries +- Request/response logging +- OpenTelemetry tracing +- JSON/form data support +""" + +from typing import Any, Optional +import httpx +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode +from core.clients.base import BaseAPIClient +from core.config import settings +from observability.logging import get_logger + +logger = get_logger(__name__) +tracer = trace.get_tracer(__name__) + + +class HTTPClient(BaseAPIClient): + """ + Generic HTTP client for third-party APIs. + + Features: + - Automatic connection pooling + - Retry with exponential backoff + - Request/response logging + - OpenTelemetry distributed tracing + - Timeout control + - JSON and form data support + """ + + def __init__( + self, + base_url: str, + timeout: float = 30.0, + max_retries: int = 3, + retry_delay: float = 1.0, + retry_backoff: float = 2.0, + headers: Optional[dict[str, str]] = None, + max_connections: int = 100, + max_keepalive_connections: int = 20, + trace_enabled: bool = True, + ) -> None: + """ + Initialize HTTP client. + + Args: + base_url: Base URL for the API + timeout: Request timeout in seconds + max_retries: Maximum number of retry attempts + retry_delay: Initial retry delay in seconds + retry_backoff: Exponential backoff multiplier + headers: Default headers for all requests + max_connections: Maximum number of connections in the pool + max_keepalive_connections: Maximum keep-alive connections + trace_enabled: Enable OpenTelemetry tracing + """ + super().__init__( + base_url=base_url, + timeout=timeout, + max_retries=max_retries, + retry_delay=retry_delay, + retry_backoff=retry_backoff, + trace_enabled=trace_enabled, + ) + self.default_headers = headers or {} + self.max_connections = max_connections + self.max_keepalive_connections = max_keepalive_connections + + async def _get_client(self) -> httpx.AsyncClient: + """ + Get or create HTTP client with connection pooling. + + Returns: + httpx.AsyncClient: Configured async HTTP client + """ + if self._client is None: + limits = httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + ) + + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=httpx.Timeout(self.timeout), + limits=limits, + headers=self.default_headers, + follow_redirects=True, + ) + + logger.info( + f"HTTPClient initialized: base_url={self.base_url}, " + f"timeout={self.timeout}s, max_connections={self.max_connections}" + ) + + return self._client + + async def request( + self, + method: str, + path: str, + *, + params: Optional[dict[str, Any]] = None, + json: Optional[dict[str, Any]] = None, + data: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make HTTP request with retry and tracing. + + Args: + method: HTTP method (GET, POST, PUT, DELETE, etc.) + path: API path (will be joined with base_url) + params: URL query parameters + json: JSON request body + data: Form data request body + headers: Additional headers for this request + timeout: Override default timeout for this request + **kwargs: Additional arguments passed to httpx + + Returns: + httpx.Response: HTTP response + + Raises: + httpx.HTTPStatusError: For 4xx/5xx responses + httpx.TimeoutException: On timeout + httpx.RequestError: For network errors + """ + client = await self._get_client() + url = f"{self.base_url}/{path.lstrip('/')}" + + # Merge headers + request_headers = {**self.default_headers, **(headers or {})} + + # Use custom timeout if provided + request_timeout = timeout if timeout is not None else self.timeout + + # Log request + self._log_request(method, url, headers=request_headers, params=params) + + # Create span for tracing + span_name = f"{method} {path}" + + async def _make_request() -> httpx.Response: + if self.trace_enabled and settings.otel_enabled: + with tracer.start_as_current_span(span_name) as span: + span.set_attributes(self._create_span_attributes(method, url)) + + try: + response = await client.request( + method=method, + url=url, + params=params, + json=json, + data=data, + headers=request_headers, + timeout=request_timeout, + **kwargs + ) + + # Record response status + span.set_attribute("http.status_code", response.status_code) + + # Log response + self._log_response( + method, + url, + response.status_code, + response.elapsed.total_seconds() + ) + + # Set span status + if response.status_code >= 400: + span.set_status(Status(StatusCode.ERROR)) + else: + span.set_status(Status(StatusCode.OK)) + + # Raise for 4xx/5xx + response.raise_for_status() + + return response + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + else: + # No tracing + response = await client.request( + method=method, + url=url, + params=params, + json=json, + data=data, + headers=request_headers, + timeout=request_timeout, + **kwargs + ) + + # Log response + self._log_response( + method, + url, + response.status_code, + response.elapsed.total_seconds() + ) + + # Raise for 4xx/5xx + response.raise_for_status() + + return response + + # Execute with retry + return await self._retry_with_backoff(_make_request) + + async def get( + self, + path: str, + *, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make GET request. + + Args: + path: API path + params: URL query parameters + headers: Additional headers + timeout: Request timeout + **kwargs: Additional arguments + + Returns: + httpx.Response: HTTP response + """ + return await self.request( + "GET", + path, + params=params, + headers=headers, + timeout=timeout, + **kwargs + ) + + async def post( + self, + path: str, + *, + json: Optional[dict[str, Any]] = None, + data: Optional[dict[str, Any]] = None, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make POST request. + + Args: + path: API path + json: JSON request body + data: Form data request body + params: URL query parameters + headers: Additional headers + timeout: Request timeout + **kwargs: Additional arguments + + Returns: + httpx.Response: HTTP response + """ + return await self.request( + "POST", + path, + json=json, + data=data, + params=params, + headers=headers, + timeout=timeout, + **kwargs + ) + + async def put( + self, + path: str, + *, + json: Optional[dict[str, Any]] = None, + data: Optional[dict[str, Any]] = None, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make PUT request. + + Args: + path: API path + json: JSON request body + data: Form data request body + params: URL query parameters + headers: Additional headers + timeout: Request timeout + **kwargs: Additional arguments + + Returns: + httpx.Response: HTTP response + """ + return await self.request( + "PUT", + path, + json=json, + data=data, + params=params, + headers=headers, + timeout=timeout, + **kwargs + ) + + async def patch( + self, + path: str, + *, + json: Optional[dict[str, Any]] = None, + data: Optional[dict[str, Any]] = None, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make PATCH request. + + Args: + path: API path + json: JSON request body + data: Form data request body + params: URL query parameters + headers: Additional headers + timeout: Request timeout + **kwargs: Additional arguments + + Returns: + httpx.Response: HTTP response + """ + return await self.request( + "PATCH", + path, + json=json, + data=data, + params=params, + headers=headers, + timeout=timeout, + **kwargs + ) + + async def delete( + self, + path: str, + *, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, + **kwargs: Any + ) -> httpx.Response: + """ + Make DELETE request. + + Args: + path: API path + params: URL query parameters + headers: Additional headers + timeout: Request timeout + **kwargs: Additional arguments + + Returns: + httpx.Response: HTTP response + """ + return await self.request( + "DELETE", + path, + params=params, + headers=headers, + timeout=timeout, + **kwargs + ) diff --git a/tests/test_http_client.py b/tests/test_http_client.py new file mode 100644 index 0000000..cff7176 --- /dev/null +++ b/tests/test_http_client.py @@ -0,0 +1,215 @@ +""" +Tests for HTTP client. +""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock +import httpx +from core.clients.http_client import HTTPClient + + +@pytest.fixture +def mock_httpx_response(): + """Create a mock httpx response.""" + response = MagicMock(spec=httpx.Response) + response.status_code = 200 + response.json.return_value = {"success": True, "data": {"id": 1}} + response.elapsed.total_seconds.return_value = 0.123 + response.raise_for_status = MagicMock() + return response + + +@pytest.mark.asyncio +async def test_http_client_initialization(): + """Test HTTP client initialization.""" + client = HTTPClient( + base_url="https://api.example.com", + timeout=30.0, + max_retries=3, + headers={"Authorization": "Bearer test_token"} + ) + + assert client.base_url == "https://api.example.com" + assert client.timeout == 30.0 + assert client.max_retries == 3 + assert client.default_headers["Authorization"] == "Bearer test_token" + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_get_request(mock_httpx_response): + """Test HTTP GET request.""" + client = HTTPClient(base_url="https://api.example.com") + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + mock_async_client.request = AsyncMock(return_value=mock_httpx_response) + mock_get_client.return_value = mock_async_client + + response = await client.get("/users/1", params={"include": "profile"}) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + + # Verify the request was made correctly + mock_async_client.request.assert_called_once() + call_args = mock_async_client.request.call_args + assert call_args.kwargs["method"] == "GET" + assert "users/1" in call_args.kwargs["url"] + assert call_args.kwargs["params"] == {"include": "profile"} + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_post_request(mock_httpx_response): + """Test HTTP POST request.""" + client = HTTPClient(base_url="https://api.example.com") + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + mock_async_client.request = AsyncMock(return_value=mock_httpx_response) + mock_get_client.return_value = mock_async_client + + payload = {"name": "John", "email": "john@example.com"} + response = await client.post("/users", json=payload) + + assert response.status_code == 200 + + # Verify the request + mock_async_client.request.assert_called_once() + call_args = mock_async_client.request.call_args + assert call_args.kwargs["method"] == "POST" + assert call_args.kwargs["json"] == payload + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_context_manager(): + """Test HTTP client as context manager.""" + async with HTTPClient(base_url="https://api.example.com") as client: + assert client is not None + assert client._client is None # Not initialized until first use + + # Client should be closed after context + assert client._client is None + + +@pytest.mark.asyncio +async def test_http_client_retry_on_timeout(): + """Test automatic retry on timeout.""" + client = HTTPClient( + base_url="https://api.example.com", + max_retries=3, + retry_delay=0.01 # Fast retry for testing + ) + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + + # Simulate 2 timeouts, then success + mock_async_client.request = AsyncMock( + side_effect=[ + httpx.TimeoutException("Timeout 1"), + httpx.TimeoutException("Timeout 2"), + MagicMock( + status_code=200, + json=MagicMock(return_value={"success": True}), + elapsed=MagicMock(total_seconds=MagicMock(return_value=0.1)), + raise_for_status=MagicMock() + ) + ] + ) + mock_get_client.return_value = mock_async_client + + response = await client.get("/users") + + assert response.status_code == 200 + assert mock_async_client.request.call_count == 3 # 2 retries + 1 success + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_retry_exhausted(): + """Test retry exhaustion.""" + client = HTTPClient( + base_url="https://api.example.com", + max_retries=2, + retry_delay=0.01 + ) + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + mock_async_client.request = AsyncMock( + side_effect=httpx.TimeoutException("Persistent timeout") + ) + mock_get_client.return_value = mock_async_client + + with pytest.raises(httpx.TimeoutException): + await client.get("/users") + + assert mock_async_client.request.call_count == 2 # max_retries + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_custom_timeout(): + """Test custom timeout for individual request.""" + client = HTTPClient(base_url="https://api.example.com", timeout=30.0) + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + mock_async_client.request = AsyncMock( + return_value=MagicMock( + status_code=200, + json=MagicMock(return_value={}), + elapsed=MagicMock(total_seconds=MagicMock(return_value=0.1)), + raise_for_status=MagicMock() + ) + ) + mock_get_client.return_value = mock_async_client + + await client.get("/users", timeout=10.0) + + # Verify custom timeout was used + call_args = mock_async_client.request.call_args + assert call_args.kwargs["timeout"] == 10.0 + + await client.close() + + +@pytest.mark.asyncio +async def test_http_client_headers_merge(): + """Test header merging.""" + client = HTTPClient( + base_url="https://api.example.com", + headers={"Authorization": "Bearer token", "User-Agent": "TestClient/1.0"} + ) + + with patch.object(client, '_get_client') as mock_get_client: + mock_async_client = AsyncMock() + mock_async_client.request = AsyncMock( + return_value=MagicMock( + status_code=200, + json=MagicMock(return_value={}), + elapsed=MagicMock(total_seconds=MagicMock(return_value=0.1)), + raise_for_status=MagicMock() + ) + ) + mock_get_client.return_value = mock_async_client + + await client.get("/users", headers={"X-Custom": "value"}) + + # Verify headers were merged + call_args = mock_async_client.request.call_args + headers = call_args.kwargs["headers"] + assert headers["Authorization"] == "Bearer token" + assert headers["User-Agent"] == "TestClient/1.0" + assert headers["X-Custom"] == "value" + + await client.close()