Files
kami_merchant/internal/otelTrace/init.go

318 lines
9.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package otelTrace
import (
"context"
"log/slog"
"net/http"
"os"
"github.com/beego/beego/v2/core/config/env"
"github.com/natefinch/lumberjack"
"github.com/beego/beego/v2/server/web"
beecontext "github.com/beego/beego/v2/server/web/context"
"go.opentelemetry.io/contrib/bridges/otelzap"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkMetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"time"
"go.opentelemetry.io/otel/trace"
)
// InitCtx 初始化ctx
var (
InitCtx = context.Background()
)
type LoggerStruct struct {
logger *zap.Logger
}
// WithContext 为日志记录器添加上下文信息。
// 如果上下文中包含跟踪 id则将其添加到日志中以增强可追踪性。
func (l *LoggerStruct) WithContext(ctx context.Context) *zap.Logger {
// 如果上下文为空,直接返回日志记录器,不进行任何修改。
if ctx == nil {
return l.logger
}
// 从 context 中获取 span
// 从 context 中获取 span
span := trace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
if l.logger == nil {
return zap.NewNop()
}
return l.logger
}
return l.logger.With(zap.Reflect("ctx", ctx)).With(zap.String("span", span.SpanContext().TraceID().String()))
}
var (
serviceName = "网关服务——" + env.Get("serverName", "")
collectorURL = "otel-collector.kkknametrans.buzz"
Logger *LoggerStruct // 添加全局 logger
)
func InitTracer() (func(context.Context) error, func(context.Context) error, func(context.Context) error) {
traceExporter, err := otlptrace.New(
InitCtx,
otlptracehttp.NewClient(
otlptracehttp.WithInsecure(),
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
otlptracehttp.WithEndpoint(collectorURL),
),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create trace exporter", slog.String("error", err.Error()))
return nil, nil, nil
}
resources, err := resource.New(
InitCtx,
resource.WithAttributes(
attribute.String("service.name", serviceName),
attribute.String("library.language", "go"),
),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create resources", slog.String("error", err.Error()))
return nil, nil, nil
}
otel.SetTracerProvider(
sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(traceExporter,
sdktrace.WithBatchTimeout(30*time.Second),
sdktrace.WithMaxExportBatchSize(100),
)),
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(resources),
),
)
// Set up meter provider.
metricExporter, err := otlpmetrichttp.New(
InitCtx,
otlpmetrichttp.WithInsecure(),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
otlpmetrichttp.WithEndpoint(collectorURL))
if err != nil {
slog.ErrorContext(InitCtx, "failed to create trace exporter", slog.String("error", err.Error()))
return nil, nil, nil
}
otel.SetMeterProvider(
sdkMetric.NewMeterProvider(
sdkMetric.WithReader(
sdkMetric.NewPeriodicReader(
metricExporter,
sdkMetric.WithInterval(30*time.Second),
),
),
sdkMetric.WithResource(resources),
),
)
logExporter, err := otlploghttp.New(
InitCtx,
otlploghttp.WithCompression(otlploghttp.GzipCompression),
otlploghttp.WithInsecure(),
otlploghttp.WithEndpoint(collectorURL),
)
if err != nil {
slog.ErrorContext(InitCtx, "failed to create log exporter", slog.String("error", err.Error()))
return nil, nil, nil
}
loggerProvider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(
sdklog.NewBatchProcessor(logExporter, sdklog.WithExportMaxBatchSize(100)),
),
sdklog.WithResource(resources),
)
global.SetLoggerProvider(loggerProvider)
// 自定义日志格式配置
encoderConfig := zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder, // 使用大写字母记录日志级别
EncodeTime: zapcore.TimeEncoderOfLayout(time.DateTime), // ISO8601 时间格式
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder, // 短路径编码器
}
lumberjacklogger := &lumberjack.Logger{
Filename: "./logs/log-rotate-test.log",
MaxSize: 10, //MB
MaxBackups: 20,
MaxAge: 15, //days
Compress: true, // disabled by default
}
defer lumberjacklogger.Close()
// 日志需要保存在本地,并且每日更新
core := zapcore.NewTee(
otelzap.NewCore(serviceName, otelzap.WithLoggerProvider(loggerProvider)),
zapcore.NewCore(
zapcore.NewConsoleEncoder(encoderConfig),
zapcore.NewMultiWriteSyncer(
zapcore.AddSync(os.Stdout),
zapcore.AddSync(lumberjacklogger),
),
zap.InfoLevel,
),
)
logger := zap.New(core,
zap.AddCaller(),
zap.AddStacktrace(zap.ErrorLevel),
)
// zap设置标准输出流
// 设置全局 logger
Logger = &LoggerStruct{
logger: logger,
}
// 确保设置 TextMapPropagator
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
)
return traceExporter.Shutdown, metricExporter.Shutdown, logExporter.Shutdown
}
// Middleware 从请求头中提取上游的 trace context
// 并将其注入到当前的 context 中
// 这样下游的服务就可以从 context 中获取到上游的 trace context
// 这样就可以在日志中看到上游的 trace Context
func Middleware(ctx *beecontext.Context, next web.FilterFunc) {
req := ctx.Request
// 从请求头中提取上游的 trace context
propagator := otel.GetTextMapPropagator()
parentCtx := propagator.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
routeTrace := otel.Tracer("router")
spanCtx, span := routeTrace.Start(parentCtx, req.URL.Path,
trace.WithAttributes(
attribute.String("http.method", req.Method),
attribute.String("http.url", req.URL.String()),
attribute.String("peer.hostname", req.Host),
attribute.String("http.scheme", getScheme(req)),
attribute.String("component", "beego"),
attribute.String("peer.address", req.RemoteAddr),
attribute.String("span.kind", "server"),
attribute.String("peer.service", serviceName),
attribute.String("http.flavor", req.Proto),
attribute.String("http.user_agent", req.UserAgent()),
attribute.String("http.target", req.URL.Path),
),
)
// 合并所有 defer 处理到一个函数中
defer func() {
// 先处理状态码
if ctx.Request.Response != nil {
span.SetAttributes(attribute.Int("http.status_code", ctx.Request.Response.StatusCode))
}
// 处理 panic
if err := recover(); err != nil {
span.RecordError(err.(error))
span.SetAttributes(attribute.String("error", "true"))
Logger.WithContext(ctx.Request.Context()).Error("全局错误", zap.Any("error", err))
// 结束 span
span.End()
// 重新抛出异常
panic(err)
}
// 正常情况下结束 span
span.End()
}()
ctx.Request = ctx.Request.WithContext(spanCtx)
next(ctx)
}
// Span 抽象span
func Span(ctx context.Context, traceName, spanName string, attr ...trace.SpanStartOption) (context.Context, func()) {
ctx, span := otel.Tracer(traceName).Start(ctx, spanName, attr...)
return ctx, func() {
defer span.End()
}
}
func getScheme(req *http.Request) string {
scheme := req.Header.Get("X-Forwarded-Proto")
if scheme == "" && req.TLS != nil {
scheme = "https"
} else if scheme == "" {
scheme = "http"
}
return scheme
}
func NewSchedulerTrace(opts ...trace.TracerOption) trace.Tracer {
return otel.Tracer("scheduler", opts...)
}
func init() {
// 初始化日志
// 自定义日志格式配置
encoderConfig := zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder, // 使用大写字母记录日志级别
EncodeTime: zapcore.TimeEncoderOfLayout(time.DateTime), // ISO8601 时间格式
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder, // 短路径编码器
}
lumberjacklogger := &lumberjack.Logger{
Filename: "./logs/log-rotate-test.log",
MaxSize: 10, //MB
MaxBackups: 20,
MaxAge: 15, //days
Compress: true, // disabled by default
}
defer lumberjacklogger.Close()
// 创建核心
core := zapcore.NewTee(
zapcore.NewCore(
zapcore.NewConsoleEncoder(encoderConfig),
zapcore.NewMultiWriteSyncer(
zapcore.AddSync(os.Stdout),
zapcore.AddSync(lumberjacklogger),
),
zap.InfoLevel,
),
)
logger := zap.New(core,
zap.AddCaller(),
// zap.AddCallerSkip(1),
zap.AddStacktrace(zap.ErrorLevel),
)
// zap设置标准输出流
// 设置全局 logger
Logger = &LoggerStruct{
logger: logger,
}
}