fix(pool): 优化重试次数及订单绑定逻辑
- 将最大重试次数从3次调整为5次,提高任务处理容错能力 - 限制核弹卡发送任务循环次数从5次减少到3次,避免过度重试 - 增加用户订单重试次数计数,超过最大重试立即返回失败 - 修正redis订单数据结构变量名,确保数据一致性 - 优化绑定订单ID和卡信息ID失败后的重试和日志处理 - 支持wtr支付接口动态设置请求URL,增加灵活性 - 移除main.go中http性能分析监听,减少无用服务运行 - 修改wtr支付测试用例渠道号及接口地址,便于测试调试 - 统一日志打印规范,提升调试体验
This commit is contained in:
@@ -266,7 +266,7 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
|
||||
time.Sleep(time.Second*30 - time.Since(orderItem.CreateTime))
|
||||
}
|
||||
|
||||
for range 5 {
|
||||
for range 3 {
|
||||
needChangeProxyId := utils.GenerateId()
|
||||
webClient := resty.New().SetTimeout(10 * time.Second).SetHeaders(map[string]string{
|
||||
"user-agent": useragent.GetUserAgentByPlatform(useragent.PlatformPhone),
|
||||
|
||||
@@ -49,12 +49,16 @@ func (s *SendCardTaskTypeWtr) CreateOrder(ctx context.Context, roadUid string, f
|
||||
"nonceStr": utils.GenerateId(),
|
||||
}
|
||||
|
||||
formData["sign"] = (&SendCardTaskTypeWtr{}).sign(ctx, formData, gojson.Json(roadInfo.Params).Get("key").Tostring())
|
||||
formData["sign"] = s.sign(ctx, formData, gojson.Json(roadInfo.Params).Get("key").Tostring())
|
||||
webClient := resty.New().EnableTrace().SetTimeout(time.Second * 5).SetRetryCount(1).
|
||||
SetFormData(formData)
|
||||
otelresty.TraceClient(webClient)
|
||||
createdUrl := "https://api.wtrpay.xyz/pay"
|
||||
if gojson.Json(roadInfo.Params).Get("createdurl").Tostring() != "" {
|
||||
createdUrl = gojson.Json(roadInfo.Params).Get("createdUrl").Tostring()
|
||||
}
|
||||
|
||||
response, err := webClient.R().SetContext(ctx).Post("https://api.wtrpay.xyz/pay")
|
||||
response, err := webClient.R().SetContext(ctx).Post(createdUrl)
|
||||
|
||||
if err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("下单失败", zap.Error(err))
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestSendCardTaskTypeWtr_CreateOrder(t *testing.T) {
|
||||
"merId": "20250967",
|
||||
"orderId": utils.GenerateId()[:16],
|
||||
"orderAmt": "30",
|
||||
"channel": "923",
|
||||
"channel": "968",
|
||||
"desc": "这是个标题",
|
||||
"ip": utils.GenerateIpv4(),
|
||||
"notifyUrl": "https://www.showdoc.com.cn/api/notify",
|
||||
@@ -26,7 +26,7 @@ func TestSendCardTaskTypeWtr_CreateOrder(t *testing.T) {
|
||||
|
||||
formData["sign"] = (&SendCardTaskTypeUp{}).sign(t.Context(), formData, "otLhjzAKvIYOQisJgHxEdVFZaTUclmMb")
|
||||
response, err := resty.New().EnableTrace().SetTimeout(time.Second * 5).SetRetryCount(3).
|
||||
SetFormData(formData).R().Post("https://alsopay.wtrpay.xyz/pay")
|
||||
SetFormData(formData).R().Post("http://47.76.69.194:26895/api/pay")
|
||||
responseStruct := struct {
|
||||
Code int64 `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
|
||||
@@ -43,7 +43,7 @@ func DefaultConfig() *Config {
|
||||
|
||||
MaxFaceValueConcurrency: 100, // 每个面值最多20个并发
|
||||
MatchOrderTimeout: 2 * time.Minute, // 订单匹配超时时间30秒
|
||||
MaxRetryCount: 3, // 最大重试3次
|
||||
MaxRetryCount: 5, // 最大重试5次
|
||||
RetryInterval: 1 * time.Second, // 重试间隔1秒
|
||||
}
|
||||
}
|
||||
|
||||
@@ -449,8 +449,8 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
)
|
||||
|
||||
// 获取用户订单
|
||||
var task card_sender.SendCardTask
|
||||
if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &task); err != nil {
|
||||
var customerTask card_sender.SendCardTask
|
||||
if err = s.redisClient.LPopUnmarshal(ctx, customerKey, &customerTask); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("获取用户订单失败", zap.Error(err))
|
||||
// 获取用户订单失败时,将生产订单重新放回池中
|
||||
if err = s.redisClient.LPush(ctx, produceKey, produceOrderItem); err != nil {
|
||||
@@ -459,18 +459,28 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
return
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.String("bankOrderId", task.LocalOrderID))
|
||||
span.SetAttributes(attribute.String("bankOrderId", customerTask.LocalOrderID))
|
||||
span.AddEvent("acquire user order")
|
||||
if task.SendCardTaskType == "" {
|
||||
task.SendCardTaskType = channel
|
||||
if customerTask.SendCardTaskType == "" {
|
||||
customerTask.SendCardTaskType = channel
|
||||
}
|
||||
|
||||
if customerTask.DispatchCount > s.config.MaxRetryCount {
|
||||
otelTrace.Logger.WithContext(ctx).Error("用户订单已重试次数过多", zap.String("customerKey", customerKey),
|
||||
zap.Any("customerTask", customerTask), zap.Int("DispatchCount", customerTask.DispatchCount),
|
||||
)
|
||||
if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, customerTask, "提交失败,重试失败"); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 绑定订单ID和卡片信息ID
|
||||
bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, produceOrderItem.OrderID)
|
||||
if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
|
||||
if err = s.redisClient.Set(ctx, bindKey, customerTask.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
|
||||
// 如果绑定失败,将订单重新放回池中
|
||||
if err = s.redisClient.RPush(ctx, customerKey, task); err != nil {
|
||||
if err = s.redisClient.RPush(ctx, customerKey, customerTask); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
|
||||
}
|
||||
if err = s.redisClient.RPush(ctx, produceKey, produceOrderItem); err != nil {
|
||||
@@ -479,7 +489,7 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
return
|
||||
}
|
||||
|
||||
if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, task); err != nil {
|
||||
if err = sendCardTaskType.BindPoolOrderId(ctx, produceOrderItem, customerTask); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
@@ -496,11 +506,11 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
// 处理发卡任务,带重试机制
|
||||
var retryCount int
|
||||
for retryCount < s.config.MaxRetryCount {
|
||||
err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, task)
|
||||
err = sendCardTaskType.HandleSendCardTask(ctx, produceOrderItem, customerTask)
|
||||
|
||||
if err == nil {
|
||||
// 如果需要查询,发布查询事件
|
||||
if task.NeedQuery {
|
||||
if customerTask.NeedQuery {
|
||||
queryEvent := &OrderQueryEvent{
|
||||
BaseEvent: BaseEvent{
|
||||
eventType: EventTypeOrderQuery,
|
||||
@@ -508,8 +518,8 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
},
|
||||
RoadUid: roadUid,
|
||||
OrderID: produceOrderItem.OrderID,
|
||||
LocalOrderID: task.LocalOrderID,
|
||||
Channel: task.SendCardTaskType,
|
||||
LocalOrderID: customerTask.LocalOrderID,
|
||||
Channel: customerTask.SendCardTaskType,
|
||||
QueryCount: 0,
|
||||
LastQueryTime: time.Now(),
|
||||
RemoteOrderID: produceOrderItem.RemoteOrderID,
|
||||
@@ -533,10 +543,11 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "重新下单提交卡密") {
|
||||
if strings.Contains(err.Error(), "重新下单") {
|
||||
// 如果绑定失败,将订单重新放回池中
|
||||
if err = s.redisClient.LPush(ctx, customerKey, task); err != nil {
|
||||
_ = s.unboundPoolOrderId(ctx, task.SendCardTaskType, produceOrderItem, task)
|
||||
customerTask.DispatchCount += 1
|
||||
if err = s.redisClient.LPush(ctx, customerKey, customerTask); err != nil {
|
||||
_ = s.unboundPoolOrderId(ctx, customerTask.SendCardTaskType, produceOrderItem, customerTask)
|
||||
otelTrace.Logger.WithContext(ctx).Error("用户订单重新入池失败", zap.Error(err))
|
||||
}
|
||||
return
|
||||
@@ -547,7 +558,7 @@ func (s *OrderPoolServiceImpl) matchOrdersForFaceValue(ctx context.Context, chan
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, task, errMsg); err != nil {
|
||||
if err = sendCardTaskType.BindOrderFailed(ctx, produceOrderItem, customerTask, errMsg); err != nil {
|
||||
otelTrace.Logger.WithContext(ctx).Error("绑定失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
@@ -580,7 +591,7 @@ func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender
|
||||
|
||||
var err error
|
||||
var orderItem card_sender.OrderPoolItem
|
||||
maxRetries := 3
|
||||
maxRetries := 5
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
span.AddEvent("attempt_start", trace.WithAttributes(attribute.Int("attempt", attempt+1)))
|
||||
|
||||
2
internal/service/supplier/third_party/wtr.go
vendored
2
internal/service/supplier/third_party/wtr.go
vendored
@@ -135,7 +135,7 @@ func (f *WtrImpl) PayNotify() {
|
||||
f.Ctx.WriteString("fail")
|
||||
return
|
||||
}
|
||||
otelTrace.Logger.WithContext(ctx).Info("wtr回调", zap.Any("resp", resp))
|
||||
otelTrace.Logger.WithContext(ctx).Info("回调", zap.Any("resp", resp))
|
||||
localId, err := orderPoolService.GetLocalIdByOrderId(ctx, resp.OrderId)
|
||||
var orderInfo order.OrderInfo
|
||||
if err != nil {
|
||||
|
||||
4
main.go
4
main.go
@@ -18,7 +18,6 @@ import (
|
||||
"github.com/beego/beego/v2/server/web"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"time"
|
||||
)
|
||||
@@ -30,9 +29,6 @@ func main() {
|
||||
log.Printf("初始化代理池失败: %v", err)
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe(":6060", nil))
|
||||
}()
|
||||
// 初始化 OpenTelemetry
|
||||
cleanup1, cleanup2, cleanup3 := otelTrace.InitTracer()
|
||||
defer func() {
|
||||
|
||||
Reference in New Issue
Block a user