mirror of
https://git.oceanpay.cc/danial/kami_walmart_slide.git
synced 2025-12-18 22:15:38 +00:00
Refactor the telemetry.py file to improve readability by breaking long import statements into multiple lines. Additionally, enhance the `record_metric` function by adding logging for successful metric recording and error handling to log failures.
201 lines
5.6 KiB
Python
201 lines
5.6 KiB
Python
import os
|
||
from opentelemetry import trace, metrics
|
||
from opentelemetry.sdk.trace import TracerProvider
|
||
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
||
from opentelemetry.sdk.metrics import MeterProvider
|
||
from opentelemetry.sdk.metrics.export import (
|
||
ConsoleMetricExporter,
|
||
PeriodicExportingMetricReader,
|
||
)
|
||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
|
||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
|
||
from opentelemetry.instrumentation.flask import FlaskInstrumentor
|
||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||
|
||
# 获取当前的logger
|
||
from logger import get_logger
|
||
|
||
logger = get_logger()
|
||
|
||
# 服务名称
|
||
SERVICE = os.getenv("OTEL_SERVICE_NAME", "walmart-card-service")
|
||
|
||
# 追踪器提供者
|
||
|
||
# 追踪器提供者
|
||
tracer_provider = None
|
||
# 度量提供者
|
||
meter_provider = None
|
||
# 全局追踪器
|
||
tracer = None
|
||
# 全局度量器
|
||
meter = None
|
||
|
||
|
||
def setup_telemetry(enable_console_export=False, otlp_endpoint=None):
|
||
"""设置OpenTelemetry
|
||
|
||
Args:
|
||
enable_console_export: 是否启用控制台导出器
|
||
otlp_endpoint: OTLP导出器端点,例如 http://localhost:4317
|
||
"""
|
||
global tracer_provider, meter_provider, tracer, meter
|
||
|
||
# 创建资源
|
||
resource = Resource.create({SERVICE_NAME: SERVICE})
|
||
|
||
# 设置追踪器提供者
|
||
tracer_provider = TracerProvider(resource=resource)
|
||
trace.set_tracer_provider(tracer_provider)
|
||
|
||
# 设置度量提供者
|
||
metric_readers = []
|
||
|
||
# 添加控制台导出器(用于调试)
|
||
if enable_console_export:
|
||
# 追踪导出器
|
||
console_span_exporter = ConsoleSpanExporter()
|
||
tracer_provider.add_span_processor(BatchSpanProcessor(console_span_exporter))
|
||
|
||
# 度量导出器
|
||
console_metric_exporter = ConsoleMetricExporter()
|
||
metric_readers.append(PeriodicExportingMetricReader(console_metric_exporter))
|
||
|
||
# 添加OTLP导出器(用于生产环境)
|
||
if otlp_endpoint:
|
||
try:
|
||
# 追踪导出器
|
||
otlp_span_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
|
||
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_span_exporter))
|
||
|
||
# 度量导出器
|
||
otlp_metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint)
|
||
metric_readers.append(PeriodicExportingMetricReader(otlp_metric_exporter))
|
||
|
||
logger.info(f"已连接到OTLP端点: {otlp_endpoint}")
|
||
except Exception as e:
|
||
logger.error(f"连接OTLP端点失败: {e}")
|
||
|
||
# 设置度量提供者
|
||
if metric_readers:
|
||
meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
|
||
metrics.set_meter_provider(meter_provider)
|
||
|
||
# 获取全局追踪器和度量器
|
||
tracer = trace.get_tracer("walmart.tracer")
|
||
if meter_provider:
|
||
meter = metrics.get_meter("walmart.meter")
|
||
|
||
# 自动检测Flask和Requests
|
||
FlaskInstrumentor().instrument()
|
||
RequestsInstrumentor().instrument()
|
||
|
||
logger.info("OpenTelemetry已初始化")
|
||
return tracer, meter
|
||
|
||
|
||
def get_tracer():
|
||
"""获取全局追踪器"""
|
||
global tracer
|
||
if tracer is None:
|
||
setup_telemetry()
|
||
return tracer
|
||
|
||
|
||
def get_meter():
|
||
"""获取全局度量器"""
|
||
global meter
|
||
return meter
|
||
|
||
|
||
def create_span(name, parent_span=None, attributes=None):
|
||
"""创建一个新的span
|
||
|
||
Args:
|
||
name: span名称
|
||
parent_span: 父span
|
||
attributes: span属性
|
||
"""
|
||
tracer = get_tracer()
|
||
if parent_span:
|
||
ctx = trace.set_span_in_context(parent_span)
|
||
span = tracer.start_span(name, context=ctx, attributes=attributes)
|
||
else:
|
||
span = tracer.start_span(name, attributes=attributes)
|
||
return span
|
||
|
||
|
||
def end_span(span, status="ok", exception=None):
|
||
"""结束一个span
|
||
|
||
Args:
|
||
span: 要结束的span
|
||
status: 状态,"ok"或"error"
|
||
exception: 异常信息
|
||
"""
|
||
if not span:
|
||
return
|
||
|
||
if status == "error" and exception:
|
||
span.record_exception(exception)
|
||
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))
|
||
|
||
span.end()
|
||
|
||
|
||
def add_span_event(span, name, attributes=None):
|
||
"""向span添加事件
|
||
|
||
Args:
|
||
span: 目标span
|
||
name: 事件名称
|
||
attributes: 事件属性
|
||
"""
|
||
if span:
|
||
span.add_event(name, attributes=attributes)
|
||
|
||
|
||
def add_span_attribute(span, key, value):
|
||
"""向span添加属性
|
||
|
||
Args:
|
||
span: 目标span
|
||
key: 属性键
|
||
value: 属性值
|
||
"""
|
||
if span:
|
||
span.set_attribute(key, value)
|
||
|
||
|
||
def get_current_span():
|
||
"""获取当前活动的span"""
|
||
return trace.get_current_span()
|
||
|
||
|
||
def record_metric(name, value, description=None, unit="1", attributes=None):
|
||
"""记录度量
|
||
|
||
Args:
|
||
name: 度量名称
|
||
value: 度量值
|
||
description: 度量描述
|
||
unit: 度量单位
|
||
attributes: 度量属性
|
||
"""
|
||
global meter
|
||
if meter is None:
|
||
logger.warning(f"无法记录指标 {name}:meter未初始化")
|
||
return
|
||
|
||
if attributes is None:
|
||
attributes = {}
|
||
|
||
try:
|
||
counter = meter.create_counter(name, description=description, unit=unit)
|
||
counter.add(value, attributes)
|
||
logger.info(f"记录指标:{name}={value}, 单位={unit}, 属性={attributes}")
|
||
except Exception as e:
|
||
logger.error(f"记录指标 {name} 失败:{e}")
|