|
|
|
|
@ -12,6 +12,10 @@ from flask_login import user_loaded_from_request, user_logged_in # type: ignore
|
|
|
|
|
from configs import dify_config
|
|
|
|
|
from dify_app import DifyApp
|
|
|
|
|
|
|
|
|
|
import requests # To get requests.Response type hint
|
|
|
|
|
import httpx # To get httpx.Response type hint
|
|
|
|
|
from opentelemetry.semconv.trace import SpanAttributes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@user_logged_in.connect
|
|
|
|
|
@user_loaded_from_request.connect
|
|
|
|
|
@ -108,6 +112,8 @@ def init_app(app: DifyApp):
|
|
|
|
|
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
|
|
|
|
from opentelemetry.instrumentation.flask import FlaskInstrumentor
|
|
|
|
|
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
|
|
|
|
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
|
|
|
|
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
|
|
|
|
from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
|
|
|
|
|
from opentelemetry.propagate import set_global_textmap
|
|
|
|
|
from opentelemetry.propagators.b3 import B3Format
|
|
|
|
|
@ -124,7 +130,7 @@ def init_app(app: DifyApp):
|
|
|
|
|
from opentelemetry.semconv.resource import ResourceAttributes
|
|
|
|
|
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
|
|
|
|
|
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
|
|
|
|
|
from opentelemetry.trace.status import StatusCode
|
|
|
|
|
from opentelemetry.trace.status import StatusCode, Status
|
|
|
|
|
|
|
|
|
|
setup_context_propagation()
|
|
|
|
|
# Initialize OpenTelemetry
|
|
|
|
|
@ -178,11 +184,61 @@ def init_app(app: DifyApp):
|
|
|
|
|
export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
|
|
|
|
|
)
|
|
|
|
|
set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
|
|
|
|
|
|
|
|
|
|
# Helper function to initialize requests instrumentor
|
|
|
|
|
def init_requests_instrumentor():
|
|
|
|
|
def outgoing_requests_response_hook(span: Span, request: requests.PreparedRequest, response: requests.Response):
|
|
|
|
|
if span and span.is_recording() and response:
|
|
|
|
|
status_code = response.status_code
|
|
|
|
|
status_class = f"{status_code // 100}xx"
|
|
|
|
|
|
|
|
|
|
# Set span status
|
|
|
|
|
if 200 <= status_code < 400:
|
|
|
|
|
span.set_status(Status(StatusCode.OK))
|
|
|
|
|
else:
|
|
|
|
|
span.set_status(Status(StatusCode.ERROR, f"HTTP status code {status_code}"))
|
|
|
|
|
|
|
|
|
|
# Record metric (references counter from outer scope)
|
|
|
|
|
_http_client_response_counter.add(1, {"status_code": status_code, "status_class": status_class})
|
|
|
|
|
|
|
|
|
|
# Only record the URL path attribute
|
|
|
|
|
if hasattr(request, 'path_url'): # For requests.PreparedRequest
|
|
|
|
|
path = request.path_url.split('?', 1)[0] # Get path before query string
|
|
|
|
|
span.set_attribute(SpanAttributes.URL_PATH, path)
|
|
|
|
|
|
|
|
|
|
RequestsInstrumentor().instrument(response_hook=outgoing_requests_response_hook)
|
|
|
|
|
|
|
|
|
|
# Helper function to initialize httpx instrumentor
|
|
|
|
|
def init_httpx_instrumentor():
|
|
|
|
|
def outgoing_httpx_response_hook(span: Span, request: httpx.Request, response: httpx.Response):
|
|
|
|
|
if span and span.is_recording() and response:
|
|
|
|
|
status_code = response.status_code
|
|
|
|
|
status_class = f"{status_code // 100}xx"
|
|
|
|
|
|
|
|
|
|
# Set span status
|
|
|
|
|
if 200 <= status_code < 400:
|
|
|
|
|
span.set_status(Status(StatusCode.OK))
|
|
|
|
|
else:
|
|
|
|
|
span.set_status(Status(StatusCode.ERROR, f"HTTP status code {status_code}"))
|
|
|
|
|
|
|
|
|
|
# Record metric (references counter from outer scope)
|
|
|
|
|
_http_client_response_counter.add(1, {"status_code": status_code, "status_class": status_class})
|
|
|
|
|
|
|
|
|
|
# Only record the URL path attribute
|
|
|
|
|
if hasattr(request, 'url') and hasattr(request.url, 'path'): # For httpx.Request
|
|
|
|
|
span.set_attribute(SpanAttributes.URL_PATH, request.url.path)
|
|
|
|
|
|
|
|
|
|
HTTPXClientInstrumentor().instrument(response_hook=outgoing_httpx_response_hook)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Initialize instrumentors
|
|
|
|
|
if not is_celery_worker():
|
|
|
|
|
init_flask_instrumentor(app)
|
|
|
|
|
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
|
|
|
|
|
instrument_exception_logging()
|
|
|
|
|
init_sqlalchemy_instrumentor(app)
|
|
|
|
|
init_requests_instrumentor()
|
|
|
|
|
init_httpx_instrumentor()
|
|
|
|
|
atexit.register(shutdown_tracer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|