Files
kami_gateway/internal/service/supplier/third_party/pool/service.go
danial 863dc33ba3 feat(orderpool): 优化订单提交流程及代理获取逻辑
- 为 SubmitOrder 添加重试机制,增强订单创建、绑定和处理的鲁棒性
- 提供订单创建失败和处理失败时的资源清理方法,避免资源泄漏
- 统一订单处理各阶段的日志记录,增加失败场景的上下文信息
- 调整 Nuclear 任务中随机ID生成逻辑,使用 Pipeline 批量写 Redis 降低压力
- 发送请求时增加访问异常处理,避免无代理情况下报错
- 为各 channel 接口添加获取代理失败的容错处理,防止服务中断
- proxy_pool 中代理可用性检测新增独立超时,提升检测稳定性
- 优化代理过期清理逻辑,缩短锁持有时间,避免性能瓶颈
- GetProxy 增加超时控制,异步获取防止阻塞调用线程
- scan_controller 和 service 添加 gopool panic 处理,防止任务异常崩溃
- Nuclear.go 中添加锁机制保证随机ID生成线程安全
- 减少 submitPool 线程池数量,优化资源使用
- 统一并增强日志和追踪,导入 runtime/debug 用于堆栈信息打印
2025-12-14 21:24:02 +08:00

