refactor订单处理和配置

- 优化订单处理逻辑,增加特殊错误处理
- 调整配置文件格式,提高可读性
- 修复部分代码缩进和格式问题
- 移除未使用的导入和变量
This commit is contained in:
danial
2025-09-20 02:16:40 +08:00
parent 4df2793ad7
commit c03a811fbd
9 changed files with 61 additions and 47 deletions

View File

@@ -207,7 +207,7 @@ async def get_order(
created_at=order.created_at.isoformat(),
updated_at=order.updated_at.isoformat(),
final_order_url=order.final_order_url,
final_order_id=order.final_order_id,
final_order_id=order.final_order_id,
failure_reason=order.failure_reason,
user_data_id=order.user_data_id,
links_id=order.links_id,

View File

@@ -47,7 +47,6 @@ class Settings(BaseSettings):
REDIS_PASSWORD: str | None = Field(default=None, description="Redis密码")
REDIS_DB: int = Field(default=0, description="Redis数据库编号")
# 超时配置
REQUEST_TIMEOUT: int = Field(default=30, description="请求超时时间(秒)")
PLAYWRIGHT_TIMEOUT: int = Field(
@@ -91,29 +90,23 @@ class Settings(BaseSettings):
OTEL_EXPORTER_PROTOCOL: str = Field(
default="grpc", description="OTLP协议 (grpc, http)"
)
OTEL_EXPORTER_TIMEOUT: int = Field(
default=30, description="导出器超时时间(秒)"
)
OTEL_EXPORTER_TIMEOUT: int = Field(default=30, description="导出器超时时间(秒)")
# 功能开关
OTEL_TRACES_ENABLED: bool = Field(default=True, description="是否启用链路追踪")
OTEL_METRICS_ENABLED: bool = Field(default=True, description="是否启用指标")
OTEL_LOGS_ENABLED: bool = Field(default=True, description="是否启用日志导出")
OTEL_PYTHON_LOG_LEVEL: str = Field(
default="INFO", description="OpenTelemetry 日志级别"
)
# 采样配置
OTEL_SAMPLER_RATIO: float = Field(
default=1.0, description="采样率 (0.0-1.0)"
)
OTEL_SAMPLER_RATIO: float = Field(default=1.0, description="采样率 (0.0-1.0)")
# 性能配置
OTEL_BATCH_SIZE: int = Field(default=512, description="批量处理大小")
OTEL_EXPORT_INTERVAL: int = Field(
default=5000, description="导出间隔(毫秒)"
)
OTEL_EXPORT_INTERVAL: int = Field(default=5000, description="导出间隔(毫秒)")
OTEL_MAX_QUEUE_SIZE: int = Field(default=2048, description="最大队列大小")
# 文件存储配置
@@ -125,9 +118,13 @@ class Settings(BaseSettings):
# Arq Worker配置
WORKER_MAX_JOBS: int = Field(default=10, description="Arq worker最大任务数")
WORKER_JOB_TIMEOUT: int = Field(default=1800, description="Arq worker任务超时时间(秒)")
WORKER_JOB_TIMEOUT: int = Field(
default=1800, description="Arq worker任务超时时间(秒)"
)
WORKER_MAX_TRIES: int = Field(default=3, description="Arq worker最大重试次数")
WORKER_HEALTH_CHECK_INTERVAL: int = Field(default=30, description="Arq worker健康检查间隔(秒)")
WORKER_HEALTH_CHECK_INTERVAL: int = Field(
default=30, description="Arq worker健康检查间隔(秒)"
)
@field_validator("ENVIRONMENT", mode="before")
@classmethod
@@ -155,7 +152,6 @@ class Settings(BaseSettings):
raise ValueError(f"日志格式必须是以下之一: {valid_formats}")
return v.lower()
@field_validator("PORT")
@classmethod
def validate_port(cls, v):

View File

@@ -57,7 +57,9 @@ class ArqWorkerConfig:
timeout=3000,
)
]
self.logger.info(f"已注册 {len(self.cron_jobs)} 个定时任务: {[job.name for job in self.cron_jobs]}")
self.logger.info(
f"已注册 {len(self.cron_jobs)} 个定时任务: {[job.name for job in self.cron_jobs]}"
)
except ImportError as e:
self.logger.warning(f"无法导入任务函数: {e}")
self.functions = []

View File

