Files
kami_backend/utility/otel/manager.go
danial d3da03990b fix(integration): 更新 Camel Oil API 请求地址和日志手机号显示
- 将所有请求地址由 recharge3.bac365.com 替换为 app.bac365.com
- 修改日志中手机号显示,移除部分掩码,直接展示完整手机号
- 保持发送验证码、登录及查询接口的请求地址一致性
- 更新错误日志和响应日志中手机号的显示格式

perf(otel): 配置 OpenTelemetry gRPC 最大消息大小限制

- 为 trace exporter 添加 gRPC 最大接收和发送消息大小限制(100MB)
- 为 log exporter 添加相同的 gRPC 消息大小限制配置
- 确保大数据量的采样和日志传输过程中不被消息大小限制阻断
2025-12-06 21:58:09 +08:00

324 lines
8.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package otel
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"github.com/gogf/gf/contrib/metric/otelmetric/v2"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/glog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otellog "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
const (
tracerHostnameTagKey = "hostname"
)
// Manager OTel管理器 - 简化版,只保留核心功能
type Manager struct {
config *Config
resource *resource.Resource
tracerProvider *trace.TracerProvider
logProvider *log.LoggerProvider
shutdownFuncs []func(context.Context) error
}
// NewOTelManager 创建新的OTel管理器
func NewOTelManager(config *Config) (*Manager, error) {
if config == nil {
config = DefaultConfig()
}
// 验证配置并设置默认值
if err := config.ValidateAndSetDefaults(); err != nil {
return nil, err
}
manager := &Manager{
config: config,
shutdownFuncs: []func(context.Context) error{},
}
// 获取主机IP
hostIP := getHostIP()
// 创建资源
attrs := []attribute.KeyValue{
semconv.ServiceNameKey.String(config.ServiceName),
semconv.HostNameKey.String(hostIP),
attribute.String(tracerHostnameTagKey, hostIP),
}
res, err := resource.New(gctx.GetInitCtx(),
resource.WithFromEnv(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithHost(),
resource.WithAttributes(attrs...),
)
if err != nil {
return nil, ErrInitFailed(fmt.Errorf("failed to create resource: %w", err))
}
manager.resource = res
// 初始化各个组件
if err := manager.initTracing(); err != nil {
return nil, ErrInitFailed(err)
}
// 初始化日志
if err := manager.initLogging(); err != nil {
return nil, ErrInitFailed(err)
}
return manager, nil
}
// initMetrics 初始化指标
func (m *Manager) initMetrics() error {
metricProvider, err := otelmetric.NewProvider(otelmetric.WithResource(m.resource))
if err != nil {
return ErrExporterFailed(fmt.Errorf("failed to create metric provider: %w", err))
}
metricProvider.SetAsGlobal()
m.shutdownFuncs = append(m.shutdownFuncs, func(ctx context.Context) error {
return metricProvider.Shutdown(ctx)
})
return nil
}
// initTracing 初始化链路追踪
func (m *Manager) initTracing() error {
// 创建trace exporter配置重试和超时
var traceExp *otlptrace.Exporter
var err error
// 配置gRPC消息大小限制
dialOpts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100 * 1024 * 1024)), // 100MB
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100 * 1024 * 1024)), // 100MB
}
opts := []otlptracegrpc.Option{
otlptracegrpc.WithCompressor(m.config.Compressor),
otlptracegrpc.WithEndpoint(m.config.CollectorURL),
otlptracegrpc.WithHeaders(m.config.Headers),
otlptracegrpc.WithTimeout(m.config.Timeout),
otlptracegrpc.WithDialOption(dialOpts...),
}
// 配置重试策略
if m.config.RetryEnabled {
opts = append(opts,
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Enabled: true,
InitialInterval: m.config.RetryInitInterval,
MaxInterval: m.config.RetryMaxInterval,
MaxElapsedTime: m.config.RetryMaxElapsed,
}),
)
}
if m.config.Insecure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
traceExp, err = otlptracegrpc.New(gctx.GetInitCtx(), opts...)
if err != nil {
return ErrExporterFailed(fmt.Errorf("failed to create trace exporter: %w", err))
}
// 使用默认批处理配置(包含消息丢弃)
tracerProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExp, trace.WithBatchTimeout(10*time.Second)),
trace.WithResource(m.resource),
trace.WithSampler(trace.TraceIDRatioBased(m.config.SampleRate)),
)
m.tracerProvider = tracerProvider
otel.SetTracerProvider(tracerProvider)
m.shutdownFuncs = append(m.shutdownFuncs, func(ctx context.Context) error {
return tracerProvider.Shutdown(ctx)
})
return nil
}
// initLogging 初始化日志
func (m *Manager) initLogging() error {
// 创建log exporter配置重试和超时
var logExp *otlploggrpc.Exporter
var err error
// 配置gRPC消息大小限制与trace exporter保持一致
logDialOpts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100 * 1024 * 1024)), // 100MB
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100 * 1024 * 1024)), // 100MB
}
opts := []otlploggrpc.Option{
otlploggrpc.WithCompressor(m.config.Compressor),
otlploggrpc.WithEndpoint(m.config.CollectorURL),
otlploggrpc.WithHeaders(m.config.Headers),
otlploggrpc.WithTimeout(m.config.Timeout),
otlploggrpc.WithDialOption(logDialOpts...),
}
// 配置重试策略
if m.config.RetryEnabled {
opts = append(opts,
otlploggrpc.WithRetry(otlploggrpc.RetryConfig{
Enabled: true,
InitialInterval: m.config.RetryInitInterval,
MaxInterval: m.config.RetryMaxInterval,
MaxElapsedTime: m.config.RetryMaxElapsed,
}),
)
}
if m.config.Insecure {
opts = append(opts, otlploggrpc.WithInsecure())
}
logExp, err = otlploggrpc.New(gctx.GetInitCtx(), opts...)
if err != nil {
return ErrExporterFailed(fmt.Errorf("failed to create log exporter: %w", err))
}
// 使用默认批处理配置
logProvider := log.NewLoggerProvider(
log.WithResource(m.resource),
log.WithProcessor(log.NewBatchProcessor(logExp,
log.WithExportMaxBatchSize(1024),
log.WithExportTimeout(10*time.Second))),
)
m.logProvider = logProvider
globalLogger = logProvider.Logger("")
// 注册日志处理器
glog.SetHandlers(LogHandler)
m.shutdownFuncs = append(m.shutdownFuncs, func(ctx context.Context) error {
return logProvider.Shutdown(ctx)
})
return nil
}
// Shutdown 优雅关闭
func (m *Manager) Shutdown(ctx context.Context) error {
var errors []error
for i := len(m.shutdownFuncs) - 1; i >= 0; i-- {
if err := m.shutdownFuncs[i](ctx); err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return ErrShutdownFailed(fmt.Errorf("shutdown errors: %v", errors))
}
return nil
}
// HealthCheck 检查OTel连接健康状态
func (m *Manager) HealthCheck(ctx context.Context) error {
if m.tracerProvider == nil {
return ErrProviderNotInitialized
}
// 创建一个测试span来验证连接
tracer := m.CreateTracer("healthcheck")
ctx, span := tracer.Start(ctx, "otel-health-check")
defer span.End()
// 添加测试事件
span.AddEvent("health_check_started", oteltrace.WithAttributes(
attribute.String("service", m.config.ServiceName),
attribute.String("endpoint", m.config.CollectorURL),
))
return nil
}
// GetConfig 获取配置
func (m *Manager) GetConfig() *Config {
return m.config.Clone()
}
// GetResource 获取资源信息
func (m *Manager) GetResource() *resource.Resource {
return m.resource
}
// IsTracingEnabled 检查tracing是否启用
func (m *Manager) IsTracingEnabled() bool {
return m.tracerProvider != nil
}
// IsLoggingEnabled 检查logging是否启用
func (m *Manager) IsLoggingEnabled() bool {
return m.logProvider != nil
}
// GetTracerProvider 获取tracer provider
func (m *Manager) GetTracerProvider() *trace.TracerProvider {
return m.tracerProvider
}
// GetLogProvider 获取log provider
func (m *Manager) GetLogProvider() *log.LoggerProvider {
return m.logProvider
}
// CreateTracer 创建新的tracer
func (m *Manager) CreateTracer(name string) oteltrace.Tracer {
if m.tracerProvider != nil {
return m.tracerProvider.Tracer(name)
}
return oteltrace.NewNoopTracerProvider().Tracer(name)
}
// CreateLogger 创建新的logger
func (m *Manager) CreateLogger(name string) otellog.Logger {
if m.logProvider != nil {
return m.logProvider.Logger(name)
}
// Return nil, caller should handle nil case
return nil
}
// GetCollectorURL 获取Collector地址
func (m *Manager) GetCollectorURL() string {
return m.config.CollectorURL
}
// IsRetryEnabled 检查是否启用重试
func (m *Manager) IsRetryEnabled() bool {
return m.config.RetryEnabled
}