diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index 29fa46902e..8bb35977c9 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -8,62 +8,15 @@ from typing import Union from celery.signals import worker_init # type: ignore from flask_login import user_loaded_from_request, user_logged_in # type: ignore -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.celery import CeleryInstrumentor -from opentelemetry.instrumentation.flask import FlaskInstrumentor -from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor -from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider -from opentelemetry.propagate import set_global_textmap -from opentelemetry.propagators.b3 import B3Format -from opentelemetry.propagators.composite import CompositePropagator -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - BatchSpanProcessor, - ConsoleSpanExporter, -) -from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.trace import Span, get_current_span, get_tracer_provider, set_tracer_provider -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.trace.status import StatusCode from configs import dify_config from dify_app import DifyApp -class ExceptionLoggingHandler(logging.Handler): - """Custom logging handler that creates spans for logging.exception() calls""" - - def emit(self, record): - try: - if record.exc_info: - tracer = get_tracer_provider().get_tracer("dify.exception.logging") - with tracer.start_as_current_span( - "log.exception", - attributes={ - "log.level": record.levelname, - "log.message": record.getMessage(), - "log.logger": record.name, - "log.file.path": record.pathname, - "log.file.line": record.lineno, - }, - ) as span: - span.set_status(StatusCode.ERROR) - span.record_exception(record.exc_info[1]) - span.set_attribute("exception.type", record.exc_info[0].__name__) - span.set_attribute("exception.message", str(record.exc_info[1])) - except Exception: - pass - - @user_logged_in.connect @user_loaded_from_request.connect def on_user_loaded(_sender, user): + from opentelemetry.trace import get_current_span if user: current_span = get_current_span() if current_span: @@ -72,7 +25,108 @@ def on_user_loaded(_sender, user): def init_app(app: DifyApp): + def is_celery_worker(): + return "celery" in sys.argv[0].lower() + + + def instrument_exception_logging(): + exception_handler = ExceptionLoggingHandler() + logging.getLogger().addHandler(exception_handler) + + + def init_flask_instrumentor(app: DifyApp): + meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION) + _http_response_counter = meter.create_counter( + "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}" + ) + + def response_hook(span: Span, status: str, response_headers: list): + if span and span.is_recording(): + if status.startswith("2"): + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR, status) + + status = status.split(" ")[0] + status_code = int(status) + status_class = f"{status_code // 100}xx" + _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class}) + + instrumentor = FlaskInstrumentor() + if dify_config.DEBUG: + logging.info("Initializing Flask instrumentor") + instrumentor.instrument_app(app, response_hook=response_hook) + + + def init_sqlalchemy_instrumentor(app: DifyApp): + with app.app_context(): + engines = list(app.extensions["sqlalchemy"].engines.values()) + SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + + + def setup_context_propagation(): + # Configure propagators + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), # W3C trace context + B3Format(), # B3 propagation (used by many systems) + ] + ) + ) + + def shutdown_tracer(): + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() + if dify_config.ENABLE_OTEL: + class ExceptionLoggingHandler(logging.Handler): + """Custom logging handler that creates spans for logging.exception() calls""" + + def emit(self, record): + try: + if record.exc_info: + tracer = get_tracer_provider().get_tracer("dify.exception.logging") + with tracer.start_as_current_span( + "log.exception", + attributes={ + "log.level": record.levelname, + "log.message": record.getMessage(), + "log.logger": record.name, + "log.file.path": record.pathname, + "log.file.line": record.lineno, + }, + ) as span: + span.set_status(StatusCode.ERROR) + span.record_exception(record.exc_info[1]) + span.set_attribute("exception.type", record.exc_info[0].__name__) + span.set_attribute("exception.message", str(record.exc_info[1])) + except Exception: + pass + from opentelemetry import trace + from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.instrumentation.flask import FlaskInstrumentor + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider + from opentelemetry.propagate import set_global_textmap + from opentelemetry.propagators.b3 import B3Format + from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, + ) + from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio + from opentelemetry.semconv.resource import ResourceAttributes + from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + from opentelemetry.trace.status import StatusCode setup_context_propagation() # Initialize OpenTelemetry # Follow Semantic Convertions 1.32.0 to define resource attributes @@ -133,67 +187,22 @@ def init_app(app: DifyApp): atexit.register(shutdown_tracer) -def is_celery_worker(): - return "celery" in sys.argv[0].lower() - - -def instrument_exception_logging(): - exception_handler = ExceptionLoggingHandler() - logging.getLogger().addHandler(exception_handler) - - -def init_flask_instrumentor(app: DifyApp): - meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION) - _http_response_counter = meter.create_counter( - "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}" - ) - - def response_hook(span: Span, status: str, response_headers: list): - if span and span.is_recording(): - if status.startswith("2"): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR, status) - - status = status.split(" ")[0] - status_code = int(status) - status_class = f"{status_code // 100}xx" - _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class}) - - instrumentor = FlaskInstrumentor() - if dify_config.DEBUG: - logging.info("Initializing Flask instrumentor") - instrumentor.instrument_app(app, response_hook=response_hook) - - -def init_sqlalchemy_instrumentor(app: DifyApp): - with app.app_context(): - engines = list(app.extensions["sqlalchemy"].engines.values()) - SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) - - -def setup_context_propagation(): - # Configure propagators - set_global_textmap( - CompositePropagator( - [ - TraceContextTextMapPropagator(), # W3C trace context - B3Format(), # B3 propagation (used by many systems) - ] - ) - ) +def is_enabled(): + """ + Check if the extension is enabled. + Always enable this patch to prevent errors even when OpenTelemetry is disabled. + """ + return dify_config.ENABLE_OTEL + @worker_init.connect(weak=False) def init_celery_worker(*args, **kwargs): + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.metrics import get_meter_provider + from opentelemetry.trace import get_tracer_provider tracer_provider = get_tracer_provider() metric_provider = get_meter_provider() if dify_config.DEBUG: logging.info("Initializing OpenTelemetry for Celery worker") CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() - - -def shutdown_tracer(): - provider = trace.get_tracer_provider() - if hasattr(provider, "force_flush"): - provider.force_flush()