741 lines
25 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pool
import (
"context"
"fmt"
"github.com/duke-git/lancet/v2/convertor"
"github.com/duke-git/lancet/v2/pointer"
"runtime/debug"
"strings"
"sync"
"time"
"gateway/internal/cache"
"gateway/internal/models/merchant_deploy"
"gateway/internal/models/road"
"gateway/internal/otelTrace"
"gateway/internal/service/supplier/third_party/pool/card_sender"
"gateway/internal/utils"
"github.com/beego/beego/v2/client/httplib"
"github.com/beego/beego/v2/core/berror"
"github.com/bytedance/gopkg/util/gopool"
"github.com/duke-git/lancet/v2/slice"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// OrderPoolService 订单池服务接口
type OrderPoolService interface {
// Start 启动服务
Start(ctx context.Context) error
// Stop 停止服务
Stop() error
// PushOrder 推送订单
PushOrder(ctx context.Context, task card_sender.SendCardTask) error
// GetLocalIdByOrderId 获取本地订单ID
GetLocalIdByOrderId(ctx context.Context, orderId string) (string, error)
// SubmitOrder 提交订单
SubmitOrder(ctx context.Context, task card_sender.SendCardTask) error
}
// OrderPoolServiceImpl 订单池服务实现
type OrderPoolServiceImpl struct {
config *Config
redisClient *cache.RedisClient
workerPool *WorkerPool
eventBus EventBus
metrics *Metrics
poolLocks sync.Map
channels []card_sender.SendCardTaskEnum
poolSizes sync.Map // 存储每个通道的当前池大小
channelsMu sync.RWMutex // 保护channels切片的并发访问
taskPool gopool.Pool
servicePool gopool.Pool
}
// NewOrderPoolService 创建新的订单池服务
func NewOrderPoolService(config *Config, redisClient *cache.RedisClient) OrderPoolService {
metrics := NewMetrics()
service := &OrderPoolServiceImpl{
config: config,
redisClient: redisClient,
workerPool: NewWorkerPool(config.WorkerCount, metrics),
eventBus: NewRedisEventBus(redisClient),
metrics: metrics,
channels: card_sender.GetAllSendCardTaskType(),
taskPool: gopool.NewPool(utils.GenerateId(), 10, gopool.NewConfig()),
servicePool: gopool.NewPool(utils.GenerateId(), 30, gopool.NewConfig()),
}
// 设置 panic handler
service.taskPool.SetPanicHandler(func(ctx context.Context, v interface{}) {
otelTrace.Logger.WithContext(ctx).Error("OrderPool taskPool panic recovered",
zap.Any("panic", v),
zap.String("stack", string(debug.Stack())))
})
service.servicePool.SetPanicHandler(func(ctx context.Context, v interface{}) {
otelTrace.Logger.WithContext(ctx).Error("OrderPool servicePool panic recovered",
zap.Any("panic", v),
zap.String("stack", string(debug.Stack())))
})
// 注册事件处理器
service.registerEventHandlers(context.Background())
return service
}
// registerEventHandlers 注册事件处理器
func (s *OrderPoolServiceImpl) registerEventHandlers(ctx context.Context) {
// 注册订单创建事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderCreated, NewOrderCreatedHandler(s))
// 注册订单处理事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderProcessed, NewOrderProcessedHandler(s))
// 注册订单失败事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderFailed, NewOrderFailedHandler(s))
// 注册订单池更新事件处理器
s.eventBus.Subscribe(ctx, EventTypePoolUpdated, NewPoolUpdatedHandler(s))
// 注册通道更新事件处理器
s.eventBus.Subscribe(ctx, EventTypeChannelUpdated, NewChannelUpdatedHandler(s))
// 注册订单过期时间处理器
s.eventBus.Subscribe(ctx, EventTypeOrderExpired, NewOrderExpiredHandler(s))
// 注册订单查询事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderQuery, NewOrderQueryHandler(s))
}
// Start 启动服务
func (s *OrderPoolServiceImpl) Start(ctx context.Context) error {
// 启动事件总线
if err := s.eventBus.Start(ctx); err != nil {
return fmt.Errorf("启动事件总线失败: %v", err)
}
s.workerPool.Start()
// 热更新通道
go s.startChannelHotUpdate(ctx)
// 匹配订单
go s.startOrderMatching(ctx)
return nil
}
// Stop 停止服务
func (s *OrderPoolServiceImpl) Stop() error {
// 停止事件总线
if err := s.eventBus.Stop(); err != nil {
return fmt.Errorf("停止事件总线失败: %v", err)
}
s.workerPool.Stop()
return nil
}
// PushOrder 推送订单到池中
func (s *OrderPoolServiceImpl) PushOrder(ctx context.Context, task card_sender.SendCardTask) error {
// 将用户订单推送到用户订单池
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, task.RoadUid, task.CardInfo.GetFaceTypeFloat(ctx))
if err := s.redisClient.RPush(ctx, customerKey, task); err != nil {
return fmt.Errorf("推送用户订单失败: %v", err)
}
// 发布订单创建事件
if err := s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeOrderCreated,
payload: task,
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单创建事件失败", zap.Error(err))
}
return nil
}
// startChannelHotUpdate 启动通道热更新
func (s *OrderPoolServiceImpl) startChannelHotUpdate(ctx context.Context) {
ticker := time.NewTicker(s.config.ChannelUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.updateChannels(ctx)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("更新通道失败", zap.Error(err))
}
}
}
}
// updateChannels 更新通道列表
func (s *OrderPoolServiceImpl) updateChannels(ctx context.Context) error {
s.channelsMu.RLock()
channels := make([]card_sender.SendCardTaskEnum, len(s.channels))
copy(channels, s.channels)
s.channelsMu.RUnlock()
// 在通道循环外部构建 codeSet
codeSet := make(map[string]struct{})
for _, channel := range channels {
currentRoadUids := s.getAllRoadUids(ctx, channel.String())
for _, roadUid := range currentRoadUids {
if !s.isRoadOpen(ctx, roadUid) {
continue
}
for _, faceValue := range s.getAllFaceValues(ctx, roadUid) {
if err := s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeChannelUpdated,
payload: channelUpdated{
RoadUid: roadUid,
FaceValue: faceValue,
Channel: channel,
},
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单创建事件失败", zap.Error(err))
}
lockKey := fmt.Sprintf("%s:%s:%.2f", s.config.ProduceOrderPoolKey, roadUid, faceValue)
codeSet[lockKey] = struct{}{}
func() {
// 使用互斥锁保护通道的初始化
lock, _ := s.poolLocks.LoadOrStore(lockKey, &sync.Mutex{})
mutex := lock.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
// 新增通道
if _, ok := s.poolLocks.Load(lockKey); !ok {
otelTrace.Logger.WithContext(ctx).Info("新增通道", zap.String("channel", channel.String()), zap.String("roadUid", roadUid), zap.Float64("faceValue", faceValue))
s.poolLocks.Store(lockKey, &sync.Mutex{})
s.workerPool.Submit(&InitOrderPoolTask{
service: s,
channel: channel,
roadUid: roadUid,
faceValue: faceValue,
targetSize: s.config.InitialPoolSize,
})
}
}()
}
}
}
// 删除已不存在的通道
s.poolLocks.Range(func(key, value any) bool {
lockKey := key.(string)
if _, ok := codeSet[lockKey]; !ok {
// 使用互斥锁保护通道的删除
lock, _ := s.poolLocks.Load(lockKey)
mutex := lock.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
s.poolLocks.Delete(lockKey)
// 清理 Redis 订单池
_ = s.redisClient.Delete(ctx, lockKey)
}
return true
})
return nil
}
// getAllRoadUids 获取所有可用通道
func (s *OrderPoolServiceImpl) getAllRoadUids(ctx context.Context, productCode string) []string {
roadInfoList := road.GetListByProductCode(ctx, productCode)
roadUIds := slice.Unique(slice.Map(roadInfoList, func(index int, item road.RoadInfo) string {
return item.RoadUid
}))
if len(roadUIds) == 0 {
return []string{}
}
merchantDeployList := merchant_deploy.GetListByRoadUidsAndStrategy(ctx, roadUIds, merchant_deploy.SUBMIT_STRATEGY_POOL)
return slice.Unique(slice.Map(merchantDeployList, func(index int, item merchant_deploy.MerchantDeployInfo) string {
return item.SingleRoadUid
}))
}
// 判断当前通道是否开启
func (s *OrderPoolServiceImpl) isRoadOpen(ctx context.Context, roadUid string) bool {
isOpen, err := road.IsRoadOpen(ctx, roadUid)
if err != nil {
return true
}
return isOpen
}
// getAllFaceValues 获取所有面值
func (s *OrderPoolServiceImpl) getAllFaceValues(ctx context.Context, roadUid string) []float64 {
merchantDeployList := merchant_deploy.GetListByRoadUidAndStrategy(ctx, roadUid, merchant_deploy.SUBMIT_STRATEGY_POOL)
faceValueList := slice.Map(merchantDeployList, func(index int, item merchant_deploy.MerchantDeployInfo) []float64 {
platformRate, err := item.GetPlatformRate(ctx)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取平台费率失败", zap.Error(err), zap.String("roadUid", roadUid))
return []float64{}
}
return slice.Map(platformRate, func(index int, item merchant_deploy.ProfitMargin) float64 {
return item.ShowLabel
})
})
newFaceValueList := make([]float64, 0)
for _, faceValue := range faceValueList {
newFaceValueList = append(newFaceValueList, faceValue...)
}
return slice.Unique(newFaceValueList)
}
// getOrderPoolKey 获取订单池的键
func (s *OrderPoolServiceImpl) getOrderPoolKey(roadUID string, faceValue float64) string {
return fmt.Sprintf("%s:%s:%.2f", s.config.ProduceOrderPoolKey, roadUID, faceValue)
}
// startOrderMatching 启动订单匹配处理
func (s *OrderPoolServiceImpl) startOrderMatching(ctx context.Context) {
ticker := time.NewTicker(s.config.OrderMinInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.matchOrders(ctx)
}
}
}
// matchOrdersForRoad 处理单个道路的订单匹配
func (s *OrderPoolServiceImpl) matchOrders(ctx context.Context) {
preTaskPoolCount := 0
// 为每个通道创建一个工作队列
for _, channel := range s.channels {
// 获取该通道的所有道路ID
roadUIds := s.getAllRoadUids(ctx, channel.String())
for _, roadUid := range roadUIds {
// 获取该道路的所有面值
faceValues := s.getAllFaceValues(ctx, roadUid)
for _, faceValue := range faceValues {
// 先检查
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, roadUid, faceValue)
customerLength, err := s.redisClient.LLen(ctx, customerKey)
if err != nil || customerLength == 0 {
continue
}
preTaskPoolCount += int(customerLength)
if preTaskPoolCount+10 > int(s.servicePool.WorkerCount()) {
s.servicePool.SetCap(int32(min(preTaskPoolCount+10, s.config.MaxFaceValueConcurrency)))
}
for range customerLength {
// 创建局部变量来捕获当前循环的值
currentChannel := channel
currentRoadUid := roadUid
currentFaceValue := faceValue
s.servicePool.CtxGo(ctx, func() {
s.matchOrdersForFaceValue(ctx, currentChannel, currentRoadUid, currentFaceValue)
})
}
}
}
if preTaskPoolCount == 0 {
s.servicePool.SetCap(30)
}
}
}
// matchOrdersForFaceValue 处理单个面值的订单匹配
func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, channel card_sender.SendCardTaskEnum, roadUid string, faceValue float64) {
ctx, span := otelTrace.CreateAsyncContext(ctx, "matchOrdersForFaceValue")
defer span.End()
// 获取用户订单池中的订单
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, roadUid, faceValue)
produceKey := s.getOrderPoolKey(roadUid, faceValue)
// // 使用互斥锁保护订单池操作
// lock, _ := s.poolLocks.LoadOrStore(produceKey, &sync.Mutex{})
// mutex := lock.(*sync.Mutex)
// mutex.Lock()
// defer mutex.Unlock()
produceLength, err := s.redisClient.LLen(ctx, produceKey)
if err != nil {
s.metrics.RecordError("redis")
otelTrace.Logger.WithContext(ctx).Error("获取用户订单池长度失败", zap.Error(err))
return
}
span.SetAttributes(attribute.Int64("produceLength", produceLength))
if produceLength == 0 {
return
}
// 获取生产订单
var produceOrderItem card_sender.OrderPoolItem
if err = s.redisClient.LPopUnmarshal(ctx, produceKey, &produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取生产订单失败", zap.Error(err))
s.metrics.RecordError("redis")
return
}
// 检查订单是否过期
if time.Since(produceOrderItem.CreateTime) > s.config.OrderInactiveTime {
otelTrace.Logger.WithContext(ctx).Info("生产订单已过期", zap.String("produceKey", produceKey),
zap.Any("produceOrderItem", produceOrderItem), zap.Time("createTime", produceOrderItem.CreateTime),
)
// 过期订单不重新入池,直接丢弃
return
}
sendCardTaskType := produceOrderItem.SendCardTaskType.GetSendCardTaskType()
if sendCardTaskType == nil {
otelTrace.Logger.WithContext(ctx).Error("无效的发卡任务类型")
// 原子性操作:重新放回订单
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
// 检查等待时间
waitingTime := sendCardTaskType.GetWaitingTime(ctx, produceOrderItem)
if waitingTime > 0 && time.Since(produceOrderItem.CreateTime) < time.Second*time.Duration(waitingTime) {
// 原子性操作:重新放回订单到队列尾部
if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("添加任务到队列失败", zap.Error(err))
}
return
}
// 检查用户订单池长度
customerLength, err := s.redisClient.LLen(ctx, customerKey)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取用户订单池长度失败", zap.Error(err))
// 发生错误时,将生产订单重新放回池中
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
if customerLength == 0 {
// 原子性操作:重新放回订单到队列头部
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
s.metrics.RecordError("redis")
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
otelTrace.Logger.WithContext(ctx).Info("生产订单等待时间", zap.String("produceKey", produceKey),
zap.Any("produceOrderItem", produceOrderItem), zap.Time("createTime", produceOrderItem.CreateTime),
zap.Duration("s.config.OrderWaitTime", s.config.OrderWaitTime),
)
// 获取用户订单
var task card_sender.SendCardTask
if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取用户订单失败", zap.Error(err))
// 获取用户订单失败时,将生产订单重新放回池中
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
span.SetAttributes(attribute.String("bankOrderId", task.LocalOrderID))
span.AddEvent("acquire user order")
if task.SendCardTaskType == "" {
task.SendCardTaskType = channel
}
// 绑定订单ID和卡片信息ID
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, produceOrderItem.OrderID)
if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
// 如果绑定失败,将订单重新放回池中
if err = s.redisClient.RPush(ctx, customerKey, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
}
if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
return
}
// 不需要查询,直接发布订单处理完成事件
if err = s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeOrderProcessed,
payload: produceOrderItem,
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单处理完成事件失败", zap.Error(err))
}
// 处理发卡任务,带重试机制
var retryCount int
for retryCount < s.config.MaxRetryCount {
err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, task)
if err == nil {
// 如果需要查询,发布查询事件
if task.NeedQuery {
queryEvent := &OrderQueryEvent{
BaseEvent: BaseEvent{
eventType: EventTypeOrderQuery,
timestamp: time.Now(),
},
RoadUid: roadUid,
OrderID: produceOrderItem.OrderID,
LocalOrderID: task.LocalOrderID,
Channel: task.SendCardTaskType,
QueryCount: 0,
LastQueryTime: time.Now(),
RemoteOrderID: produceOrderItem.RemoteOrderID,
}
if err = s.eventBus.Publish(ctx, queryEvent); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败", zap.Error(err))
}
return
}
return
}
if bCode, ok := berror.FromError(err); ok && bCode == httplib.SendRequestFailed {
retryCount++
otelTrace.Logger.WithContext(ctx).Warn("处理发卡任务失败,准备重试",
zap.Error(err),
zap.Int("retryCount", retryCount),
zap.String("roadUid", roadUid),
zap.Float64("faceValue", faceValue),
)
continue
}
if strings.Contains(err.Error(), "重新下单提交卡密") {
// 如果绑定失败,将订单重新放回池中
if err = s.redisClient.LPush(ctx, customerKey, task); err != nil {
_ = s.unboundPoolOrderId(ctx, task.SendCardTaskType, produceOrderItem, task)
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
}
return
}
break
}
errMsg := "绑定失败,绑定重试次数超过次数"
if err != nil {
errMsg = err.Error()
}
if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, task, errMsg); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err))
}
}
func (s *OrderPoolServiceImpl) GetLocalIdByOrderId(ctx context.Context, orderId string) (string, error) {
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderId)
var localId string
err := s.redisClient.Get(ctx, bindKey, &localId)
if err != nil {
return "", fmt.Errorf("获取本地订单ID失败: %vbindKey%s", err, bindKey)
}
return localId, nil
}
func (s *OrderPoolServiceImpl) unboundPoolOrderId(ctx context.Context, taskEnum card_sender.SendCardTaskEnum, orderItem card_sender.OrderPoolItem, task card_sender.SendCardTask) error {
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderItem.OrderID)
_ = taskEnum.GetSendCardTaskType().UnBindPoolOrderId(ctx, orderItem, task)
_ = s.redisClient.Delete(ctx, bindKey)
return nil
}
func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender.SendCardTask) error {
ctx, span := otelTrace.Span(ctx, "SubmitOrder", "OrderPoolServiceImpl.SubmitOrder",
trace.WithAttributes(
attribute.String("task", convertor.ToString(task)),
attribute.String("localOrderId", task.LocalOrderID),
),
)
defer span.End()
var err error
var orderItem card_sender.OrderPoolItem
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
span.AddEvent("attempt_start", trace.WithAttributes(attribute.Int("attempt", attempt+1)))
// 清空上一次的orderItem避免残留数据影响
orderItem = card_sender.OrderPoolItem{}
// 1. 创建订单
span.AddEvent("create order")
orderItem, err = task.SendCardTaskType.GetSendCardTaskType().
CreateOrder(ctx, task.RoadUid, task.CardInfo.GetFaceTypeFloat(ctx))
if err != nil {
span.AddEvent("create order failed")
otelTrace.Logger.WithContext(ctx).Error("创建订单失败",
zap.Error(err),
zap.Int("attempt", attempt+1),
zap.String("localOrderId", task.LocalOrderID))
return fmt.Errorf("创建订单失败: %v", err)
}
// 验证订单创建成功
if orderItem.OrderID == "" {
span.AddEvent("invalid order created")
return fmt.Errorf("创建的订单ID为空")
}
span.SetAttributes(attribute.String("orderId", orderItem.OrderID))
// 2. 绑定订单到数据库
span.AddEvent("bind order")
if err = task.SendCardTaskType.GetSendCardTaskType().BindPoolOrderId(ctx, orderItem, task); err != nil {
span.AddEvent("bind_order_failed")
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败",
zap.Error(err),
zap.String("orderId", orderItem.OrderID),
zap.Int("attempt", attempt+1))
// 清理已创建的订单资源
cleanupErr := s.cleanupCreatedOrder(ctx, task, orderItem, "bind_failed")
if cleanupErr != nil {
otelTrace.Logger.WithContext(ctx).Error("清理创建的订单资源失败",
zap.Error(cleanupErr),
zap.String("orderId", orderItem.OrderID))
}
return fmt.Errorf("绑定订单ID和卡片信息ID失败: %v", err)
}
// 3. 绑定到Redis
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderItem.OrderID)
if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
span.AddEvent("redis_bind_failed")
otelTrace.Logger.WithContext(ctx).Error("Redis绑定订单失败",
zap.Error(err),
zap.String("orderId", orderItem.OrderID),
zap.String("bindKey", bindKey),
zap.Int("attempt", attempt+1))
// 回滚数据库绑定操作
cleanupErr := s.unboundPoolOrderId(ctx, task.SendCardTaskType, orderItem, task)
if cleanupErr != nil {
otelTrace.Logger.WithContext(ctx).Error("回滚数据库绑定失败",
zap.Error(cleanupErr),
zap.String("orderId", orderItem.OrderID))
}
return fmt.Errorf("Redis绑定订单ID和卡片信息ID失败: %v", err)
}
// 4. 处理订单
span.AddEvent("handle order")
err = task.SendCardTaskType.GetSendCardTaskType().HandleSendCardTask(ctx, orderItem, task)
if err != nil {
span.AddEvent("handle_order_failed")
otelTrace.Logger.WithContext(ctx).Warn("订单处理失败",
zap.Error(err),
zap.String("orderId", orderItem.OrderID),
zap.String("remoteOrderId", orderItem.RemoteOrderID),
zap.Int("attempt", attempt+1))
// 清理失败订单的所有资源(无论什么错误都清理,避免资源泄漏)
cleanupErr := s.cleanupFailedOrder(ctx, task, orderItem, "handle_failed")
if cleanupErr != nil {
otelTrace.Logger.WithContext(ctx).Error("清理失败订单资源时出错",
zap.Error(cleanupErr),
zap.String("orderId", orderItem.OrderID))
}
// 只有标记为"重新下单"的错误才重试
if strings.Contains(err.Error(), "重新下单") && attempt < maxRetries-1 {
otelTrace.Logger.WithContext(ctx).Info("订单需要重新下单,准备重试",
zap.String("orderId", orderItem.OrderID),
zap.Int("attempt", attempt+1))
continue
}
return fmt.Errorf("提交订单失败: %v", err)
}
// 成功处理,退出重试循环
span.AddEvent("order_success")
otelTrace.Logger.WithContext(ctx).Info("订单提交成功",
zap.String("orderId", orderItem.OrderID),
zap.String("remoteOrderId", orderItem.RemoteOrderID),
zap.Int("finalAttempt", attempt+1))
break
}
// 最终状态检查
if pointer.IsNil(orderItem) || orderItem.OrderID == "" {
span.AddEvent("final_validation_failed")
return fmt.Errorf("订单处理失败")
}
// 5. 处理查询需求
if task.NeedQuery {
queryEvent := &OrderQueryEvent{
BaseEvent: BaseEvent{
eventType: EventTypeOrderQuery,
timestamp: time.Now(),
},
OrderID: orderItem.OrderID,
LocalOrderID: task.LocalOrderID,
Channel: task.SendCardTaskType,
RoadUid: task.RoadUid,
QueryCount: 0,
LastQueryTime: time.Now(),
RemoteOrderID: orderItem.RemoteOrderID,
}
if err := s.eventBus.Publish(ctx, queryEvent); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败",
zap.Error(err),
zap.String("orderId", orderItem.OrderID))
}
}
return nil
}
// cleanupCreatedOrder 清理刚创建但绑定失败的订单
func (s *OrderPoolServiceImpl) cleanupCreatedOrder(ctx context.Context, task card_sender.SendCardTask, orderItem card_sender.OrderPoolItem, reason string) error {
otelTrace.Logger.WithContext(ctx).Info("清理创建的订单",
zap.String("orderId", orderItem.OrderID),
zap.String("reason", reason))
// 清理数据库绑定
_ = task.SendCardTaskType.GetSendCardTaskType().UnBindPoolOrderId(ctx, orderItem, task)
return nil
}
// cleanupFailedOrder 清理处理失败的订单所有资源
func (s *OrderPoolServiceImpl) cleanupFailedOrder(ctx context.Context, task card_sender.SendCardTask, orderItem card_sender.OrderPoolItem, reason string) error {
otelTrace.Logger.WithContext(ctx).Info("清理失败的订单",
zap.String("orderId", orderItem.OrderID),
zap.String("reason", reason))
// 使用现有的清理方法
return s.unboundPoolOrderId(ctx, task.SendCardTaskType, orderItem, task)
}