diff --git a/internal/service/supplier/third_party/pool/card_sender/nuclear.go b/internal/service/supplier/third_party/pool/card_sender/nuclear.go index 532e07f..f6cde2d 100644 --- a/internal/service/supplier/third_party/pool/card_sender/nuclear.go +++ b/internal/service/supplier/third_party/pool/card_sender/nuclear.go @@ -266,7 +266,7 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde time.Sleep(time.Second*30 - time.Since(orderItem.CreateTime)) } - for range 5 { + for range 3 { needChangeProxyId := utils.GenerateId() webClient := resty.New().SetTimeout(10 * time.Second).SetHeaders(map[string]string{ "user-agent": useragent.GetUserAgentByPlatform(useragent.PlatformPhone), diff --git a/internal/service/supplier/third_party/pool/card_sender/wtr.go b/internal/service/supplier/third_party/pool/card_sender/wtr.go index b7cc081..6c1be72 100644 --- a/internal/service/supplier/third_party/pool/card_sender/wtr.go +++ b/internal/service/supplier/third_party/pool/card_sender/wtr.go @@ -49,12 +49,16 @@ func (s *SendCardTaskTypeWtr) CreateOrder(ctx context.Context, roadUid string, f "nonceStr": utils.GenerateId(), } - formData["sign"] = (&SendCardTaskTypeWtr{}).sign(ctx, formData, gojson.Json(roadInfo.Params).Get("key").Tostring()) + formData["sign"] = s.sign(ctx, formData, gojson.Json(roadInfo.Params).Get("key").Tostring()) webClient := resty.New().EnableTrace().SetTimeout(time.Second * 5).SetRetryCount(1). SetFormData(formData) otelresty.TraceClient(webClient) + createdUrl := "https://api.wtrpay.xyz/pay" + if gojson.Json(roadInfo.Params).Get("createdurl").Tostring() != "" { + createdUrl = gojson.Json(roadInfo.Params).Get("createdUrl").Tostring() + } - response, err := webClient.R().SetContext(ctx).Post("https://api.wtrpay.xyz/pay") + response, err := webClient.R().SetContext(ctx).Post(createdUrl) if err != nil { otelTrace.Logger.WithContext(ctx).Error("下单失败", zap.Error(err)) diff --git a/internal/service/supplier/third_party/pool/card_sender/wtr_test.go b/internal/service/supplier/third_party/pool/card_sender/wtr_test.go index da98a4a..99277d6 100644 --- a/internal/service/supplier/third_party/pool/card_sender/wtr_test.go +++ b/internal/service/supplier/third_party/pool/card_sender/wtr_test.go @@ -16,7 +16,7 @@ func TestSendCardTaskTypeWtr_CreateOrder(t *testing.T) { "merId": "20250967", "orderId": utils.GenerateId()[:16], "orderAmt": "30", - "channel": "923", + "channel": "968", "desc": "这是个标题", "ip": utils.GenerateIpv4(), "notifyUrl": "https://www.showdoc.com.cn/api/notify", @@ -26,7 +26,7 @@ func TestSendCardTaskTypeWtr_CreateOrder(t *testing.T) { formData["sign"] = (&SendCardTaskTypeUp{}).sign(t.Context(), formData, "otLhjzAKvIYOQisJgHxEdVFZaTUclmMb") response, err := resty.New().EnableTrace().SetTimeout(time.Second * 5).SetRetryCount(3). - SetFormData(formData).R().Post("https://alsopay.wtrpay.xyz/pay") + SetFormData(formData).R().Post("http://47.76.69.194:26895/api/pay") responseStruct := struct { Code int64 `json:"code"` Msg string `json:"msg"` diff --git a/internal/service/supplier/third_party/pool/config.go b/internal/service/supplier/third_party/pool/config.go index 7ba4b73..6f7e5ee 100644 --- a/internal/service/supplier/third_party/pool/config.go +++ b/internal/service/supplier/third_party/pool/config.go @@ -43,7 +43,7 @@ func DefaultConfig() *Config { MaxFaceValueConcurrency: 100, // 每个面值最多20个并发 MatchOrderTimeout: 2 * time.Minute, // 订单匹配超时时间30秒 - MaxRetryCount: 3, // 最大重试3次 + MaxRetryCount: 5, // 最大重试5次 RetryInterval: 1 * time.Second, // 重试间隔1秒 } } diff --git a/internal/service/supplier/third_party/pool/service.go b/internal/service/supplier/third_party/pool/service.go index cb03f3d..f423ec2 100644 --- a/internal/service/supplier/third_party/pool/service.go +++ b/internal/service/supplier/third_party/pool/service.go @@ -449,8 +449,8 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan ) // 获取用户订单 - var task card_sender.SendCardTask - if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &task); err != nil { + var customerTask card_sender.SendCardTask + if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &customerTask); err != nil { otelTrace.Logger.WithContext(ctx).Error("获取用户订单失败", zap.Error(err)) // 获取用户订单失败时,将生产订单重新放回池中 if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil { @@ -459,18 +459,28 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan return } - span.SetAttributes(attribute.String("bankOrderId", task.LocalOrderID)) + span.SetAttributes(attribute.String("bankOrderId", customerTask.LocalOrderID)) span.AddEvent("acquire user order") - if task.SendCardTaskType == "" { - task.SendCardTaskType = channel + if customerTask.SendCardTaskType == "" { + customerTask.SendCardTaskType = channel + } + + if customerTask.DispatchCount > s.config.MaxRetryCount { + otelTrace.Logger.WithContext(ctx).Error("用户订单已重试次数过多", zap.String("customerKey", customerKey), + zap.Any("customerTask", customerTask), zap.Int("DispatchCount", customerTask.DispatchCount), + ) + if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, customerTask, "提交失败,重试失败"); err != nil { + otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err)) + } + return } // 绑定订单ID和卡片信息ID bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, produceOrderItem.OrderID) - if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil { + if err = s.redisClient.Set(ctx, bindKey, customerTask.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil { otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err)) // 如果绑定失败,将订单重新放回池中 - if err = s.redisClient.RPush(ctx, customerKey, task); err != nil { + if err = s.redisClient.RPush(ctx, customerKey, customerTask); err != nil { otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err)) } if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil { @@ -479,7 +489,7 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan return } - if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, task); err != nil { + if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, customerTask); err != nil { otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err)) return } @@ -496,11 +506,11 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan // 处理发卡任务,带重试机制 var retryCount int for retryCount < s.config.MaxRetryCount { - err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, task) + err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, customerTask) if err == nil { // 如果需要查询,发布查询事件 - if task.NeedQuery { + if customerTask.NeedQuery { queryEvent := &OrderQueryEvent{ BaseEvent: BaseEvent{ eventType: EventTypeOrderQuery, @@ -508,8 +518,8 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan }, RoadUid: roadUid, OrderID: produceOrderItem.OrderID, - LocalOrderID: task.LocalOrderID, - Channel: task.SendCardTaskType, + LocalOrderID: customerTask.LocalOrderID, + Channel: customerTask.SendCardTaskType, QueryCount: 0, LastQueryTime: time.Now(), RemoteOrderID: produceOrderItem.RemoteOrderID, @@ -533,10 +543,11 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan continue } - if strings.Contains(err.Error(), "重新下单提交卡密") { + if strings.Contains(err.Error(), "重新下单") { // 如果绑定失败,将订单重新放回池中 - if err = s.redisClient.LPush(ctx, customerKey, task); err != nil { - _ = s.unboundPoolOrderId(ctx, task.SendCardTaskType, produceOrderItem, task) + customerTask.DispatchCount += 1 + if err = s.redisClient.LPush(ctx, customerKey, customerTask); err != nil { + _ = s.unboundPoolOrderId(ctx, customerTask.SendCardTaskType, produceOrderItem, customerTask) otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err)) } return @@ -547,7 +558,7 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan if err != nil { errMsg = err.Error() } - if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, task, errMsg); err != nil { + if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, customerTask, errMsg); err != nil { otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err)) } } @@ -580,7 +591,7 @@ func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender var err error var orderItem card_sender.OrderPoolItem - maxRetries := 3 + maxRetries := 5 for attempt := 0; attempt < maxRetries; attempt++ { span.AddEvent("attempt_start", trace.WithAttributes(attribute.Int("attempt", attempt+1))) diff --git a/internal/service/supplier/third_party/wtr.go b/internal/service/supplier/third_party/wtr.go index 63178bb..a63de07 100644 --- a/internal/service/supplier/third_party/wtr.go +++ b/internal/service/supplier/third_party/wtr.go @@ -135,7 +135,7 @@ func (f *WtrImpl) PayNotify() { f.Ctx.WriteString("fail") return } - otelTrace.Logger.WithContext(ctx).Info("wtr回调", zap.Any("resp", resp)) + otelTrace.Logger.WithContext(ctx).Info("回调", zap.Any("resp", resp)) localId, err := orderPoolService.GetLocalIdByOrderId(ctx, resp.OrderId) var orderInfo order.OrderInfo if err != nil { diff --git a/main.go b/main.go index b77d463..f107328 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,6 @@ import ( "github.com/beego/beego/v2/server/web" _ "github.com/go-sql-driver/mysql" "log" - "net/http" _ "net/http/pprof" "time" ) @@ -30,9 +29,6 @@ func main() { log.Printf("初始化代理池失败: %v", err) return } - go func() { - log.Println(http.ListenAndServe(":6060", nil)) - }() // 初始化 OpenTelemetry cleanup1, cleanup2, cleanup3 := otelTrace.InitTracer() defer func() {