mirror of
https://git.oceanpay.cc/danial/kami_apple_exchage.git
synced 2025-12-18 22:29:09 +00:00
refactor(config): remove MAX_THREADS and WORKER_MAX_CONCURRENT_TASKS from configuration files and task scheduler
This commit is contained in:
@@ -26,7 +26,6 @@ REDIS_PASSWORD=
|
||||
REDIS_DB=0
|
||||
|
||||
# 线程池配置
|
||||
MAX_THREADS=3
|
||||
THREAD_POOL_SIZE=10
|
||||
GIFT_CARD_WAIT_TIMEOUT=600
|
||||
MONITOR_INTERVAL=10
|
||||
@@ -81,4 +80,3 @@ WORKER_MAX_JOBS=10
|
||||
WORKER_JOB_TIMEOUT=1800
|
||||
WORKER_MAX_TRIES=3
|
||||
WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
WORKER_MAX_CONCURRENT_TASKS=5
|
||||
|
||||
@@ -25,7 +25,6 @@ REDIS_PASSWORD=
|
||||
REDIS_DB=0
|
||||
|
||||
# 线程池配置
|
||||
MAX_THREADS=3
|
||||
THREAD_POOL_SIZE=10
|
||||
GIFT_CARD_WAIT_TIMEOUT=600
|
||||
MONITOR_INTERVAL=10
|
||||
@@ -76,5 +75,4 @@ HEALTH_CHECK_INTERVAL=30
|
||||
WORKER_MAX_JOBS=10
|
||||
WORKER_JOB_TIMEOUT=1800
|
||||
WORKER_MAX_TRIES=3
|
||||
WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
WORKER_MAX_CONCURRENT_TASKS=5
|
||||
WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
@@ -82,5 +82,4 @@ HEALTH_CHECK_INTERVAL=30
|
||||
WORKER_MAX_JOBS=20
|
||||
WORKER_JOB_TIMEOUT=3600
|
||||
WORKER_MAX_TRIES=5
|
||||
WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
WORKER_MAX_CONCURRENT_TASKS=10
|
||||
WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
@@ -47,9 +47,6 @@ class Settings(BaseSettings):
|
||||
REDIS_PASSWORD: str | None = Field(default=None, description="Redis密码")
|
||||
REDIS_DB: int = Field(default=0, description="Redis数据库编号")
|
||||
|
||||
# 线程池配置
|
||||
MAX_THREADS: int = Field(default=3, description="最大并发线程数")
|
||||
THREAD_TIMEOUT: int = Field(default=300, description="线程超时时间(秒)")
|
||||
|
||||
# 超时配置
|
||||
REQUEST_TIMEOUT: int = Field(default=30, description="请求超时时间(秒)")
|
||||
@@ -135,7 +132,6 @@ class Settings(BaseSettings):
|
||||
WORKER_JOB_TIMEOUT: int = Field(default=1800, description="Arq worker任务超时时间(秒)")
|
||||
WORKER_MAX_TRIES: int = Field(default=3, description="Arq worker最大重试次数")
|
||||
WORKER_HEALTH_CHECK_INTERVAL: int = Field(default=30, description="Arq worker健康检查间隔(秒)")
|
||||
WORKER_MAX_CONCURRENT_TASKS: int = Field(default=5, description="Arq worker最大并发任务数")
|
||||
|
||||
@field_validator("ENVIRONMENT", mode="before")
|
||||
@classmethod
|
||||
@@ -163,15 +159,6 @@ class Settings(BaseSettings):
|
||||
raise ValueError(f"日志格式必须是以下之一: {valid_formats}")
|
||||
return v.lower()
|
||||
|
||||
@field_validator("MAX_THREADS")
|
||||
@classmethod
|
||||
def validate_max_threads(cls, v):
|
||||
"""验证最大线程数"""
|
||||
if v < 1:
|
||||
raise ValueError("最大线程数必须大于0")
|
||||
if v > 50:
|
||||
raise ValueError("最大线程数不能超过50")
|
||||
return v
|
||||
|
||||
@field_validator("PORT")
|
||||
@classmethod
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
"""
|
||||
Task scheduler for managing coroutine-based tasks
|
||||
独立的任务调度器模块,避免循环导入问题
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
from app.core.config_arq import get_arq_settings
|
||||
|
||||
settings = get_arq_settings()
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""Task scheduler for managing coroutine-based tasks"""
|
||||
|
||||
def __init__(self):
|
||||
self.coroutine_pool: list[asyncio.Task] = []
|
||||
self.max_concurrent_tasks = settings.WORKER_MAX_CONCURRENT_TASKS
|
||||
|
||||
async def schedule_task(self, task_func, *args, **kwargs):
|
||||
"""Schedule a coroutine task with concurrency control"""
|
||||
|
||||
# Wait if we've reached maximum concurrent tasks
|
||||
while len(self.coroutine_pool) >= self.max_concurrent_tasks:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Create and track the task
|
||||
task = asyncio.create_task(task_func(*args, **kwargs))
|
||||
self.coroutine_pool.append(task)
|
||||
|
||||
# Add cleanup callback
|
||||
task.add_done_callback(lambda t: self.coroutine_pool.remove(t))
|
||||
|
||||
return await task
|
||||
|
||||
async def get_pool_stats(self):
|
||||
"""Get current coroutine pool statistics"""
|
||||
return {
|
||||
"current_tasks": len(self.coroutine_pool),
|
||||
"max_concurrent_tasks": self.max_concurrent_tasks,
|
||||
"task_statuses": [
|
||||
{
|
||||
"done": task.done(),
|
||||
"cancelled": task.cancelled(),
|
||||
"exception": task.exception() if task.done() else None,
|
||||
}
|
||||
for task in self.coroutine_pool
|
||||
],
|
||||
}
|
||||
|
||||
async def cleanup_pool(self):
|
||||
"""Cleanup completed tasks from pool"""
|
||||
self.coroutine_pool = [task for task in self.coroutine_pool if not task.done()]
|
||||
|
||||
|
||||
# Global task scheduler instance
|
||||
task_scheduler = TaskScheduler()
|
||||
@@ -14,7 +14,6 @@ from app.core.distributed_lock import get_lock
|
||||
from app.core.log import get_logger
|
||||
from app.core.redis_manager import redis_manager
|
||||
from app.core.state_manager import task_state_manager
|
||||
from app.core.task_scheduler import task_scheduler
|
||||
from app.enums.task import OrderTaskStatus
|
||||
from app.models.orders import OrderStatus
|
||||
from app.repositories.order_repository import OrderRepository
|
||||
@@ -43,10 +42,8 @@ async def process_apple_order(ctx, order_id: str) -> dict[str, Any]:
|
||||
logger.info(f"开始处理Apple订单: task_id={task_id}, order_id={order_id}")
|
||||
|
||||
try:
|
||||
# 使用协程池调度任务
|
||||
result = await task_scheduler.schedule_task(
|
||||
_process_apple_order_async, order_id, task_id
|
||||
)
|
||||
# 直接处理订单任务(移除自定义任务调度器)
|
||||
result = await _process_apple_order_async(order_id, task_id)
|
||||
return result
|
||||
except Exception as e:
|
||||
# 重试 - arq自动处理重试,我们只需要记录错误
|
||||
@@ -214,7 +211,9 @@ async def batch_process_orders(ctx) -> dict[str, Any]:
|
||||
|
||||
# 使用arq并行调度订单处理任务
|
||||
job = await ctx["redis"].enqueue_job(
|
||||
"process_apple_order", order_id, _job_id=None
|
||||
"process_apple_order", order_id, _job_id=None,
|
||||
_max_tries=3, # 最大重试次数
|
||||
_timeout=1800 # 30分钟超时
|
||||
)
|
||||
|
||||
if job:
|
||||
|
||||
@@ -36,7 +36,6 @@ services:
|
||||
- DATABASE_URL=postgresql+asyncpg://postgres:Kp9mX8vL2nQ5wR7@db:5432/apple_exchange
|
||||
- REDIS_URL=redis://:Df4jG7zN9pL1tY3@redis:6379/0
|
||||
- WORKERS=4
|
||||
- LOG_DIR=/app/logs
|
||||
volumes:
|
||||
- logs:/app/logs
|
||||
- data:/app/data
|
||||
@@ -63,10 +62,7 @@ services:
|
||||
- ENVIRONMENT=production
|
||||
- DATABASE_URL=postgresql+asyncpg://postgres:Kp9mX8vL2nQ5wR7@db:5432/apple_exchange
|
||||
- REDIS_URL=redis://:Df4jG7zN9pL1tY3@redis:6379/0
|
||||
- WORKER_MAX_CONCURRENT_TASKS=5
|
||||
- LOG_DIR=/app/logs
|
||||
- PLAYWRIGHT_BROWSERS_PATH=/app/data/playwright-browsers
|
||||
- WORKER_MAX_JOBS=10
|
||||
- WORKER_MAX_JOBS=100
|
||||
- WORKER_JOB_TIMEOUT=1800
|
||||
- WORKER_MAX_TRIES=3
|
||||
- WORKER_HEALTH_CHECK_INTERVAL=30
|
||||
|
||||
@@ -58,7 +58,6 @@ services:
|
||||
- REDIS_URL=redis://redis:6379/0
|
||||
|
||||
- WORKERS=4
|
||||
- LOG_DIR=/app/logs
|
||||
volumes:
|
||||
- logs:/app/logs
|
||||
- data:/app/data
|
||||
@@ -108,9 +107,6 @@ services:
|
||||
- ENVIRONMENT=production
|
||||
- DATABASE_URL=postgresql+asyncpg://postgres:${POSTGRES_PASSWORD}@db:5432/apple_exchange
|
||||
- REDIS_URL=redis://redis:6379/0
|
||||
- WORKER_MAX_CONCURRENT_TASKS=2
|
||||
- LOG_DIR=/app/logs
|
||||
- PLAYWRIGHT_BROWSERS_PATH=/app/data/playwright-browsers
|
||||
- WORKER_MAX_JOBS=10
|
||||
- WORKER_JOB_TIMEOUT=1800
|
||||
- WORKER_MAX_TRIES=3
|
||||
|
||||
@@ -37,7 +37,6 @@ services:
|
||||
- REDIS_URL=redis://redis:6379/0
|
||||
|
||||
- WORKERS=4
|
||||
- LOG_DIR=/app/logs
|
||||
volumes:
|
||||
- logs:/app/logs
|
||||
- data:/app/data
|
||||
@@ -64,9 +63,6 @@ services:
|
||||
- ENVIRONMENT=production
|
||||
- DATABASE_URL=postgresql+asyncpg://postgres:password@db:5432/apple_exchange
|
||||
- REDIS_URL=redis://redis:6379/0
|
||||
- WORKER_MAX_CONCURRENT_TASKS=2
|
||||
- LOG_DIR=/app/logs
|
||||
- PLAYWRIGHT_BROWSERS_PATH=/app/data/playwright-browsers
|
||||
- WORKER_MAX_JOBS=10
|
||||
- WORKER_JOB_TIMEOUT=1800
|
||||
- WORKER_MAX_TRIES=3
|
||||
|
||||
Reference in New Issue
Block a user