clean code

pull/21471/head
hieheihei 11 months ago
parent 2354b8bef6
commit 432455a796

@ -1,3 +1,4 @@
import os
import sys import sys
@ -16,20 +17,20 @@ else:
# It seems that JetBrains Python debugger does not work well with gevent, # It seems that JetBrains Python debugger does not work well with gevent,
# so we need to disable gevent in debug mode. # so we need to disable gevent in debug mode.
# If you are using debugpy and set GEVENT_SUPPORT=True, you can debug with gevent. # If you are using debugpy and set GEVENT_SUPPORT=True, you can debug with gevent.
# if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}: if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}:
# from gevent import monkey from gevent import monkey
#
# # gevent # gevent
# monkey.patch_all() monkey.patch_all()
#
# from grpc.experimental import gevent as grpc_gevent # type: ignore from grpc.experimental import gevent as grpc_gevent # type: ignore
#
# # grpc gevent # grpc gevent
# grpc_gevent.init_gevent() grpc_gevent.init_gevent()
#
# import psycogreen.gevent # type: ignore import psycogreen.gevent # type: ignore
#
# psycogreen.gevent.patch_psycopg() psycogreen.gevent.patch_psycopg()
from app_factory import create_app from app_factory import create_app

@ -16,7 +16,7 @@ from core.ops.aliyun_trace.data_exporter.traceclient import (
) )
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
from core.ops.aliyun_trace.entities.semconv import ( from core.ops.aliyun_trace.entities.semconv import (
GEM_AI_COMPLETION, GEN_AI_COMPLETION,
GEN_AI_FRAMEWORK, GEN_AI_FRAMEWORK,
GEN_AI_MODEL_NAME, GEN_AI_MODEL_NAME,
GEN_AI_PROMPT, GEN_AI_PROMPT,
@ -25,6 +25,7 @@ from core.ops.aliyun_trace.entities.semconv import (
GEN_AI_RESPONSE_FINISH_REASON, GEN_AI_RESPONSE_FINISH_REASON,
GEN_AI_SESSION_ID, GEN_AI_SESSION_ID,
GEN_AI_SPAN_KIND, GEN_AI_SPAN_KIND,
GEN_AI_SYSTEM,
GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_INPUT_TOKENS,
GEN_AI_USAGE_OUTPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS,
GEN_AI_USAGE_TOTAL_TOKENS, GEN_AI_USAGE_TOTAL_TOKENS,
@ -36,7 +37,6 @@ from core.ops.aliyun_trace.entities.semconv import (
TOOL_DESCRIPTION, TOOL_DESCRIPTION,
TOOL_NAME, TOOL_NAME,
TOOL_PARAMETERS, TOOL_PARAMETERS,
Gen_AI_SYSTEM,
GenAISpanKind, GenAISpanKind,
) )
from core.ops.base_trace_instance import BaseTraceInstance from core.ops.base_trace_instance import BaseTraceInstance
@ -67,8 +67,8 @@ logger = logging.getLogger(__name__)
class AliyunDataTrace(BaseTraceInstance): class AliyunDataTrace(BaseTraceInstance):
def __init__( def __init__(
self, self,
aliyun_config: AliyunConfig, aliyun_config: AliyunConfig,
): ):
super().__init__(aliyun_config) super().__init__(aliyun_config)
base_url = aliyun_config.endpoint.rstrip('/') base_url = aliyun_config.endpoint.rstrip('/')
@ -163,14 +163,14 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''), GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''),
Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), GEN_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''),
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens), GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens), GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens), GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False), GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False),
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt, GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt,
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEM_AI_COMPLETION: str(trace_info.outputs), GEN_AI_COMPLETION: str(trace_info.outputs),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.outputs), OUTPUT_VALUE: str(trace_info.outputs),
}, },
@ -270,8 +270,8 @@ class AliyunDataTrace(BaseTraceInstance):
return workflow_node_executions return workflow_node_executions
def build_workflow_node_span( def build_workflow_node_span(
self, node_execution: WorkflowNodeExecution, trace_id: int, self, node_execution: WorkflowNodeExecution, trace_id: int,
trace_info: WorkflowTraceInfo,workflow_span_id: int): trace_info: WorkflowTraceInfo, workflow_span_id: int):
try: try:
if node_execution.node_type == NodeType.LLM: if node_execution.node_type == NodeType.LLM:
node_span = self.build_workflow_llm_span( node_span = self.build_workflow_llm_span(
@ -297,8 +297,8 @@ class AliyunDataTrace(BaseTraceInstance):
span_status = Status(StatusCode.ERROR, str(node_execution.error)) span_status = Status(StatusCode.ERROR, str(node_execution.error))
return span_status return span_status
def build_workflow_task_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, def build_workflow_task_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData: node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData( return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
@ -316,8 +316,8 @@ class AliyunDataTrace(BaseTraceInstance):
status=self.get_workflow_node_status(node_execution), status=self.get_workflow_node_status(node_execution),
) )
def build_workflow_tool_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, def build_workflow_tool_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData: node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData( return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
@ -330,16 +330,16 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
TOOL_NAME: node_execution.title, TOOL_NAME: node_execution.title,
TOOL_DESCRIPTION: json.dumps( TOOL_DESCRIPTION: json.dumps(
node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}),ensure_ascii=False), node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}), ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False), TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False), INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False), OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
}, },
status=self.get_workflow_node_status(node_execution), status=self.get_workflow_node_status(node_execution),
) )
def build_workflow_retrieval_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, def build_workflow_retrieval_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData: node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData( return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
@ -358,7 +358,7 @@ class AliyunDataTrace(BaseTraceInstance):
status=self.get_workflow_node_status(node_execution), status=self.get_workflow_node_status(node_execution),
) )
def build_workflow_llm_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo, def build_workflow_llm_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData: node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData( return SpanData(
trace_id=trace_id, trace_id=trace_id,
@ -372,12 +372,12 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''), GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''),
Gen_AI_SYSTEM: node_execution.process_data.get('model_provider', ''), GEN_AI_SYSTEM: node_execution.process_data.get('model_provider', ''),
GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)), GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('completion_tokens', 0)), GEN_AI_USAGE_OUTPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('completion_tokens', 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)), GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)),
GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False), GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
GEM_AI_COMPLETION: str(node_execution.outputs.get('text', '')), GEN_AI_COMPLETION: str(node_execution.outputs.get('text', '')),
GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''), GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''),
INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False), INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
OUTPUT_VALUE: str(node_execution.outputs.get('text', '')) OUTPUT_VALUE: str(node_execution.outputs.get('text', ''))
@ -385,7 +385,7 @@ class AliyunDataTrace(BaseTraceInstance):
status=self.get_workflow_node_status(node_execution), status=self.get_workflow_node_status(node_execution),
) )
def add_workflow_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo): def add_workflow_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo):
message_span_id = None message_span_id = None
if trace_info.message_id: if trace_info.message_id:
message_span_id = convert_to_span_id(trace_info.message_id, 'message') message_span_id = convert_to_span_id(trace_info.message_id, 'message')
@ -406,7 +406,7 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_USER_ID: str(user_id), GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value, GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: trace_info.workflow_run_inputs.get('sys.query',''), INPUT_VALUE: trace_info.workflow_run_inputs.get('sys.query', ''),
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
}, },
status=status, status=status,
@ -447,9 +447,9 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''), GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''),
Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), GEN_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''),
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEM_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False), GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False), OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
}, },

