Files
kami_spider_monorepo/core/redis.py
danial 0e41e7acce feat(core): 初始化核心配置和部署文件
- 添加 .env.example 环境变量配置示例
- 添加 .gitignore 忽略文件配置
- 添加 core/config.py 配置管理模块
- 添加 deployments/k8s/configmap.yaml Kubernetes 配置
- 添加 core/database.py 数据库连接管理模块
- 添加 core/dependencies.py 全局依赖模块
- 添加 DEPENDENCIES_UPDATED.md 依赖更新记录
- 添加 deployments/k8s/deployment.yaml Kubernetes 部署配置- 添加 deployments/swarm/docker-compose.swarm.yml Docker Swarm 部署配置
- 添加 deployments/docker/docker-compose.yml Docker 部署配置
- 添加 deployments/docker/Dockerfile 应用镜像构建文件
- 添加 middleware/error_handler.py 全局异常处理中间件
2025-11-01 14:32:29 +08:00

250 lines
6.0 KiB
Python

"""
Redis integration with async client and connection pooling.
Provides shared Redis instance for all applications.
"""
from typing import Optional, Any
import json
from redis import asyncio as aioredis
from redis.asyncio import Redis, ConnectionPool
from core.config import settings
# Create Redis connection pool
redis_pool: Optional[ConnectionPool] = None
redis_client: Optional[Redis] = None
async def init_redis() -> Redis:
"""
Initialize Redis client with connection pool.
This should be called at application startup.
Returns:
Redis: Async Redis client
"""
global redis_pool, redis_client
redis_pool = ConnectionPool.from_url(
settings.redis_url,
max_connections=settings.redis_max_connections,
decode_responses=settings.redis_decode_responses,
)
redis_client = Redis(connection_pool=redis_pool)
return redis_client
async def get_redis() -> Redis:
"""
Dependency that provides Redis client.
Usage in FastAPI:
@app.get("/cache")
async def get_cache(redis: Redis = Depends(get_redis)):
...
Returns:
Redis: Async Redis client
Raises:
RuntimeError: If Redis client is not initialized
"""
if redis_client is None:
raise RuntimeError("Redis client not initialized. Call init_redis() at startup.")
return redis_client
async def check_redis_connection() -> bool:
"""
Check if Redis connection is healthy.
Returns:
bool: True if connection is healthy, False otherwise
"""
try:
if redis_client is None:
return False
await redis_client.ping()
return True
except Exception:
return False
async def close_redis_connection() -> None:
"""
Close Redis connection pool.
This should be called at application shutdown.
"""
global redis_pool, redis_client
if redis_client:
await redis_client.aclose()
redis_client = None
if redis_pool:
await redis_pool.disconnect()
redis_pool = None
class RedisCache:
"""
Redis cache utility with common operations.
Provides helper methods for caching with TTL and JSON serialization.
"""
def __init__(self, redis: Redis, prefix: str = ""):
"""
Initialize cache with Redis client and optional key prefix.
Args:
redis: Redis client instance
prefix: Optional key prefix for namespace isolation
"""
self.redis = redis
self.prefix = prefix
def _make_key(self, key: str) -> str:
"""Generate prefixed key."""
return f"{self.prefix}:{key}" if self.prefix else key
async def get(self, key: str) -> Optional[str]:
"""
Get value by key.
Args:
key: Cache key
Returns:
Optional[str]: Cached value or None if not found
"""
return await self.redis.get(self._make_key(key))
async def set(
self,
key: str,
value: str,
ttl: Optional[int] = None
) -> bool:
"""
Set value with optional TTL.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (optional)
Returns:
bool: True if successful
"""
return await self.redis.set(
self._make_key(key),
value,
ex=ttl
)
async def get_json(self, key: str) -> Optional[Any]:
"""
Get JSON value by key.
Args:
key: Cache key
Returns:
Optional[Any]: Deserialized JSON value or None
"""
value = await self.get(key)
if value is None:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
return None
async def set_json(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""
Set JSON value with optional TTL.
Args:
key: Cache key
value: Value to serialize and cache
ttl: Time to live in seconds (optional)
Returns:
bool: True if successful
"""
json_value = json.dumps(value)
return await self.set(key, json_value, ttl)
async def delete(self, key: str) -> int:
"""
Delete key.
Args:
key: Cache key
Returns:
int: Number of keys deleted
"""
return await self.redis.delete(self._make_key(key))
async def exists(self, key: str) -> bool:
"""
Check if key exists.
Args:
key: Cache key
Returns:
bool: True if key exists
"""
return await self.redis.exists(self._make_key(key)) > 0
async def expire(self, key: str, ttl: int) -> bool:
"""
Set TTL for existing key.
Args:
key: Cache key
ttl: Time to live in seconds
Returns:
bool: True if successful
"""
return await self.redis.expire(self._make_key(key), ttl)
async def increment(self, key: str, amount: int = 1) -> int:
"""
Increment numeric value.
Args:
key: Cache key
amount: Increment amount (default 1)
Returns:
int: New value after increment
"""
return await self.redis.incrby(self._make_key(key), amount)
async def decrement(self, key: str, amount: int = 1) -> int:
"""
Decrement numeric value.
Args:
key: Cache key
amount: Decrement amount (default 1)
Returns:
int: New value after decrement
"""
return await self.redis.decrby(self._make_key(key), amount)