feat: adapt aliyun otel

pull/21891/head
ytqh 1 year ago
parent 8a190d7dc5
commit 3d2b3a53e6

@ -7,10 +7,9 @@ import sys
from typing import Union from typing import Union
from celery.signals import worker_init # type: ignore from celery.signals import worker_init # type: ignore
from flask_login import user_loaded_from_request, user_logged_in # type: ignore
from configs import dify_config from configs import dify_config
from dify_app import DifyApp from dify_app import DifyApp
from flask_login import user_loaded_from_request, user_logged_in # type: ignore
@user_logged_in.connect @user_logged_in.connect
@ -37,7 +36,9 @@ def init_app(app: DifyApp):
def init_flask_instrumentor(app: DifyApp): def init_flask_instrumentor(app: DifyApp):
meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION) meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION)
_http_response_counter = meter.create_counter( _http_response_counter = meter.create_counter(
"http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}" "http.server.response.count",
description="Total number of HTTP responses by status code",
unit="{response}",
) )
def response_hook(span: Span, status: str, response_headers: list): def response_hook(span: Span, status: str, response_headers: list):
@ -50,7 +51,9 @@ def init_app(app: DifyApp):
status = status.split(" ")[0] status = status.split(" ")[0]
status_code = int(status) status_code = int(status)
status_class = f"{status_code // 100}xx" status_class = f"{status_code // 100}xx"
_http_response_counter.add(1, {"status_code": status_code, "status_class": status_class}) _http_response_counter.add(
1, {"status_code": status_code, "status_class": status_class}
)
instrumentor = FlaskInstrumentor() instrumentor = FlaskInstrumentor()
if dify_config.DEBUG: if dify_config.DEBUG:
@ -97,13 +100,17 @@ def init_app(app: DifyApp):
) as span: ) as span:
span.set_status(StatusCode.ERROR) span.set_status(StatusCode.ERROR)
span.record_exception(record.exc_info[1]) span.record_exception(record.exc_info[1])
span.set_attribute("exception.type", record.exc_info[0].__name__) span.set_attribute(
"exception.type", record.exc_info[0].__name__
)
span.set_attribute("exception.message", str(record.exc_info[1])) span.set_attribute("exception.message", str(record.exc_info[1]))
except Exception: except Exception:
pass pass
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.flask import FlaskInstrumentor
@ -113,17 +120,19 @@ def init_app(app: DifyApp):
from opentelemetry.propagators.b3 import B3Format from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.composite import CompositePropagator from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ( from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)
from opentelemetry.trace.status import StatusCode from opentelemetry.trace.status import StatusCode
setup_context_propagation() setup_context_propagation()
@ -151,12 +160,10 @@ def init_app(app: DifyApp):
metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter] metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
if dify_config.OTEL_EXPORTER_TYPE == "otlp": if dify_config.OTEL_EXPORTER_TYPE == "otlp":
exporter = OTLPSpanExporter( exporter = OTLPSpanExporter(
endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces", endpoint=dify_config.OTLP_BASE_ENDPOINT + "/traces",
headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
) )
metric_exporter = OTLPMetricExporter( metric_exporter = OTLPMetricExporter(
endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics", endpoint=dify_config.OTLP_BASE_ENDPOINT + "/metrics",
headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
) )
else: else:
# Fallback to console exporter # Fallback to console exporter
@ -180,7 +187,9 @@ def init_app(app: DifyApp):
set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
if not is_celery_worker(): if not is_celery_worker():
init_flask_instrumentor(app) init_flask_instrumentor(app)
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() CeleryInstrumentor(
tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()
).instrument()
instrument_exception_logging() instrument_exception_logging()
init_sqlalchemy_instrumentor(app) init_sqlalchemy_instrumentor(app)
atexit.register(shutdown_tracer) atexit.register(shutdown_tracer)
@ -201,4 +210,6 @@ def init_celery_worker(*args, **kwargs):
metric_provider = get_meter_provider() metric_provider = get_meter_provider()
if dify_config.DEBUG: if dify_config.DEBUG:
logging.info("Initializing OpenTelemetry for Celery worker") logging.info("Initializing OpenTelemetry for Celery worker")
CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() CeleryInstrumentor(
tracer_provider=tracer_provider, meter_provider=metric_provider
).instrument()

Loading…
Cancel
Save