mirror of
https://git.oceanpay.cc/danial/kami_apple_exchage.git
synced 2025-12-18 22:29:09 +00:00
feat(core): 重构 TelemetryManager 并添加日志功能
- 重构 TelemetryManager 类,优化 Tracing 和 Metrics 的初始化逻辑- 添加对 OpenTelemetry 日志功能的支持 - 实现自动化 instrumentation 的设置和管理 - 新增 get_tracer 和 get_meter 函数,简化外部调用 -移除冗余的测试脚本
This commit is contained in:
@@ -1,106 +1,140 @@
|
||||
"""
|
||||
统一的 OpenTelemetry 遥测管理模块
|
||||
集成 Traces、Metrics 和 Logs 的初始化和管理
|
||||
OpenTelemetry 遥测管理模块
|
||||
集成 Traces、Metrics、Logs 的初始化和管理,支持自动化 instrumentation
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import uuid
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# OpenTelemetry Core
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter as OTLPMetricExporterHTTP,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter as OTLPSpanExporterHTTP,
|
||||
)
|
||||
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
||||
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
||||
from opentelemetry._logs import set_logger_provider
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
|
||||
|
||||
# Exporters
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
|
||||
# Metrics
|
||||
from opentelemetry.metrics import set_meter_provider
|
||||
|
||||
# Logs
|
||||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import (
|
||||
ConsoleMetricExporter,
|
||||
PeriodicExportingMetricReader,
|
||||
)
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
ConsoleSpanExporter,
|
||||
)
|
||||
from opentelemetry.sdk.trace.sampling import (
|
||||
ALWAYS_OFF,
|
||||
ALWAYS_ON,
|
||||
ParentBased,
|
||||
TraceIdRatioBased,
|
||||
)
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
||||
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
|
||||
|
||||
# Trace
|
||||
from opentelemetry.trace import set_tracer_provider
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.grpc_optimized_exporters import create_optimized_exporters
|
||||
from app.core.log import get_logger
|
||||
# Auto-instrumentation - import conditionally to handle dependency issues
|
||||
try:
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
except ImportError:
|
||||
FastAPIInstrumentor = None
|
||||
|
||||
logger = get_logger(__name__)
|
||||
try:
|
||||
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
||||
except ImportError:
|
||||
RedisInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||
except ImportError:
|
||||
RequestsInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
||||
except ImportError:
|
||||
HTTPXClientInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
||||
except ImportError:
|
||||
LoggingInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
|
||||
except ImportError:
|
||||
AsyncPGInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
||||
except ImportError:
|
||||
SQLAlchemyInstrumentor = None
|
||||
|
||||
try:
|
||||
from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor
|
||||
except ImportError:
|
||||
SystemMetricsInstrumentor = None
|
||||
|
||||
from app.core.config import get_settings
|
||||
|
||||
|
||||
class TelemetryManager:
|
||||
"""遥测管理器 - 统一管理 OpenTelemetry 功能"""
|
||||
"""OpenTelemetry 遥测管理器 - 统一管理 Traces、Metrics、Logs"""
|
||||
|
||||
def __init__(self):
|
||||
self.settings = get_settings()
|
||||
self._resource: Optional[Resource] = None
|
||||
self._tracer_provider: Optional[TracerProvider] = None
|
||||
self._meter_provider: Optional[MeterProvider] = None
|
||||
self._logger_provider: Optional[LoggerProvider] = None
|
||||
self._initialized = False
|
||||
self._instrumentors: List[Any] = []
|
||||
|
||||
async def initialize(self) -> bool:
|
||||
def initialize(self) -> bool:
|
||||
"""初始化遥测系统"""
|
||||
if self._initialized:
|
||||
logger.warning("遥测系统已经初始化")
|
||||
return True
|
||||
|
||||
if not self.settings.OTEL_ENABLED:
|
||||
logger.info("OpenTelemetry 未启用,跳过初始化")
|
||||
print("OpenTelemetry 未启用,跳过初始化")
|
||||
return False
|
||||
|
||||
try:
|
||||
logger.info("🚀 正在初始化 OpenTelemetry 遥测系统...")
|
||||
print("正在初始化 OpenTelemetry 遥测系统...")
|
||||
|
||||
# 创建资源标识
|
||||
await self._create_resource()
|
||||
self._create_resource()
|
||||
|
||||
# 初始化 Tracing
|
||||
await self._setup_tracing()
|
||||
self._setup_tracing()
|
||||
|
||||
# 初始化 Metrics
|
||||
await self._setup_metrics()
|
||||
self._setup_metrics()
|
||||
|
||||
# 初始化 Logs
|
||||
self._setup_logs()
|
||||
|
||||
# 设置自动化 instrumentation
|
||||
await self._setup_instrumentations()
|
||||
self._setup_instrumentations()
|
||||
|
||||
self._initialized = True
|
||||
logger.info("✅ OpenTelemetry 遥测系统初始化完成")
|
||||
print("OpenTelemetry 遥测系统初始化完成")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ OpenTelemetry 初始化失败: {e}")
|
||||
print(f"OpenTelemetry 初始化失败: {e}")
|
||||
return False
|
||||
|
||||
async def _create_resource(self) -> None:
|
||||
def _create_resource(self) -> None:
|
||||
"""创建资源标识"""
|
||||
# 生成唯一的服务实例ID
|
||||
instance_id = f"{socket.gethostname()}-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建资源属性
|
||||
resource_attributes = {
|
||||
"service.name": self.settings.OTEL_SERVICE_NAME,
|
||||
"service.version": self.settings.OTEL_SERVICE_VERSION,
|
||||
"service.namespace": self.settings.OTEL_SERVICE_NAMESPACE,
|
||||
SERVICE_NAME: self.settings.OTEL_SERVICE_NAME,
|
||||
SERVICE_VERSION: self.settings.OTEL_SERVICE_VERSION,
|
||||
"service.instance.id": instance_id,
|
||||
"deployment.environment": str(self.settings.ENVIRONMENT),
|
||||
"host.name": socket.gethostname(),
|
||||
@@ -108,20 +142,20 @@ class TelemetryManager:
|
||||
}
|
||||
|
||||
self._resource = Resource.create(resource_attributes)
|
||||
logger.info(
|
||||
f"📋 创建服务资源: {self.settings.OTEL_SERVICE_NAME} v{self.settings.OTEL_SERVICE_VERSION}"
|
||||
print(
|
||||
f"创建服务资源: {self.settings.OTEL_SERVICE_NAME} v{self.settings.OTEL_SERVICE_VERSION}"
|
||||
)
|
||||
|
||||
async def _setup_tracing(self) -> None:
|
||||
def _setup_tracing(self) -> None:
|
||||
"""设置 Tracing"""
|
||||
if not self.settings.OTEL_TRACES_ENABLED:
|
||||
logger.info("Tracing 功能已禁用")
|
||||
print("Tracing 功能已禁用")
|
||||
return
|
||||
|
||||
logger.info("🔍 正在设置 OpenTelemetry Tracing...")
|
||||
print("正在设置 OpenTelemetry Tracing...")
|
||||
|
||||
# 创建采样器
|
||||
sampler = self._create_sampler()
|
||||
sampler = TraceIdRatioBased(self.settings.OTEL_SAMPLER_RATIO)
|
||||
|
||||
# 创建 TracerProvider
|
||||
self._tracer_provider = TracerProvider(
|
||||
@@ -129,76 +163,56 @@ class TelemetryManager:
|
||||
sampler=sampler,
|
||||
)
|
||||
|
||||
# 创建并添加 SpanProcessor
|
||||
span_exporter = await self._create_trace_exporter()
|
||||
# 创建 Span Exporter
|
||||
span_exporter = self._create_trace_exporter()
|
||||
if span_exporter:
|
||||
span_processor = BatchSpanProcessor(
|
||||
span_exporter,
|
||||
max_queue_size=self.settings.OTEL_BATCH_MAX_QUEUE_SIZE,
|
||||
max_export_batch_size=self.settings.OTEL_BATCH_MAX_EXPORT_BATCH_SIZE,
|
||||
export_timeout_millis=self.settings.OTEL_BATCH_EXPORT_TIMEOUT,
|
||||
schedule_delay_millis=self.settings.OTEL_BATCH_SCHEDULE_DELAY,
|
||||
max_queue_size=self.settings.OTEL_MAX_QUEUE_SIZE,
|
||||
max_export_batch_size=self.settings.OTEL_BATCH_SIZE,
|
||||
export_timeout_millis=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
schedule_delay_millis=self.settings.OTEL_EXPORT_INTERVAL,
|
||||
)
|
||||
self._tracer_provider.add_span_processor(span_processor)
|
||||
|
||||
# 设置全局 TracerProvider
|
||||
set_tracer_provider(self._tracer_provider)
|
||||
print("OpenTelemetry Tracing 设置完成")
|
||||
|
||||
logger.info("✅ OpenTelemetry Tracing 设置完成")
|
||||
|
||||
def _create_sampler(self):
|
||||
"""创建采样器"""
|
||||
sampler_type = self.settings.OTEL_TRACES_SAMPLER.lower()
|
||||
|
||||
if sampler_type == "always_off":
|
||||
return ALWAYS_OFF
|
||||
elif sampler_type == "always_on":
|
||||
return ALWAYS_ON
|
||||
elif sampler_type == "ratio":
|
||||
base_sampler = TraceIdRatioBased(self.settings.OTEL_TRACES_SAMPLER_RATIO)
|
||||
return ParentBased(root=base_sampler)
|
||||
else:
|
||||
logger.warning(f"未知的采样器类型: {sampler_type},使用默认采样器")
|
||||
return ALWAYS_ON
|
||||
|
||||
async def _create_trace_exporter(self):
|
||||
def _create_trace_exporter(self):
|
||||
"""创建 Trace 导出器"""
|
||||
protocol = self.settings.OTEL_EXPORTER_OTLP_PROTOCOL.lower()
|
||||
if not self.settings.OTEL_EXPORTER_ENDPOINT:
|
||||
print("使用控制台 Trace 导出器")
|
||||
return ConsoleSpanExporter()
|
||||
|
||||
if protocol == "grpc":
|
||||
# 使用优化的 gRPC 导出器
|
||||
exporters = create_optimized_exporters(self.settings)
|
||||
return exporters.get("trace")
|
||||
|
||||
elif protocol in ["http/protobuf", "http"]:
|
||||
return OTLPSpanExporterHTTP(
|
||||
endpoint=self.settings.OTEL_EXPORTER_OTLP_ENDPOINT,
|
||||
headers=self.settings.OTEL_EXPORTER_OTLP_HEADERS,
|
||||
timeout=self.settings.OTEL_EXPORTER_OTLP_TIMEOUT,
|
||||
try:
|
||||
return OTLPSpanExporter(
|
||||
endpoint=self.settings.OTEL_EXPORTER_ENDPOINT,
|
||||
headers=self._get_exporter_headers(),
|
||||
timeout=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"创建 OTLP Trace 导出器失败: {e},fallback 到控制台导出器")
|
||||
return ConsoleSpanExporter()
|
||||
|
||||
else:
|
||||
logger.error(f"不支持的协议类型: {protocol}")
|
||||
return None
|
||||
|
||||
async def _setup_metrics(self) -> None:
|
||||
def _setup_metrics(self) -> None:
|
||||
"""设置 Metrics"""
|
||||
if not self.settings.OTEL_METRICS_ENABLED:
|
||||
logger.info("Metrics 功能已禁用")
|
||||
print("Metrics 功能已禁用")
|
||||
return
|
||||
|
||||
logger.info("📊 正在设置 OpenTelemetry Metrics...")
|
||||
print("正在设置 OpenTelemetry Metrics...")
|
||||
|
||||
# 创建 Metric Exporter
|
||||
metric_exporter = self._create_metric_exporter()
|
||||
|
||||
# 创建 MetricReader
|
||||
metric_readers = []
|
||||
|
||||
# 添加主要的导出器
|
||||
main_exporter = await self._create_metric_exporter()
|
||||
if main_exporter:
|
||||
if metric_exporter:
|
||||
reader = PeriodicExportingMetricReader(
|
||||
exporter=main_exporter,
|
||||
export_interval_millis=self.settings.OTEL_METRICS_EXPORT_INTERVAL,
|
||||
export_timeout_millis=self.settings.OTEL_BATCH_EXPORT_TIMEOUT,
|
||||
exporter=metric_exporter,
|
||||
export_interval_millis=self.settings.OTEL_EXPORT_INTERVAL,
|
||||
export_timeout_millis=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
)
|
||||
metric_readers.append(reader)
|
||||
|
||||
@@ -210,76 +224,235 @@ class TelemetryManager:
|
||||
|
||||
# 设置全局 MeterProvider
|
||||
set_meter_provider(self._meter_provider)
|
||||
print("OpenTelemetry Metrics 设置完成")
|
||||
|
||||
logger.info("✅ OpenTelemetry Metrics 设置完成")
|
||||
|
||||
async def _create_metric_exporter(self):
|
||||
def _create_metric_exporter(self):
|
||||
"""创建 Metric 导出器"""
|
||||
protocol = self.settings.OTEL_EXPORTER_OTLP_PROTOCOL.lower()
|
||||
|
||||
if protocol == "grpc":
|
||||
# 使用优化的 gRPC 导出器
|
||||
exporters = create_optimized_exporters(self.settings)
|
||||
return exporters.get("metric")
|
||||
|
||||
elif protocol in ["http/protobuf", "http"]:
|
||||
return OTLPMetricExporterHTTP(
|
||||
endpoint=self.settings.OTEL_EXPORTER_OTLP_ENDPOINT,
|
||||
headers=self.settings.OTEL_EXPORTER_OTLP_HEADERS,
|
||||
timeout=self.settings.OTEL_EXPORTER_OTLP_TIMEOUT,
|
||||
)
|
||||
|
||||
else:
|
||||
logger.error(f"不支持的协议类型: {protocol}")
|
||||
return None
|
||||
|
||||
async def _setup_instrumentations(self) -> None:
|
||||
"""设置自动化 instrumentation"""
|
||||
logger.info("🔧 正在设置自动化 instrumentation...")
|
||||
if not self.settings.OTEL_EXPORTER_ENDPOINT:
|
||||
print("使用控制台 Metric 导出器")
|
||||
return ConsoleMetricExporter()
|
||||
|
||||
try:
|
||||
# FastAPI instrumentation
|
||||
FastAPIInstrumentor().instrument()
|
||||
logger.info("✅ FastAPI instrumentation 设置完成")
|
||||
return OTLPMetricExporter(
|
||||
endpoint=self.settings.OTEL_EXPORTER_ENDPOINT,
|
||||
headers=self._get_exporter_headers(),
|
||||
timeout=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"创建 OTLP Metric 导出器失败: {e},fallback 到控制台导出器")
|
||||
return ConsoleMetricExporter()
|
||||
|
||||
# HTTP 客户端 instrumentation
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
RequestsInstrumentor().instrument()
|
||||
logger.info("✅ HTTP 客户端 instrumentation 设置完成")
|
||||
def _setup_logs(self) -> None:
|
||||
"""设置 Logs"""
|
||||
if not self.settings.OTEL_LOGS_ENABLED:
|
||||
print("Logs 功能已禁用")
|
||||
return
|
||||
|
||||
# 数据库 instrumentation
|
||||
SQLAlchemyInstrumentor().instrument()
|
||||
AsyncPGInstrumentor().instrument()
|
||||
logger.info("✅ 数据库 instrumentation 设置完成")
|
||||
print("正在设置 OpenTelemetry Logs...")
|
||||
|
||||
# Redis instrumentation
|
||||
RedisInstrumentor().instrument()
|
||||
logger.info("✅ Redis instrumentation 设置完成")
|
||||
# 创建 LoggerProvider
|
||||
self._logger_provider = LoggerProvider(resource=self._resource)
|
||||
|
||||
# 创建 Log Exporter
|
||||
log_exporter = self._create_log_exporter()
|
||||
if log_exporter:
|
||||
log_processor = BatchLogRecordProcessor(
|
||||
log_exporter,
|
||||
max_queue_size=self.settings.OTEL_MAX_QUEUE_SIZE,
|
||||
max_export_batch_size=self.settings.OTEL_BATCH_SIZE,
|
||||
export_timeout_millis=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
schedule_delay_millis=self.settings.OTEL_EXPORT_INTERVAL,
|
||||
)
|
||||
self._logger_provider.add_log_record_processor(log_processor)
|
||||
|
||||
# 设置全局 LoggerProvider
|
||||
set_logger_provider(self._logger_provider)
|
||||
|
||||
# 设置标准库日志桥接
|
||||
self._setup_logging_bridge()
|
||||
print("OpenTelemetry Logs 设置完成")
|
||||
|
||||
def _create_log_exporter(self):
|
||||
"""创建 Log 导出器"""
|
||||
if not self.settings.OTEL_EXPORTER_ENDPOINT:
|
||||
print("Log exporter endpoint 未配置,跳过 Log 导出")
|
||||
return None
|
||||
|
||||
try:
|
||||
return OTLPLogExporter(
|
||||
endpoint=self.settings.OTEL_EXPORTER_ENDPOINT,
|
||||
headers=self._get_exporter_headers(),
|
||||
timeout=self.settings.OTEL_EXPORTER_TIMEOUT,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"创建 OTLP Log 导出器失败: {e}")
|
||||
return None
|
||||
|
||||
def _setup_logging_bridge(self) -> None:
|
||||
"""设置标准库日志到 OpenTelemetry 的桥接"""
|
||||
if not self._logger_provider:
|
||||
return
|
||||
|
||||
try:
|
||||
handler = LoggingHandler(logger_provider=self._logger_provider)
|
||||
|
||||
# 配置根日志器
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.addHandler(handler)
|
||||
root_logger.setLevel(logging.INFO)
|
||||
|
||||
print("OpenTelemetry 日志桥接设置完成")
|
||||
except Exception as e:
|
||||
print(f"设置日志桥接失败: {e}")
|
||||
|
||||
def _setup_instrumentations(self) -> None:
|
||||
"""设置自动化 instrumentation"""
|
||||
print("正在设置自动化 instrumentation...")
|
||||
|
||||
try:
|
||||
# Standard library logging instrumentation
|
||||
if (
|
||||
LoggingInstrumentor
|
||||
and not LoggingInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
LoggingInstrumentor().instrument(set_logging_format=True)
|
||||
self._instrumentors.append(LoggingInstrumentor)
|
||||
print("Logging instrumentation 设置完成")
|
||||
|
||||
# Requests instrumentation
|
||||
if (
|
||||
RequestsInstrumentor
|
||||
and not RequestsInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
RequestsInstrumentor().instrument()
|
||||
self._instrumentors.append(RequestsInstrumentor)
|
||||
print("Requests instrumentation 设置完成")
|
||||
|
||||
# HTTPX instrumentation
|
||||
if (
|
||||
HTTPXClientInstrumentor
|
||||
and not HTTPXClientInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
self._instrumentors.append(HTTPXClientInstrumentor)
|
||||
print("HTTPX instrumentation 设置完成")
|
||||
|
||||
# System metrics instrumentation
|
||||
if self.settings.OTEL_METRICS_ENABLED and SystemMetricsInstrumentor:
|
||||
try:
|
||||
SystemMetricsInstrumentor().instrument()
|
||||
self._instrumentors.append(SystemMetricsInstrumentor)
|
||||
print("System metrics instrumentation 设置完成")
|
||||
except Exception as e:
|
||||
print(f"System metrics instrumentation 失败: {e}")
|
||||
|
||||
print("自动化 instrumentation 设置完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"某些 instrumentation 设置失败: {e}")
|
||||
print(f"设置 instrumentation 失败: {e}")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
def instrument_app(self, app) -> None:
|
||||
"""为应用设置 instrumentation"""
|
||||
try:
|
||||
# FastAPI instrumentation
|
||||
if (
|
||||
FastAPIInstrumentor
|
||||
and not FastAPIInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
FastAPIInstrumentor.instrument_app(app)
|
||||
self._instrumentors.append(FastAPIInstrumentor)
|
||||
print("FastAPI instrumentation 设置完成")
|
||||
elif not FastAPIInstrumentor:
|
||||
print("FastAPI instrumentation 不可用 - 依赖缺失")
|
||||
except Exception as e:
|
||||
print(f"FastAPI instrumentation 失败: {e}")
|
||||
|
||||
def instrument_database(self, engine=None, connection_string=None) -> None:
|
||||
"""为数据库设置 instrumentation"""
|
||||
try:
|
||||
# SQLAlchemy instrumentation
|
||||
if (
|
||||
engine
|
||||
and SQLAlchemyInstrumentor
|
||||
and not SQLAlchemyInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
SQLAlchemyInstrumentor().instrument(engine=engine)
|
||||
self._instrumentors.append(SQLAlchemyInstrumentor)
|
||||
print("SQLAlchemy instrumentation 设置完成")
|
||||
|
||||
# AsyncPG instrumentation
|
||||
if (
|
||||
connection_string
|
||||
and AsyncPGInstrumentor
|
||||
and not AsyncPGInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
AsyncPGInstrumentor().instrument()
|
||||
self._instrumentors.append(AsyncPGInstrumentor)
|
||||
print("AsyncPG instrumentation 设置完成")
|
||||
|
||||
if not SQLAlchemyInstrumentor:
|
||||
print("SQLAlchemy instrumentation 不可用 - 依赖缺失")
|
||||
if not AsyncPGInstrumentor:
|
||||
print("AsyncPG instrumentation 不可用 - 依赖缺失")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Database instrumentation 失败: {e}")
|
||||
|
||||
def instrument_redis(self, redis_client=None) -> None:
|
||||
"""为 Redis 设置 instrumentation"""
|
||||
try:
|
||||
if (
|
||||
RedisInstrumentor
|
||||
and not RedisInstrumentor().is_instrumented_by_opentelemetry
|
||||
):
|
||||
RedisInstrumentor().instrument()
|
||||
self._instrumentors.append(RedisInstrumentor)
|
||||
print("Redis instrumentation 设置完成")
|
||||
elif not RedisInstrumentor:
|
||||
print("Redis instrumentation 不可用 - 依赖缺失")
|
||||
except Exception as e:
|
||||
print(f"Redis instrumentation 失败: {e}")
|
||||
|
||||
def _get_exporter_headers(self) -> Dict[str, str]:
|
||||
"""获取导出器头信息"""
|
||||
headers = {}
|
||||
if auth_header := os.getenv("OTEL_EXPORTER_OTLP_HEADERS"):
|
||||
for header in auth_header.split(","):
|
||||
if "=" in header:
|
||||
key, value = header.split("=", 1)
|
||||
headers[key.strip()] = value.strip()
|
||||
return headers
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""关闭遥测系统"""
|
||||
if not self._initialized:
|
||||
return
|
||||
|
||||
logger.info("🔒 正在关闭 OpenTelemetry 遥测系统...")
|
||||
print("正在关闭 OpenTelemetry 遥测系统...")
|
||||
|
||||
try:
|
||||
# 关闭 TracerProvider
|
||||
# 取消 instrumentation
|
||||
for instrumentor_class in self._instrumentors:
|
||||
try:
|
||||
instrumentor_class().uninstrument()
|
||||
except Exception as e:
|
||||
print(
|
||||
f"取消 {instrumentor_class.__name__} instrumentation 失败: {e}"
|
||||
)
|
||||
|
||||
# 关闭 providers
|
||||
if self._tracer_provider:
|
||||
self._tracer_provider.shutdown()
|
||||
|
||||
# 关闭 MeterProvider
|
||||
if self._meter_provider:
|
||||
self._meter_provider.shutdown()
|
||||
if self._logger_provider:
|
||||
self._logger_provider.shutdown()
|
||||
|
||||
self._initialized = False
|
||||
logger.info("✅ OpenTelemetry 遥测系统已关闭")
|
||||
print("OpenTelemetry 遥测系统已关闭")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"关闭遥测系统时发生错误: {e}")
|
||||
print(f"关闭遥测系统时发生错误: {e}")
|
||||
|
||||
def get_tracer(self, name: str = __name__):
|
||||
"""获取 Tracer"""
|
||||
@@ -307,13 +480,34 @@ def get_telemetry_manager() -> TelemetryManager:
|
||||
return _telemetry_manager
|
||||
|
||||
|
||||
async def initialize_telemetry() -> bool:
|
||||
def initialize_telemetry() -> bool:
|
||||
"""初始化遥测系统"""
|
||||
manager = get_telemetry_manager()
|
||||
return await manager.initialize()
|
||||
return manager.initialize()
|
||||
|
||||
|
||||
async def shutdown_telemetry() -> None:
|
||||
def shutdown_telemetry() -> None:
|
||||
"""关闭遥测系统"""
|
||||
manager = get_telemetry_manager()
|
||||
await manager.shutdown()
|
||||
manager.shutdown()
|
||||
|
||||
|
||||
def get_tracer(name: str = __name__):
|
||||
"""获取 Tracer"""
|
||||
return trace.get_tracer(name)
|
||||
|
||||
|
||||
def get_meter(name: str = __name__):
|
||||
"""获取 Meter"""
|
||||
return metrics.get_meter(name)
|
||||
|
||||
|
||||
# 导出的公共API
|
||||
__all__ = [
|
||||
"TelemetryManager",
|
||||
"get_telemetry_manager",
|
||||
"initialize_telemetry",
|
||||
"shutdown_telemetry",
|
||||
"get_tracer",
|
||||
"get_meter",
|
||||
]
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test script to check arq context API"""
|
||||
|
||||
import asyncio
|
||||
from arq.worker import Worker, create_worker
|
||||
from arq.connections import RedisSettings
|
||||
from arq import cron
|
||||
|
||||
|
||||
async def test_task(ctx):
|
||||
"""Test task to check context attributes"""
|
||||
print(f"Context type: {type(ctx)}")
|
||||
print(f"Context attributes: {dir(ctx)}")
|
||||
|
||||
# Check for common context attributes
|
||||
if hasattr(ctx, "job_id"):
|
||||
print(f"job_id: {ctx.job_id}")
|
||||
|
||||
if hasattr(ctx, "retry"):
|
||||
print("retry method exists")
|
||||
# Test if retry is callable
|
||||
if callable(ctx.retry):
|
||||
print("retry is callable")
|
||||
|
||||
if hasattr(ctx, "redis"):
|
||||
print("redis connection exists")
|
||||
|
||||
return {"success": True, "tested_attributes": True}
|
||||
|
||||
|
||||
async def main():
|
||||
print("Testing arq context API...")
|
||||
print(f"arq version: {__import__('arq').__version__}")
|
||||
|
||||
# Create a simple worker to test context
|
||||
worker = Worker(
|
||||
functions=[test_task],
|
||||
redis_settings=RedisSettings(host="localhost"),
|
||||
max_jobs=1,
|
||||
)
|
||||
|
||||
print("Worker created successfully")
|
||||
|
||||
# Test cron function
|
||||
try:
|
||||
cron_job = cron(test_task, name="test_cron", minute="*")
|
||||
print(f"Cron job created: {cron_job}")
|
||||
except Exception as e:
|
||||
print(f"Cron error: {e}")
|
||||
|
||||
print("\nContext API test completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,69 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Detailed test script to check arq context API"""
|
||||
|
||||
import asyncio
|
||||
from arq.worker import Worker
|
||||
from arq.connections import RedisSettings
|
||||
|
||||
|
||||
async def detailed_task(ctx):
|
||||
"""Detailed task to check context attributes"""
|
||||
print("=== CONTEXT DETAILED INSPECTION ===")
|
||||
print(f"Context type: {type(ctx)}")
|
||||
|
||||
# Get all attributes
|
||||
attributes = [attr for attr in dir(ctx) if not attr.startswith("_")]
|
||||
print(f"Public attributes: {attributes}")
|
||||
|
||||
# Check specific important attributes
|
||||
important_attrs = ["job_id", "retry", "redis", "job_try", "enqueue_time"]
|
||||
for attr in important_attrs:
|
||||
if hasattr(ctx, attr):
|
||||
value = getattr(ctx, attr)
|
||||
print(f"{attr}: {value} (type: {type(value)})")
|
||||
if callable(value):
|
||||
print(f" {attr} is callable")
|
||||
else:
|
||||
print(f"{attr}: NOT FOUND")
|
||||
|
||||
# Test retry functionality if it exists
|
||||
if hasattr(ctx, "retry") and callable(ctx.retry):
|
||||
try:
|
||||
# Just check if we can call it, but don't actually retry
|
||||
retry_info = ctx.retry.__doc__ or "No docstring"
|
||||
print(f"retry doc: {retry_info[:100]}...")
|
||||
except Exception as e:
|
||||
print(f"retry check error: {e}")
|
||||
|
||||
return {"success": True, "detailed_inspection": True}
|
||||
|
||||
|
||||
async def main():
|
||||
print("Running detailed arq context inspection...")
|
||||
|
||||
# Create worker with our test task
|
||||
worker = Worker(
|
||||
functions=[detailed_task],
|
||||
redis_settings=RedisSettings(host="localhost"),
|
||||
max_jobs=1,
|
||||
)
|
||||
|
||||
print("Worker setup complete")
|
||||
print(
|
||||
"Note: To see actual context attributes, this would need to run in a real arq worker process"
|
||||
)
|
||||
print("This test confirms basic arq functionality works")
|
||||
|
||||
# Check if we can access the retry mechanism through imports
|
||||
try:
|
||||
from arq.jobs import Job
|
||||
|
||||
print("Job class available")
|
||||
except ImportError as e:
|
||||
print(f"Job import error: {e}")
|
||||
|
||||
print("\nBasic arq setup is working correctly")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,96 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
简单的Gunicorn测试脚本
|
||||
用于验证Docker容器中的Gunicorn配置
|
||||
"""
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# 模拟Gunicorn配置
|
||||
bind = "0.0.0.0:8000"
|
||||
workers = max(1, multiprocessing.cpu_count() * 2 + 1)
|
||||
worker_class = "uvicorn.workers.UvicornWorker"
|
||||
worker_connections = 1000
|
||||
timeout = 30
|
||||
keepalive = 2
|
||||
|
||||
|
||||
def test_imports():
|
||||
"""测试必要的模块导入"""
|
||||
try:
|
||||
import gunicorn
|
||||
import uvicorn
|
||||
|
||||
print("✅ Uvicorn和Gunicorn导入成功")
|
||||
return True
|
||||
except ImportError as e:
|
||||
print(f"❌ 模块导入失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def test_arq():
|
||||
"""测试Arq worker配置"""
|
||||
try:
|
||||
from app.core.arq_worker import ArqWorkerSettings
|
||||
|
||||
settings = ArqWorkerSettings()
|
||||
print("✅ Arq worker配置成功")
|
||||
return True
|
||||
except ImportError as e:
|
||||
print(f"❌ Arq worker配置失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def test_database():
|
||||
"""测试数据库连接"""
|
||||
try:
|
||||
from app.core.database import get_database
|
||||
|
||||
db = get_database()
|
||||
print("✅ 数据库连接配置成功")
|
||||
return True
|
||||
except ImportError as e:
|
||||
print(f"❌ 数据库配置失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
"""主测试函数"""
|
||||
print("🚀 开始测试Docker容器环境...")
|
||||
|
||||
tests = [
|
||||
test_imports,
|
||||
test_arq,
|
||||
test_database,
|
||||
]
|
||||
|
||||
passed = 0
|
||||
failed = 0
|
||||
|
||||
for test in tests:
|
||||
try:
|
||||
if test():
|
||||
passed += 1
|
||||
else:
|
||||
failed += 1
|
||||
except Exception as e:
|
||||
print(f"❌ 测试异常: {e}")
|
||||
failed += 1
|
||||
|
||||
print(f"\n📊 测试结果: {passed} 通过, {failed} 失败")
|
||||
|
||||
if failed == 0:
|
||||
print("🎉 所有测试通过! 容器准备就绪。")
|
||||
return 0
|
||||
else:
|
||||
print("⚠️ 有测试失败,请检查配置。")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,42 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
DEPRECATED: This test file for Celery has been replaced with Arq.
|
||||
Please use app/tasks/crawler_tasks_arq.py for coroutine-based task processing.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# 添加项目根目录到Python路径
|
||||
project_root = Path(__file__).parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
|
||||
def test_arq_task_registration():
|
||||
"""测试Arq任务注册"""
|
||||
try:
|
||||
print("正在导入 Arq worker配置...")
|
||||
from app.core.arq_worker import ArqWorkerSettings
|
||||
|
||||
settings = ArqWorkerSettings()
|
||||
|
||||
print(f"已注册的函数数量: {len(settings.functions)}")
|
||||
print("已注册的函数:")
|
||||
for func in settings.functions:
|
||||
print(f" ✓ {func.__name__}")
|
||||
|
||||
print("\n✅ Arq worker配置成功!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Arq任务配置测试失败: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("注意:Celery已被Arq替代")
|
||||
success = test_arq_task_registration()
|
||||
sys.exit(0 if success else 1)
|
||||
@@ -1,46 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Simple test to understand arq worker usage"""
|
||||
|
||||
import asyncio
|
||||
from arq.worker import Worker
|
||||
from arq.connections import RedisSettings
|
||||
|
||||
|
||||
def simple_task(ctx):
|
||||
"""Simple test task"""
|
||||
print(f"Task executed with job_id: {ctx.job_id}")
|
||||
return {"success": True}
|
||||
|
||||
|
||||
async def test_worker():
|
||||
"""Test creating and running a worker"""
|
||||
|
||||
print("Testing arq worker creation...")
|
||||
|
||||
# Try creating a worker
|
||||
worker = Worker(
|
||||
functions=[simple_task],
|
||||
redis_settings=RedisSettings(host="localhost"),
|
||||
max_jobs=5,
|
||||
)
|
||||
|
||||
print("Worker created successfully")
|
||||
print("Worker type:", type(worker))
|
||||
|
||||
# Check if worker has run method
|
||||
if hasattr(worker, "run"):
|
||||
print("Worker has 'run' method")
|
||||
sig = str(type(worker.run))
|
||||
print("run method type:", sig)
|
||||
|
||||
if hasattr(worker, "main"):
|
||||
print("Worker has 'main' method")
|
||||
|
||||
if hasattr(worker, "start"):
|
||||
print("Worker has 'start' method")
|
||||
|
||||
return worker
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_worker())
|
||||
Reference in New Issue
Block a user