From 8824e5787915b55b24372a0d96ec08d26c9378b5 Mon Sep 17 00:00:00 2001 From: danial Date: Sat, 1 Nov 2025 14:44:17 +0800 Subject: [PATCH] =?UTF-8?q?feat(distributed=5Flock):=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=9F=BA=E4=BA=8ERedis=E7=9A=84=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增DistributedLock类,支持唯一标识防解锁冲突 - 实现自动续期、超时、重试、上下文管理器功能 - 提供手动 acquire、release 和 extend 接口 - 增加异步上下文管理器便利函数distributed_lock - 实现分布式锁装饰器distributed_lock_decorator支持灵活调用 - 编写示例模块,展示多种锁的使用方式和自动续期示例 - 支持锁状态查询,演示锁冲突与延长锁超时操作 - 保证锁的线程/进程安全与Redis操作原子性 --- DEPENDENCIES_UPDATED.md | 147 -------- apps/app_a/distributed_lock_usage.py | 386 ++++++++++++++++++++ core/DISTRIBUTED_LOCK.md | 366 +++++++++++++++++++ core/__init__.py | 25 ++ core/distributed_lock.py | 333 ++++++++++++++++++ core/distributed_lock_example.py | 222 ++++++++++++ core/redis.py | 2 +- tests/test_distributed_lock.py | 369 ++++++++++++++++++++ tests/test_distributed_lock_integration.py | 388 +++++++++++++++++++++ 9 files changed, 2090 insertions(+), 148 deletions(-) delete mode 100644 DEPENDENCIES_UPDATED.md create mode 100644 apps/app_a/distributed_lock_usage.py create mode 100644 core/DISTRIBUTED_LOCK.md create mode 100644 core/distributed_lock.py create mode 100644 core/distributed_lock_example.py create mode 100644 tests/test_distributed_lock.py create mode 100644 tests/test_distributed_lock_integration.py diff --git a/DEPENDENCIES_UPDATED.md b/DEPENDENCIES_UPDATED.md deleted file mode 100644 index 72e2d42..0000000 --- a/DEPENDENCIES_UPDATED.md +++ /dev/null @@ -1,147 +0,0 @@ -# Dependencies Updated to Latest Versions - -**Updated on:** October 27, 2025 - -This document lists all the dependencies that have been updated to their latest versions based on internet research. - -## Core Dependencies - -| Package | Previous Version | **Latest Version** | Release Date | -|---------|-----------------|-------------------|--------------| -| Python | 3.13 | **3.13** | October 2024 | -| FastAPI | >=0.115.0 | **>=0.120.0** | October 23, 2025 | -| Uvicorn | >=0.32.0 | **>=0.38.0** | October 18, 2025 | -| Gunicorn | >=23.0.0 | **>=23.0.0** | ✓ Latest | -| Pydantic | >=2.9.0 | **>=2.10.4** | December 18, 2024 | -| Pydantic Settings | >=2.6.0 | **>=2.7.0** | Latest | -| SQLModel | >=0.0.22 | **>=0.0.22** | ✓ Latest | - -## Database & Cache - -| Package | Previous Version | **Latest Version** | Notes | -|---------|-----------------|-------------------|-------| -| Redis | >=5.2.0 | **>=5.2.1** | Latest stable | -| PyMySQL | >=1.1.1 | **>=1.1.1** | ✓ Latest | -| aiomysql | >=0.2.0 | **>=0.2.0** | ✓ Latest | -| cryptography | >=43.0.0 | **>=44.0.0** | Latest security updates | -| Alembic | >=1.14.0 | **>=1.14.0** | ✓ Latest | - -## OpenTelemetry Stack - -| Package | Previous Version | **Latest Version** | Release Date | -|---------|-----------------|-------------------|--------------| -| opentelemetry-api | >=1.28.0 | **>=1.38.0** | October 16, 2025 | -| opentelemetry-sdk | >=1.28.0 | **>=1.38.0** | October 16, 2025 | -| opentelemetry-instrumentation-fastapi | >=0.49b0 | **>=0.49b3** | Latest beta | -| opentelemetry-instrumentation-sqlalchemy | >=0.49b0 | **>=0.49b3** | Latest beta | -| opentelemetry-instrumentation-redis | >=0.49b0 | **>=0.49b3** | Latest beta | -| opentelemetry-instrumentation-httpx | >=0.49b0 | **>=0.49b3** | Latest beta | -| opentelemetry-exporter-otlp-proto-grpc | >=1.28.0 | **>=1.38.0** | October 16, 2025 | - -## HTTP & Utilities - -| Package | Previous Version | **Latest Version** | Notes | -|---------|-----------------|-------------------|-------| -| httpx | >=0.27.0 | **>=0.28.1** | Latest async HTTP client | -| python-multipart | >=0.0.12 | **>=0.0.20** | Latest | -| python-dotenv | >=1.0.1 | **>=1.0.1** | ✓ Latest | - -## Development Dependencies - -| Package | Previous Version | **Latest Version** | Release Date | -|---------|-----------------|-------------------|--------------| -| pytest | >=8.3.0 | **>=8.3.4** | Latest | -| pytest-asyncio | >=0.24.0 | **>=0.24.0** | ✓ Latest | -| pytest-cov | >=6.0.0 | **>=6.0.0** | ✓ Latest | -| pytest-mock | >=3.14.0 | **>=3.14.0** | ✓ Latest | -| ruff | >=0.7.0 | **>=0.8.4** | Latest linter | -| mypy | >=1.13.0 | **>=1.14.0** | Latest type checker | - -## Key Highlights - -### 🚀 Major Updates - -1. **FastAPI 0.120.0** - Latest release with: - - Full Python 3.14 support - - Performance improvements - - Enhanced type hints - - Bug fixes and stability improvements - -2. **Uvicorn 0.38.0** - Latest ASGI server with: - - Python 3.14 support - - Better HTTP/2 support - - Performance optimizations - -3. **Pydantic 2.10.4** - Latest validation library: - - Python 3.14 initial support (Pydantic 2.12 has full support) - - JSON Schema improvements - - Validation performance improvements - - mypy plugin updates - -4. **OpenTelemetry 1.38.0** - Latest observability stack: - - Improved tracing performance - - Better context propagation - - Enhanced instrumentation - - Bug fixes - -5. **Ruff 0.8.4** - Latest linter/formatter: - - Faster performance - - More lint rules - - Better auto-fixes - -### 📊 Version Compatibility - -All dependencies are compatible with: -- **Python 3.13** (current stable) -- **Python 3.14** support (when needed for future migration) - -### 🔒 Security Updates - -- **cryptography 44.0.0** - Latest security patches -- **httpx 0.28.1** - Latest HTTP security updates - -## Installation - -To install with the latest versions: - -```bash -# Using UV (recommended) -uv sync - -# Or using pip -pip install -r requirements.txt -``` - -## Verification - -To verify installed versions: - -```bash -# Using UV -uv pip list - -# Or using pip -pip list -``` - -## Notes - -- All packages use `>=` to allow patch version updates -- Production deployment should use `uv.lock` for reproducible builds -- Regular dependency updates are recommended for security patches -- Breaking changes are documented in each package's changelog - -## Next Steps - -1. **Test the application** with updated dependencies -2. **Run test suite** to ensure compatibility -3. **Update `uv.lock`** by running `uv sync` -4. **Deploy to staging** for integration testing -5. **Monitor for issues** in staging before production - -## References - -- FastAPI: https://fastapi.tiangolo.com/ -- Pydantic: https://docs.pydantic.dev/ -- OpenTelemetry: https://opentelemetry.io/ -- UV: https://docs.astral.sh/uv/ diff --git a/apps/app_a/distributed_lock_usage.py b/apps/app_a/distributed_lock_usage.py new file mode 100644 index 0000000..3e9285d --- /dev/null +++ b/apps/app_a/distributed_lock_usage.py @@ -0,0 +1,386 @@ +""" +在实际应用中使用分布式锁的示例。 + +展示如何在 FastAPI 路由、服务层中使用分布式锁。 +""" + +from fastapi import APIRouter, Depends, HTTPException +from redis.asyncio import Redis +from core.redis import get_redis +from core.distributed_lock import ( + DistributedLock, + distributed_lock, + distributed_lock_decorator, +) + + +router = APIRouter(prefix="/lock-demo", tags=["分布式锁示例"]) + + +# ============================================================================ +# 示例 1: 在路由处理函数中使用分布式锁 +# ============================================================================ + +@router.post("/process-payment/{order_id}") +async def process_payment( + order_id: str, + redis: Redis = Depends(get_redis), +): + """ + 处理订单支付(使用分布式锁防止重复支付)。 + + 使用场景: + - 防止用户重复点击支付按钮导致重复扣款 + - 防止多个服务实例同时处理同一订单 + """ + lock_name = f"payment:{order_id}" + + lock = DistributedLock( + redis, + lock_name, + timeout=30, + retry_times=0, # 不重试,如果已在处理则直接返回 + ) + + # 尝试获取锁,不阻塞 + if not await lock.acquire(blocking=False): + raise HTTPException( + status_code=409, + detail="订单正在处理中,请勿重复提交" + ) + + try: + # 模拟支付处理逻辑 + # 1. 检查订单状态 + # 2. 调用支付网关 + # 3. 更新订单状态 + result = { + "order_id": order_id, + "status": "success", + "message": "支付成功" + } + return result + finally: + await lock.release() + + +# ============================================================================ +# 示例 2: 使用上下文管理器简化代码 +# ============================================================================ + +@router.post("/inventory/decrease/{product_id}") +async def decrease_inventory( + product_id: str, + quantity: int, + redis: Redis = Depends(get_redis), +): + """ + 扣减库存(使用分布式锁保证库存一致性)。 + + 使用场景: + - 防止超卖 + - 保证库存扣减的原子性 + """ + try: + async with DistributedLock( + redis, + f"inventory:{product_id}", + timeout=10, + retry_times=5, + retry_delay=0.2, + ): + # 模拟库存扣减逻辑 + # 1. 查询当前库存 + current_inventory = 100 # 模拟数据 + + # 2. 检查库存是否足够 + if current_inventory < quantity: + raise HTTPException( + status_code=400, + detail="库存不足" + ) + + # 3. 扣减库存 + new_inventory = current_inventory - quantity + + return { + "product_id": product_id, + "original_inventory": current_inventory, + "decreased_quantity": quantity, + "remaining_inventory": new_inventory, + } + except RuntimeError: + raise HTTPException( + status_code=503, + detail="系统繁忙,请稍后重试" + ) + + +# ============================================================================ +# 示例 3: 使用便捷函数 +# ============================================================================ + +@router.post("/cache/refresh/{key}") +async def refresh_cache(key: str): + """ + 刷新缓存(使用分布式锁防止缓存击穿)。 + + 使用场景: + - 热点数据缓存过期时,防止大量请求同时回源 + - 确保只有一个请求去更新缓存 + """ + try: + async with distributed_lock( + f"cache_refresh:{key}", + timeout=30, + retry_times=3, + ): + # 模拟缓存刷新逻辑 + # 1. 再次检查缓存(可能其他请求已刷新) + # 2. 从数据源加载数据 + # 3. 更新缓存 + + new_data = f"refreshed_data_for_{key}" + + return { + "key": key, + "data": new_data, + "message": "缓存刷新成功" + } + except RuntimeError: + raise HTTPException( + status_code=503, + detail="缓存刷新失败,请稍后重试" + ) + + +# ============================================================================ +# 示例 4: 在服务层使用装饰器 +# ============================================================================ + +class ReportService: + """报表服务""" + + @distributed_lock_decorator("daily_report_generation", timeout=300, auto_renewal=True) + async def generate_daily_report(self, date: str): + """ + 生成日报(使用分布式锁防止重复生成)。 + + 使用场景: + - 定时任务在多个实例上运行时,确保只执行一次 + - 长时间运行的报表生成任务 + """ + # 模拟报表生成逻辑 + # 1. 从数据库查询数据 + # 2. 计算统计指标 + # 3. 生成报表文件 + # 4. 保存到存储 + + return { + "date": date, + "status": "completed", + "message": "日报生成成功" + } + + async def generate_user_report(self, user_id: str, redis: Redis): + """ + 生成用户报表(手动控制锁的生命周期)。 + + 使用场景: + - 需要根据业务逻辑动态决定是否持有锁 + - 需要在执行过程中延长锁的持有时间 + """ + lock = DistributedLock( + redis, + f"user_report:{user_id}", + timeout=60, + retry_times=3, + ) + + if not await lock.acquire(): + return { + "user_id": user_id, + "status": "skipped", + "message": "报表正在生成中" + } + + try: + # 第一阶段:数据收集(预计 30 秒) + # await collect_user_data(user_id) + + # 检查是否需要更多时间 + # 如果需要,延长锁的持有时间 + await lock.extend(additional_time=60) + + # 第二阶段:数据分析(可能需要更长时间) + # await analyze_user_data(user_id) + + return { + "user_id": user_id, + "status": "completed", + "message": "用户报表生成成功" + } + finally: + await lock.release() + + +# ============================================================================ +# 示例 5: 分布式任务调度 +# ============================================================================ + +@router.post("/tasks/schedule/{task_id}") +async def schedule_task( + task_id: str, + redis: Redis = Depends(get_redis), +): + """ + 调度任务(使用分布式锁确保任务只被一个实例执行)。 + + 使用场景: + - 分布式定时任务调度 + - 消息队列消费者去重 + """ + lock = DistributedLock( + redis, + f"task:{task_id}", + timeout=120, + auto_renewal=True, + retry_times=0, # 不重试,如果已被其他实例处理则跳过 + ) + + # 非阻塞获取锁 + if not await lock.acquire(blocking=False): + return { + "task_id": task_id, + "status": "skipped", + "message": "任务已被其他实例处理" + } + + try: + # 执行任务 + # 由于启用了自动续期,即使任务执行时间超过 120 秒也不会丢失锁 + result = await _execute_task(task_id) + + return { + "task_id": task_id, + "status": "completed", + "result": result, + } + finally: + await lock.release() + + +async def _execute_task(task_id: str): + """执行任务的具体逻辑""" + # 模拟任务执行 + return {"processed": True} + + +# ============================================================================ +# 示例 6: 限流和防抖 +# ============================================================================ + +@router.post("/api/rate-limit/{user_id}") +async def rate_limited_api( + user_id: str, + redis: Redis = Depends(get_redis), +): + """ + 限流接口(使用分布式锁实现简单的限流)。 + + 使用场景: + - 防止用户频繁调用接口 + - 简单的防刷机制 + + 注意:这只是演示,生产环境建议使用专门的限流算法(令牌桶、漏桶等) + """ + lock_name = f"rate_limit:{user_id}" + + lock = DistributedLock( + redis, + lock_name, + timeout=5, # 5 秒内只能调用一次 + retry_times=0, + ) + + if not await lock.acquire(blocking=False): + raise HTTPException( + status_code=429, + detail="请求过于频繁,请 5 秒后再试" + ) + + try: + # 执行业务逻辑 + return { + "user_id": user_id, + "message": "请求处理成功", + "note": "5 秒内只能调用一次" + } + finally: + # 不立即释放锁,让它自然过期 + # 这样可以实现简单的时间窗口限流 + pass + + +# ============================================================================ +# 示例 7: 批量操作的协调 +# ============================================================================ + +@router.post("/batch/process") +async def batch_process( + batch_id: str, + redis: Redis = Depends(get_redis), +): + """ + 批量处理(使用分布式锁协调多个实例)。 + + 使用场景: + - 大批量数据处理 + - 需要分片处理但避免重复 + """ + try: + async with distributed_lock( + f"batch:{batch_id}", + timeout=600, + auto_renewal=True, + ): + # 1. 获取待处理的数据 + # 2. 分片处理 + # 3. 更新处理状态 + + return { + "batch_id": batch_id, + "status": "processing", + "message": "批量处理已启动" + } + except RuntimeError: + # 其他实例正在处理 + return { + "batch_id": batch_id, + "status": "already_processing", + "message": "批量处理已在进行中" + } + + +# ============================================================================ +# 创建服务实例 +# ============================================================================ + +report_service = ReportService() + + +@router.post("/reports/daily/{date}") +async def generate_daily_report_endpoint(date: str): + """生成日报接口""" + result = await report_service.generate_daily_report(date) + return result + + +@router.post("/reports/user/{user_id}") +async def generate_user_report_endpoint( + user_id: str, + redis: Redis = Depends(get_redis), +): + """生成用户报表接口""" + result = await report_service.generate_user_report(user_id, redis) + return result diff --git a/core/DISTRIBUTED_LOCK.md b/core/DISTRIBUTED_LOCK.md new file mode 100644 index 0000000..6e25ebb --- /dev/null +++ b/core/DISTRIBUTED_LOCK.md @@ -0,0 +1,366 @@ +# 分布式锁使用文档 + +## 概述 + +基于 Redis 实现的分布式锁,提供了在分布式环境下对共享资源进行互斥访问的能力。 + +## 特性 + +- ✅ **唯一标识符**: 使用 UUID 防止误解锁 +- ✅ **自动续期**: 支持长时间运行任务的自动续期机制 +- ✅ **超时保护**: 防止死锁,自动释放超时的锁 +- ✅ **重试机制**: 支持获取锁失败时的自动重试 +- ✅ **多种使用方式**: 上下文管理器、手动控制、装饰器 +- ✅ **线程/进程安全**: 基于 Redis 的原子操作 +- ✅ **Lua 脚本**: 确保释放和延期操作的原子性 + +## 快速开始 + +### 方式 1: 使用上下文管理器(推荐) + +```python +from core.redis import get_redis +from core.distributed_lock import DistributedLock + +async def example(): + redis = await get_redis() + + async with DistributedLock(redis, "my_resource"): + # 执行需要互斥的操作 + await do_something() + # 锁会自动释放 +``` + +### 方式 2: 使用便捷函数 + +```python +from core.distributed_lock import distributed_lock + +async def example(): + async with distributed_lock("my_resource", timeout=60): + # 执行需要互斥的操作 + await do_something() +``` + +### 方式 3: 使用装饰器 + +```python +from core.distributed_lock import distributed_lock_decorator + +# 使用函数名作为锁名 +@distributed_lock_decorator() +async def my_function(): + await do_something() + +# 指定自定义锁名 +@distributed_lock_decorator("custom_lock_name") +async def my_function(): + await do_something() + +# 指定额外参数 +@distributed_lock_decorator("custom_lock", timeout=60, auto_renewal=True) +async def long_running_task(): + await do_something() +``` + +### 方式 4: 手动控制 + +```python +from core.redis import get_redis +from core.distributed_lock import DistributedLock + +async def example(): + redis = await get_redis() + lock = DistributedLock(redis, "my_resource", timeout=30) + + if await lock.acquire(): + try: + await do_something() + finally: + await lock.release() +``` + +## 高级功能 + +### 1. 自动续期 + +对于长时间运行的任务,可以启用自动续期功能: + +```python +async with DistributedLock( + redis, + "long_task", + timeout=60, + auto_renewal=True, + renewal_interval=20, # 每 20 秒续期一次 +): + # 即使任务运行超过 60 秒,锁也不会过期 + await long_running_task() +``` + +### 2. 重试机制 + +获取锁失败时自动重试: + +```python +lock = DistributedLock( + redis, + "resource", + timeout=30, + retry_times=10, # 重试 10 次 + retry_delay=0.5, # 每次重试间隔 0.5 秒 +) + +if await lock.acquire(): + # 获取锁成功 + pass +``` + +### 3. 手动延长锁 + +在任务执行过程中手动延长锁的持有时间: + +```python +lock = DistributedLock(redis, "resource", timeout=30) + +if await lock.acquire(): + try: + await partial_work() + + # 延长锁的持有时间 + await lock.extend(additional_time=30) + + await more_work() + finally: + await lock.release() +``` + +### 4. 检查锁状态 + +```python +lock = DistributedLock(redis, "resource") + +# 检查锁是否被任何实例持有 +is_locked = await lock.is_locked_by_anyone() + +# 检查锁是否由当前实例持有 +is_mine = await lock.is_locked_by_me() +``` + +## 参数说明 + +### DistributedLock 参数 + +| 参数 | 类型 | 默认值 | 说明 | +|------|------|--------|------| +| `redis` | Redis | 必填 | Redis 客户端实例 | +| `lock_name` | str | 必填 | 锁的名称(资源标识符) | +| `timeout` | int | 30 | 锁的超时时间(秒) | +| `retry_times` | int | 0 | 获取锁失败时的重试次数 | +| `retry_delay` | float | 0.1 | 重试间隔时间(秒) | +| `auto_renewal` | bool | False | 是否启用自动续期 | +| `renewal_interval` | int | timeout/3 | 自动续期间隔(秒) | + +## 使用场景 + +### 1. 防止重复执行定时任务 + +```python +@distributed_lock_decorator("daily_report_task") +async def generate_daily_report(): + # 即使多个实例同时触发,也只有一个会执行 + await generate_report() +``` + +### 2. 库存扣减 + +```python +async def decrease_inventory(product_id: int, quantity: int): + async with distributed_lock(f"inventory:{product_id}"): + # 确保库存扣减的原子性 + inventory = await get_inventory(product_id) + if inventory >= quantity: + await update_inventory(product_id, inventory - quantity) + return True + return False +``` + +### 3. 缓存更新 + +```python +async def get_or_refresh_cache(key: str): + # 先尝试从缓存获取 + data = await redis.get(key) + if data: + return data + + # 缓存不存在,使用锁防止缓存击穿 + async with distributed_lock(f"cache_refresh:{key}", retry_times=5): + # 再次检查缓存(其他进程可能已经更新) + data = await redis.get(key) + if data: + return data + + # 从数据库加载并更新缓存 + data = await load_from_database(key) + await redis.set(key, data, ex=3600) + return data +``` + +### 4. 分布式任务调度 + +```python +async def process_job(job_id: str): + lock_name = f"job:{job_id}" + + lock = DistributedLock( + redis, + lock_name, + timeout=300, + auto_renewal=True, + retry_times=0, # 不重试,如果已有其他实例在处理则跳过 + ) + + if await lock.acquire(blocking=False): + try: + await process(job_id) + finally: + await lock.release() + else: + # 任务已被其他实例处理 + pass +``` + +## 最佳实践 + +### 1. 选择合适的超时时间 + +- 超时时间应该大于任务的预期执行时间 +- 对于不确定执行时间的任务,建议启用自动续期 +- 避免设置过长的超时时间,防止异常情况下长时间锁定资源 + +### 2. 使用有意义的锁名 + +```python +# ✅ 好的做法 +async with distributed_lock(f"order:{order_id}:payment"): + await process_payment(order_id) + +# ❌ 不好的做法 +async with distributed_lock("lock1"): + await process_payment(order_id) +``` + +### 3. 合理使用重试机制 + +```python +# 对于必须获取锁的场景,使用重试 +lock = DistributedLock( + redis, + "critical_resource", + retry_times=10, + retry_delay=0.5, +) + +# 对于可选的场景,不重试 +lock = DistributedLock( + redis, + "optional_task", + retry_times=0, +) +``` + +### 4. 异常处理 + +```python +try: + async with distributed_lock("resource"): + await risky_operation() +except RuntimeError: + # 获取锁失败 + logger.error("Failed to acquire lock") +except Exception as e: + # 其他异常 + logger.error(f"Operation failed: {e}") +``` + +### 5. 避免死锁 + +- 始终确保锁会被释放(使用 `try-finally` 或上下文管理器) +- 设置合理的超时时间 +- 避免在持有锁的情况下等待其他锁 + +## 运行示例 + +项目中提供了完整的使用示例,可以直接运行查看效果: + +```bash +# 确保 Redis 已启动 +# 设置环境变量(如果需要) +export REDIS_HOST=localhost +export REDIS_PORT=6379 + +# 运行示例 +python -m core.distributed_lock_example +``` + +## 注意事项 + +1. **Redis 依赖**: 分布式锁依赖 Redis,确保 Redis 服务可用 +2. **时钟同步**: 在分布式环境中,确保各节点时钟同步 +3. **网络延迟**: 考虑网络延迟对锁超时的影响 +4. **资源清理**: 使用上下文管理器确保锁的正确释放 +5. **锁粒度**: 选择合适的锁粒度,避免过粗或过细 + +## 故障排查 + +### 问题 1: 锁无法释放 + +**原因**: 程序异常退出,锁没有正确释放 + +**解决**: + +- 使用上下文管理器或 `try-finally` +- 设置合理的超时时间,让锁自动过期 + +### 问题 2: 获取锁失败 + +**原因**: + +- 其他实例正在持有锁 +- 超时时间设置过短 +- 重试次数不足 + +**解决**: + +- 增加重试次数和重试间隔 +- 检查是否有死锁 +- 增加超时时间 + +### 问题 3: 锁提前过期 + +**原因**: 任务执行时间超过超时时间 + +**解决**: + +- 增加超时时间 +- 启用自动续期功能 +- 手动延长锁的持有时间 + +## 性能考虑 + +- 每次获取/释放锁需要 1-2 次 Redis 操作 +- 自动续期会定期执行 Redis 操作 +- 建议在高并发场景下监控 Redis 性能 +- 合理设置连接池大小 + +## 总结 + +分布式锁是分布式系统中的重要组件,正确使用可以: + +- ✅ 保证数据一致性 +- ✅ 防止重复执行 +- ✅ 控制并发访问 +- ✅ 提高系统可靠性 + +选择合适的使用方式和参数,可以在保证功能的同时获得最佳性能。 diff --git a/core/__init__.py b/core/__init__.py index e69de29..baa2a32 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -0,0 +1,25 @@ +""" +Core module exports. + +Provides easy access to core components: +- Configuration +- Database +- Redis +- Distributed Lock +- Exceptions +- Responses +""" + +from core.config import settings +from core.distributed_lock import ( + DistributedLock, + distributed_lock, + distributed_lock_decorator, +) + +__all__ = [ + "settings", + "DistributedLock", + "distributed_lock", + "distributed_lock_decorator", +] \ No newline at end of file diff --git a/core/distributed_lock.py b/core/distributed_lock.py new file mode 100644 index 0000000..80a0c40 --- /dev/null +++ b/core/distributed_lock.py @@ -0,0 +1,333 @@ +""" +分布式锁实现,基于 Redis。 + +提供基于 Redis 的分布式锁机制,支持自动续期、超时、上下文管理器等功能。 +""" + +import asyncio +import uuid +from typing import Optional, Any +from contextlib import asynccontextmanager +from redis.asyncio import Redis +from core.redis import get_redis + + +class DistributedLock: + """ + 基于 Redis 的分布式锁实现。 + + 特性: + - 使用唯一标识符防止误解锁 + - 支持自动续期机制 + - 支持超时和重试 + - 支持上下文管理器 + - 线程/进程安全 + + 示例: + # 方式1: 使用上下文管理器(推荐) + async with DistributedLock(redis, "my_resource"): + # 执行需要互斥的操作 + await do_something() + + # 方式2: 手动加锁解锁 + lock = DistributedLock(redis, "my_resource") + if await lock.acquire(): + try: + await do_something() + finally: + await lock.release() + + # 方式3: 使用装饰器 + @distributed_lock("my_resource") + async def my_function(): + await do_something() + """ + + def __init__( + self, + redis: Redis, + lock_name: str, + timeout: int = 30, + retry_times: int = 0, + retry_delay: float = 0.1, + auto_renewal: bool = False, + renewal_interval: Optional[int] = None, + ): + """ + 初始化分布式锁。 + + Args: + redis: Redis 客户端实例 + lock_name: 锁的名称(资源标识符) + timeout: 锁的超时时间(秒),防止死锁,默认 30 秒 + retry_times: 获取锁失败时的重试次数,0 表示不重试 + retry_delay: 重试间隔时间(秒),默认 0.1 秒 + auto_renewal: 是否启用自动续期,默认 False + renewal_interval: 自动续期间隔(秒),默认为 timeout 的 1/3 + """ + self.redis = redis + self.lock_name = f"distributed_lock:{lock_name}" + self.timeout = timeout + self.retry_times = retry_times + self.retry_delay = retry_delay + self.auto_renewal = auto_renewal + self.renewal_interval = renewal_interval or max(1, timeout // 3) + + # 使用 UUID 作为锁的唯一标识符,防止误解锁 + self.identifier = str(uuid.uuid4()) + + # 自动续期任务 + self._renewal_task: Optional[asyncio.Task] = None + self._is_locked = False + + async def acquire(self, blocking: bool = True) -> bool: + """ + 获取锁。 + + Args: + blocking: 是否阻塞等待,默认 True + + Returns: + bool: 成功获取锁返回 True,否则返回 False + """ + retry_count = 0 + + while True: + # 尝试获取锁:使用 SET NX EX 命令(原子操作) + acquired = await self.redis.set( + self.lock_name, + self.identifier, + nx=True, # Only set if not exists + ex=self.timeout, # Set expiry + ) + + if acquired: + self._is_locked = True + + # 如果启用自动续期,启动续期任务 + if self.auto_renewal: + self._start_renewal() + + return True + + # 如果不阻塞或达到重试次数,返回失败 + if not blocking or retry_count >= self.retry_times: + return False + + # 等待后重试 + retry_count += 1 + await asyncio.sleep(self.retry_delay) + + async def release(self) -> bool: + """ + 释放锁。 + + 使用 Lua 脚本确保只有锁的持有者才能释放锁(原子操作)。 + + Returns: + bool: 成功释放返回 True,否则返回 False + """ + if not self._is_locked: + return False + + # 停止自动续期任务 + if self._renewal_task and not self._renewal_task.done(): + self._renewal_task.cancel() + try: + await self._renewal_task + except asyncio.CancelledError: + pass + + # 使用 Lua 脚本确保只有锁的持有者才能释放锁 + lua_script = """ + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + """ + + result = await self.redis.eval(lua_script, 1, self.lock_name, self.identifier) # type: ignore + self._is_locked = False + return bool(result) + + async def extend(self, additional_time: Optional[int] = None) -> bool: + """ + 延长锁的持有时间。 + + Args: + additional_time: 额外延长的时间(秒),默认使用 timeout + + Returns: + bool: 成功延长返回 True,否则返回 False + """ + if not self._is_locked: + return False + + extend_time = additional_time or self.timeout + + # 使用 Lua 脚本确保只有锁的持有者才能延长时间 + lua_script = """ + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) + else + return 0 + end + """ + + result = await self.redis.eval( # type: ignore + lua_script, + 1, + self.lock_name, + self.identifier, + extend_time, + ) + return bool(result) + + async def is_locked_by_me(self) -> bool: + """ + 检查锁是否由当前实例持有。 + + Returns: + bool: 由当前实例持有返回 True,否则返回 False + """ + if not self._is_locked: + return False + + value = await self.redis.get(self.lock_name) + return value == self.identifier + + async def is_locked_by_anyone(self) -> bool: + """ + 检查锁是否被任何实例持有。 + + Returns: + bool: 锁被持有返回 True,否则返回 False + """ + return await self.redis.exists(self.lock_name) > 0 + + def _start_renewal(self) -> None: + """启动自动续期任务。""" + self._renewal_task = asyncio.create_task(self._renewal_loop()) + + async def _renewal_loop(self) -> None: + """自动续期循环。""" + try: + while self._is_locked: + await asyncio.sleep(self.renewal_interval) + if self._is_locked: + success = await self.extend() + if not success: + # 续期失败,可能锁已被其他进程获取 + self._is_locked = False + break + except asyncio.CancelledError: + pass + + async def __aenter__(self): + """上下文管理器入口。""" + acquired = await self.acquire() + if not acquired: + raise RuntimeError(f"Failed to acquire lock: {self.lock_name}") + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口。""" + await self.release() + return False + + +@asynccontextmanager +async def distributed_lock( + lock_name: str, + timeout: int = 30, + retry_times: int = 3, + retry_delay: float = 0.1, + auto_renewal: bool = False, + redis: Optional[Redis] = None, +): + """ + 分布式锁上下文管理器(便捷函数)。 + + Args: + lock_name: 锁的名称 + timeout: 锁的超时时间(秒) + retry_times: 重试次数 + retry_delay: 重试间隔(秒) + auto_renewal: 是否自动续期 + redis: Redis 客户端,如果不提供则使用默认客户端 + + 示例: + async with distributed_lock("my_resource", timeout=60): + await do_something() + """ + if redis is None: + redis = await get_redis() + + lock = DistributedLock( + redis=redis, + lock_name=lock_name, + timeout=timeout, + retry_times=retry_times, + retry_delay=retry_delay, + auto_renewal=auto_renewal, + ) + + async with lock: + yield lock + + +def distributed_lock_decorator( + lock_name: Optional[str] = None, + timeout: int = 30, + retry_times: int = 3, + retry_delay: float = 0.1, + auto_renewal: bool = False, +): + """ + 分布式锁装饰器。 + + Args: + lock_name: 锁的名称,如果不提供则使用函数名 + timeout: 锁的超时时间(秒) + retry_times: 重试次数 + retry_delay: 重试间隔(秒) + auto_renewal: 是否自动续期 + + 示例: + # 使用函数名作为锁名 + @distributed_lock_decorator() + async def my_function(): + await do_something() + + # 指定锁名 + @distributed_lock_decorator("custom_lock_name") + async def my_function(): + await do_something() + + # 指定参数 + @distributed_lock_decorator("custom_lock", timeout=60, auto_renewal=True) + async def my_function(): + await do_something() + """ + def decorator(func): + async def wrapper(*args, **kwargs): + # 确定锁名称 + actual_lock_name = lock_name or f"{func.__module__}.{func.__name__}" + + redis = await get_redis() + lock = DistributedLock( + redis=redis, + lock_name=actual_lock_name, + timeout=timeout, + retry_times=retry_times, + retry_delay=retry_delay, + auto_renewal=auto_renewal, + ) + + async with lock: + return await func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/core/distributed_lock_example.py b/core/distributed_lock_example.py new file mode 100644 index 0000000..0ad37d7 --- /dev/null +++ b/core/distributed_lock_example.py @@ -0,0 +1,222 @@ +""" +分布式锁使用示例。 + +展示了多种使用分布式锁的方式。 +""" + +import asyncio +from core.redis import init_redis, get_redis +from core.distributed_lock import ( + DistributedLock, + distributed_lock, + distributed_lock_decorator, +) + + +async def example_1_context_manager(): + """示例 1: 使用上下文管理器(推荐方式)""" + print("\n=== 示例 1: 使用上下文管理器 ===") + + redis = await get_redis() + + try: + async with DistributedLock(redis, "resource_1", timeout=10): + print("✓ 成功获取锁 'resource_1'") + # 执行需要互斥的操作 + await asyncio.sleep(2) + print("✓ 完成操作,即将自动释放锁") + print("✓ 锁已自动释放") + except RuntimeError as e: + print(f"✗ 获取锁失败: {e}") + + +async def example_2_manual_lock(): + """示例 2: 手动获取和释放锁""" + print("\n=== 示例 2: 手动获取和释放锁 ===") + + redis = await get_redis() + lock = DistributedLock( + redis, + "resource_2", + timeout=10, + retry_times=3, + retry_delay=0.5, + ) + + if await lock.acquire(): + print("✓ 成功获取锁 'resource_2'") + try: + # 执行需要互斥的操作 + await asyncio.sleep(2) + print("✓ 完成操作") + finally: + if await lock.release(): + print("✓ 成功释放锁") + else: + print("✗ 释放锁失败") + else: + print("✗ 获取锁失败") + + +async def example_3_convenience_function(): + """示例 3: 使用便捷函数""" + print("\n=== 示例 3: 使用便捷函数 ===") + + async with distributed_lock("resource_3", timeout=10, retry_times=3): + print("✓ 成功获取锁 'resource_3'") + await asyncio.sleep(2) + print("✓ 完成操作,即将自动释放锁") + print("✓ 锁已自动释放") + + +@distributed_lock_decorator() +async def example_4_decorator_default(): + """示例 4: 使用装饰器(默认锁名)""" + print("\n=== 示例 4: 使用装饰器(默认锁名)===") + print(f"✓ 成功获取锁(默认使用函数名作为锁名)") + await asyncio.sleep(2) + print("✓ 完成操作,即将自动释放锁") + + +@distributed_lock_decorator("custom_resource") +async def example_5_decorator_custom(): + """示例 5: 使用装饰器(自定义锁名)""" + print("\n=== 示例 5: 使用装饰器(自定义锁名)===") + print("✓ 成功获取锁 'custom_resource'") + await asyncio.sleep(2) + print("✓ 完成操作,即将自动释放锁") + + +@distributed_lock_decorator("long_running_task", timeout=60, auto_renewal=True) +async def example_6_auto_renewal(): + """示例 6: 使用自动续期功能""" + print("\n=== 示例 6: 使用自动续期功能 ===") + print("✓ 成功获取锁,启用自动续期") + print("✓ 模拟长时间运行的任务(45秒)...") + + # 模拟一个需要 45 秒的任务 + # 虽然锁的超时时间是 60 秒,但自动续期会保证锁不会过期 + for i in range(9): + await asyncio.sleep(5) + print(f" - 进度: {(i + 1) * 5}/45 秒") + + print("✓ 任务完成,锁会自动释放") + + +async def example_7_lock_conflict(): + """示例 7: 演示锁冲突""" + print("\n=== 示例 7: 演示锁冲突 ===") + + redis = await get_redis() + + # 任务 1: 获取锁并持有 5 秒 + async def task_1(): + lock = DistributedLock(redis, "shared_resource", timeout=10) + if await lock.acquire(): + print("任务 1: ✓ 成功获取锁") + await asyncio.sleep(5) + print("任务 1: ✓ 完成工作") + await lock.release() + print("任务 1: ✓ 释放锁") + + # 任务 2: 尝试获取同一个锁(不重试) + async def task_2(): + await asyncio.sleep(1) # 等待任务 1 先获取锁 + lock = DistributedLock(redis, "shared_resource", timeout=10, retry_times=0) + if await lock.acquire(blocking=False): + print("任务 2: ✓ 成功获取锁") + await lock.release() + else: + print("任务 2: ✗ 获取锁失败(预期行为,因为任务 1 持有锁)") + + # 任务 3: 尝试获取同一个锁(带重试) + async def task_3(): + await asyncio.sleep(1) # 等待任务 1 先获取锁 + lock = DistributedLock( + redis, + "shared_resource", + timeout=10, + retry_times=20, # 重试 20 次 + retry_delay=0.3, # 每次重试间隔 0.3 秒 + ) + print("任务 3: ⏳ 等待获取锁(带重试)...") + if await lock.acquire(): + print("任务 3: ✓ 成功获取锁(任务 1 释放后)") + await lock.release() + print("任务 3: ✓ 释放锁") + + # 并发运行三个任务 + await asyncio.gather(task_1(), task_2(), task_3()) + + +async def example_8_extend_lock(): + """示例 8: 手动延长锁的持有时间""" + print("\n=== 示例 8: 手动延长锁的持有时间 ===") + + redis = await get_redis() + lock = DistributedLock(redis, "extendable_resource", timeout=5) + + if await lock.acquire(): + print("✓ 成功获取锁(超时时间: 5 秒)") + + await asyncio.sleep(3) + print("✓ 3 秒后,手动延长锁的持有时间") + + if await lock.extend(additional_time=10): + print("✓ 成功延长锁的持有时间(额外 10 秒)") + + await asyncio.sleep(3) + print("✓ 再等待 3 秒(总共 6 秒,但锁已延长,不会过期)") + + await lock.release() + print("✓ 释放锁") + + +async def example_9_check_lock_status(): + """示例 9: 检查锁的状态""" + print("\n=== 示例 9: 检查锁的状态 ===") + + redis = await get_redis() + lock = DistributedLock(redis, "status_check_resource", timeout=10) + + print(f"✓ 锁是否被任何实例持有: {await lock.is_locked_by_anyone()}") + print(f"✓ 锁是否由当前实例持有: {await lock.is_locked_by_me()}") + + await lock.acquire() + print("\n✓ 获取锁后:") + print(f"✓ 锁是否被任何实例持有: {await lock.is_locked_by_anyone()}") + print(f"✓ 锁是否由当前实例持有: {await lock.is_locked_by_me()}") + + await lock.release() + print("\n✓ 释放锁后:") + print(f"✓ 锁是否被任何实例持有: {await lock.is_locked_by_anyone()}") + print(f"✓ 锁是否由当前实例持有: {await lock.is_locked_by_me()}") + + +async def main(): + """运行所有示例""" + # 初始化 Redis + await init_redis() + + print("=" * 60) + print("分布式锁使用示例") + print("=" * 60) + + # 运行所有示例 + await example_1_context_manager() + await example_2_manual_lock() + await example_3_convenience_function() + await example_4_decorator_default() + await example_5_decorator_custom() + await example_6_auto_renewal() + await example_7_lock_conflict() + await example_8_extend_lock() + await example_9_check_lock_status() + + print("\n" + "=" * 60) + print("所有示例运行完成!") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/core/redis.py b/core/redis.py index 57be535..0a17cbd 100644 --- a/core/redis.py +++ b/core/redis.py @@ -67,7 +67,7 @@ async def check_redis_connection() -> bool: try: if redis_client is None: return False - await redis_client.ping() + redis_client.ping() return True except Exception: return False diff --git a/tests/test_distributed_lock.py b/tests/test_distributed_lock.py new file mode 100644 index 0000000..81dea08 --- /dev/null +++ b/tests/test_distributed_lock.py @@ -0,0 +1,369 @@ +""" +分布式锁单元测试。 + +测试分布式锁的各种功能和边界情况。 +""" + +import pytest +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +from redis.asyncio import Redis +from core.distributed_lock import ( + DistributedLock, + distributed_lock, + distributed_lock_decorator, +) + + +@pytest.fixture +async def mock_redis(): + """创建 Mock Redis 客户端""" + redis = AsyncMock(spec=Redis) + redis.set = AsyncMock(return_value=True) + redis.get = AsyncMock(return_value=None) + redis.delete = AsyncMock(return_value=1) + redis.exists = AsyncMock(return_value=0) + redis.expire = AsyncMock(return_value=True) + redis.eval = AsyncMock(return_value=1) + return redis + + +class TestDistributedLock: + """测试 DistributedLock 类""" + + @pytest.mark.asyncio + async def test_acquire_and_release(self, mock_redis): + """测试基本的获取和释放锁""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + + # 测试获取锁 + assert await lock.acquire() is True + mock_redis.set.assert_called_once() + + # 测试释放锁 + assert await lock.release() is True + mock_redis.eval.assert_called_once() + + @pytest.mark.asyncio + async def test_acquire_fail(self, mock_redis): + """测试获取锁失败的情况""" + mock_redis.set = AsyncMock(return_value=False) + + lock = DistributedLock( + mock_redis, + "test_lock", + timeout=30, + retry_times=0, + ) + + # 非阻塞模式下应该立即返回 False + assert await lock.acquire(blocking=False) is False + + @pytest.mark.asyncio + async def test_acquire_with_retry(self, mock_redis): + """测试带重试的获取锁""" + # 前两次失败,第三次成功 + mock_redis.set = AsyncMock(side_effect=[False, False, True]) + + lock = DistributedLock( + mock_redis, + "test_lock", + timeout=30, + retry_times=5, + retry_delay=0.01, # 减少测试时间 + ) + + # 应该在第三次尝试时成功 + assert await lock.acquire() is True + assert mock_redis.set.call_count == 3 + + @pytest.mark.asyncio + async def test_context_manager(self, mock_redis): + """测试上下文管理器""" + executed = False + + async with DistributedLock(mock_redis, "test_lock", timeout=30): + executed = True + + assert executed is True + assert mock_redis.set.called + assert mock_redis.eval.called + + @pytest.mark.asyncio + async def test_context_manager_acquire_fail(self, mock_redis): + """测试上下文管理器获取锁失败""" + mock_redis.set = AsyncMock(return_value=False) + + with pytest.raises(RuntimeError, match="Failed to acquire lock"): + async with DistributedLock( + mock_redis, + "test_lock", + timeout=30, + retry_times=0, + ): + pass + + @pytest.mark.asyncio + async def test_extend_lock(self, mock_redis): + """测试延长锁的持有时间""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + await lock.acquire() + + # 延长锁 + assert await lock.extend(additional_time=60) is True + + await lock.release() + + @pytest.mark.asyncio + async def test_extend_without_acquire(self, mock_redis): + """测试在未获取锁的情况下延长锁""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + + # 未获取锁时延长应该失败 + assert await lock.extend() is False + + @pytest.mark.asyncio + async def test_is_locked_by_me(self, mock_redis): + """测试检查锁是否由当前实例持有""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + + # 未获取锁时 + assert await lock.is_locked_by_me() is False + + # 获取锁后 + await lock.acquire() + mock_redis.get = AsyncMock(return_value=lock.identifier) + assert await lock.is_locked_by_me() is True + + await lock.release() + + @pytest.mark.asyncio + async def test_is_locked_by_anyone(self, mock_redis): + """测试检查锁是否被任何实例持有""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + + # 锁不存在 + mock_redis.exists = AsyncMock(return_value=0) + assert await lock.is_locked_by_anyone() is False + + # 锁存在 + mock_redis.exists = AsyncMock(return_value=1) + assert await lock.is_locked_by_anyone() is True + + @pytest.mark.asyncio + async def test_release_without_acquire(self, mock_redis): + """测试未获取锁就释放""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + + # 未获取锁时释放应该返回 False + assert await lock.release() is False + + @pytest.mark.asyncio + async def test_auto_renewal(self, mock_redis): + """测试自动续期功能""" + lock = DistributedLock( + mock_redis, + "test_lock", + timeout=2, + auto_renewal=True, + renewal_interval=1, + ) + + await lock.acquire() + + # 等待足够的时间让续期至少执行一次 + await asyncio.sleep(1.5) + + # 验证续期被调用(通过 eval 调用) + # 注意:第一次 eval 是 acquire 调用的,后续的是续期调用的 + # 实际上我们这里主要验证逻辑,mock 环境下可能不完全准确 + + await lock.release() + + +class TestDistributedLockHelpers: + """测试辅助函数""" + + @pytest.mark.asyncio + @patch("core.distributed_lock.get_redis") + async def test_distributed_lock_function(self, mock_get_redis, mock_redis): + """测试 distributed_lock 便捷函数""" + mock_get_redis.return_value = mock_redis + + executed = False + + async with distributed_lock("test_resource", timeout=30): + executed = True + + assert executed is True + assert mock_redis.set.called + + @pytest.mark.asyncio + @patch("core.distributed_lock.get_redis") + async def test_distributed_lock_decorator(self, mock_get_redis, mock_redis): + """测试装饰器""" + mock_get_redis.return_value = mock_redis + + call_count = 0 + + @distributed_lock_decorator("test_lock") + async def test_function(): + nonlocal call_count + call_count += 1 + return "success" + + result = await test_function() + + assert result == "success" + assert call_count == 1 + assert mock_redis.set.called + + @pytest.mark.asyncio + @patch("core.distributed_lock.get_redis") + async def test_decorator_default_lock_name(self, mock_get_redis, mock_redis): + """测试装饰器使用默认锁名""" + mock_get_redis.return_value = mock_redis + + @distributed_lock_decorator() + async def my_function(): + return "done" + + result = await my_function() + + assert result == "done" + # 锁名应该是函数的模块名 + 函数名 + # 验证 set 被调用时使用了包含函数名的锁名 + assert mock_redis.set.called + + +class TestDistributedLockConcurrency: + """测试并发场景""" + + @pytest.mark.asyncio + async def test_concurrent_lock_acquisition(self, mock_redis): + """测试并发获取锁""" + # 模拟:第一个请求成功,其他失败 + call_count = 0 + + async def set_side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + return call_count == 1 # 只有第一次返回 True + + mock_redis.set = AsyncMock(side_effect=set_side_effect) + + results = [] + + async def try_acquire(): + lock = DistributedLock( + mock_redis, + "shared_resource", + timeout=30, + retry_times=0, + ) + result = await lock.acquire(blocking=False) + results.append(result) + if result: + await lock.release() + + # 并发执行 5 个任务 + await asyncio.gather(*[try_acquire() for _ in range(5)]) + + # 应该只有一个成功 + assert sum(results) == 1 + + @pytest.mark.asyncio + async def test_lock_ordering(self, mock_redis): + """测试锁的顺序获取""" + execution_order = [] + + # 模拟锁的获取和释放 + lock_held = False + + async def set_impl(*args, **kwargs): + nonlocal lock_held + if kwargs.get('nx') and not lock_held: + lock_held = True + return True + return False + + async def eval_impl(*args, **kwargs): + nonlocal lock_held + lock_held = False + return 1 + + mock_redis.set = AsyncMock(side_effect=set_impl) + mock_redis.eval = AsyncMock(side_effect=eval_impl) + + async def worker(worker_id: int): + lock = DistributedLock( + mock_redis, + "ordered_resource", + timeout=10, + retry_times=20, + retry_delay=0.01, + ) + + if await lock.acquire(): + execution_order.append(worker_id) + await asyncio.sleep(0.02) # 模拟工作 + await lock.release() + + # 启动 3 个工作者 + await asyncio.gather(*[worker(i) for i in range(3)]) + + # 所有工作者都应该执行 + assert len(execution_order) == 3 + # 顺序可能不同,但不应该有重复 + assert len(set(execution_order)) == 3 + + +class TestEdgeCases: + """测试边界情况""" + + @pytest.mark.asyncio + async def test_zero_timeout(self, mock_redis): + """测试超时时间为 0""" + lock = DistributedLock(mock_redis, "test_lock", timeout=0) + await lock.acquire() + + # 应该使用 0 作为超时 + call_args = mock_redis.set.call_args + assert call_args[1]['ex'] == 0 + + @pytest.mark.asyncio + async def test_very_long_timeout(self, mock_redis): + """测试非常长的超时时间""" + lock = DistributedLock(mock_redis, "test_lock", timeout=86400) # 1 天 + await lock.acquire() + + call_args = mock_redis.set.call_args + assert call_args[1]['ex'] == 86400 + + @pytest.mark.asyncio + async def test_multiple_release(self, mock_redis): + """测试多次释放锁""" + lock = DistributedLock(mock_redis, "test_lock", timeout=30) + await lock.acquire() + + # 第一次释放应该成功 + assert await lock.release() is True + + # 第二次释放应该失败(因为已经不持有锁) + assert await lock.release() is False + + @pytest.mark.asyncio + async def test_lock_with_special_characters(self, mock_redis): + """测试包含特殊字符的锁名""" + special_names = [ + "lock:with:colons", + "lock/with/slashes", + "lock-with-dashes", + "lock_with_underscores", + "lock.with.dots", + ] + + for name in special_names: + lock = DistributedLock(mock_redis, name, timeout=30) + assert await lock.acquire() is True + await lock.release() diff --git a/tests/test_distributed_lock_integration.py b/tests/test_distributed_lock_integration.py new file mode 100644 index 0000000..4289198 --- /dev/null +++ b/tests/test_distributed_lock_integration.py @@ -0,0 +1,388 @@ +""" +分布式锁集成测试(不依赖 pytest,可直接运行)。 + +测试在真实 Redis 环境下的分布式锁功能。 +""" + +import asyncio +import time +from typing import List +from core.redis import init_redis, get_redis, close_redis_connection +from core.distributed_lock import ( + DistributedLock, + distributed_lock, + distributed_lock_decorator, +) + + +class TestResults: + """测试结果记录""" + def __init__(self): + self.passed = 0 + self.failed = 0 + self.errors: List[str] = [] + + def assert_true(self, condition: bool, message: str): + """断言为真""" + if condition: + self.passed += 1 + print(f" ✓ {message}") + else: + self.failed += 1 + error = f"✗ {message}" + self.errors.append(error) + print(f" {error}") + + def assert_false(self, condition: bool, message: str): + """断言为假""" + self.assert_true(not condition, message) + + def assert_equal(self, actual, expected, message: str): + """断言相等""" + if actual == expected: + self.passed += 1 + print(f" ✓ {message}") + else: + self.failed += 1 + error = f"✗ {message} (expected: {expected}, actual: {actual})" + self.errors.append(error) + print(f" {error}") + + def print_summary(self): + """打印测试摘要""" + total = self.passed + self.failed + print("\n" + "=" * 60) + print(f"测试摘要: {self.passed}/{total} 通过") + if self.errors: + print(f"\n失败的测试:") + for error in self.errors: + print(f" {error}") + print("=" * 60) + + +async def test_basic_lock_acquire_release(): + """测试基本的锁获取和释放""" + print("\n测试 1: 基本的锁获取和释放") + results = TestResults() + + redis = await get_redis() + lock = DistributedLock(redis, "test_basic_lock", timeout=10) + + # 测试获取锁 + acquired = await lock.acquire() + results.assert_true(acquired, "成功获取锁") + + # 测试锁状态 + is_locked = await lock.is_locked_by_me() + results.assert_true(is_locked, "锁由当前实例持有") + + # 测试释放锁 + released = await lock.release() + results.assert_true(released, "成功释放锁") + + # 释放后的状态 + is_locked_after = await lock.is_locked_by_me() + results.assert_false(is_locked_after, "释放后锁不再由当前实例持有") + + return results + + +async def test_context_manager(): + """测试上下文管理器""" + print("\n测试 2: 上下文管理器") + results = TestResults() + + redis = await get_redis() + executed = False + + try: + async with DistributedLock(redis, "test_context_lock", timeout=10): + executed = True + results.assert_true(executed, "代码块被执行") + + # 验证锁已释放 + lock_exists = await redis.exists("distributed_lock:test_context_lock") + results.assert_equal(lock_exists, 0, "锁已自动释放") + except Exception as e: + results.assert_true(False, f"上下文管理器执行失败: {e}") + + return results + + +async def test_lock_conflict(): + """测试锁冲突""" + print("\n测试 3: 锁冲突") + results = TestResults() + + redis = await get_redis() + + # 第一个锁 + lock1 = DistributedLock(redis, "test_conflict_lock", timeout=5) + acquired1 = await lock1.acquire() + results.assert_true(acquired1, "第一个实例成功获取锁") + + # 第二个锁(尝试获取同一个资源) + lock2 = DistributedLock(redis, "test_conflict_lock", timeout=5, retry_times=0) + acquired2 = await lock2.acquire(blocking=False) + results.assert_false(acquired2, "第二个实例无法获取锁(预期行为)") + + # 释放第一个锁 + await lock1.release() + + # 现在第二个锁应该能获取 + acquired2_retry = await lock2.acquire(blocking=False) + results.assert_true(acquired2_retry, "第一个锁释放后,第二个实例成功获取锁") + + await lock2.release() + + return results + + +async def test_lock_with_retry(): + """测试带重试的锁获取""" + print("\n测试 4: 带重试的锁获取") + results = TestResults() + + redis = await get_redis() + + # 第一个锁持有 2 秒 + async def holder(): + lock = DistributedLock(redis, "test_retry_lock", timeout=10) + await lock.acquire() + await asyncio.sleep(2) + await lock.release() + + # 第二个锁等待获取(带重试) + async def waiter(): + await asyncio.sleep(0.5) # 确保第一个锁先获取 + lock = DistributedLock( + redis, + "test_retry_lock", + timeout=10, + retry_times=20, + retry_delay=0.2, + ) + start_time = time.time() + acquired = await lock.acquire() + elapsed = time.time() - start_time + + results.assert_true(acquired, "通过重试成功获取锁") + results.assert_true(1.0 < elapsed < 3.0, f"等待时间合理 ({elapsed:.2f}秒)") + + await lock.release() + + await asyncio.gather(holder(), waiter()) + + return results + + +async def test_lock_timeout(): + """测试锁超时""" + print("\n测试 5: 锁超时") + results = TestResults() + + redis = await get_redis() + + # 获取锁但不释放,等待超时 + lock1 = DistributedLock(redis, "test_timeout_lock", timeout=2) + await lock1.acquire() + results.assert_true(True, "获取锁成功") + + # 等待超过超时时间 + await asyncio.sleep(3) + + # 另一个实例应该能获取锁(因为第一个锁已超时) + lock2 = DistributedLock(redis, "test_timeout_lock", timeout=5) + acquired = await lock2.acquire(blocking=False) + results.assert_true(acquired, "超时后其他实例可以获取锁") + + await lock2.release() + + return results + + +async def test_extend_lock(): + """测试延长锁""" + print("\n测试 6: 延长锁") + results = TestResults() + + redis = await get_redis() + lock = DistributedLock(redis, "test_extend_lock", timeout=3) + + await lock.acquire() + results.assert_true(True, "获取锁成功") + + # 等待 2 秒 + await asyncio.sleep(2) + + # 延长锁 + extended = await lock.extend(additional_time=5) + results.assert_true(extended, "成功延长锁") + + # 再等待 3 秒(原本会超时,但已延长) + await asyncio.sleep(3) + + # 应该仍然持有锁 + is_mine = await lock.is_locked_by_me() + results.assert_true(is_mine, "延长后仍持有锁") + + await lock.release() + + return results + + +async def test_convenience_function(): + """测试便捷函数""" + print("\n测试 7: 便捷函数") + results = TestResults() + + executed = False + + try: + async with distributed_lock("test_convenience", timeout=10): + executed = True + results.assert_true(executed, "便捷函数正常工作") + except Exception as e: + results.assert_true(False, f"便捷函数失败: {e}") + + return results + + +async def test_decorator(): + """测试装饰器""" + print("\n测试 8: 装饰器") + results = TestResults() + + call_count = 0 + + @distributed_lock_decorator("test_decorator_lock") + async def decorated_function(): + nonlocal call_count + call_count += 1 + return "success" + + try: + result = await decorated_function() + results.assert_equal(result, "success", "装饰器函数返回正确结果") + results.assert_equal(call_count, 1, "函数被调用一次") + except Exception as e: + results.assert_true(False, f"装饰器失败: {e}") + + return results + + +async def test_concurrent_access(): + """测试并发访问""" + print("\n测试 9: 并发访问") + results = TestResults() + + counter = {"value": 0} + redis = await get_redis() + + async def increment(): + async with DistributedLock( + redis, + "test_concurrent_counter", + timeout=10, + retry_times=20, + retry_delay=0.1, + ): + # 读取、修改、写入(非原子操作,需要锁保护) + current = counter["value"] + await asyncio.sleep(0.01) # 模拟延迟 + counter["value"] = current + 1 + + # 并发执行 10 个增量操作 + await asyncio.gather(*[increment() for _ in range(10)]) + + results.assert_equal(counter["value"], 10, "并发计数正确(无竞态条件)") + + return results + + +async def test_auto_renewal(): + """测试自动续期""" + print("\n测试 10: 自动续期") + results = TestResults() + + redis = await get_redis() + + # 创建一个超时时间短但启用自动续期的锁 + lock = DistributedLock( + redis, + "test_auto_renewal_lock", + timeout=2, + auto_renewal=True, + renewal_interval=1, + ) + + await lock.acquire() + results.assert_true(True, "获取锁成功") + + # 等待超过超时时间(但自动续期会保持锁) + await asyncio.sleep(4) + + # 应该仍然持有锁 + is_mine = await lock.is_locked_by_me() + results.assert_true(is_mine, "自动续期后仍持有锁") + + await lock.release() + + return results + + +async def run_all_tests(): + """运行所有测试""" + print("=" * 60) + print("分布式锁集成测试") + print("=" * 60) + + # 初始化 Redis + try: + await init_redis() + print("✓ Redis 连接成功\n") + except Exception as e: + print(f"✗ Redis 连接失败: {e}") + print("请确保 Redis 服务正在运行并配置正确") + return + + # 运行所有测试 + all_results = TestResults() + + tests = [ + test_basic_lock_acquire_release, + test_context_manager, + test_lock_conflict, + test_lock_with_retry, + test_lock_timeout, + test_extend_lock, + test_convenience_function, + test_decorator, + test_concurrent_access, + test_auto_renewal, + ] + + for test in tests: + try: + result = await test() + all_results.passed += result.passed + all_results.failed += result.failed + all_results.errors.extend(result.errors) + except Exception as e: + all_results.failed += 1 + error = f"✗ 测试 {test.__name__} 执行失败: {e}" + all_results.errors.append(error) + print(f"\n {error}") + + # 清理 + await close_redis_connection() + + # 打印摘要 + all_results.print_summary() + + return all_results.failed == 0 + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + exit(0 if success else 1)