fix(otelTrace): 优化 OpenTelemetry 初始化与配置

- 将默认超时时间从5秒调整到10秒,提升稳定性
- 支持通过环境变量配置 OTEL_COLLECTOR_URL 和 OTEL_ENABLED,增强灵活性
- 添加初始化时的日志记录,便于跟踪 OTEL 状态
- 增加带超时的 context 用于初始化和关闭,防止阻塞
- 修改 main.go 使用基础 context 替代全局 InitCtx,提高可控性
- 优化代理请求逻辑,避免重复生成代理 ID,并改为请求前设置代理
- 在监控导出器健康的 goroutine 中加入 panic 保护,防止异常崩溃
- 测试代码中替换 InitCtx 为 context.Background(),保证日志上下文一致性
This commit is contained in:
danial
2025-12-14 17:58:54 +08:00
parent 2d332154c8
commit 54b49a4f06
6 changed files with 125 additions and 57 deletions

View File

@@ -1,6 +1,7 @@
package dto
import (
"context"
"crypto/md5"
"encoding/hex"
"gateway/internal/otelTrace"
@@ -15,6 +16,7 @@ import (
)
func TestParams_Encrypt(t *testing.T) {
ctx := context.Background()
orderParams := Params{
GeneratedTime: 1746202250,
Duration: 24,
@@ -24,13 +26,14 @@ func TestParams_Encrypt(t *testing.T) {
ShowMMValue: 100.00,
NotifyUrl: "https://localhost:44311/api/services/app/BPlatCallback/Callback_DMMWuDiBaoBao",
}
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info(orderParams.Encrypt())
otelTrace.Logger.WithContext(ctx).Info(orderParams.Encrypt())
p, _ := formatter.Pretty(orderParams)
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info(p)
otelTrace.Logger.WithContext(ctx).Info(p)
main()
}
func main() {
ctx := context.Background()
//实际获取的appKey值
appKey := "1f2811dcd32b2c5c"
//实际获取的appSecret值
@@ -45,7 +48,7 @@ func main() {
params["amount"] = "100"
result, _ := formatter.Pretty(params)
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info(result)
otelTrace.Logger.WithContext(ctx).Info(result)
strArr := SortMap(params)
signStr := ""
@@ -59,10 +62,10 @@ func main() {
}
signStr += appSecret
h := md5.New()
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info(signStr)
otelTrace.Logger.WithContext(ctx).Info(signStr)
h.Write([]byte(signStr))
result = hex.EncodeToString(h.Sum(nil))
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info(result)
otelTrace.Logger.WithContext(ctx).Info(result)
}
// SortMap 对map的key值进行排序

View File

@@ -1,21 +1,15 @@
package otelTrace
import (
"context"
"time"
"github.com/beego/beego/v2/core/config/env"
)
// InitCtx 初始化ctx
var (
InitCtx = context.Background()
)
// 生产环境配置常量 - 针对高负载线上场景优化
const (
// DefaultTimeout 网络配置 - 针对生产环境网络抖动优化
DefaultTimeout = 5 * time.Second // 推荐的超时时间,平衡响应性和稳定性
DefaultTimeout = 10 * time.Second // 推荐的超时时间,平衡响应性和稳定性从5秒增加到10秒
InitialRetryInterval = 1 * time.Second // 初始重试间隔,快速恢复
MaxRetryInterval = 10 * time.Second // 最大重试间隔,避免频繁重试
MaxRetryElapsedTime = 30 * time.Second // 最大重试总时间,防止长时间阻塞
@@ -26,8 +20,8 @@ const (
MaxQueueSize = 2048 // 最大队列大小,防止内存溢出
// DefaultSamplingRatio 采样配置 - 生产环境资源控制
DefaultSamplingRatio = 1 // 默认采样率50%,平衡观测性和性能
HighLoadSamplingRatio = 1 // 高负载时采样率10%,保护系统稳定性
DefaultSamplingRatio = 1 // 默认采样率100%(开发环境)
HighLoadSamplingRatio = 1 // 高负载时采样率100%(可根据需要调整)
)
// CircuitBreakerState 熔断器状态
@@ -41,8 +35,9 @@ const (
var (
serviceName = "网关服务——" + env.Get("serverName", "")
collectorURL = "38.38.251.113:31547"
Logger CustomLogger // 添加全局 logger
collectorURL = env.Get("OTEL_COLLECTOR_URL", "38.38.251.113:31547") // 支持从环境变量配置
otelEnabled = env.Get("OTEL_ENABLED", "true") == "true" // 支持禁用 OTEL
Logger CustomLogger // 添加全局 logger
// 生产环境监控指标
exportFailures int32 // 导出失败计数

View File

@@ -2,7 +2,6 @@ package otelTrace
import (
"context"
"log/slog"
"os"
"time"
@@ -26,9 +25,28 @@ import (
)
func InitTracer() (func(context.Context) error, func(context.Context) error, func(context.Context) error) {
// 检查是否启用 OpenTelemetry
if !otelEnabled {
Logger.logger.Info("OpenTelemetry 已禁用",
zap.String("env_var", "OTEL_ENABLED=false"),
)
return func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil }
}
// 创建带超时的初始化 context
initCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Logger.logger.Info("开始初始化 OpenTelemetry",
zap.String("collector_url", collectorURL),
zap.String("service_name", serviceName),
)
// 生产环境优化的Trace导出器配置
traceExporter, err := otlptrace.New(
InitCtx,
initCtx,
otlptracegrpc.NewClient(
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(collectorURL),
@@ -46,13 +64,20 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create trace exporter", slog.String("error", err.Error()))
return nil, nil, nil
Logger.logger.Error("failed to create trace exporter",
zap.Error(err),
zap.String("collector_url", collectorURL),
zap.String("action", "trace_disabled"),
)
// 返回空函数,但不阻塞程序启动
return func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil }
}
// 增强的资源标识配置
resources, err := resource.New(
InitCtx,
initCtx,
resource.WithAttributes(
attribute.String("library.language", "go"),
attribute.String("service.name", serviceName),
@@ -60,8 +85,14 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create resources", slog.String("error", err.Error()))
return nil, nil, nil
Logger.logger.Error("failed to create resources",
zap.Error(err),
zap.String("action", "trace_disabled"),
)
// 返回空函数,但不阻塞程序启动
return func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil }
}
// 生产环境采样策略 - 动态采样率
@@ -86,7 +117,7 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
)
// 生产环境优化的Metrics导出器配置
metricExporter, err := otlpmetricgrpc.New(
InitCtx,
initCtx,
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint(collectorURL),
// 生产环境网络优化配置
@@ -101,8 +132,15 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
otlpmetricgrpc.WithCompressor("gzip"),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create metric exporter", slog.String("error", err.Error()))
return nil, nil, nil
Logger.logger.Error("failed to create metric exporter",
zap.Error(err),
zap.String("collector_url", collectorURL),
zap.String("action", "metrics_disabled"),
)
// 返回空函数,但不阻塞程序启动
return func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil }
}
// 生产环境优化的Metrics Provider配置
@@ -122,7 +160,7 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
// 生产环境优化的日志导出器配置
logExporter, err := otlploggrpc.New(
InitCtx,
initCtx,
otlploggrpc.WithInsecure(),
otlploggrpc.WithEndpoint(collectorURL),
// 生产环境网络优化配置
@@ -137,8 +175,15 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
otlploggrpc.WithCompressor("gzip"),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create log exporter", slog.String("error", err.Error()))
return nil, nil, nil
Logger.logger.Error("failed to create log exporter",
zap.Error(err),
zap.String("collector_url", collectorURL),
zap.String("action", "logs_disabled"),
)
// 返回空函数,但不阻塞程序启动
return func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil }
}
// 生产环境优化的日志处理器配置
@@ -200,6 +245,11 @@ func InitTracer() (func(context.Context) error, func(context.Context) error, fun
// 启动后台监控goroutine - 监控导出器健康状态
go monitorExporterHealth()
Logger.logger.Info("OpenTelemetry 初始化成功",
zap.String("collector_url", collectorURL),
zap.Duration("default_timeout", DefaultTimeout),
)
return traceExporter.Shutdown, metricExporter.Shutdown, logExporter.Shutdown
}

View File

@@ -15,6 +15,14 @@ import (
// monitorExporterHealth 监控导出器健康状态
// 实现错误率监控告警和动态采样率调整
func monitorExporterHealth() {
defer func() {
if r := recover(); r != nil {
Logger.logger.Error("导出器健康监控 goroutine panic",
zap.Any("panic", r),
)
}
}()
ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次
defer ticker.Stop()
@@ -26,6 +34,12 @@ func monitorExporterHealth() {
Logger.logger.Warn("OTEL导出器错误率过高",
zap.Int32("failures", failures),
zap.String("action", "consider_circuit_breaker"),
zap.String("collector_url", collectorURL),
)
} else if failures > 0 {
Logger.logger.Info("OTEL导出器有少量失败",
zap.Int32("failures", failures),
zap.String("collector_url", collectorURL),
)
}

View File

@@ -213,30 +213,30 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
}
span.AddEvent("changeProxy")
needChangeProxyId := utils.GenerateId()
var errRes error
var err error
if time.Since(orderItem.CreateTime) < time.Second*20 {
if time.Since(orderItem.CreateTime) <= time.Second*30 {
time.Sleep(time.Second*30 - time.Since(orderItem.CreateTime))
}
for range 3 {
errRes = nil
needChangeProxyId := utils.GenerateId()
webClient := resty.New().SetTimeout(10 * time.Second).SetHeaders(map[string]string{
"user-agent": useragent.GetUserAgentByPlatform(useragent.PlatformPhone),
"origin": queryOrderInfo.Host,
"referer": orderItem.PayURL,
}).OnBeforeRequest(func(c *resty.Client, request *resty.Request) error {
proxy, err3 := utils.GetProxy(ctx, needChangeProxyId, SendCardTaskTypeEnumNuclear.String())
if err3 != nil {
otelTrace.Logger.WithContext(ctx).Error("获取代理失败", zap.Error(err3))
return err3
}
c.SetProxy(proxy)
return nil
})
otelresty.TraceClient(webClient)
span.AddEvent("getProxy")
proxy, err3 := utils.GetProxy(ctx, needChangeProxyId, SendCardTaskTypeEnumNuclear.String())
if err3 != nil {
otelTrace.Logger.WithContext(ctx).Error("获取代理失败", zap.Error(err3))
continue
}
span.AddEvent("endGetProxy")
webClient = webClient.SetProxy(proxy)
_, _ = webClient.R().Get(orderItem.PayURL)
//添加计数器
@@ -249,18 +249,15 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
Get(queryOrderInfo.Scheme + "://" + queryOrderInfo.Host + "/pay/index/captcha.html")
span.AddEvent("finishGetCaptcha")
if err4 != nil {
needChangeProxyId = utils.GenerateId()
otelTrace.Logger.WithContext(ctx).Error("获取验证码失败", zap.Error(err4))
continue
}
span.AddEvent("getCaptchaOCRResult")
//设置cookie
ocrResp, err = client.NewOCRClient().Calc(ctx, &client.CalcInput{
File: resp.Body(),
FileName: "captcha.jpg",
Category: "calc",
})
span.AddEvent("finishCaptchaOCRResult")
if err != nil {
errRes = errors.New("识别验证码失败,需要重新提交")
otelTrace.Logger.WithContext(ctx).Error("识别验证码失败", zap.Error(err))
@@ -289,14 +286,12 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
resp, err5 := webClient.R().SetContext(ctx).SetBody(bodyData).
SetQueryParam("oe", orderId).Post(queryOrderInfo.Scheme + "://" + queryOrderInfo.Host + "/pay/PxyAlai/setParams")
span.AddEvent("finishSubmitData")
//重置验证码
ocrResp = ""
if err5 != nil {
otelTrace.Logger.WithContext(ctx).Error("提交支付失败", zap.Error(err5))
needChangeProxyId = utils.GenerateId()
continue
}
submitRespStr := struct {
@@ -304,7 +299,7 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
Msg string `json:"msg"`
}{}
otelTrace.Logger.WithContext(ctx).Info("提交支付返回", zap.Any("response", resp.String()),
zap.Any("formData", bodyData), zap.String("oe", orderId))
zap.Any("formData", bodyData))
err6 := json.Unmarshal(resp.Body(), &submitRespStr)
if err6 != nil {
otelTrace.Logger.WithContext(ctx).Error("json解析失败", zap.Error(err6))
@@ -334,7 +329,6 @@ func (s *SendCardTaskTypeNuclear) channelOne(ctx context.Context, orderItem Orde
continue
}
if strings.Contains(submitRespStr.Msg, "访问异常,页面刷新后请重试") || strings.Contains(submitRespStr.Msg, "当前操作频繁操作") {
needChangeProxyId = utils.GenerateId()
fingerprintHash = fingerprint.GenerateRandomBrowserFingerprintHash()
span.AddEvent("startRefreshPage")
errRes = errors.New("访问异常,页面刷新后请重试,请重新下单")

32
main.go
View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"gateway/internal/cache"
"gateway/internal/config"
_ "gateway/internal/models"
@@ -18,6 +19,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"log"
_ "net/http/pprof"
"time"
)
func main() {
@@ -27,31 +29,41 @@ func main() {
log.Printf("初始化代理池失败: %v", err)
return
}
// 初始化 OpenTelemetry
cleanup1, cleanup2, cleanup3 := otelTrace.InitTracer()
defer func() {
// 使用带超时的 context 来关闭导出器
ctx := context.Background()
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if cleanup1 != nil {
_ = cleanup1(otelTrace.InitCtx)
_ = cleanup1(shutdownCtx)
}
if cleanup2 != nil {
_ = cleanup2(otelTrace.InitCtx)
_ = cleanup2(shutdownCtx)
}
if cleanup3 != nil {
_ = cleanup3(otelTrace.InitCtx)
_ = cleanup3(shutdownCtx)
}
}()
go notify.CreateOrderNotifyConsumer(otelTrace.InitCtx)
go query.CreateSupplierOrderQueryCuConsumer(otelTrace.InitCtx)
// go query.CreatePayForQueryConsumer(otelTrace.InitCtx)
go service.OrderSettleInit(otelTrace.InitCtx)
// 使用 context.Background() 作为基础 context
ctx := context.Background()
go notify.CreateOrderNotifyConsumer(ctx)
go query.CreateSupplierOrderQueryCuConsumer(ctx)
// go query.CreatePayForQueryConsumer(ctx)
go service.OrderSettleInit(ctx)
cache.Start()
utils.StartProxyPool()
//tasks.Start(otelTrace.InitCtx)
//tasks.Start(ctx)
// 初始化队列系统
queue.Init(otelTrace.InitCtx)
third_party.StartOrderPool(otelTrace.InitCtx)
queue.Init(ctx)
third_party.StartOrderPool(ctx)
web.Run()
}