Files
kami_spider_monorepo/tests/test_distributed_lock_integration.py
danial 8824e57879 feat(distributed_lock): 实现基于Redis的分布式锁功能
- 新增DistributedLock类,支持唯一标识防解锁冲突
- 实现自动续期、超时、重试、上下文管理器功能
- 提供手动 acquire、release 和 extend 接口
- 增加异步上下文管理器便利函数distributed_lock
- 实现分布式锁装饰器distributed_lock_decorator支持灵活调用
- 编写示例模块,展示多种锁的使用方式和自动续期示例
- 支持锁状态查询,演示锁冲突与延长锁超时操作
- 保证锁的线程/进程安全与Redis操作原子性
2025-11-01 14:44:17 +08:00

389 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
分布式锁集成测试(不依赖 pytest可直接运行
测试在真实 Redis 环境下的分布式锁功能。
"""
import asyncio
import time
from typing import List
from core.redis import init_redis, get_redis, close_redis_connection
from core.distributed_lock import (
DistributedLock,
distributed_lock,
distributed_lock_decorator,
)
class TestResults:
"""测试结果记录"""
def __init__(self):
self.passed = 0
self.failed = 0
self.errors: List[str] = []
def assert_true(self, condition: bool, message: str):
"""断言为真"""
if condition:
self.passed += 1
print(f"{message}")
else:
self.failed += 1
error = f"{message}"
self.errors.append(error)
print(f" {error}")
def assert_false(self, condition: bool, message: str):
"""断言为假"""
self.assert_true(not condition, message)
def assert_equal(self, actual, expected, message: str):
"""断言相等"""
if actual == expected:
self.passed += 1
print(f"{message}")
else:
self.failed += 1
error = f"{message} (expected: {expected}, actual: {actual})"
self.errors.append(error)
print(f" {error}")
def print_summary(self):
"""打印测试摘要"""
total = self.passed + self.failed
print("\n" + "=" * 60)
print(f"测试摘要: {self.passed}/{total} 通过")
if self.errors:
print(f"\n失败的测试:")
for error in self.errors:
print(f" {error}")
print("=" * 60)
async def test_basic_lock_acquire_release():
"""测试基本的锁获取和释放"""
print("\n测试 1: 基本的锁获取和释放")
results = TestResults()
redis = await get_redis()
lock = DistributedLock(redis, "test_basic_lock", timeout=10)
# 测试获取锁
acquired = await lock.acquire()
results.assert_true(acquired, "成功获取锁")
# 测试锁状态
is_locked = await lock.is_locked_by_me()
results.assert_true(is_locked, "锁由当前实例持有")
# 测试释放锁
released = await lock.release()
results.assert_true(released, "成功释放锁")
# 释放后的状态
is_locked_after = await lock.is_locked_by_me()
results.assert_false(is_locked_after, "释放后锁不再由当前实例持有")
return results
async def test_context_manager():
"""测试上下文管理器"""
print("\n测试 2: 上下文管理器")
results = TestResults()
redis = await get_redis()
executed = False
try:
async with DistributedLock(redis, "test_context_lock", timeout=10):
executed = True
results.assert_true(executed, "代码块被执行")
# 验证锁已释放
lock_exists = await redis.exists("distributed_lock:test_context_lock")
results.assert_equal(lock_exists, 0, "锁已自动释放")
except Exception as e:
results.assert_true(False, f"上下文管理器执行失败: {e}")
return results
async def test_lock_conflict():
"""测试锁冲突"""
print("\n测试 3: 锁冲突")
results = TestResults()
redis = await get_redis()
# 第一个锁
lock1 = DistributedLock(redis, "test_conflict_lock", timeout=5)
acquired1 = await lock1.acquire()
results.assert_true(acquired1, "第一个实例成功获取锁")
# 第二个锁(尝试获取同一个资源)
lock2 = DistributedLock(redis, "test_conflict_lock", timeout=5, retry_times=0)
acquired2 = await lock2.acquire(blocking=False)
results.assert_false(acquired2, "第二个实例无法获取锁(预期行为)")
# 释放第一个锁
await lock1.release()
# 现在第二个锁应该能获取
acquired2_retry = await lock2.acquire(blocking=False)
results.assert_true(acquired2_retry, "第一个锁释放后,第二个实例成功获取锁")
await lock2.release()
return results
async def test_lock_with_retry():
"""测试带重试的锁获取"""
print("\n测试 4: 带重试的锁获取")
results = TestResults()
redis = await get_redis()
# 第一个锁持有 2 秒
async def holder():
lock = DistributedLock(redis, "test_retry_lock", timeout=10)
await lock.acquire()
await asyncio.sleep(2)
await lock.release()
# 第二个锁等待获取(带重试)
async def waiter():
await asyncio.sleep(0.5) # 确保第一个锁先获取
lock = DistributedLock(
redis,
"test_retry_lock",
timeout=10,
retry_times=20,
retry_delay=0.2,
)
start_time = time.time()
acquired = await lock.acquire()
elapsed = time.time() - start_time
results.assert_true(acquired, "通过重试成功获取锁")
results.assert_true(1.0 < elapsed < 3.0, f"等待时间合理 ({elapsed:.2f}秒)")
await lock.release()
await asyncio.gather(holder(), waiter())
return results
async def test_lock_timeout():
"""测试锁超时"""
print("\n测试 5: 锁超时")
results = TestResults()
redis = await get_redis()
# 获取锁但不释放,等待超时
lock1 = DistributedLock(redis, "test_timeout_lock", timeout=2)
await lock1.acquire()
results.assert_true(True, "获取锁成功")
# 等待超过超时时间
await asyncio.sleep(3)
# 另一个实例应该能获取锁(因为第一个锁已超时)
lock2 = DistributedLock(redis, "test_timeout_lock", timeout=5)
acquired = await lock2.acquire(blocking=False)
results.assert_true(acquired, "超时后其他实例可以获取锁")
await lock2.release()
return results
async def test_extend_lock():
"""测试延长锁"""
print("\n测试 6: 延长锁")
results = TestResults()
redis = await get_redis()
lock = DistributedLock(redis, "test_extend_lock", timeout=3)
await lock.acquire()
results.assert_true(True, "获取锁成功")
# 等待 2 秒
await asyncio.sleep(2)
# 延长锁
extended = await lock.extend(additional_time=5)
results.assert_true(extended, "成功延长锁")
# 再等待 3 秒(原本会超时,但已延长)
await asyncio.sleep(3)
# 应该仍然持有锁
is_mine = await lock.is_locked_by_me()
results.assert_true(is_mine, "延长后仍持有锁")
await lock.release()
return results
async def test_convenience_function():
"""测试便捷函数"""
print("\n测试 7: 便捷函数")
results = TestResults()
executed = False
try:
async with distributed_lock("test_convenience", timeout=10):
executed = True
results.assert_true(executed, "便捷函数正常工作")
except Exception as e:
results.assert_true(False, f"便捷函数失败: {e}")
return results
async def test_decorator():
"""测试装饰器"""
print("\n测试 8: 装饰器")
results = TestResults()
call_count = 0
@distributed_lock_decorator("test_decorator_lock")
async def decorated_function():
nonlocal call_count
call_count += 1
return "success"
try:
result = await decorated_function()
results.assert_equal(result, "success", "装饰器函数返回正确结果")
results.assert_equal(call_count, 1, "函数被调用一次")
except Exception as e:
results.assert_true(False, f"装饰器失败: {e}")
return results
async def test_concurrent_access():
"""测试并发访问"""
print("\n测试 9: 并发访问")
results = TestResults()
counter = {"value": 0}
redis = await get_redis()
async def increment():
async with DistributedLock(
redis,
"test_concurrent_counter",
timeout=10,
retry_times=20,
retry_delay=0.1,
):
# 读取、修改、写入(非原子操作,需要锁保护)
current = counter["value"]
await asyncio.sleep(0.01) # 模拟延迟
counter["value"] = current + 1
# 并发执行 10 个增量操作
await asyncio.gather(*[increment() for _ in range(10)])
results.assert_equal(counter["value"], 10, "并发计数正确(无竞态条件)")
return results
async def test_auto_renewal():
"""测试自动续期"""
print("\n测试 10: 自动续期")
results = TestResults()
redis = await get_redis()
# 创建一个超时时间短但启用自动续期的锁
lock = DistributedLock(
redis,
"test_auto_renewal_lock",
timeout=2,
auto_renewal=True,
renewal_interval=1,
)
await lock.acquire()
results.assert_true(True, "获取锁成功")
# 等待超过超时时间(但自动续期会保持锁)
await asyncio.sleep(4)
# 应该仍然持有锁
is_mine = await lock.is_locked_by_me()
results.assert_true(is_mine, "自动续期后仍持有锁")
await lock.release()
return results
async def run_all_tests():
"""运行所有测试"""
print("=" * 60)
print("分布式锁集成测试")
print("=" * 60)
# 初始化 Redis
try:
await init_redis()
print("✓ Redis 连接成功\n")
except Exception as e:
print(f"✗ Redis 连接失败: {e}")
print("请确保 Redis 服务正在运行并配置正确")
return
# 运行所有测试
all_results = TestResults()
tests = [
test_basic_lock_acquire_release,
test_context_manager,
test_lock_conflict,
test_lock_with_retry,
test_lock_timeout,
test_extend_lock,
test_convenience_function,
test_decorator,
test_concurrent_access,
test_auto_renewal,
]
for test in tests:
try:
result = await test()
all_results.passed += result.passed
all_results.failed += result.failed
all_results.errors.extend(result.errors)
except Exception as e:
all_results.failed += 1
error = f"✗ 测试 {test.__name__} 执行失败: {e}"
all_results.errors.append(error)
print(f"\n {error}")
# 清理
await close_redis_connection()
# 打印摘要
all_results.print_summary()
return all_results.failed == 0
if __name__ == "__main__":
success = asyncio.run(run_all_tests())
exit(0 if success else 1)