Files
kami_spider_monorepo/core/database.py
danial 0e41e7acce feat(core): 初始化核心配置和部署文件
- 添加 .env.example 环境变量配置示例
- 添加 .gitignore 忽略文件配置
- 添加 core/config.py 配置管理模块
- 添加 deployments/k8s/configmap.yaml Kubernetes 配置
- 添加 core/database.py 数据库连接管理模块
- 添加 core/dependencies.py 全局依赖模块
- 添加 DEPENDENCIES_UPDATED.md 依赖更新记录
- 添加 deployments/k8s/deployment.yaml Kubernetes 部署配置- 添加 deployments/swarm/docker-compose.swarm.yml Docker Swarm 部署配置
- 添加 deployments/docker/docker-compose.yml Docker 部署配置
- 添加 deployments/docker/Dockerfile 应用镜像构建文件
- 添加 middleware/error_handler.py 全局异常处理中间件
2025-11-01 14:32:29 +08:00

136 lines
3.6 KiB
Python

"""
Database layer with SQLModel and async SQLAlchemy engine.
Provides connection pooling and session management.
"""
from typing import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
from sqlalchemy import text
from sqlmodel import SQLModel
from core.config import settings
# Create async engine with connection pooling
if settings.debug:
# In debug mode, use NullPool (no pooling parameters)
engine = create_async_engine(
settings.database_url,
echo=settings.db_echo,
pool_pre_ping=settings.db_pool_pre_ping,
poolclass=NullPool,
)
else:
# In production mode, use AsyncAdaptedQueuePool with pooling parameters
engine = create_async_engine(
settings.database_url,
echo=settings.db_echo,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_recycle=settings.db_pool_recycle,
pool_pre_ping=settings.db_pool_pre_ping,
poolclass=AsyncAdaptedQueuePool,
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency that provides async database session.
Usage in FastAPI:
@app.get("/users")
async def get_users(session: AsyncSession = Depends(get_session)):
...
Yields:
AsyncSession: Database session with automatic cleanup
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@asynccontextmanager
async def get_session_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session.
Usage:
async with get_session_context() as session:
result = await session.execute(select(User))
Yields:
AsyncSession: Database session with automatic cleanup
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def create_db_and_tables() -> None:
"""
Create all database tables.
This should be called at application startup in development.
In production, use Alembic migrations instead.
"""
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def drop_db_and_tables() -> None:
"""
Drop all database tables.
WARNING: This will delete all data. Use with caution.
Only use in development/testing environments.
"""
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
async def check_database_connection() -> bool:
"""
Check if database connection is healthy.
Returns:
bool: True if connection is healthy, False otherwise
"""
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return True
except Exception:
return False
async def close_database_connection() -> None:
"""
Close database connection pool.
This should be called at application shutdown.
"""
await engine.dispose()