feat: code cleanup

pull/18934/head
TedaLIEz 1 year ago
parent 583600c6e0
commit fc5a61ad92

@ -52,7 +52,6 @@ def initialize_extensions(app: DifyApp):
ext_mail, ext_mail,
ext_migrate, ext_migrate,
ext_otel, ext_otel,
ext_otel_patch,
ext_proxy_fix, ext_proxy_fix,
ext_redis, ext_redis,
ext_repositories, ext_repositories,
@ -85,7 +84,6 @@ def initialize_extensions(app: DifyApp):
ext_proxy_fix, ext_proxy_fix,
ext_blueprints, ext_blueprints,
ext_commands, ext_commands,
ext_otel_patch, # Apply patch before initializing OpenTelemetry
ext_otel, ext_otel,
] ]
for ext in extensions: for ext in extensions:

@ -16,15 +16,17 @@ from dify_app import DifyApp
@user_logged_in.connect @user_logged_in.connect
@user_loaded_from_request.connect @user_loaded_from_request.connect
def on_user_loaded(_sender, user): def on_user_loaded(_sender, user):
from opentelemetry.trace import get_current_span if dify_config.ENABLE_OTEL:
if user: from opentelemetry.trace import get_current_span
current_span = get_current_span() if user:
if current_span: current_span = get_current_span()
current_span.set_attribute("service.tenant.id", user.current_tenant_id) if current_span:
current_span.set_attribute("service.user.id", user.id) current_span.set_attribute("service.tenant.id", user.current_tenant_id)
current_span.set_attribute("service.user.id", user.id)
def init_app(app: DifyApp): def init_app(app: DifyApp):
def is_celery_worker(): def is_celery_worker():
return "celery" in sys.argv[0].lower() return "celery" in sys.argv[0].lower()
@ -80,111 +82,110 @@ def init_app(app: DifyApp):
if hasattr(provider, "force_flush"): if hasattr(provider, "force_flush"):
provider.force_flush() provider.force_flush()
if dify_config.ENABLE_OTEL: class ExceptionLoggingHandler(logging.Handler):
class ExceptionLoggingHandler(logging.Handler): """Custom logging handler that creates spans for logging.exception() calls"""
"""Custom logging handler that creates spans for logging.exception() calls"""
def emit(self, record):
def emit(self, record): try:
try: if record.exc_info:
if record.exc_info: tracer = get_tracer_provider().get_tracer("dify.exception.logging")
tracer = get_tracer_provider().get_tracer("dify.exception.logging") with tracer.start_as_current_span(
with tracer.start_as_current_span( "log.exception",
"log.exception", attributes={
attributes={ "log.level": record.levelname,
"log.level": record.levelname, "log.message": record.getMessage(),
"log.message": record.getMessage(), "log.logger": record.name,
"log.logger": record.name, "log.file.path": record.pathname,
"log.file.path": record.pathname, "log.file.line": record.lineno,
"log.file.line": record.lineno, },
}, ) as span:
) as span: span.set_status(StatusCode.ERROR)
span.set_status(StatusCode.ERROR) span.record_exception(record.exc_info[1])
span.record_exception(record.exc_info[1]) span.set_attribute("exception.type", record.exc_info[0].__name__)
span.set_attribute("exception.type", record.exc_info[0].__name__) span.set_attribute("exception.message", str(record.exc_info[1]))
span.set_attribute("exception.message", str(record.exc_info[1])) except Exception:
except Exception: pass
pass from opentelemetry import trace
from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagate import set_global_textmap from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.b3 import B3Format from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.propagators.composite import CompositePropagator from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import (
from opentelemetry.sdk.trace.export import ( BatchSpanProcessor,
BatchSpanProcessor, ConsoleSpanExporter,
ConsoleSpanExporter, )
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.status import StatusCode
setup_context_propagation()
# Initialize OpenTelemetry
# Follow Semantic Convertions 1.32.0 to define resource attributes
resource = Resource(
attributes={
ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
ResourceAttributes.PROCESS_PID: os.getpid(),
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
ResourceAttributes.HOST_NAME: socket.gethostname(),
ResourceAttributes.HOST_ARCH: platform.machine(),
"custom.deployment.git_commit": dify_config.COMMIT_SHA,
ResourceAttributes.HOST_ID: platform.node(),
ResourceAttributes.OS_TYPE: platform.system().lower(),
ResourceAttributes.OS_DESCRIPTION: platform.platform(),
ResourceAttributes.OS_VERSION: platform.version(),
}
)
sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
provider = TracerProvider(resource=resource, sampler=sampler)
set_tracer_provider(provider)
exporter: Union[OTLPSpanExporter, ConsoleSpanExporter]
metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
if dify_config.OTEL_EXPORTER_TYPE == "otlp":
exporter = OTLPSpanExporter(
endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces",
headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
) )
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio metric_exporter = OTLPMetricExporter(
from opentelemetry.semconv.resource import ResourceAttributes endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics",
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.status import StatusCode
setup_context_propagation()
# Initialize OpenTelemetry
# Follow Semantic Convertions 1.32.0 to define resource attributes
resource = Resource(
attributes={
ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
ResourceAttributes.PROCESS_PID: os.getpid(),
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
ResourceAttributes.HOST_NAME: socket.gethostname(),
ResourceAttributes.HOST_ARCH: platform.machine(),
"custom.deployment.git_commit": dify_config.COMMIT_SHA,
ResourceAttributes.HOST_ID: platform.node(),
ResourceAttributes.OS_TYPE: platform.system().lower(),
ResourceAttributes.OS_DESCRIPTION: platform.platform(),
ResourceAttributes.OS_VERSION: platform.version(),
}
) )
sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE) else:
provider = TracerProvider(resource=resource, sampler=sampler) # Fallback to console exporter
set_tracer_provider(provider) exporter = ConsoleSpanExporter()
exporter: Union[OTLPSpanExporter, ConsoleSpanExporter] metric_exporter = ConsoleMetricExporter()
metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
if dify_config.OTEL_EXPORTER_TYPE == "otlp": provider.add_span_processor(
exporter = OTLPSpanExporter( BatchSpanProcessor(
endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces", exporter,
headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"}, max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
) schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
metric_exporter = OTLPMetricExporter( max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics", export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
)
else:
# Fallback to console exporter
exporter = ConsoleSpanExporter()
metric_exporter = ConsoleMetricExporter()
provider.add_span_processor(
BatchSpanProcessor(
exporter,
max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
)
)
reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
) )
set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) )
if not is_celery_worker(): reader = PeriodicExportingMetricReader(
init_flask_instrumentor(app) metric_exporter,
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
instrument_exception_logging() export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
init_sqlalchemy_instrumentor(app) )
atexit.register(shutdown_tracer) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
if not is_celery_worker():
init_flask_instrumentor(app)
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
instrument_exception_logging()
init_sqlalchemy_instrumentor(app)
atexit.register(shutdown_tracer)
def is_enabled(): def is_enabled():
@ -198,11 +199,12 @@ def is_enabled():
@worker_init.connect(weak=False) @worker_init.connect(weak=False)
def init_celery_worker(*args, **kwargs): def init_celery_worker(*args, **kwargs):
from opentelemetry.instrumentation.celery import CeleryInstrumentor if dify_config.ENABLE_OTEL:
from opentelemetry.metrics import get_meter_provider from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.trace import get_tracer_provider from opentelemetry.metrics import get_meter_provider
tracer_provider = get_tracer_provider() from opentelemetry.trace import get_tracer_provider
metric_provider = get_meter_provider() tracer_provider = get_tracer_provider()
if dify_config.DEBUG: metric_provider = get_meter_provider()
logging.info("Initializing OpenTelemetry for Celery worker") if dify_config.DEBUG:
CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() logging.info("Initializing OpenTelemetry for Celery worker")
CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()

