Files
kami_apple_exchage/backend/app/tasks/arq_tasks.py
danial 8bc8e1c664 feat(links): 实现基于权重的轮询算法和链接管理功能
- 新增链接权重字段,支持1-100范围设置
- 修改轮询算法为基于权重的选择机制
- 更新链接API接口返回统一使用LinkInfo模型
- 添加更新链接权重的PATCH端点
- 调整链接仓库查询逻辑,只包含激活状态链接
- 迁移链接相关Pydantic模型到task模块统一管理
- 修改分页响应格式为通用PaginatedResponse包装
- 禁用OpenTelemetry监控配置
2025-09-30 17:02:02 +08:00

285 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Arq任务模块 - 专门为arq设计的任务函数
遵循arq的任务签名约定
"""
import asyncio
import traceback
from datetime import datetime
from typing import Any
from app.core.config import get_settings
from app.core.database import db_manager
from app.core.distributed_lock import get_lock
from app.core.log import get_logger
from app.core.redis_manager import redis_manager
from app.core.state_manager import task_state_manager
from app.enums.task import OrderTaskStatus
from app.models.orders import OrderStatus
from app.repositories.order_repository import OrderRepository
from app.services.link_service import LinksService
from app.services.playwright_service import AppleOrderProcessor
from app.services.user_data_service import UserDataService
settings = get_settings()
logger = get_logger(__name__)
async def process_apple_order(ctx, order_id: str) -> dict[str, Any]:
"""
处理Apple订单任务 - Arq版本
支持分布式锁定和进度跟踪,使用协程池
Args:
ctx: Arq context对象
order_id: 订单ID
Returns:
dict[str, Any]: 处理结果
"""
task_id = ctx["job_id"]
logger.info(f"开始处理Apple订单: task_id={task_id}, order_id={order_id}")
try:
# 直接处理订单任务(移除自定义任务调度器)
result = await _process_apple_order_async(ctx, order_id, task_id)
return result
except Exception as e:
# 重试 - arq自动处理重试我们只需要记录错误
logger.error(f"运行异步任务失败: {traceback.format_exc()}")
# 重新抛出异常让arq处理重试
raise
async def _process_apple_order_async(
ctx, order_id: str, task_id: str
) -> dict[str, Any]:
"""异步处理Apple订单"""
# 检查任务是否暂停
if await redis_manager.is_task_paused():
_, reason = await redis_manager.get_task_pause_state()
logger.info(f"任务已暂停,跳过订单处理: {order_id}, 原因: {reason}")
await asyncio.sleep(10) # 等待一段时间以防止频繁检查
return {
"success": False,
"is_paused": True,
"pause_reason": reason,
"message": "任务已暂停",
"order_id": order_id,
}
# 检查关联的用户数据是否已被软删除
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
order = await order_repo.get_by_id(order_id, relations=["user_data"])
if not order:
logger.error(f"订单不存在: {order_id}")
await task_state_manager.fail_task(
task_id, order_id, f"订单 {order_id} 不存在"
)
return {
"success": False,
"error": f"订单 {order_id} 不存在",
"order_id": order_id,
}
# 检查用户数据是否已被软删除
if order.user_data.is_deleted:
logger.warning(f"用户数据已被软删除,终止订单处理: {order_id}, user_data_id={order.user_data_id}")
await task_state_manager.fail_task(
task_id, order_id, f"用户数据 {order.user_data_id} 已被删除"
)
# 更新订单状态为失败
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=f"用户数据 {order.user_data_id} 已被删除",
completed_at=datetime.now(),
)
return {
"success": False,
"error": f"用户数据 {order.user_data_id} 已被删除",
"order_id": order_id,
}
# 获取分布式锁
lock_key = f"apple_order_processing:{order_id}"
lock = get_lock(
key=lock_key,
timeout=1800, # 30分钟超时
retry_times=5,
retry_delay=1.0,
auto_extend=True,
extend_interval=120, # 每2分钟延长一次
)
try:
# 尝试获取锁
if not await lock.acquire():
logger.warning(f"无法获取订单锁: {order_id}")
raise Exception(f"Apple订单 {order_id} 正在被其他worker处理")
logger.info(f"成功获取Apple订单锁: {order_id}")
# 设置初始任务状态
await task_state_manager.set_task_state(
task_id=task_id,
status=OrderTaskStatus.RUNNING,
worker_id=order_id,
progress=0.0,
started_at=datetime.now().timestamp(),
)
# 创建Apple订单处理器
processor = AppleOrderProcessor(task_id, order_id)
# 执行订单处理
result = await processor.process_order()
# 更新最终状态
if result.get("success"):
logger.info(f"Apple订单处理成功: {order_id}")
await task_state_manager.complete_task(task_id, order_id, result, True)
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
failure_reason=None,
final_order_id=result.get("order_number"),
final_order_url=result.get("order_url"),
status=OrderStatus.SUCCESS,
completed_at=datetime.now(),
)
else:
await task_state_manager.fail_task(
task_id, order_id, result.get("error", "未知错误")
)
logger.error(f"Apple订单处理失败: {order_id}, error: {result.get('error')}")
if "无法点击邮编设置按钮" in result.get("error", ""):
# # 特殊错误处理 - 可能是地区限制,暂停任务
# 使用arq并行调度订单处理任务
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.PENDING,
updated_at=datetime.now(),
)
await ctx["redis"].enqueue_job("process_apple_order", order_id)
else:
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=result.get("error", "未知错误"),
completed_at=datetime.now(),
)
return result
except Exception as e:
logger.error(f"处理Apple订单异常: {order_id}, error: {traceback.format_exc()}")
# 更新任务状态为失败
await task_state_manager.fail_task(task_id, order_id, str(e))
# 更新订单状态为失败
try:
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=str(e),
completed_at=datetime.now(),
)
except Exception as db_error:
logger.error(f"更新订单状态失败: {db_error}")
raise e
finally:
# 释放锁
await lock.release()
logger.info(f"释放Apple订单锁: {order_id}")
async def batch_process_orders(ctx) -> dict[str, Any]:
"""
批量处理订单任务 - Arq版本
Cron任务接收arq context参数
Args:
ctx: Arq context对象
Returns:
dict[str, Any]: 批量处理结果
"""
# 检查任务是否暂停
if await redis_manager.is_task_paused():
is_paused, reason = await redis_manager.get_task_pause_state()
logger.info(f"批量处理任务已暂停,原因: {reason}")
return {}
# 获取数据库会话
async with db_manager.get_async_session() as session:
from app.services.order_business_service import OrderService
order_service = OrderService(session)
user_service = UserDataService(session)
links_service = LinksService(session)
processed_count = 0
max_orders_per_batch = 5 # 每批处理的最大订单数
while processed_count < max_orders_per_batch:
user_data_id = await redis_manager.get_user_data_id()
if not user_data_id:
break
# 检查用户数据是否存在
user_info = await user_service.get_user_info(user_data_id)
if not user_info:
break
# 检查是否可以获取到链接
link_info = await links_service.get_next_link_from_pool()
if not link_info:
break
try:
# 开始创建订单
order_id = await order_service.create_order(
user_data_id, link_info.link.id
)
# 使用arq并行调度订单处理任务
job = await ctx["redis"].enqueue_job("process_apple_order", order_id)
if job:
logger.info(
f"已调度订单处理任务: order_id={order_id}, job_id={job.job_id}"
)
processed_count += 1
logger.info(
f"已调度订单处理: order_id={order_id}, 已处理: {processed_count}"
)
except Exception as e:
logger.error(f"创建订单失败: {e}")
continue
return {
"success": True,
"processed_count": processed_count,
"message": f"批量处理完成,共调度 {processed_count} 个订单",
}