@ -27,8 +27,8 @@ logger = logging.getLogger(__name__)
class TraceClient: class TraceClient:
def __init__(self, service_name:str, endpoint:str,max_queue_size:int=1000, def __init__(self, service_name: str, endpoint: str, max_queue_size: int = 1000,
schedule_delay_sec:int=5, max_export_batch_size:int=50): schedule_delay_sec: int = 5, max_export_batch_size: int = 50):
self.endpoint = endpoint self.endpoint = endpoint
self.resource = Resource( self.resource = Resource(
attributes={ attributes={
@ -54,7 +54,6 @@ class TraceClient:
self._spans_dropped = False self._spans_dropped = False
def export(self, spans: Sequence[ReadableSpan]): def export(self, spans: Sequence[ReadableSpan]):
self.exporter.export(spans) self.exporter.export(spans)
@ -114,6 +113,7 @@ class TraceClient:
self._export_batch() self._export_batch()
self.exporter.shutdown() self.exporter.shutdown()
class SpanBuilder: class SpanBuilder:
def __init__(self, resource): def __init__(self, resource):
self.resource = resource self.resource = resource
@ -159,12 +159,14 @@ class SpanBuilder:
) )
return span return span
def generate_span_id() -> int: def generate_span_id() -> int:
span_id = random.getrandbits(64) span_id = random.getrandbits(64)
while span_id == INVALID_SPAN_ID: while span_id == INVALID_SPAN_ID:
span_id = random.getrandbits(64) span_id = random.getrandbits(64)
return span_id return span_id
def convert_to_trace_id(uuid_v4: str) -> int: def convert_to_trace_id(uuid_v4: str) -> int:
try: try:
uuid_obj = uuid.UUID(uuid_v4) uuid_obj = uuid.UUID(uuid_v4)
@ -172,23 +174,21 @@ def convert_to_trace_id(uuid_v4: str) -> int:
except Exception as e: except Exception as e:
raise ValueError(f"Invalid UUID input: {e}") raise ValueError(f"Invalid UUID input: {e}")
def convert_to_span_id(uuid_v4: str, span_type: str) -> int: def convert_to_span_id(uuid_v4: str, span_type: str) -> int:
try: try:
uuid_obj = uuid.UUID(uuid_v4) uuid_obj = uuid.UUID(uuid_v4)
except Exception as e: except Exception as e:
raise ValueError(f"Invalid UUID input: {e}") raise ValueError(f"Invalid UUID input: {e}")
combined_key = f"{uuid_obj.hex}-{span_type}"
type_hash = consistent_hash(span_type) & 0xFFFFFFFFFFFFFFFF hash_bytes = hashlib.sha256(combined_key.encode('utf-8')).digest()
span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash span_id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
return span_id return span_id
def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]: def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]:
if start_time_a is None: if start_time_a is None:
return None return None
timestamp_in_seconds = start_time_a.timestamp() timestamp_in_seconds = start_time_a.timestamp()
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9) timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
return timestamp_in_nanoseconds return timestamp_in_nanoseconds
def consistent_hash(s: str) -> int:
sha256_hash = hashlib.sha256(s.encode()).hexdigest()
return int(sha256_hash[:16], 16)

@ -27,7 +27,7 @@ RETRIEVAL_DOCUMENT = 'retrieval.document'
#LLM #LLM
GEN_AI_MODEL_NAME = 'gen_ai.model_name' GEN_AI_MODEL_NAME = 'gen_ai.model_name'
Gen_AI_SYSTEM = 'gen_ai.system' GEN_AI_SYSTEM = 'gen_ai.system'
GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens' GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens'
@ -41,7 +41,7 @@ GEN_AI_PROMPT_TEMPLATE_VARIABLE = 'gen_ai.prompt_template.variable'
GEN_AI_PROMPT = 'gen_ai.prompt' GEN_AI_PROMPT = 'gen_ai.prompt'
GEM_AI_COMPLETION = 'gem_ai.completion' GEN_AI_COMPLETION = 'gem_ai.completion'
GEN_AI_RESPONSE_FINISH_REASON = 'gen_ai.response.finish_reason' GEN_AI_RESPONSE_FINISH_REASON = 'gen_ai.response.finish_reason'

Loading…
Cancel
Save