@@ -298,10 +298,9 @@ class TelemetryManager:
try:
# Create OpenTelemetry logging handler
handler = LoggingHandler(
logger_provider=self._logger_provider,
level=logging.INFO
logger_provider=self._logger_provider, level=logging.INFO
)
# Set a formatter for the handler if needed
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
@@ -310,12 +309,12 @@ class TelemetryManager:
# Configure root logger and application loggers
root_logger = logging.getLogger()
# Only add handler if it's not already present
if handler not in root_logger.handlers:
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
# Also configure application-specific loggers
app_logger = logging.getLogger("app")
if handler not in app_logger.handlers:
@@ -343,14 +342,16 @@ class TelemetryManager:
"[trace_id=%(otelTraceID)s span_id=%(otelSpanID)s "
"resource.service.name=%(otelServiceName)s trace_sampled=%(otelTraceSampled)s] - %(message)s"
)
# Convert string log level to logging constant
log_level = getattr(logging, self.settings.OTEL_PYTHON_LOG_LEVEL.upper(), logging.INFO)
log_level = getattr(
logging, self.settings.OTEL_PYTHON_LOG_LEVEL.upper(), logging.INFO
)
LoggingInstrumentor().instrument(
set_logging_format=True,
logging_format=logging_format,
log_level=log_level
log_level=log_level,
)
self._instrumentors.append(LoggingInstrumentor)
print("Logging instrumentation 设置完成 - 已启用 trace context 注入")

View File

@@ -85,7 +85,9 @@ class Orders(BaseModel):
elif isinstance(self.gift_cards, list):
return self.gift_cards
else:
return [self.gift_cards,]
return [
self.gift_cards,
]
@property
def is_processing(self) -> bool:

View File

@@ -88,7 +88,9 @@ class OrderService:
if hasattr(gc, "card_code") and gc.card_code
]
else:
card_codes = [order.gift_cards.card_code,]
card_codes = [
order.gift_cards.card_code,
]
# 处理用户信息(如果存在)
user_name = ""
@@ -125,7 +127,9 @@ class OrderService:
if hasattr(gc, "card_code") and gc.card_code
]
else:
card_codes = [order.gift_cards.card_code, ]
card_codes = [
order.gift_cards.card_code,
]
export_data.append(
{

View File

@@ -454,7 +454,7 @@ class AppleOrderProcessor:
if not await self._click_element(
page, self.selectors["zipcode_edit"], wait=False
):
raise Exception("无法点击点击邮编设置按钮")
raise Exception("无法点击邮编设置按钮")
if not await self._input_element(
page, self.selectors["zipcode_input"], value=self.order.user_data.zip_code
):
@@ -708,7 +708,7 @@ class AppleOrderProcessor:
raise Exception("无法点击 place_order")
await page.wait_for_timeout(timeout=5000)
await page.wait_for_load_state("networkidle")
# 强制等待
# 强制等待
max_wait_timeout = 60
while max_wait_timeout >= 0:
try:

View File

@@ -7,7 +7,6 @@ import asyncio
import traceback
from datetime import datetime
from typing import Any
import uuid
from app.core.config import get_settings
from app.core.database import db_manager
@@ -44,7 +43,7 @@ async def process_apple_order(ctx, order_id: str) -> dict[str, Any]:
try:
# 直接处理订单任务(移除自定义任务调度器)
result = await _process_apple_order_async(order_id, task_id)
result = await _process_apple_order_async(ctx, order_id, task_id)
return result
except Exception as e:
# 重试 - arq自动处理重试我们只需要记录错误
@@ -53,7 +52,9 @@ async def process_apple_order(ctx, order_id: str) -> dict[str, Any]:
raise
async def _process_apple_order_async(order_id: str, task_id: str) -> dict[str, Any]:
async def _process_apple_order_async(
ctx, order_id: str, task_id: str
) -> dict[str, Any]:
"""异步处理Apple订单"""
# 检查任务是否暂停
@@ -122,14 +123,26 @@ async def _process_apple_order_async(order_id: str, task_id: str) -> dict[str, A
task_id, order_id, result.get("error", "未知错误")
)
logger.error(f"Apple订单处理失败: {order_id}, error: {result.get('error')}")
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=result.get("error"),
completed_at=datetime.now(),
)
if "无法点击邮编设置按钮" in result.get("error", ""):
# # 特殊错误处理 - 可能是地区限制,暂停任务
# 使用arq并行调度订单处理任务
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.PENDING,
updated_at=datetime.now(),
)
await ctx["redis"].enqueue_job("process_apple_order", order_id)
else:
async with db_manager.get_async_session() as session:
order_repo = OrderRepository(session)
await order_repo.update_by_id(
order_id,
status=OrderStatus.FAILURE,
failure_reason=result.get("error", "未知错误"),
completed_at=datetime.now(),
)
return result
@@ -211,9 +224,7 @@ async def batch_process_orders(ctx) -> dict[str, Any]:
)
# 使用arq并行调度订单处理任务
job = await ctx["redis"].enqueue_job(
"process_apple_order", order_id
)
job = await ctx["redis"].enqueue_job("process_apple_order", order_id)
if job:
logger.info(

View File

@@ -40,9 +40,7 @@ def run_arq_worker():
"在Windows上请使用: uv run python scripts/start_arq_worker.py"
)
else:
logger.info(
"请使用: uv run python scripts/start_arq_worker.py"
)
logger.info("请使用: uv run python scripts/start_arq_worker.py")
# 简单等待,让用户看到消息
import time