@ -1,63 +0,0 @@
"""
Patch for OpenTelemetry context detach method to handle None tokens gracefully.
This patch addresses the issue where OpenTelemetry's context.detach() method raises a TypeError
when called with a None token. The error occurs in the contextvars_context.py file where it tries
to call reset() on a None token.
Related GitHub issue: https://github.com/langgenius/dify/issues/18496
Error being fixed:
```
Traceback (most recent call last):
File "opentelemetry/context/__init__.py", line 154, in detach
_RUNTIME_CONTEXT.detach(token)
File "opentelemetry/context/contextvars_context.py", line 50, in detach
self._current_context.reset(token) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: expected an instance of Token, got None
```
Instead of modifying the third-party package directly, this patch monkey-patches the
context.detach method to gracefully handle None tokens.
"""
import logging
from functools import wraps
from opentelemetry import context
logger = logging.getLogger(__name__)
# Store the original detach method
original_detach = context.detach
# Create a patched version that handles None tokens
@wraps(original_detach)
def patched_detach(token):
"""
A patched version of context.detach that handles None tokens gracefully.
"""
if token is None:
logger.debug("Attempted to detach a None token, skipping")
return
return original_detach(token)
def is_enabled():
"""
Check if the extension is enabled.
Always enable this patch to prevent errors even when OpenTelemetry is disabled.
"""
return True
def init_app(app):
"""
Initialize the OpenTelemetry context patch.
"""
# Replace the original detach method with our patched version
context.detach = patched_detach
logger.info("OpenTelemetry context.detach patched to handle None tokens")
Loading…
Cancel
Save