""" OpenTelemetry tracing configuration with gRPC exporter. Provides distributed tracing for the application. """ from typing import Optional from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider, SpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.sdk.trace.sampling import TraceIdRatioBased from core.config import settings tracer_provider: Optional[TracerProvider] = None span_processor: Optional[SpanProcessor] = None def init_tracing() -> trace.Tracer: """ Initialize OpenTelemetry tracing with gRPC exporter. This should be called at application startup. Returns: trace.Tracer: Configured tracer instance """ global tracer_provider, span_processor if not settings.otel_enabled: # Return no-op tracer if OpenTelemetry is disabled return trace.get_tracer(__name__) # Create resource with service name resource = Resource(attributes={ SERVICE_NAME: settings.otel_service_name }) # Create sampler based on sample rate sampler = TraceIdRatioBased(settings.otel_sample_rate) # Create tracer provider tracer_provider = TracerProvider( resource=resource, sampler=sampler ) # Create OTLP gRPC exporter otlp_exporter = OTLPSpanExporter( endpoint=settings.otel_exporter_endpoint, insecure=settings.otel_exporter_insecure ) # Create batch span processor span_processor = BatchSpanProcessor(otlp_exporter) tracer_provider.add_span_processor(span_processor) # Set global tracer provider trace.set_tracer_provider(tracer_provider) # Return tracer return trace.get_tracer(__name__) def instrument_app(app) -> None: """ Instrument FastAPI application with OpenTelemetry. Args: app: FastAPI application instance """ if not settings.otel_enabled: return # Instrument FastAPI FastAPIInstrumentor.instrument_app(app) # Instrument HTTP client HTTPXClientInstrumentor().instrument() # Redis instrumentation is done when Redis client is created # SQLAlchemy instrumentation is done when engine is created def instrument_sqlalchemy(engine) -> None: """ Instrument SQLAlchemy engine with OpenTelemetry. Args: engine: SQLAlchemy engine instance """ if not settings.otel_enabled: return SQLAlchemyInstrumentor().instrument( engine=engine.sync_engine ) def instrument_redis() -> None: """ Instrument Redis client with OpenTelemetry. """ if not settings.otel_enabled: return RedisInstrumentor().instrument() async def shutdown_tracing() -> None: """ Shutdown tracing and flush remaining spans. This should be called at application shutdown. """ global tracer_provider, span_processor if span_processor: span_processor.shutdown() if tracer_provider: tracer_provider.shutdown() def get_tracer(name: str = __name__) -> trace.Tracer: """ Get tracer instance. Args: name: Tracer name (usually __name__) Returns: trace.Tracer: Tracer instance """ return trace.get_tracer(name) def get_current_span() -> trace.Span: """ Get current active span. Returns: trace.Span: Current span """ return trace.get_current_span() def get_trace_id() -> str: """ Get current trace ID as hex string. Returns: str: Trace ID or empty string if no active span """ span = get_current_span() if span and span.get_span_context().is_valid: return format(span.get_span_context().trace_id, '032x') return ""