fix: ruff check

pull/21471/head
hieheihei 11 months ago
parent a84ef153b3
commit 7338398c7f

@ -65,7 +65,6 @@ logger = logging.getLogger(__name__)
class AliyunDataTrace(BaseTraceInstance):
def __init__(
self,
aliyun_config: AliyunConfig,
@ -150,6 +149,9 @@ class AliyunDataTrace(BaseTraceInstance):
)
self.trace_client.add_span(message_span)
app_model_config = getattr(trace_info.message_data, 'app_model_config', {})
pre_prompt = getattr(app_model_config, 'pre_prompt', "")
inputs_data = getattr(trace_info.message_data, 'inputs', {})
llm_span = SpanData(
trace_id=trace_id,
parent_span_id=message_span_id,
@ -167,8 +169,8 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False),
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt,
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(inputs_data, ensure_ascii=False),
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: pre_prompt,
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEN_AI_COMPLETION: str(trace_info.outputs),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
@ -270,8 +272,8 @@ class AliyunDataTrace(BaseTraceInstance):
return workflow_node_executions
def build_workflow_node_span(
self, node_execution: WorkflowNodeExecution, trace_id: int,
trace_info: WorkflowTraceInfo, workflow_span_id: int):
self, node_execution: WorkflowNodeExecution, trace_id: int,trace_info: WorkflowTraceInfo, workflow_span_id: int
):
try:
if node_execution.node_type == NodeType.LLM:
node_span = self.build_workflow_llm_span(trace_id, workflow_span_id, trace_info, node_execution)
@ -294,8 +296,8 @@ class AliyunDataTrace(BaseTraceInstance):
return span_status
def build_workflow_task_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,node_execution: WorkflowNodeExecution
) -> SpanData:
return SpanData(
trace_id=trace_id,
parent_span_id=workflow_span_id,
@ -314,8 +316,11 @@ class AliyunDataTrace(BaseTraceInstance):
)
def build_workflow_tool_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,node_execution: WorkflowNodeExecution
) -> SpanData:
tool_des = {}
if node_execution.metadata:
tool_des = node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {})
return SpanData(
trace_id=trace_id,
parent_span_id=workflow_span_id,
@ -327,8 +332,7 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
GEN_AI_FRAMEWORK: "dify",
TOOL_NAME: node_execution.title,
TOOL_DESCRIPTION: json.dumps(
node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}), ensure_ascii=False),
TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
@ -337,8 +341,8 @@ class AliyunDataTrace(BaseTraceInstance):
)
def build_workflow_retrieval_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,node_execution: WorkflowNodeExecution
) -> SpanData:
return SpanData(
trace_id=trace_id,
parent_span_id=workflow_span_id,
@ -358,8 +362,8 @@ class AliyunDataTrace(BaseTraceInstance):
)
def build_workflow_llm_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo,node_execution: WorkflowNodeExecution
) -> SpanData:
return SpanData(
trace_id=trace_id,
parent_span_id=workflow_span_id,
@ -380,7 +384,7 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_COMPLETION: str(node_execution.outputs.get("text", "")),
GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get("finish_reason", ""),
INPUT_VALUE: json.dumps(node_execution.process_data.get("prompts", []), ensure_ascii=False),
OUTPUT_VALUE: str(node_execution.outputs.get("text", ""))
OUTPUT_VALUE: str(node_execution.outputs.get("text", "")),
},
status=self.get_workflow_node_status(node_execution),
)

@ -1,4 +1,3 @@
import datetime
import hashlib
import logging
import random
@ -7,6 +6,7 @@ import threading
import uuid
from collections import deque
from collections.abc import Sequence
from datetime import datetime
from typing import Optional
import requests
@ -27,8 +27,14 @@ logger = logging.getLogger(__name__)
class TraceClient:
def __init__(self, service_name: str, endpoint: str, max_queue_size: int = 1000,
schedule_delay_sec: int = 5, max_export_batch_size: int = 50):
def __init__(
self,
service_name: str,
endpoint: str,
max_queue_size: int = 1000,
schedule_delay_sec: int = 5,
max_export_batch_size: int = 50,
):
self.endpoint = endpoint
self.resource = Resource(
attributes={
@ -45,7 +51,7 @@ class TraceClient:
self.schedule_delay_sec = schedule_delay_sec
self.max_export_batch_size = max_export_batch_size
self.queue = deque(maxlen=max_queue_size)
self.queue: deque = deque(maxlen=max_queue_size)
self.condition = threading.Condition(threading.Lock())
self.done = False
@ -94,7 +100,7 @@ class TraceClient:
self._export_batch()
def _export_batch(self):
spans_to_export = []
spans_to_export: list[ReadableSpan] = []
with self.condition:
while len(spans_to_export) < self.max_export_batch_size and self.queue:
spans_to_export.append(self.queue.pop())
@ -167,7 +173,7 @@ def generate_span_id() -> int:
return span_id
def convert_to_trace_id(uuid_v4: str) -> int:
def convert_to_trace_id(uuid_v4: Optional[str]) -> int:
try:
uuid_obj = uuid.UUID(uuid_v4)
return uuid_obj.int
@ -175,7 +181,7 @@ def convert_to_trace_id(uuid_v4: str) -> int:
raise ValueError(f"Invalid UUID input: {e}")
def convert_to_span_id(uuid_v4: str, span_type: str) -> int:
def convert_to_span_id(uuid_v4: Optional[str], span_type: str) -> int:
try:
uuid_obj = uuid.UUID(uuid_v4)
except Exception as e:

@ -17,5 +17,5 @@ class SpanData(BaseModel):
events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.")
links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.")
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
start_time: int = Field(..., description="The start time of the span in nanoseconds.")
end_time: int = Field(..., description="The end time of the span in nanoseconds.")
start_time: Optional[int] = Field(..., description="The start time of the span in nanoseconds.")
end_time: Optional[int] = Field(..., description="The end time of the span in nanoseconds.")

@ -62,4 +62,3 @@ class GenAISpanKind(Enum):
TOOL = "TOOL"
AGENT = "AGENT"
TASK = "TASK"

Loading…
Cancel
Save