diff --git a/api/app.py b/api/app.py index 499a06772f..4f393f6c20 100644 --- a/api/app.py +++ b/api/app.py @@ -1,3 +1,4 @@ +import os import sys @@ -16,20 +17,20 @@ else: # It seems that JetBrains Python debugger does not work well with gevent, # so we need to disable gevent in debug mode. # If you are using debugpy and set GEVENT_SUPPORT=True, you can debug with gevent. - # if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}: - # from gevent import monkey - # - # # gevent - # monkey.patch_all() - # - # from grpc.experimental import gevent as grpc_gevent # type: ignore - # - # # grpc gevent - # grpc_gevent.init_gevent() - # - # import psycogreen.gevent # type: ignore - # - # psycogreen.gevent.patch_psycopg() + if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}: + from gevent import monkey + + # gevent + monkey.patch_all() + + from grpc.experimental import gevent as grpc_gevent # type: ignore + + # grpc gevent + grpc_gevent.init_gevent() + + import psycogreen.gevent # type: ignore + + psycogreen.gevent.patch_psycopg() from app_factory import create_app diff --git a/api/core/ops/aliyun_trace/aliyun_trace.py b/api/core/ops/aliyun_trace/aliyun_trace.py index 196e3e444c..812260c651 100644 --- a/api/core/ops/aliyun_trace/aliyun_trace.py +++ b/api/core/ops/aliyun_trace/aliyun_trace.py @@ -16,7 +16,7 @@ from core.ops.aliyun_trace.data_exporter.traceclient import ( ) from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData from core.ops.aliyun_trace.entities.semconv import ( - GEM_AI_COMPLETION, + GEN_AI_COMPLETION, GEN_AI_FRAMEWORK, GEN_AI_MODEL_NAME, GEN_AI_PROMPT, @@ -25,6 +25,7 @@ from core.ops.aliyun_trace.entities.semconv import ( GEN_AI_RESPONSE_FINISH_REASON, GEN_AI_SESSION_ID, GEN_AI_SPAN_KIND, + GEN_AI_SYSTEM, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, GEN_AI_USAGE_TOTAL_TOKENS, @@ -36,7 +37,6 @@ from core.ops.aliyun_trace.entities.semconv import ( TOOL_DESCRIPTION, TOOL_NAME, TOOL_PARAMETERS, - Gen_AI_SYSTEM, GenAISpanKind, ) from core.ops.base_trace_instance import BaseTraceInstance @@ -67,8 +67,8 @@ logger = logging.getLogger(__name__) class AliyunDataTrace(BaseTraceInstance): def __init__( - self, - aliyun_config: AliyunConfig, + self, + aliyun_config: AliyunConfig, ): super().__init__(aliyun_config) base_url = aliyun_config.endpoint.rstrip('/') @@ -163,14 +163,14 @@ class AliyunDataTrace(BaseTraceInstance): GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_FRAMEWORK: 'dify', GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''), - Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), + GEN_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens), GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens), GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens), GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False), GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt, GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), - GEM_AI_COMPLETION: str(trace_info.outputs), + GEN_AI_COMPLETION: str(trace_info.outputs), INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), OUTPUT_VALUE: str(trace_info.outputs), }, @@ -270,8 +270,8 @@ class AliyunDataTrace(BaseTraceInstance): return workflow_node_executions def build_workflow_node_span( - self, node_execution: WorkflowNodeExecution, trace_id: int, - trace_info: WorkflowTraceInfo,workflow_span_id: int): + self, node_execution: WorkflowNodeExecution, trace_id: int, + trace_info: WorkflowTraceInfo, workflow_span_id: int): try: if node_execution.node_type == NodeType.LLM: node_span = self.build_workflow_llm_span( @@ -297,8 +297,8 @@ class AliyunDataTrace(BaseTraceInstance): span_status = Status(StatusCode.ERROR, str(node_execution.error)) return span_status - def build_workflow_task_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, - node_execution: WorkflowNodeExecution) -> SpanData: + def build_workflow_task_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, + node_execution: WorkflowNodeExecution) -> SpanData: return SpanData( trace_id=trace_id, parent_span_id=workflow_span_id, @@ -316,8 +316,8 @@ class AliyunDataTrace(BaseTraceInstance): status=self.get_workflow_node_status(node_execution), ) - def build_workflow_tool_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, - node_execution: WorkflowNodeExecution) -> SpanData: + def build_workflow_tool_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, + node_execution: WorkflowNodeExecution) -> SpanData: return SpanData( trace_id=trace_id, parent_span_id=workflow_span_id, @@ -330,16 +330,16 @@ class AliyunDataTrace(BaseTraceInstance): GEN_AI_FRAMEWORK: 'dify', TOOL_NAME: node_execution.title, TOOL_DESCRIPTION: json.dumps( - node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}),ensure_ascii=False), - TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False), - INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False), + node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}), ensure_ascii=False), + TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False), + INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False), OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False), }, status=self.get_workflow_node_status(node_execution), ) - def build_workflow_retrieval_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, - node_execution: WorkflowNodeExecution) -> SpanData: + def build_workflow_retrieval_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, + node_execution: WorkflowNodeExecution) -> SpanData: return SpanData( trace_id=trace_id, parent_span_id=workflow_span_id, @@ -358,7 +358,7 @@ class AliyunDataTrace(BaseTraceInstance): status=self.get_workflow_node_status(node_execution), ) - def build_workflow_llm_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, + def build_workflow_llm_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution) -> SpanData: return SpanData( trace_id=trace_id, @@ -372,12 +372,12 @@ class AliyunDataTrace(BaseTraceInstance): GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_FRAMEWORK: 'dify', GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''), - Gen_AI_SYSTEM: node_execution.process_data.get('model_provider', ''), + GEN_AI_SYSTEM: node_execution.process_data.get('model_provider', ''), GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)), GEN_AI_USAGE_OUTPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('completion_tokens', 0)), GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)), GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False), - GEM_AI_COMPLETION: str(node_execution.outputs.get('text', '')), + GEN_AI_COMPLETION: str(node_execution.outputs.get('text', '')), GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''), INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False), OUTPUT_VALUE: str(node_execution.outputs.get('text', '')) @@ -385,7 +385,7 @@ class AliyunDataTrace(BaseTraceInstance): status=self.get_workflow_node_status(node_execution), ) - def add_workflow_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo): + def add_workflow_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo): message_span_id = None if trace_info.message_id: message_span_id = convert_to_span_id(trace_info.message_id, 'message') @@ -406,7 +406,7 @@ class AliyunDataTrace(BaseTraceInstance): GEN_AI_USER_ID: str(user_id), GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value, GEN_AI_FRAMEWORK: 'dify', - INPUT_VALUE: trace_info.workflow_run_inputs.get('sys.query',''), + INPUT_VALUE: trace_info.workflow_run_inputs.get('sys.query', ''), OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), }, status=status, @@ -447,9 +447,9 @@ class AliyunDataTrace(BaseTraceInstance): GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_FRAMEWORK: 'dify', GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''), - Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), + GEN_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), - GEM_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False), + GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False), INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False), }, diff --git a/api/core/ops/aliyun_trace/data_exporter/traceclient.py b/api/core/ops/aliyun_trace/data_exporter/traceclient.py index 86abf8b5e8..561315643e 100644 --- a/api/core/ops/aliyun_trace/data_exporter/traceclient.py +++ b/api/core/ops/aliyun_trace/data_exporter/traceclient.py @@ -27,8 +27,8 @@ logger = logging.getLogger(__name__) class TraceClient: - def __init__(self, service_name:str, endpoint:str,max_queue_size:int=1000, - schedule_delay_sec:int=5, max_export_batch_size:int=50): + def __init__(self, service_name: str, endpoint: str, max_queue_size: int = 1000, + schedule_delay_sec: int = 5, max_export_batch_size: int = 50): self.endpoint = endpoint self.resource = Resource( attributes={ @@ -54,7 +54,6 @@ class TraceClient: self._spans_dropped = False - def export(self, spans: Sequence[ReadableSpan]): self.exporter.export(spans) @@ -114,6 +113,7 @@ class TraceClient: self._export_batch() self.exporter.shutdown() + class SpanBuilder: def __init__(self, resource): self.resource = resource @@ -159,12 +159,14 @@ class SpanBuilder: ) return span + def generate_span_id() -> int: span_id = random.getrandbits(64) while span_id == INVALID_SPAN_ID: span_id = random.getrandbits(64) return span_id + def convert_to_trace_id(uuid_v4: str) -> int: try: uuid_obj = uuid.UUID(uuid_v4) @@ -172,23 +174,21 @@ def convert_to_trace_id(uuid_v4: str) -> int: except Exception as e: raise ValueError(f"Invalid UUID input: {e}") + def convert_to_span_id(uuid_v4: str, span_type: str) -> int: try: uuid_obj = uuid.UUID(uuid_v4) except Exception as e: raise ValueError(f"Invalid UUID input: {e}") - - type_hash = consistent_hash(span_type) & 0xFFFFFFFFFFFFFFFF - span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash + combined_key = f"{uuid_obj.hex}-{span_type}" + hash_bytes = hashlib.sha256(combined_key.encode('utf-8')).digest() + span_id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False) return span_id + def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]: if start_time_a is None: return None timestamp_in_seconds = start_time_a.timestamp() timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9) return timestamp_in_nanoseconds - -def consistent_hash(s: str) -> int: - sha256_hash = hashlib.sha256(s.encode()).hexdigest() - return int(sha256_hash[:16], 16) diff --git a/api/core/ops/aliyun_trace/entities/semconv.py b/api/core/ops/aliyun_trace/entities/semconv.py index 3a020a013c..51d728b03f 100644 --- a/api/core/ops/aliyun_trace/entities/semconv.py +++ b/api/core/ops/aliyun_trace/entities/semconv.py @@ -27,7 +27,7 @@ RETRIEVAL_DOCUMENT = 'retrieval.document' #LLM GEN_AI_MODEL_NAME = 'gen_ai.model_name' -Gen_AI_SYSTEM = 'gen_ai.system' +GEN_AI_SYSTEM = 'gen_ai.system' GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens' @@ -41,7 +41,7 @@ GEN_AI_PROMPT_TEMPLATE_VARIABLE = 'gen_ai.prompt_template.variable' GEN_AI_PROMPT = 'gen_ai.prompt' -GEM_AI_COMPLETION = 'gem_ai.completion' +GEN_AI_COMPLETION = 'gem_ai.completion' GEN_AI_RESPONSE_FINISH_REASON = 'gen_ai.response.finish_reason'