diff --git a/.claude/settings.local.json b/.claude/settings.local.json index a4ea665..bf9a9df 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -9,9 +9,12 @@ "Bash(for:*)", "Bash(do echo:*)", "Bash(grep:*)", - "Bash(done)" + "Bash(done)", + "Bash(lsof:*)", + "Bash(go tool pprof:*)", + "Bash(curl:*)" ], "deny": [], "ask": [] } -} \ No newline at end of file +} diff --git a/internal/controllers/scan_controller.go b/internal/controllers/scan_controller.go index 95a2da4..44e8883 100644 --- a/internal/controllers/scan_controller.go +++ b/internal/controllers/scan_controller.go @@ -22,6 +22,7 @@ import ( "gateway/internal/service/supplier/third_party" "gateway/internal/utils" "go.opentelemetry.io/otel/attribute" + "runtime/debug" "strconv" "strings" "sync" @@ -39,9 +40,30 @@ import ( var ( delayPool = gopool.NewPool("delayHandler", 50, gopool.NewConfig()) submitLimiterPool = gopool.NewPool("submitLimiterPool", 50, gopool.NewConfig()) - submitPool = gopool.NewPool("submitPool", 500, gopool.NewConfig()) + submitPool = gopool.NewPool("submitPool", 100, gopool.NewConfig()) ) +func init() { + // 为 submitPool 设置 panic handler,防止 OpenTelemetry panic 导致任务中断 + submitPool.SetPanicHandler(func(ctx context.Context, v interface{}) { + otelTrace.Logger.WithContext(ctx).Error("SubmitPool panic recovered", + zap.Any("panic", v), + zap.String("stack", string(debug.Stack()))) + }) + + delayPool.SetPanicHandler(func(ctx context.Context, v interface{}) { + otelTrace.Logger.WithContext(ctx).Error("DelayPool panic recovered", + zap.Any("panic", v), + zap.String("stack", string(debug.Stack()))) + }) + + submitLimiterPool.SetPanicHandler(func(ctx context.Context, v interface{}) { + otelTrace.Logger.WithContext(ctx).Error("SubmitLimiterPool panic recovered", + zap.Any("panic", v), + zap.String("stack", string(debug.Stack()))) + }) +} + var orderSubmitLimiter sync.Map func isAllowed(orderNo string, intervalSec int64) bool { diff --git a/internal/service/supplier/third_party/pool/card_sender/nuclear.go b/internal/service/supplier/third_party/pool/card_sender/nuclear.go index 0c2da09..1328993 100644 --- a/internal/service/supplier/third_party/pool/card_sender/nuclear.go +++ b/internal/service/supplier/third_party/pool/card_sender/nuclear.go @@ -19,6 +19,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/dubonzi/otelresty" @@ -33,9 +34,12 @@ import ( type SendCardTaskTypeNuclear struct { sendCardTaskTypeSendCardTaskBase + mu sync.Mutex } func (s *SendCardTaskTypeNuclear) getRandomId(ctx context.Context) (string, string) { + s.mu.Lock() + defer s.mu.Unlock() redisClient := cache.GetRedisClient() // 检查是否已经有足够的ID在Redis中 @@ -44,16 +48,33 @@ func (s *SendCardTaskTypeNuclear) getRandomId(ctx context.Context) (string, stri otelTrace.Logger.WithContext(ctx).Error("Redis keys error", zap.Error(err)) } - if len(keys) < 20000 { - // 生成2w个随机数 - for i := 0; i < 20000; i++ { - nuclearRandomId := utils.GetMd5Lower(utils.GenerateId()) - // 生成浏览器指纹哈希 - fingerprintHash := fingerprint.GenerateRandomBrowserFingerprintHash() - err = redisClient.Set(ctx, "nuclear_random_ids:"+nuclearRandomId, fingerprintHash, time.Hour*time.Duration(rand.Uint64N(24)+1)) - if err != nil { - otelTrace.Logger.WithContext(ctx).Error("Failed to set nuclear ID", zap.Error(err)) + if len(keys) < 2000 { + // 批量生成2k个随机数,分批处理以减少Redis压力 + batchSize := 100 + totalToGenerate := 2000 + + for batch := 0; batch < totalToGenerate/batchSize; batch++ { + // 使用Pipeline批量设置 + pipe := redisClient.Client.Pipeline() + + for i := 0; i < batchSize; i++ { + nuclearRandomId := utils.GetMd5Lower(utils.GenerateId()) + fingerprintHash := fingerprint.GenerateRandomBrowserFingerprintHash() + ttl := time.Hour * time.Duration(rand.Uint64N(24)+1) + + pipe.Set(ctx, "nuclear_random_ids:"+nuclearRandomId, fingerprintHash, ttl) } + + // 执行批量操作 + _, err := pipe.Exec(ctx) + if err != nil { + otelTrace.Logger.WithContext(ctx).Error("Failed to batch set nuclear IDs", + zap.Error(err), + zap.Int("batch", batch+1)) + } + + // 短暂延迟,避免给Redis造成过大压力 + time.Sleep(5 * time.Millisecond) } // 重新获取keys keys, err = redisClient.Client.Keys(ctx, "nuclear_random_ids:*").Result() @@ -189,7 +210,7 @@ func (s *SendCardTaskTypeNuclear) CreateOrder(ctx context.Context, roadUid strin func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem OrderPoolItem, task SendCardTask) error { ctx, span := otelTrace.Span(ctx, "SendCardTaskTypeNuclear", "SendCardTaskTypeNuclear.channelOne") - defer span.End() + defer otelTrace.SafeEndSpan(span) queryOrderInfo, err2 := url.Parse(orderItem.PayURL) if err2 != nil { @@ -231,13 +252,18 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde proxy, err3 := utils.GetProxy(ctx, needChangeProxyId, SendCardTaskTypeEnumNuclear.String()) if err3 != nil { otelTrace.Logger.WithContext(ctx).Error("获取代理失败", zap.Error(err3)) - return err3 + return nil } c.SetProxy(proxy) return nil }) otelresty.TraceClient(webClient) - _, _ = webClient.R().Get(orderItem.PayURL) + + // 检查是否能成功访问,如果失败则继续下一次重试 + if _, err = webClient.R().Get(orderItem.PayURL); err != nil { + otelTrace.Logger.WithContext(ctx).Warn("访问支付链接失败", zap.Error(err)) + continue + } //添加计数器 for range 5 { @@ -351,18 +377,24 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde } func (s *SendCardTaskTypeNuclear) channelTwo(ctx context.Context, orderItem OrderPoolItem, task SendCardTask) error { + // 先尝试获取代理 + proxy, err := utils.GetProxy(ctx, utils.GenerateId(), string(SendCardTaskTypeEnumNuclear+"two")) + if err != nil { + otelTrace.Logger.WithContext(ctx).Warn("获取代理失败,将不使用代理", zap.Error(err)) + } + webClient := resty.New().SetTimeout(10 * time.Second).SetHeaders(map[string]string{ "origin": "http://yosyt.hy32.top", "user-agent": useragent.GetUserAgentByPlatform(useragent.PlatformPhone), "referer": orderItem.PayURL, "x-requested-with": "XMLHttpRequest", - }).OnBeforeRequest(func(client *resty.Client, request *resty.Request) error { - proxy, _ := utils.GetProxy(ctx, utils.GenerateId(), string(SendCardTaskTypeEnumNuclear+"two")) - if proxy != "" { - client.SetProxy(proxy) - } - return nil }) + + // 如果获取到了代理,则使用代理 + if proxy != "" { + webClient.SetProxy(proxy) + } + otelresty.TraceClient(webClient) schemas := strings.Split(orderItem.PayURL, "/") orderId := schemas[len(schemas)-1] @@ -395,18 +427,24 @@ func (s *SendCardTaskTypeNuclear) channelTwo(ctx context.Context, orderItem Orde } func (s *SendCardTaskTypeNuclear) channelThree(ctx context.Context, orderItem OrderPoolItem, task SendCardTask) error { + // 先尝试获取代理 + proxy, err := utils.GetProxy(ctx, utils.GenerateId(), string(SendCardTaskTypeEnumNuclear+"three")) + if err != nil { + otelTrace.Logger.WithContext(ctx).Warn("获取代理失败,将不使用代理", zap.Error(err)) + } + webClient := resty.New().SetTimeout(10 * time.Second).SetHeaders(map[string]string{ "origin": "http://223.4.250.106:22568", "user-agent": useragent.GetUserAgentByPlatform(useragent.PlatformPhone), "referer": orderItem.PayURL, "x-requested-with": "XMLHttpRequest", - }).OnBeforeRequest(func(client *resty.Client, request *resty.Request) error { - proxy, _ := utils.GetProxy(ctx, utils.GenerateId(), string(SendCardTaskTypeEnumNuclear+"three")) - if proxy != "" { - client.SetProxy(proxy) - } - return nil }) + + // 如果获取到了代理,则使用代理 + if proxy != "" { + webClient.SetProxy(proxy) + } + otelresty.TraceClient(webClient) queryUrl, err := url.Parse(orderItem.PayURL) if err != nil { diff --git a/internal/service/supplier/third_party/pool/service.go b/internal/service/supplier/third_party/pool/service.go index 4132d12..cb03f3d 100644 --- a/internal/service/supplier/third_party/pool/service.go +++ b/internal/service/supplier/third_party/pool/service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/duke-git/lancet/v2/convertor" "github.com/duke-git/lancet/v2/pointer" + "runtime/debug" "strings" "sync" "time" @@ -69,6 +70,19 @@ func NewOrderPoolService(config *Config, redisClient *cache.RedisClient) OrderPo servicePool: gopool.NewPool(utils.GenerateId(), 30, gopool.NewConfig()), } + // 设置 panic handler + service.taskPool.SetPanicHandler(func(ctx context.Context, v interface{}) { + otelTrace.Logger.WithContext(ctx).Error("OrderPool taskPool panic recovered", + zap.Any("panic", v), + zap.String("stack", string(debug.Stack()))) + }) + + service.servicePool.SetPanicHandler(func(ctx context.Context, v interface{}) { + otelTrace.Logger.WithContext(ctx).Error("OrderPool servicePool panic recovered", + zap.Any("panic", v), + zap.String("stack", string(debug.Stack()))) + }) + // 注册事件处理器 service.registerEventHandlers(context.Background()) @@ -557,54 +571,129 @@ func (s *OrderPoolServiceImpl) unboundPoolOrderId(ctx context.Context, taskEnum func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender.SendCardTask) error { ctx, span := otelTrace.Span(ctx, "SubmitOrder", "OrderPoolServiceImpl.SubmitOrder", - trace.WithAttributes(attribute.String("task", convertor.ToString(task))), + trace.WithAttributes( + attribute.String("task", convertor.ToString(task)), + attribute.String("localOrderId", task.LocalOrderID), + ), ) defer span.End() var err error var orderItem card_sender.OrderPoolItem + maxRetries := 3 - for range 3 { + for attempt := 0; attempt < maxRetries; attempt++ { + span.AddEvent("attempt_start", trace.WithAttributes(attribute.Int("attempt", attempt+1))) + + // 清空上一次的orderItem,避免残留数据影响 + orderItem = card_sender.OrderPoolItem{} + + // 1. 创建订单 span.AddEvent("create order") orderItem, err = task.SendCardTaskType.GetSendCardTaskType(). CreateOrder(ctx, task.RoadUid, task.CardInfo.GetFaceTypeFloat(ctx)) if err != nil { span.AddEvent("create order failed") + otelTrace.Logger.WithContext(ctx).Error("创建订单失败", + zap.Error(err), + zap.Int("attempt", attempt+1), + zap.String("localOrderId", task.LocalOrderID)) return fmt.Errorf("创建订单失败: %v", err) } + + // 验证订单创建成功 + if orderItem.OrderID == "" { + span.AddEvent("invalid order created") + return fmt.Errorf("创建的订单ID为空") + } + + span.SetAttributes(attribute.String("orderId", orderItem.OrderID)) + + // 2. 绑定订单到数据库 span.AddEvent("bind order") if err = task.SendCardTaskType.GetSendCardTaskType().BindPoolOrderId(ctx, orderItem, task); err != nil { - otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", zap.Error(err)) + span.AddEvent("bind_order_failed") + otelTrace.Logger.WithContext(ctx).Error("绑定订单ID和卡片信息ID失败", + zap.Error(err), + zap.String("orderId", orderItem.OrderID), + zap.Int("attempt", attempt+1)) + + // 清理已创建的订单资源 + cleanupErr := s.cleanupCreatedOrder(ctx, task, orderItem, "bind_failed") + if cleanupErr != nil { + otelTrace.Logger.WithContext(ctx).Error("清理创建的订单资源失败", + zap.Error(cleanupErr), + zap.String("orderId", orderItem.OrderID)) + } return fmt.Errorf("绑定订单ID和卡片信息ID失败: %v", err) } - // 绑定订单ID和卡片信息ID + // 3. 绑定到Redis bindKey := fmt.Sprintf("%s:%s", s.config.OrderBindKey, orderItem.OrderID) if err = s.redisClient.Set(ctx, bindKey, task.LocalOrderID, s.config.OrderBindKeyActiveTime); err != nil { - span.AddEvent("绑定订单ID和卡片信息ID失败") - return fmt.Errorf("绑定订单ID和卡片信息ID失败: %v", err) + span.AddEvent("redis_bind_failed") + otelTrace.Logger.WithContext(ctx).Error("Redis绑定订单失败", + zap.Error(err), + zap.String("orderId", orderItem.OrderID), + zap.String("bindKey", bindKey), + zap.Int("attempt", attempt+1)) + + // 回滚数据库绑定操作 + cleanupErr := s.unboundPoolOrderId(ctx, task.SendCardTaskType, orderItem, task) + if cleanupErr != nil { + otelTrace.Logger.WithContext(ctx).Error("回滚数据库绑定失败", + zap.Error(cleanupErr), + zap.String("orderId", orderItem.OrderID)) + } + return fmt.Errorf("Redis绑定订单ID和卡片信息ID失败: %v", err) } + + // 4. 处理订单 span.AddEvent("handle order") err = task.SendCardTaskType.GetSendCardTaskType().HandleSendCardTask(ctx, orderItem, task) if err != nil { - span.AddEvent("handle order failed") - if strings.Contains(err.Error(), "重新下单") { - _ = s.unboundPoolOrderId(ctx, task.SendCardTaskType, orderItem, task) + span.AddEvent("handle_order_failed") + otelTrace.Logger.WithContext(ctx).Warn("订单处理失败", + zap.Error(err), + zap.String("orderId", orderItem.OrderID), + zap.String("remoteOrderId", orderItem.RemoteOrderID), + zap.Int("attempt", attempt+1)) + + // 清理失败订单的所有资源(无论什么错误都清理,避免资源泄漏) + cleanupErr := s.cleanupFailedOrder(ctx, task, orderItem, "handle_failed") + if cleanupErr != nil { + otelTrace.Logger.WithContext(ctx).Error("清理失败订单资源时出错", + zap.Error(cleanupErr), + zap.String("orderId", orderItem.OrderID)) + } + + // 只有标记为"重新下单"的错误才重试 + if strings.Contains(err.Error(), "重新下单") && attempt < maxRetries-1 { + otelTrace.Logger.WithContext(ctx).Info("订单需要重新下单,准备重试", + zap.String("orderId", orderItem.OrderID), + zap.Int("attempt", attempt+1)) continue } return fmt.Errorf("提交订单失败: %v", err) } + + // 成功处理,退出重试循环 + span.AddEvent("order_success") + otelTrace.Logger.WithContext(ctx).Info("订单提交成功", + zap.String("orderId", orderItem.OrderID), + zap.String("remoteOrderId", orderItem.RemoteOrderID), + zap.Int("finalAttempt", attempt+1)) break } - if err != nil { - return fmt.Errorf("提交订单失败: %v", err) - } + // 最终状态检查 if pointer.IsNil(orderItem) || orderItem.OrderID == "" { - return fmt.Errorf("订单创建失败") + span.AddEvent("final_validation_failed") + return fmt.Errorf("订单处理失败") } + // 5. 处理查询需求 if task.NeedQuery { queryEvent := &OrderQueryEvent{ BaseEvent: BaseEvent{ @@ -620,8 +709,32 @@ func (s *OrderPoolServiceImpl) SubmitOrder(ctx context.Context, task card_sender RemoteOrderID: orderItem.RemoteOrderID, } if err := s.eventBus.Publish(ctx, queryEvent); err != nil { - otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败", zap.Error(err)) + otelTrace.Logger.WithContext(ctx).Error("发布订单查询事件失败", + zap.Error(err), + zap.String("orderId", orderItem.OrderID)) } } + return nil } + +// cleanupCreatedOrder 清理刚创建但绑定失败的订单 +func (s *OrderPoolServiceImpl) cleanupCreatedOrder(ctx context.Context, task card_sender.SendCardTask, orderItem card_sender.OrderPoolItem, reason string) error { + otelTrace.Logger.WithContext(ctx).Info("清理创建的订单", + zap.String("orderId", orderItem.OrderID), + zap.String("reason", reason)) + + // 清理数据库绑定 + _ = task.SendCardTaskType.GetSendCardTaskType().UnBindPoolOrderId(ctx, orderItem, task) + return nil +} + +// cleanupFailedOrder 清理处理失败的订单所有资源 +func (s *OrderPoolServiceImpl) cleanupFailedOrder(ctx context.Context, task card_sender.SendCardTask, orderItem card_sender.OrderPoolItem, reason string) error { + otelTrace.Logger.WithContext(ctx).Info("清理失败的订单", + zap.String("orderId", orderItem.OrderID), + zap.String("reason", reason)) + + // 使用现有的清理方法 + return s.unboundPoolOrderId(ctx, task.SendCardTaskType, orderItem, task) +} diff --git a/internal/utils/proxy_pool.go b/internal/utils/proxy_pool.go index 8077f7a..6757be0 100644 --- a/internal/utils/proxy_pool.go +++ b/internal/utils/proxy_pool.go @@ -407,10 +407,16 @@ func (p *DefaultProxyStrategy) checkProxyAvailable(ctx context.Context, proxyIP // checkProxyAvailable 检查代理IP是否可用 func (p *OrderBasedProxyStrategy) checkProxyAvailable(ctx context.Context, proxyIP string) error { + // 为代理检测设置独立的短超时,避免阻塞其他操作 + ctxWithTimeout, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + client := resty.New() otelresty.TraceClient(client) + client.SetTimeout(3 * time.Second) // 设置客户端超时 client.SetProxy(fmt.Sprintf("socks5://%s:%s@%s", p.authKey, p.authPwd, proxyIP)) - response, err := client.R().SetContext(ctx).Get("https://www.qq.com") + + response, err := client.R().SetContext(ctxWithTimeout).Get("https://www.qq.com") if err != nil { return fmt.Errorf("代理连接测试失败: %v", err) } @@ -437,17 +443,33 @@ func (p *OrderBasedProxyStrategy) startCleanupRoutine() { // cleanupUnusedProxies 清理未使用的代理 func (p *OrderBasedProxyStrategy) cleanupUnusedProxies() { - p.mu.Lock() - defer p.mu.Unlock() - now := time.Now() - maps.DeleteFunc(p.proxies, func(s string, info *ProxyInfo) bool { - return info.expireAt.Before(now) - }) - slices.DeleteFunc(p.totalProxies, func(info *ProxyInfo) bool { - return info.expireAt.Before(now) - }) + // 分两阶段清理,减少锁持有时间 + + // 第一阶段:收集需要删除的键(只持有读锁) + var keysToDelete []string + p.mu.RLock() + for key, info := range p.proxies { + if info.expireAt.Before(now) { + keysToDelete = append(keysToDelete, key) + } + } + p.mu.RUnlock() + + // 第二阶段:快速删除(只短暂持有写锁) + if len(keysToDelete) > 0 { + p.mu.Lock() + for _, key := range keysToDelete { + delete(p.proxies, key) + } + + // 同时清理 totalProxies 中的过期代理 + p.totalProxies = slices.DeleteFunc(p.totalProxies, func(info *ProxyInfo) bool { + return info.expireAt.Before(now) + }) + p.mu.Unlock() + } } // Stop 停止清理协程 @@ -481,17 +503,52 @@ func GetProxy(ctx context.Context, orderId string, channel string) (string, erro attribute.String("channel", channel), )) defer span.End() - proxyConfig := config.GetProxy() - if proxyConfig == "dm" { - return DMProxyStrategyInstance.GetProxy(ctx, ProxyRequest{ - OrderNo: orderId, - Channel: channel, - OrderPerIP: 1, - }) + + // 为代理获取添加超时机制,防止在 OnBeforeRequest 中长时间阻塞 + ctxWithTimeout, cancel := context.WithTimeout(ctx, 8*time.Second) + defer cancel() + + done := make(chan struct { + proxy string + err error + }, 1) + + go func() { + proxyConfig := config.GetProxy() + var proxy string + var err error + + if proxyConfig == "dm" { + proxy, err = DMProxyStrategyInstance.GetProxy(ctxWithTimeout, ProxyRequest{ + OrderNo: orderId, + Channel: channel, + OrderPerIP: 1, + }) + } else { + proxy, err = OrderBasedProxyStrategyInstance.GetProxy(ctxWithTimeout, ProxyRequest{ + OrderNo: orderId, + Channel: channel, + OrderPerIP: 1, + }) + } + + select { + case done <- struct { + proxy string + err error + }{proxy, err}: + case <-ctxWithTimeout.Done(): + } + }() + + select { + case result := <-done: + return result.proxy, result.err + case <-ctxWithTimeout.Done(): + otelTrace.Logger.WithContext(ctx).Warn("获取代理超时", + zap.String("orderId", orderId), + zap.String("channel", channel), + zap.Error(ctxWithTimeout.Err())) + return "", fmt.Errorf("获取代理超时: %v", ctxWithTimeout.Err()) } - return OrderBasedProxyStrategyInstance.GetProxy(ctx, ProxyRequest{ - OrderNo: orderId, - Channel: channel, - OrderPerIP: 1, - }) }