# 第三方 API 客户端使用指南 本目录提供了用于集成第三方 API 的基础客户端封装,包含连接池管理、自动重试、日志记录、链路追踪等功能。 ## 📁 模块划分原则 ### **Core 层客户端** (`core/clients/`) 放置多个应用共享的第三方 API 客户端: - 平台级服务 (微信、支付宝、短信服务等) - 通用基础设施客户端 - 公共 SaaS 服务 ### **应用层客户端** (`apps/{app_name}/clients/`) 放置单个应用专用的第三方 API 客户端: - 特定业务的外部服务 - 垂直领域的专用接口 - 应用独有的集成 --- ## 🏗️ 核心组件 ### 1. `BaseAPIClient` - 基础客户端类 提供所有 API 客户端的通用功能: - ✅ 连接池管理 - ✅ 自动重试 (指数退避) - ✅ 超时控制 - ✅ 请求/响应日志 - ✅ OpenTelemetry 链路追踪 - ✅ 健康检查 ### 2. `HTTPClient` - HTTP 客户端 基于 `httpx` 的异步 HTTP 客户端封装: - ✅ GET/POST/PUT/PATCH/DELETE 方法 - ✅ JSON 和表单数据支持 - ✅ 查询参数处理 - ✅ 自定义请求头 - ✅ 连接池优化 --- ## 🚀 快速开始 ### 基础使用 ```python from core.clients.http_client import HTTPClient # 创建客户端 client = HTTPClient( base_url="https://api.example.com", timeout=30.0, max_retries=3, headers={"Authorization": "Bearer YOUR_TOKEN"} ) # 使用上下文管理器 (推荐) async with client: # GET 请求 response = await client.get("/users/123") data = response.json() # POST 请求 response = await client.post( "/users", json={"name": "John", "email": "john@example.com"} ) # 带查询参数的请求 response = await client.get( "/users", params={"page": 1, "page_size": 10} ) # 手动关闭 await client.close() ``` ### 创建自定义客户端 ```python from typing import Any from core.clients.http_client import HTTPClient from core.config import settings class WeChatAPIClient(HTTPClient): """微信 API 客户端""" def __init__( self, app_id: str, app_secret: str, ) -> None: super().__init__( base_url="https://api.weixin.qq.com", timeout=30.0, max_retries=3, trace_enabled=settings.otel_enabled, ) self.app_id = app_id self.app_secret = app_secret self._access_token: str | None = None async def get_access_token(self) -> str: """获取访问令牌""" if self._access_token: return self._access_token response = await self.get( "/cgi-bin/token", params={ "grant_type": "client_credential", "appid": self.app_id, "secret": self.app_secret, } ) data = response.json() self._access_token = data["access_token"] return self._access_token async def send_template_message( self, openid: str, template_id: str, data: dict[str, Any], ) -> dict[str, Any]: """发送模板消息""" access_token = await self.get_access_token() response = await self.post( "/cgi-bin/message/template/send", params={"access_token": access_token}, json={ "touser": openid, "template_id": template_id, "data": data, } ) return response.json() ``` --- ## 🔌 FastAPI 集成 ### 1. 创建依赖注入函数 ```python # core/clients/wechat.py from typing import Optional from core.clients.http_client import HTTPClient from core.config import settings _wechat_client: Optional[WeChatAPIClient] = None async def get_wechat_client() -> WeChatAPIClient: """获取微信 API 客户端实例 (用于依赖注入)""" global _wechat_client if _wechat_client is None: _wechat_client = WeChatAPIClient( app_id=settings.wechat_app_id, app_secret=settings.wechat_app_secret, ) return _wechat_client async def close_wechat_client() -> None: """关闭微信 API 客户端""" global _wechat_client if _wechat_client: await _wechat_client.close() _wechat_client = None ``` ### 2. 在路由中使用 ```python from fastapi import APIRouter, Depends from core.clients.wechat import get_wechat_client, WeChatAPIClient router = APIRouter(prefix="/wechat", tags=["WeChat"]) @router.post("/send-message") async def send_wechat_message( openid: str, message: str, client: WeChatAPIClient = Depends(get_wechat_client) ): """发送微信消息""" result = await client.send_template_message( openid=openid, template_id="your_template_id", data={"content": {"value": message}} ) return {"success": True, "result": result} ``` ### 3. 在应用生命周期中管理 ```python # main.py from core.clients.wechat import close_wechat_client @asynccontextmanager async def lifespan(app: FastAPI): # Startup logger.info("Starting application...") yield # Shutdown logger.info("Shutting down application...") await close_wechat_client() # 关闭第三方客户端 ``` --- ## ⚙️ 配置说明 在 `core/config.py` 中添加第三方 API 配置: ```python class Settings(BaseSettings): # WeChat API wechat_app_id: str = Field(default="", description="微信 AppID") wechat_app_secret: str = Field(default="", description="微信 AppSecret") # Payment API payment_api_key: str = Field(default="", description="支付 API 密钥") payment_base_url: str = Field(default="https://api.payment.com", description="支付 API 地址") ``` 环境变量: ```bash # .env WECHAT_APP_ID=wx1234567890 WECHAT_APP_SECRET=your_secret_here PAYMENT_API_KEY=pk_live_xxx ``` --- ## 🎯 功能特性 ### 1. 自动重试 ```python client = HTTPClient( base_url="https://api.example.com", max_retries=3, # 最多重试 3 次 retry_delay=1.0, # 初始延迟 1 秒 retry_backoff=2.0, # 指数退避因子 (1s -> 2s -> 4s) ) ``` **可重试的错误:** - `httpx.TimeoutException` - 请求超时 - `httpx.NetworkError` - 网络错误 - `httpx.RemoteProtocolError` - 协议错误 **不可重试的错误:** - 4xx 客户端错误 (立即抛出异常) - 其他非网络相关错误 ### 2. 超时控制 ```python # 全局超时 client = HTTPClient(base_url="...", timeout=30.0) # 单个请求超时 response = await client.get("/users", timeout=10.0) ``` ### 3. 请求日志 自动记录所有请求和响应: ```json { "level": "DEBUG", "message": "API Request: GET https://api.example.com/users/123", "method": "GET", "url": "https://api.example.com/users/123", "headers": {"Authorization": "***REDACTED***"} } { "level": "INFO", "message": "API Response: GET https://api.example.com/users/123 - 200 (0.234s)", "method": "GET", "url": "https://api.example.com/users/123", "status_code": 200, "elapsed_seconds": 0.234 } ``` **敏感数据脱敏:** `Authorization`, `API-Key`, `Cookie` 等头会自动脱敏 ### 4. OpenTelemetry 追踪 自动创建 Span 并记录: - HTTP 方法和 URL - 请求/响应状态码 - 请求耗时 - 异常信息 ```python client = HTTPClient( base_url="...", trace_enabled=True # 启用追踪 (默认跟随 settings.otel_enabled) ) ``` ### 5. 连接池优化 ```python client = HTTPClient( base_url="...", max_connections=100, # 最大连接数 max_keepalive_connections=20, # 保持活动的连接数 ) ``` ### 6. 健康检查 ```python is_healthy = await client.health_check() if not is_healthy: logger.error("API service is unavailable") ``` --- ## 📝 最佳实践 ### ✅ 推荐做法 1. **使用上下文管理器** ```python async with HTTPClient(base_url="...") as client: response = await client.get("/endpoint") # 自动关闭连接 ``` 2. **单例模式 (长生命周期)** ```python # 创建全局客户端实例 _client: Optional[WeChatAPIClient] = None async def get_client() -> WeChatAPIClient: global _client if _client is None: _client = WeChatAPIClient(...) return _client ``` 3. **在应用生命周期中管理** ```python @asynccontextmanager async def lifespan(app: FastAPI): yield await close_all_clients() # 优雅关闭 ``` 4. **使用依赖注入** ```python async def endpoint( client: WeChatAPIClient = Depends(get_wechat_client) ): return await client.get_user(...) ``` ### ❌ 避免做法 1. **不要在每个请求中创建新客户端** ```python # ❌ 错误示例 async def bad_example(): client = HTTPClient(base_url="...") response = await client.get("/endpoint") await client.close() ``` 2. **不要忘记关闭客户端** ```python # ❌ 资源泄漏 client = HTTPClient(base_url="...") response = await client.get("/endpoint") # 忘记调用 await client.close() ``` 3. **不要在日志中暴露敏感信息** ```python # ✅ BaseAPIClient 已自动脱敏敏感请求头 # 无需额外处理 ``` --- ## 🧪 测试示例 ```python import pytest from unittest.mock import AsyncMock, patch from core.clients.http_client import HTTPClient @pytest.mark.asyncio async def test_http_client_get(): """测试 HTTP GET 请求""" async with HTTPClient(base_url="https://api.example.com") as client: with patch.object(client, '_get_client') as mock_get_client: mock_response = AsyncMock() mock_response.status_code = 200 mock_response.json.return_value = {"id": 1, "name": "Test"} mock_client = AsyncMock() mock_client.request.return_value = mock_response mock_get_client.return_value = mock_client response = await client.get("/users/1") assert response.status_code == 200 data = response.json() assert data["name"] == "Test" @pytest.mark.asyncio async def test_custom_client(): """测试自定义客户端""" from core.clients.example_client import ExampleAPIClient async with ExampleAPIClient(api_key="test_key") as client: # Mock the request with patch.object(client, 'get') as mock_get: mock_response = AsyncMock() mock_response.json.return_value = {"user_id": 123} mock_get.return_value = mock_response result = await client.get_user(123) assert result["user_id"] == 123 mock_get.assert_called_once_with("/users/123") ``` --- ## 📚 参考资料 - [httpx 文档](https://www.python-httpx.org/) - [OpenTelemetry Python](https://opentelemetry.io/docs/instrumentation/python/) - [FastAPI 依赖注入](https://fastapi.tiangolo.com/tutorial/dependencies/) --- ## 🆘 常见问题 ### Q: 如何处理 API 认证? A: 在自定义客户端的 `__init__` 中设置默认请求头: ```python headers = {"Authorization": f"Bearer {api_key}"} super().__init__(base_url=base_url, headers=headers) ``` ### Q: 如何禁用自动重试? A: 设置 `max_retries=0`: ```python client = HTTPClient(base_url="...", max_retries=0) ``` ### Q: 如何自定义重试逻辑? A: 继承 `BaseAPIClient` 并重写 `_retry_with_backoff` 方法 ### Q: 如何处理大文件上传/下载? A: 使用 httpx 的流式 API: ```python async with client._get_client() as http_client: async with http_client.stream("GET", "/large-file") as response: async for chunk in response.aiter_bytes(): process(chunk) ```