Files
kami_spider_monorepo/apps/app_a/distributed_lock_usage.py
danial 8824e57879 feat(distributed_lock): 实现基于Redis的分布式锁功能
- 新增DistributedLock类,支持唯一标识防解锁冲突
- 实现自动续期、超时、重试、上下文管理器功能
- 提供手动 acquire、release 和 extend 接口
- 增加异步上下文管理器便利函数distributed_lock
- 实现分布式锁装饰器distributed_lock_decorator支持灵活调用
- 编写示例模块,展示多种锁的使用方式和自动续期示例
- 支持锁状态查询,演示锁冲突与延长锁超时操作
- 保证锁的线程/进程安全与Redis操作原子性
2025-11-01 14:44:17 +08:00

387 lines
11 KiB
Python

"""
在实际应用中使用分布式锁的示例。
展示如何在 FastAPI 路由、服务层中使用分布式锁。
"""
from fastapi import APIRouter, Depends, HTTPException
from redis.asyncio import Redis
from core.redis import get_redis
from core.distributed_lock import (
DistributedLock,
distributed_lock,
distributed_lock_decorator,
)
router = APIRouter(prefix="/lock-demo", tags=["分布式锁示例"])
# ============================================================================
# 示例 1: 在路由处理函数中使用分布式锁
# ============================================================================
@router.post("/process-payment/{order_id}")
async def process_payment(
order_id: str,
redis: Redis = Depends(get_redis),
):
"""
处理订单支付(使用分布式锁防止重复支付)。
使用场景:
- 防止用户重复点击支付按钮导致重复扣款
- 防止多个服务实例同时处理同一订单
"""
lock_name = f"payment:{order_id}"
lock = DistributedLock(
redis,
lock_name,
timeout=30,
retry_times=0, # 不重试,如果已在处理则直接返回
)
# 尝试获取锁,不阻塞
if not await lock.acquire(blocking=False):
raise HTTPException(
status_code=409,
detail="订单正在处理中,请勿重复提交"
)
try:
# 模拟支付处理逻辑
# 1. 检查订单状态
# 2. 调用支付网关
# 3. 更新订单状态
result = {
"order_id": order_id,
"status": "success",
"message": "支付成功"
}
return result
finally:
await lock.release()
# ============================================================================
# 示例 2: 使用上下文管理器简化代码
# ============================================================================
@router.post("/inventory/decrease/{product_id}")
async def decrease_inventory(
product_id: str,
quantity: int,
redis: Redis = Depends(get_redis),
):
"""
扣减库存(使用分布式锁保证库存一致性)。
使用场景:
- 防止超卖
- 保证库存扣减的原子性
"""
try:
async with DistributedLock(
redis,
f"inventory:{product_id}",
timeout=10,
retry_times=5,
retry_delay=0.2,
):
# 模拟库存扣减逻辑
# 1. 查询当前库存
current_inventory = 100 # 模拟数据
# 2. 检查库存是否足够
if current_inventory < quantity:
raise HTTPException(
status_code=400,
detail="库存不足"
)
# 3. 扣减库存
new_inventory = current_inventory - quantity
return {
"product_id": product_id,
"original_inventory": current_inventory,
"decreased_quantity": quantity,
"remaining_inventory": new_inventory,
}
except RuntimeError:
raise HTTPException(
status_code=503,
detail="系统繁忙,请稍后重试"
)
# ============================================================================
# 示例 3: 使用便捷函数
# ============================================================================
@router.post("/cache/refresh/{key}")
async def refresh_cache(key: str):
"""
刷新缓存(使用分布式锁防止缓存击穿)。
使用场景:
- 热点数据缓存过期时,防止大量请求同时回源
- 确保只有一个请求去更新缓存
"""
try:
async with distributed_lock(
f"cache_refresh:{key}",
timeout=30,
retry_times=3,
):
# 模拟缓存刷新逻辑
# 1. 再次检查缓存(可能其他请求已刷新)
# 2. 从数据源加载数据
# 3. 更新缓存
new_data = f"refreshed_data_for_{key}"
return {
"key": key,
"data": new_data,
"message": "缓存刷新成功"
}
except RuntimeError:
raise HTTPException(
status_code=503,
detail="缓存刷新失败,请稍后重试"
)
# ============================================================================
# 示例 4: 在服务层使用装饰器
# ============================================================================
class ReportService:
"""报表服务"""
@distributed_lock_decorator("daily_report_generation", timeout=300, auto_renewal=True)
async def generate_daily_report(self, date: str):
"""
生成日报(使用分布式锁防止重复生成)。
使用场景:
- 定时任务在多个实例上运行时,确保只执行一次
- 长时间运行的报表生成任务
"""
# 模拟报表生成逻辑
# 1. 从数据库查询数据
# 2. 计算统计指标
# 3. 生成报表文件
# 4. 保存到存储
return {
"date": date,
"status": "completed",
"message": "日报生成成功"
}
async def generate_user_report(self, user_id: str, redis: Redis):
"""
生成用户报表(手动控制锁的生命周期)。
使用场景:
- 需要根据业务逻辑动态决定是否持有锁
- 需要在执行过程中延长锁的持有时间
"""
lock = DistributedLock(
redis,
f"user_report:{user_id}",
timeout=60,
retry_times=3,
)
if not await lock.acquire():
return {
"user_id": user_id,
"status": "skipped",
"message": "报表正在生成中"
}
try:
# 第一阶段:数据收集(预计 30 秒)
# await collect_user_data(user_id)
# 检查是否需要更多时间
# 如果需要,延长锁的持有时间
await lock.extend(additional_time=60)
# 第二阶段:数据分析(可能需要更长时间)
# await analyze_user_data(user_id)
return {
"user_id": user_id,
"status": "completed",
"message": "用户报表生成成功"
}
finally:
await lock.release()
# ============================================================================
# 示例 5: 分布式任务调度
# ============================================================================
@router.post("/tasks/schedule/{task_id}")
async def schedule_task(
task_id: str,
redis: Redis = Depends(get_redis),
):
"""
调度任务(使用分布式锁确保任务只被一个实例执行)。
使用场景:
- 分布式定时任务调度
- 消息队列消费者去重
"""
lock = DistributedLock(
redis,
f"task:{task_id}",
timeout=120,
auto_renewal=True,
retry_times=0, # 不重试,如果已被其他实例处理则跳过
)
# 非阻塞获取锁
if not await lock.acquire(blocking=False):
return {
"task_id": task_id,
"status": "skipped",
"message": "任务已被其他实例处理"
}
try:
# 执行任务
# 由于启用了自动续期,即使任务执行时间超过 120 秒也不会丢失锁
result = await _execute_task(task_id)
return {
"task_id": task_id,
"status": "completed",
"result": result,
}
finally:
await lock.release()
async def _execute_task(task_id: str):
"""执行任务的具体逻辑"""
# 模拟任务执行
return {"processed": True}
# ============================================================================
# 示例 6: 限流和防抖
# ============================================================================
@router.post("/api/rate-limit/{user_id}")
async def rate_limited_api(
user_id: str,
redis: Redis = Depends(get_redis),
):
"""
限流接口(使用分布式锁实现简单的限流)。
使用场景:
- 防止用户频繁调用接口
- 简单的防刷机制
注意:这只是演示,生产环境建议使用专门的限流算法(令牌桶、漏桶等)
"""
lock_name = f"rate_limit:{user_id}"
lock = DistributedLock(
redis,
lock_name,
timeout=5, # 5 秒内只能调用一次
retry_times=0,
)
if not await lock.acquire(blocking=False):
raise HTTPException(
status_code=429,
detail="请求过于频繁,请 5 秒后再试"
)
try:
# 执行业务逻辑
return {
"user_id": user_id,
"message": "请求处理成功",
"note": "5 秒内只能调用一次"
}
finally:
# 不立即释放锁,让它自然过期
# 这样可以实现简单的时间窗口限流
pass
# ============================================================================
# 示例 7: 批量操作的协调
# ============================================================================
@router.post("/batch/process")
async def batch_process(
batch_id: str,
redis: Redis = Depends(get_redis),
):
"""
批量处理(使用分布式锁协调多个实例)。
使用场景:
- 大批量数据处理
- 需要分片处理但避免重复
"""
try:
async with distributed_lock(
f"batch:{batch_id}",
timeout=600,
auto_renewal=True,
):
# 1. 获取待处理的数据
# 2. 分片处理
# 3. 更新处理状态
return {
"batch_id": batch_id,
"status": "processing",
"message": "批量处理已启动"
}
except RuntimeError:
# 其他实例正在处理
return {
"batch_id": batch_id,
"status": "already_processing",
"message": "批量处理已在进行中"
}
# ============================================================================
# 创建服务实例
# ============================================================================
report_service = ReportService()
@router.post("/reports/daily/{date}")
async def generate_daily_report_endpoint(date: str):
"""生成日报接口"""
result = await report_service.generate_daily_report(date)
return result
@router.post("/reports/user/{user_id}")
async def generate_user_report_endpoint(
user_id: str,
redis: Redis = Depends(get_redis),
):
"""生成用户报表接口"""
result = await report_service.generate_user_report(user_id, redis)
return result