Files
kami_gateway/internal/service/supplier/third_party/pool/service.go
danial a1db6b764c fix(card_sender): 优化飞鱼查询响应日志输出
- 精简飞鱼查询返回日志内容,移除冗余字段
- 保留原始响应字符串以便排查问题
- 保持对频繁操作消息的判定逻辑不变
- 避免过多日志数据导致日志膨胀

perf(deploy): 优化代理请求数量及协程池容量

- 将Dockerfile中proxyUrl请求数从2改为1,减少请求量
- 扫描控制器中延迟处理、提交限制和提交池容量分别从20、20、300增至50、50、500
- 绑定订单逻辑中发送卡片任务循环次数由10调整为3,优化性能
- 移除多余的错误日志打印,减少冗余日志输出

fix(pool): 修复飞鱼查询日志字段输出问题

- 将日志字段 "respRawData" 类型调整为字符串类型,避免类型不匹配
- 修改日志中布尔字段名为 "operation",更准确表达含义
- 保持日志内容详尽,方便后续问题排查
- 更新 Go 版本至 1.25.5 以保持依赖更新
2025-12-10 22:14:52 +08:00

603 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pool
import (
"context"
"fmt"
"github.com/duke-git/lancet/v2/convertor"
"strings"
"sync"
"time"
"gateway/internal/cache"
"gateway/internal/models/merchant_deploy"
"gateway/internal/models/road"
"gateway/internal/otelTrace"
"gateway/internal/service/supplier/third_party/pool/card_sender"
"gateway/internal/utils"
"github.com/beego/beego/v2/client/httplib"
"github.com/beego/beego/v2/core/berror"
"github.com/bytedance/gopkg/util/gopool"
"github.com/duke-git/lancet/v2/slice"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// OrderPoolService 订单池服务接口
type OrderPoolService interface {
// Start 启动服务
Start(ctx context.Context) error
// Stop 停止服务
Stop() error
// PushOrder 推送订单
PushOrder(ctx context.Context, task card_sender.SendCardTask) error
// GetLocalIdByOrderId 获取本地订单ID
GetLocalIdByOrderId(ctx context.Context, orderId string) (string, error)
// SubmitOrder 提交订单
SubmitOrder(ctx context.Context, task card_sender.SendCardTask) error
}
// OrderPoolServiceImpl 订单池服务实现
type OrderPoolServiceImpl struct {
config *Config
redisClient *cache.RedisClient
workerPool *WorkerPool
eventBus EventBus
metrics *Metrics
poolLocks sync.Map
channels []card_sender.SendCardTaskEnum
poolSizes sync.Map // 存储每个通道的当前池大小
channelsMu sync.RWMutex // 保护channels切片的并发访问
taskPool gopool.Pool
servicePool gopool.Pool
}
// NewOrderPoolService 创建新的订单池服务
func NewOrderPoolService(config *Config, redisClient *cache.RedisClient) OrderPoolService {
metrics := NewMetrics()
service := &OrderPoolServiceImpl{
config: config,
redisClient: redisClient,
workerPool: NewWorkerPool(config.WorkerCount, metrics),
eventBus: NewRedisEventBus(redisClient),
metrics: metrics,
channels: card_sender.GetAllSendCardTaskType(),
taskPool: gopool.NewPool(utils.GenerateId(), 10, gopool.NewConfig()),
servicePool: gopool.NewPool(utils.GenerateId(), 30, gopool.NewConfig()),
}
// 注册事件处理器
service.registerEventHandlers(context.Background())
return service
}
// registerEventHandlers 注册事件处理器
func (s *OrderPoolServiceImpl) registerEventHandlers(ctx context.Context) {
// 注册订单创建事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderCreated, NewOrderCreatedHandler(s))
// 注册订单处理事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderProcessed, NewOrderProcessedHandler(s))
// 注册订单失败事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderFailed, NewOrderFailedHandler(s))
// 注册订单池更新事件处理器
s.eventBus.Subscribe(ctx, EventTypePoolUpdated, NewPoolUpdatedHandler(s))
// 注册通道更新事件处理器
s.eventBus.Subscribe(ctx, EventTypeChannelUpdated, NewChannelUpdatedHandler(s))
// 注册订单过期时间处理器
s.eventBus.Subscribe(ctx, EventTypeOrderExpired, NewOrderExpiredHandler(s))
// 注册订单查询事件处理器
s.eventBus.Subscribe(ctx, EventTypeOrderQuery, NewOrderQueryHandler(s))
}
// Start 启动服务
func (s *OrderPoolServiceImpl) Start(ctx context.Context) error {
// 启动事件总线
if err := s.eventBus.Start(ctx); err != nil {
return fmt.Errorf("启动事件总线失败: %v", err)
}
s.workerPool.Start()
// 热更新通道
go s.startChannelHotUpdate(ctx)
// 匹配订单
go s.startOrderMatching(ctx)
return nil
}
// Stop 停止服务
func (s *OrderPoolServiceImpl) Stop() error {
// 停止事件总线
if err := s.eventBus.Stop(); err != nil {
return fmt.Errorf("停止事件总线失败: %v", err)
}
s.workerPool.Stop()
return nil
}
// PushOrder 推送订单到池中
func (s *OrderPoolServiceImpl) PushOrder(ctx context.Context, task card_sender.SendCardTask) error {
// 将用户订单推送到用户订单池
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, task.RoadUid, task.CardInfo.GetFaceTypeFloat(ctx))
if err := s.redisClient.RPush(ctx, customerKey, task); err != nil {
return fmt.Errorf("推送用户订单失败: %v", err)
}
// 发布订单创建事件
if err := s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeOrderCreated,
payload: task,
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单创建事件失败", zap.Error(err))
}
return nil
}
// startChannelHotUpdate 启动通道热更新
func (s *OrderPoolServiceImpl) startChannelHotUpdate(ctx context.Context) {
ticker := time.NewTicker(s.config.ChannelUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.updateChannels(ctx)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("更新通道失败", zap.Error(err))
}
}
}
}
// updateChannels 更新通道列表
func (s *OrderPoolServiceImpl) updateChannels(ctx context.Context) error {
s.channelsMu.RLock()
channels := make([]card_sender.SendCardTaskEnum, len(s.channels))
copy(channels, s.channels)
s.channelsMu.RUnlock()
// 在通道循环外部构建 codeSet
codeSet := make(map[string]struct{})
for _, channel := range channels {
currentRoadUids := s.getAllRoadUids(ctx, channel.String())
for _, roadUid := range currentRoadUids {
if !s.isRoadOpen(ctx, roadUid) {
continue
}
for _, faceValue := range s.getAllFaceValues(ctx, roadUid) {
if err := s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeChannelUpdated,
payload: channelUpdated{
RoadUid: roadUid,
FaceValue: faceValue,
Channel: channel,
},
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单创建事件失败", zap.Error(err))
}
lockKey := fmt.Sprintf("%s:%s:%.2f", s.config.ProduceOrderPoolKey, roadUid, faceValue)
codeSet[lockKey] = struct{}{}
func() {
// 使用互斥锁保护通道的初始化
lock, _ := s.poolLocks.LoadOrStore(lockKey, &sync.Mutex{})
mutex := lock.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
// 新增通道
if _, ok := s.poolLocks.Load(lockKey); !ok {
otelTrace.Logger.WithContext(ctx).Info("新增通道", zap.String("channel", channel.String()), zap.String("roadUid", roadUid), zap.Float64("faceValue", faceValue))
s.poolLocks.Store(lockKey, &sync.Mutex{})
s.workerPool.Submit(&InitOrderPoolTask{
service: s,
channel: channel,
roadUid: roadUid,
faceValue: faceValue,
targetSize: s.config.InitialPoolSize,
})
}
}()
}
}
}
// 删除已不存在的通道
s.poolLocks.Range(func(key, value any) bool {
lockKey := key.(string)
if _, ok := codeSet[lockKey]; !ok {
// 使用互斥锁保护通道的删除
lock, _ := s.poolLocks.Load(lockKey)
mutex := lock.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
s.poolLocks.Delete(lockKey)
// 清理 Redis 订单池
_ = s.redisClient.Delete(ctx, lockKey)
}
return true
})
return nil
}
// getAllRoadUids 获取所有可用通道
func (s *OrderPoolServiceImpl) getAllRoadUids(ctx context.Context, productCode string) []string {
roadInfoList := road.GetListByProductCode(ctx, productCode)
roadUIds := slice.Unique(slice.Map(roadInfoList, func(index int, item road.RoadInfo) string {
return item.RoadUid
}))
if len(roadUIds) == 0 {
return []string{}
}
merchantDeployList := merchant_deploy.GetListByRoadUidsAndStrategy(ctx, roadUIds, merchant_deploy.SUBMIT_STRATEGY_POOL)
return slice.Unique(slice.Map(merchantDeployList, func(index int, item merchant_deploy.MerchantDeployInfo) string {
return item.SingleRoadUid
}))
}
// 判断当前通道是否开启
func (s *OrderPoolServiceImpl) isRoadOpen(ctx context.Context, roadUid string) bool {
isOpen, err := road.IsRoadOpen(ctx, roadUid)
if err != nil {
return true
}
return isOpen
}
// getAllFaceValues 获取所有面值
func (s *OrderPoolServiceImpl) getAllFaceValues(ctx context.Context, roadUid string) []float64 {
merchantDeployList := merchant_deploy.GetListByRoadUidAndStrategy(ctx, roadUid, merchant_deploy.SUBMIT_STRATEGY_POOL)
faceValueList := slice.Map(merchantDeployList, func(index int, item merchant_deploy.MerchantDeployInfo) []float64 {
platformRate, err := item.GetPlatformRate(ctx)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取平台费率失败", zap.Error(err), zap.String("roadUid", roadUid))
return []float64{}
}
return slice.Map(platformRate, func(index int, item merchant_deploy.ProfitMargin) float64 {
return item.ShowLabel
})
})
newFaceValueList := make([]float64, 0)
for _, faceValue := range faceValueList {
newFaceValueList = append(newFaceValueList, faceValue...)
}
return slice.Unique(newFaceValueList)
}
// getOrderPoolKey 获取订单池的键
func (s *OrderPoolServiceImpl) getOrderPoolKey(roadUID string, faceValue float64) string {
return fmt.Sprintf("%s:%s:%.2f", s.config.ProduceOrderPoolKey, roadUID, faceValue)
}
// startOrderMatching 启动订单匹配处理
func (s *OrderPoolServiceImpl) startOrderMatching(ctx context.Context) {
ticker := time.NewTicker(s.config.OrderMinInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.matchOrders(ctx)
}
}
}
// matchOrdersForRoad 处理单个道路的订单匹配
func (s *OrderPoolServiceImpl) matchOrders(ctx context.Context) {
preTaskPoolCount := 0
// 为每个通道创建一个工作队列
for _, channel := range s.channels {
// 获取该通道的所有道路ID
roadUIds := s.getAllRoadUids(ctx, channel.String())
for _, roadUid := range roadUIds {
// 获取该道路的所有面值
faceValues := s.getAllFaceValues(ctx, roadUid)
for _, faceValue := range faceValues {
// 先检查
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, roadUid, faceValue)
customerLength, err := s.redisClient.LLen(ctx, customerKey)
if err != nil || customerLength == 0 {
continue
}
preTaskPoolCount += int(customerLength)
if preTaskPoolCount+10 > int(s.servicePool.WorkerCount()) {
s.servicePool.SetCap(int32(min(preTaskPoolCount+10, s.config.MaxFaceValueConcurrency)))
}
for range customerLength {
// 创建局部变量来捕获当前循环的值
currentChannel := channel
currentRoadUid := roadUid
currentFaceValue := faceValue
s.servicePool.CtxGo(ctx, func() {
s.matchOrdersForFaceValue(ctx, currentChannel, currentRoadUid, currentFaceValue)
})
}
}
}
if preTaskPoolCount == 0 {
s.servicePool.SetCap(30)
}
}
}
// matchOrdersForFaceValue 处理单个面值的订单匹配
func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, channel card_sender.SendCardTaskEnum, roadUid string, faceValue float64) {
ctx, span := otelTrace.CreateAsyncContext(ctx, "matchOrdersForFaceValue")
defer span.End()
// 获取用户订单池中的订单
customerKey := fmt.Sprintf("%s:%s:%.2f", s.config.CustomerOrderPoolKey, roadUid, faceValue)
produceKey := s.getOrderPoolKey(roadUid, faceValue)
// // 使用互斥锁保护订单池操作
// lock, _ := s.poolLocks.LoadOrStore(produceKey, &sync.Mutex{})
// mutex := lock.(*sync.Mutex)
// mutex.Lock()
// defer mutex.Unlock()
produceLength, err := s.redisClient.LLen(ctx, produceKey)
if err != nil {
s.metrics.RecordError("redis")
otelTrace.Logger.WithContext(ctx).Error("获取用户订单池长度失败", zap.Error(err))
return
}
span.SetAttributes(attribute.Int64("produceLength", produceLength))
if produceLength == 0 {
return
}
// 获取生产订单
var produceOrderItem card_sender.OrderPoolItem
if err = s.redisClient.LPopUnmarshal(ctx, produceKey, &produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取生产订单失败", zap.Error(err))
s.metrics.RecordError("redis")
return
}
// 检查订单是否过期
if time.Since(produceOrderItem.CreateTime) > s.config.OrderInactiveTime {
otelTrace.Logger.WithContext(ctx).Info("生产订单已过期", zap.String("produceKey", produceKey),
zap.Any("produceOrderItem", produceOrderItem), zap.Time("createTime", produceOrderItem.CreateTime),
)
// 过期订单不重新入池,直接丢弃
return
}
sendCardTaskType := produceOrderItem.SendCardTaskType.GetSendCardTaskType()
if sendCardTaskType == nil {
otelTrace.Logger.WithContext(ctx).Error("无效的发卡任务类型")
// 原子性操作:重新放回订单
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
// 检查等待时间
waitingTime := sendCardTaskType.GetWaitingTime(ctx, produceOrderItem)
if waitingTime > 0 && time.Since(produceOrderItem.CreateTime) < time.Second*time.Duration(waitingTime) {
// 原子性操作:重新放回订单到队列尾部
if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("添加任务到队列失败", zap.Error(err))
}
return
}
// 检查用户订单池长度
customerLength, err := s.redisClient.LLen(ctx, customerKey)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取用户订单池长度失败", zap.Error(err))
// 发生错误时,将生产订单重新放回池中
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
if customerLength == 0 {
// 原子性操作:重新放回订单到队列头部
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
s.metrics.RecordError("redis")
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
otelTrace.Logger.WithContext(ctx).Info("生产订单等待时间", zap.String("produceKey", produceKey),
zap.Any("produceOrderItem", produceOrderItem), zap.Time("createTime", produceOrderItem.CreateTime),
zap.Duration("s.config.OrderWaitTime", s.config.OrderWaitTime),
)
// 获取用户订单
var task card_sender.SendCardTask
if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取用户订单失败", zap.Error(err))
// 获取用户订单失败时,将生产订单重新放回池中
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
span.SetAttributes(attribute.String("bankOrderId", task.LocalOrderID))
span.AddEvent("acquire user order")
if task.SendCardTaskType == "" {
task.SendCardTaskType = channel
}
// 绑定订单ID和卡片信息ID
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, produceOrderItem.OrderID)
if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
// 如果绑定失败,将订单重新放回池中
if err = s.redisClient.RPush(ctx, customerKey, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
}
if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil {
otelTrace.Logger.WithContext(ctx).Error("生产订单重新入池失败", zap.Error(err))
}
return
}
if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
return
}
// 不需要查询,直接发布订单处理完成事件
if err = s.eventBus.Publish(ctx, &BaseEvent{
eventType: EventTypeOrderProcessed,
payload: produceOrderItem,
timestamp: time.Now(),
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单处理完成事件失败", zap.Error(err))
}
// 处理发卡任务,带重试机制
var retryCount int
for retryCount < s.config.MaxRetryCount {
err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, task)
if err == nil {
// 如果需要查询,发布查询事件
if task.NeedQuery {
queryEvent := &OrderQueryEvent{
BaseEvent: BaseEvent{
eventType: EventTypeOrderQuery,
timestamp: time.Now(),
},
RoadUid: roadUid,
OrderID: produceOrderItem.OrderID,
LocalOrderID: task.LocalOrderID,
Channel: task.SendCardTaskType,
QueryCount: 0,
LastQueryTime: time.Now(),
RemoteOrderID: produceOrderItem.RemoteOrderID,
}
if err = s.eventBus.Publish(ctx, queryEvent); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败", zap.Error(err))
}
return
}
return
}
if bCode, ok := berror.FromError(err); ok && bCode == httplib.SendRequestFailed {
retryCount++
otelTrace.Logger.WithContext(ctx).Warn("处理发卡任务失败,准备重试",
zap.Error(err),
zap.Int("retryCount", retryCount),
zap.String("roadUid", roadUid),
zap.Float64("faceValue", faceValue),
)
continue
}
if strings.Contains(err.Error(), "重新下单提交卡密") {
// 如果绑定失败,将订单重新放回池中
if err = s.redisClient.LPush(ctx, customerKey, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
}
return
}
break
}
errMsg := "绑定失败,绑定重试次数超过次数"
if err != nil {
errMsg = err.Error()
}
if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, task, errMsg); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err))
}
}
func (s *OrderPoolServiceImpl) GetLocalIdByOrderId(ctx context.Context, orderId string) (string, error) {
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderId)
var localId string
err := s.redisClient.Get(ctx, bindKey, &localId)
if err != nil {
return "", fmt.Errorf("获取本地订单ID失败: %vbindKey%s", err, bindKey)
}
return localId, nil
}
func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender.SendCardTask) error {
ctx, span := otelTrace.Span(ctx, "SubmitOrder", "OrderPoolServiceImpl.SubmitOrder",
trace.WithAttributes(attribute.String("task", convertor.ToString(task))),
)
defer span.End()
var orderItem card_sender.OrderPoolItem
for range 3 {
orderItem, err := task.SendCardTaskType.GetSendCardTaskType().
CreateOrder(ctx, task.RoadUid, task.CardInfo.GetFaceTypeFloat(ctx))
if err != nil {
return fmt.Errorf("创建订单失败: %v", err)
}
if err = task.SendCardTaskType.GetSendCardTaskType().BindPoolOrderId(ctx, orderItem, task); err != nil {
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
return fmt.Errorf("绑定订单ID和卡片信息ID失败: %v", err)
}
// 绑定订单ID和卡片信息ID
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderItem.OrderID)
if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
return fmt.Errorf("绑定订单ID和卡片信息ID失败: %v", err)
}
err = task.SendCardTaskType.GetSendCardTaskType().HandleSendCardTask(ctx, orderItem, task)
if err != nil {
if strings.Contains(err.Error(), "重新下单") {
continue
}
return fmt.Errorf("提交订单失败: %v", err)
}
}
if task.NeedQuery {
queryEvent := &OrderQueryEvent{
BaseEvent: BaseEvent{
eventType: EventTypeOrderQuery,
timestamp: time.Now(),
},
OrderID: orderItem.OrderID,
LocalOrderID: task.LocalOrderID,
Channel: task.SendCardTaskType,
RoadUid: task.RoadUid,
QueryCount: 0,
LastQueryTime: time.Now(),
RemoteOrderID: orderItem.RemoteOrderID,
}
if err := s.eventBus.Publish(ctx, queryEvent); err != nil {
otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败", zap.Error(err))
}
}
return nil
}