Files
kami_apple_exchage/backend/app/tasks/arq_tasks.py
danial 8ad2a5366a refactor(backend): 将Celery替换为Arq进行协程任务处理
本次提交将后端的任务队列系统从Celery迁移到了Arq,以支持基于协程的任务处理。主要改动包括:
- 更新文档和配置文件,反映架构变化。
- 修改健康检查和服务初始化逻辑,以适应Arq的使用。
- 移除与Celery相关的代码,并添加Arq任务定义和调度器。
- 更新Dockerfile和相关脚本,确保Arq worker能够正确运行。
- 调整API和业务服务中的任务处理逻辑,移除对Celery的依赖。

这些改动旨在提高系统的异步处理能力和整体性能。
2025-09-18 16:02:05 +08:00

230 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Arq任务模块 - 专门为arq设计的任务函数
遵循arq的任务签名约定
"""
import asyncio
import time
import traceback
from datetime import datetime
from typing import Any
from arq import Retry
from app.core.config import get_settings
from app.core.database import db_manager
from app.core.distributed_lock import get_lock
from app.core.log import get_logger
from app.core.redis_manager import redis_manager
from app.core.state_manager import task_state_manager
from app.core.task_scheduler import task_scheduler
from app.enums.task import OrderTaskStatus
from app.models.orders import OrderStatus
from app.repositories.order_repository import OrderRepository
from app.services.link_service import LinksService
from app.services.playwright_service import AppleOrderProcessor
from app.services.user_data_service import UserDataService
settings = get_settings()
logger = get_logger(__name__)
async def process_apple_order(ctx, order_id: str) -> dict[str, Any]:
"""
处理Apple订单任务 - Arq版本
支持分布式锁定和进度跟踪,使用协程池
Args:
ctx: Arq context对象
order_id: 订单ID
Returns:
dict[str, Any]: 处理结果
"""
task_id = ctx['job_id']
logger.info(f"开始处理Apple订单: task_id={task_id}, order_id={order_id}")
try:
# 使用协程池调度任务
result = await task_scheduler.schedule_task(
_process_apple_order_async, order_id, task_id
)
return result
except Exception as e:
# 重试 - arq自动处理重试我们只需要记录错误
logger.error(f"运行异步任务失败: {traceback.format_exc()}")
# 重新抛出异常让arq处理重试
raise
async def _process_apple_order_async(order_id: str, task_id: str) -> dict[str, Any]:
"""异步处理Apple订单"""
# 检查任务是否暂停
if await redis_manager.is_task_paused():
_, reason = await redis_manager.get_task_pause_state()
logger.info(f"任务已暂停,跳过订单处理: {order_id}, 原因: {reason}")
await asyncio.sleep(10) # 等待一段时间以防止频繁检查
return {
"success": False,
"is_paused": True,
"pause_reason": reason,
"message": "任务已暂停",
"order_id": order_id,
}
# 获取分布式锁
lock_key = f"apple_order_processing:{order_id}"
lock = get_lock(
key=lock_key,
timeout=1800, # 30分钟超时
retry_times=5,
retry_delay=1.0,
auto_extend=True,
extend_interval=120, # 每2分钟延长一次
)
try:
# 尝试获取锁
if not await lock.acquire():
logger.warning(f"无法获取订单锁: {order_id}")
raise Exception(f"Apple订单 {order_id} 正在被其他worker处理")
logger.info(f"成功获取Apple订单锁: {order_id}")
# 设置初始任务状态
await task_state_manager.set_task_state(
task_id=task_id,
status=OrderTaskStatus.RUNNING,
worker_id=order_id,
progress=0.0,
started_at=datetime.now().timestamp(),
)
# 创建Apple订单处理器
processor = AppleOrderProcessor(task_id, order_id)
# 执行订单处理
result = await processor.process_order()
# 更新最终状态
db_status = OrderStatus.SUCCESS
if result.get("success"):
logger.info(f"Apple订单处理成功: {order_id}")
await task_state_manager.complete_task(task_id)
else:
db_status = OrderStatus.FAILURE
await task_state_manager.fail_task(task_id, result.get("error", "未知错误"))
logger.error(f"Apple订单处理失败: {order_id}, error: {result.get('error')}")
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=db_status,
failure_reason=result.get("error"),
completed_at=datetime.now(),
)
return result
except Exception as e:
logger.error(f"处理Apple订单异常: {order_id}, error: {traceback.format_exc()}")
# 更新任务状态为失败
await task_state_manager.fail_task(task_id, str(e))
# 更新订单状态为失败
try:
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=str(e),
completed_at=datetime.now(),
)
except Exception as db_error:
logger.error(f"更新订单状态失败: {db_error}")
raise e
finally:
# 释放锁
await lock.release()
logger.info(f"释放Apple订单锁: {order_id}")
async def batch_process_orders(ctx) -> dict[str, Any]:
"""
批量处理订单任务 - Arq版本
Cron任务接收arq context参数
Args:
ctx: Arq context对象
Returns:
dict[str, Any]: 批量处理结果
"""
# 检查任务是否暂停
if await redis_manager.is_task_paused():
is_paused, reason = await redis_manager.get_task_pause_state()
logger.info(f"批量处理任务已暂停,原因: {reason}")
return None
# 获取数据库会话
async with db_manager.get_async_session() as session:
from app.services.order_business_service import OrderService
order_service = OrderService(session)
user_service = UserDataService(session)
links_service = LinksService(session)
processed_count = 0
max_orders_per_batch = 10 # 每批处理的最大订单数
while processed_count < max_orders_per_batch:
user_data_id = await redis_manager.get_user_data_id()
if not user_data_id:
break
# 检查用户数据是否存在
user_info = await user_service.get_user_info(user_data_id)
if not user_info:
break
# 检查是否可以获取到链接
link_info = await links_service.get_next_link_from_pool()
if not link_info:
break
try:
# 开始创建订单
order_id = await order_service.create_order(
user_data_id, link_info.link.id
)
# 使用arq并行调度订单处理任务
job = await ctx['redis'].enqueue_job(
'process_apple_order',
order_id,
_job_id=None
)
if job:
logger.info(f"已调度订单处理任务: order_id={order_id}, job_id={job.job_id}")
processed_count += 1
logger.info(f"已调度订单处理: order_id={order_id}, 已处理: {processed_count}")
except Exception as e:
logger.error(f"创建订单失败: {e}")
continue
return {
"success": True,
"processed_count": processed_count,
"message": f"批量处理完成,共调度 {processed_count} 个订单"
}