Files
kami_walmart_slide/telemetry.py
danial c9b1fc0dea refactor(telemetry): improve code readability and add logging for metrics
Refactor the telemetry.py file to improve readability by breaking long import statements into multiple lines. Additionally, enhance the `record_metric` function by adding logging for successful metric recording and error handling to log failures.
2025-04-05 23:38:04 +08:00

201 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# 获取当前的logger
from logger import get_logger
logger = get_logger()
# 服务名称
SERVICE = os.getenv("OTEL_SERVICE_NAME", "walmart-card-service")
# 追踪器提供者
# 追踪器提供者
tracer_provider = None
# 度量提供者
meter_provider = None
# 全局追踪器
tracer = None
# 全局度量器
meter = None
def setup_telemetry(enable_console_export=False, otlp_endpoint=None):
"""设置OpenTelemetry
Args:
enable_console_export: 是否启用控制台导出器
otlp_endpoint: OTLP导出器端点例如 http://localhost:4317
"""
global tracer_provider, meter_provider, tracer, meter
# 创建资源
resource = Resource.create({SERVICE_NAME: SERVICE})
# 设置追踪器提供者
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
# 设置度量提供者
metric_readers = []
# 添加控制台导出器(用于调试)
if enable_console_export:
# 追踪导出器
console_span_exporter = ConsoleSpanExporter()
tracer_provider.add_span_processor(BatchSpanProcessor(console_span_exporter))
# 度量导出器
console_metric_exporter = ConsoleMetricExporter()
metric_readers.append(PeriodicExportingMetricReader(console_metric_exporter))
# 添加OTLP导出器用于生产环境
if otlp_endpoint:
try:
# 追踪导出器
otlp_span_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_span_exporter))
# 度量导出器
otlp_metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint)
metric_readers.append(PeriodicExportingMetricReader(otlp_metric_exporter))
logger.info(f"已连接到OTLP端点: {otlp_endpoint}")
except Exception as e:
logger.error(f"连接OTLP端点失败: {e}")
# 设置度量提供者
if metric_readers:
meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
metrics.set_meter_provider(meter_provider)
# 获取全局追踪器和度量器
tracer = trace.get_tracer("walmart.tracer")
if meter_provider:
meter = metrics.get_meter("walmart.meter")
# 自动检测Flask和Requests
FlaskInstrumentor().instrument()
RequestsInstrumentor().instrument()
logger.info("OpenTelemetry已初始化")
return tracer, meter
def get_tracer():
"""获取全局追踪器"""
global tracer
if tracer is None:
setup_telemetry()
return tracer
def get_meter():
"""获取全局度量器"""
global meter
return meter
def create_span(name, parent_span=None, attributes=None):
"""创建一个新的span
Args:
name: span名称
parent_span: 父span
attributes: span属性
"""
tracer = get_tracer()
if parent_span:
ctx = trace.set_span_in_context(parent_span)
span = tracer.start_span(name, context=ctx, attributes=attributes)
else:
span = tracer.start_span(name, attributes=attributes)
return span
def end_span(span, status="ok", exception=None):
"""结束一个span
Args:
span: 要结束的span
status: 状态,"ok""error"
exception: 异常信息
"""
if not span:
return
if status == "error" and exception:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))
span.end()
def add_span_event(span, name, attributes=None):
"""向span添加事件
Args:
span: 目标span
name: 事件名称
attributes: 事件属性
"""
if span:
span.add_event(name, attributes=attributes)
def add_span_attribute(span, key, value):
"""向span添加属性
Args:
span: 目标span
key: 属性键
value: 属性值
"""
if span:
span.set_attribute(key, value)
def get_current_span():
"""获取当前活动的span"""
return trace.get_current_span()
def record_metric(name, value, description=None, unit="1", attributes=None):
"""记录度量
Args:
name: 度量名称
value: 度量值
description: 度量描述
unit: 度量单位
attributes: 度量属性
"""
global meter
if meter is None:
logger.warning(f"无法记录指标 {name}meter未初始化")
return
if attributes is None:
attributes = {}
try:
counter = meter.create_counter(name, description=description, unit=unit)
counter.add(value, attributes)
logger.info(f"记录指标:{name}={value}, 单位={unit}, 属性={attributes}")
except Exception as e:
logger.error(f"记录指标 {name} 失败:{e}")