- 新增DistributedLock类,支持唯一标识防解锁冲突 - 实现自动续期、超时、重试、上下文管理器功能 - 提供手动 acquire、release 和 extend 接口 - 增加异步上下文管理器便利函数distributed_lock - 实现分布式锁装饰器distributed_lock_decorator支持灵活调用 - 编写示例模块,展示多种锁的使用方式和自动续期示例 - 支持锁状态查询,演示锁冲突与延长锁超时操作 - 保证锁的线程/进程安全与Redis操作原子性
387 lines
11 KiB
Python
387 lines
11 KiB
Python
"""
|
|
在实际应用中使用分布式锁的示例。
|
|
|
|
展示如何在 FastAPI 路由、服务层中使用分布式锁。
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from redis.asyncio import Redis
|
|
from core.redis import get_redis
|
|
from core.distributed_lock import (
|
|
DistributedLock,
|
|
distributed_lock,
|
|
distributed_lock_decorator,
|
|
)
|
|
|
|
|
|
router = APIRouter(prefix="/lock-demo", tags=["分布式锁示例"])
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 1: 在路由处理函数中使用分布式锁
|
|
# ============================================================================
|
|
|
|
@router.post("/process-payment/{order_id}")
|
|
async def process_payment(
|
|
order_id: str,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""
|
|
处理订单支付(使用分布式锁防止重复支付)。
|
|
|
|
使用场景:
|
|
- 防止用户重复点击支付按钮导致重复扣款
|
|
- 防止多个服务实例同时处理同一订单
|
|
"""
|
|
lock_name = f"payment:{order_id}"
|
|
|
|
lock = DistributedLock(
|
|
redis,
|
|
lock_name,
|
|
timeout=30,
|
|
retry_times=0, # 不重试,如果已在处理则直接返回
|
|
)
|
|
|
|
# 尝试获取锁,不阻塞
|
|
if not await lock.acquire(blocking=False):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="订单正在处理中,请勿重复提交"
|
|
)
|
|
|
|
try:
|
|
# 模拟支付处理逻辑
|
|
# 1. 检查订单状态
|
|
# 2. 调用支付网关
|
|
# 3. 更新订单状态
|
|
result = {
|
|
"order_id": order_id,
|
|
"status": "success",
|
|
"message": "支付成功"
|
|
}
|
|
return result
|
|
finally:
|
|
await lock.release()
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 2: 使用上下文管理器简化代码
|
|
# ============================================================================
|
|
|
|
@router.post("/inventory/decrease/{product_id}")
|
|
async def decrease_inventory(
|
|
product_id: str,
|
|
quantity: int,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""
|
|
扣减库存(使用分布式锁保证库存一致性)。
|
|
|
|
使用场景:
|
|
- 防止超卖
|
|
- 保证库存扣减的原子性
|
|
"""
|
|
try:
|
|
async with DistributedLock(
|
|
redis,
|
|
f"inventory:{product_id}",
|
|
timeout=10,
|
|
retry_times=5,
|
|
retry_delay=0.2,
|
|
):
|
|
# 模拟库存扣减逻辑
|
|
# 1. 查询当前库存
|
|
current_inventory = 100 # 模拟数据
|
|
|
|
# 2. 检查库存是否足够
|
|
if current_inventory < quantity:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="库存不足"
|
|
)
|
|
|
|
# 3. 扣减库存
|
|
new_inventory = current_inventory - quantity
|
|
|
|
return {
|
|
"product_id": product_id,
|
|
"original_inventory": current_inventory,
|
|
"decreased_quantity": quantity,
|
|
"remaining_inventory": new_inventory,
|
|
}
|
|
except RuntimeError:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="系统繁忙,请稍后重试"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 3: 使用便捷函数
|
|
# ============================================================================
|
|
|
|
@router.post("/cache/refresh/{key}")
|
|
async def refresh_cache(key: str):
|
|
"""
|
|
刷新缓存(使用分布式锁防止缓存击穿)。
|
|
|
|
使用场景:
|
|
- 热点数据缓存过期时,防止大量请求同时回源
|
|
- 确保只有一个请求去更新缓存
|
|
"""
|
|
try:
|
|
async with distributed_lock(
|
|
f"cache_refresh:{key}",
|
|
timeout=30,
|
|
retry_times=3,
|
|
):
|
|
# 模拟缓存刷新逻辑
|
|
# 1. 再次检查缓存(可能其他请求已刷新)
|
|
# 2. 从数据源加载数据
|
|
# 3. 更新缓存
|
|
|
|
new_data = f"refreshed_data_for_{key}"
|
|
|
|
return {
|
|
"key": key,
|
|
"data": new_data,
|
|
"message": "缓存刷新成功"
|
|
}
|
|
except RuntimeError:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="缓存刷新失败,请稍后重试"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 4: 在服务层使用装饰器
|
|
# ============================================================================
|
|
|
|
class ReportService:
|
|
"""报表服务"""
|
|
|
|
@distributed_lock_decorator("daily_report_generation", timeout=300, auto_renewal=True)
|
|
async def generate_daily_report(self, date: str):
|
|
"""
|
|
生成日报(使用分布式锁防止重复生成)。
|
|
|
|
使用场景:
|
|
- 定时任务在多个实例上运行时,确保只执行一次
|
|
- 长时间运行的报表生成任务
|
|
"""
|
|
# 模拟报表生成逻辑
|
|
# 1. 从数据库查询数据
|
|
# 2. 计算统计指标
|
|
# 3. 生成报表文件
|
|
# 4. 保存到存储
|
|
|
|
return {
|
|
"date": date,
|
|
"status": "completed",
|
|
"message": "日报生成成功"
|
|
}
|
|
|
|
async def generate_user_report(self, user_id: str, redis: Redis):
|
|
"""
|
|
生成用户报表(手动控制锁的生命周期)。
|
|
|
|
使用场景:
|
|
- 需要根据业务逻辑动态决定是否持有锁
|
|
- 需要在执行过程中延长锁的持有时间
|
|
"""
|
|
lock = DistributedLock(
|
|
redis,
|
|
f"user_report:{user_id}",
|
|
timeout=60,
|
|
retry_times=3,
|
|
)
|
|
|
|
if not await lock.acquire():
|
|
return {
|
|
"user_id": user_id,
|
|
"status": "skipped",
|
|
"message": "报表正在生成中"
|
|
}
|
|
|
|
try:
|
|
# 第一阶段:数据收集(预计 30 秒)
|
|
# await collect_user_data(user_id)
|
|
|
|
# 检查是否需要更多时间
|
|
# 如果需要,延长锁的持有时间
|
|
await lock.extend(additional_time=60)
|
|
|
|
# 第二阶段:数据分析(可能需要更长时间)
|
|
# await analyze_user_data(user_id)
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"status": "completed",
|
|
"message": "用户报表生成成功"
|
|
}
|
|
finally:
|
|
await lock.release()
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 5: 分布式任务调度
|
|
# ============================================================================
|
|
|
|
@router.post("/tasks/schedule/{task_id}")
|
|
async def schedule_task(
|
|
task_id: str,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""
|
|
调度任务(使用分布式锁确保任务只被一个实例执行)。
|
|
|
|
使用场景:
|
|
- 分布式定时任务调度
|
|
- 消息队列消费者去重
|
|
"""
|
|
lock = DistributedLock(
|
|
redis,
|
|
f"task:{task_id}",
|
|
timeout=120,
|
|
auto_renewal=True,
|
|
retry_times=0, # 不重试,如果已被其他实例处理则跳过
|
|
)
|
|
|
|
# 非阻塞获取锁
|
|
if not await lock.acquire(blocking=False):
|
|
return {
|
|
"task_id": task_id,
|
|
"status": "skipped",
|
|
"message": "任务已被其他实例处理"
|
|
}
|
|
|
|
try:
|
|
# 执行任务
|
|
# 由于启用了自动续期,即使任务执行时间超过 120 秒也不会丢失锁
|
|
result = await _execute_task(task_id)
|
|
|
|
return {
|
|
"task_id": task_id,
|
|
"status": "completed",
|
|
"result": result,
|
|
}
|
|
finally:
|
|
await lock.release()
|
|
|
|
|
|
async def _execute_task(task_id: str):
|
|
"""执行任务的具体逻辑"""
|
|
# 模拟任务执行
|
|
return {"processed": True}
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 6: 限流和防抖
|
|
# ============================================================================
|
|
|
|
@router.post("/api/rate-limit/{user_id}")
|
|
async def rate_limited_api(
|
|
user_id: str,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""
|
|
限流接口(使用分布式锁实现简单的限流)。
|
|
|
|
使用场景:
|
|
- 防止用户频繁调用接口
|
|
- 简单的防刷机制
|
|
|
|
注意:这只是演示,生产环境建议使用专门的限流算法(令牌桶、漏桶等)
|
|
"""
|
|
lock_name = f"rate_limit:{user_id}"
|
|
|
|
lock = DistributedLock(
|
|
redis,
|
|
lock_name,
|
|
timeout=5, # 5 秒内只能调用一次
|
|
retry_times=0,
|
|
)
|
|
|
|
if not await lock.acquire(blocking=False):
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="请求过于频繁,请 5 秒后再试"
|
|
)
|
|
|
|
try:
|
|
# 执行业务逻辑
|
|
return {
|
|
"user_id": user_id,
|
|
"message": "请求处理成功",
|
|
"note": "5 秒内只能调用一次"
|
|
}
|
|
finally:
|
|
# 不立即释放锁,让它自然过期
|
|
# 这样可以实现简单的时间窗口限流
|
|
pass
|
|
|
|
|
|
# ============================================================================
|
|
# 示例 7: 批量操作的协调
|
|
# ============================================================================
|
|
|
|
@router.post("/batch/process")
|
|
async def batch_process(
|
|
batch_id: str,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""
|
|
批量处理(使用分布式锁协调多个实例)。
|
|
|
|
使用场景:
|
|
- 大批量数据处理
|
|
- 需要分片处理但避免重复
|
|
"""
|
|
try:
|
|
async with distributed_lock(
|
|
f"batch:{batch_id}",
|
|
timeout=600,
|
|
auto_renewal=True,
|
|
):
|
|
# 1. 获取待处理的数据
|
|
# 2. 分片处理
|
|
# 3. 更新处理状态
|
|
|
|
return {
|
|
"batch_id": batch_id,
|
|
"status": "processing",
|
|
"message": "批量处理已启动"
|
|
}
|
|
except RuntimeError:
|
|
# 其他实例正在处理
|
|
return {
|
|
"batch_id": batch_id,
|
|
"status": "already_processing",
|
|
"message": "批量处理已在进行中"
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# 创建服务实例
|
|
# ============================================================================
|
|
|
|
report_service = ReportService()
|
|
|
|
|
|
@router.post("/reports/daily/{date}")
|
|
async def generate_daily_report_endpoint(date: str):
|
|
"""生成日报接口"""
|
|
result = await report_service.generate_daily_report(date)
|
|
return result
|
|
|
|
|
|
@router.post("/reports/user/{user_id}")
|
|
async def generate_user_report_endpoint(
|
|
user_id: str,
|
|
redis: Redis = Depends(get_redis),
|
|
):
|
|
"""生成用户报表接口"""
|
|
result = await report_service.generate_user_report(user_id, redis)
|
|
return result
|