|
|
|
@ -1,10 +1,41 @@
|
|
|
|
import datetime
|
|
|
|
|
|
|
|
import json
|
|
|
|
import json
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
from typing import Optional
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
|
from core.ops.aliyun_trace.data_exporter.traceclient import TraceClient
|
|
|
|
from opentelemetry.trace import Status, StatusCode
|
|
|
|
|
|
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from core.ops.aliyun_trace.data_exporter.traceclient import (
|
|
|
|
|
|
|
|
TraceClient,
|
|
|
|
|
|
|
|
convert_datetime_to_nanoseconds,
|
|
|
|
|
|
|
|
convert_to_span_id,
|
|
|
|
|
|
|
|
convert_to_trace_id,
|
|
|
|
|
|
|
|
extract_retrieval_documents,
|
|
|
|
|
|
|
|
)
|
|
|
|
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
|
|
|
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
|
|
|
|
|
|
|
from core.ops.aliyun_trace.entities.semconv import (
|
|
|
|
|
|
|
|
GEM_AI_COMPLETION,
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK,
|
|
|
|
|
|
|
|
GEN_AI_MODEL_NAME,
|
|
|
|
|
|
|
|
GEN_AI_PROMPT,
|
|
|
|
|
|
|
|
GEN_AI_PROMPT_TEMPLATE_TEMPLATE,
|
|
|
|
|
|
|
|
GEN_AI_PROMPT_TEMPLATE_VARIABLE,
|
|
|
|
|
|
|
|
GEN_AI_RESPONSE_FINISH_REASON,
|
|
|
|
|
|
|
|
GEN_AI_SESSION_ID,
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND,
|
|
|
|
|
|
|
|
GEN_AI_USAGE_INPUT_TOKENS,
|
|
|
|
|
|
|
|
GEN_AI_USAGE_OUTPUT_TOKENS,
|
|
|
|
|
|
|
|
GEN_AI_USAGE_TOTAL_TOKENS,
|
|
|
|
|
|
|
|
GEN_AI_USER_ID,
|
|
|
|
|
|
|
|
INPUT_VALUE,
|
|
|
|
|
|
|
|
OUTPUT_VALUE,
|
|
|
|
|
|
|
|
RETRIEVAL_DOCUMENT,
|
|
|
|
|
|
|
|
RETRIEVAL_QUERY,
|
|
|
|
|
|
|
|
TOOL_DESCRIPTION,
|
|
|
|
|
|
|
|
TOOL_NAME,
|
|
|
|
|
|
|
|
TOOL_PARAMETERS,
|
|
|
|
|
|
|
|
Gen_AI_SYSTEM,
|
|
|
|
|
|
|
|
GenAISpanKind,
|
|
|
|
|
|
|
|
)
|
|
|
|
from core.ops.base_trace_instance import BaseTraceInstance
|
|
|
|
from core.ops.base_trace_instance import BaseTraceInstance
|
|
|
|
from core.ops.entities.config_entity import AliyunConfig
|
|
|
|
from core.ops.entities.config_entity import AliyunConfig
|
|
|
|
from core.ops.entities.trace_entity import (
|
|
|
|
from core.ops.entities.trace_entity import (
|
|
|
|
@ -17,33 +48,10 @@ from core.ops.entities.trace_entity import (
|
|
|
|
ToolTraceInfo,
|
|
|
|
ToolTraceInfo,
|
|
|
|
WorkflowTraceInfo,
|
|
|
|
WorkflowTraceInfo,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
from models import EndUser, db
|
|
|
|
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
|
|
|
|
|
|
|
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionMetadataKey
|
|
|
|
|
|
|
|
from core.workflow.nodes import NodeType
|
|
|
|
def convert_to_trace_id(uuid_v4:str) -> int:
|
|
|
|
from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom, db
|
|
|
|
try:
|
|
|
|
|
|
|
|
uuid_obj = uuid.UUID(uuid_v4)
|
|
|
|
|
|
|
|
return uuid_obj.int
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise ValueError(f"Invalid UUID input: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_span_id(uuid_v4:str, span_type:str) -> int:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
uuid_obj = uuid.UUID(uuid_v4)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise ValueError(f"Invalid UUID input: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type_hash = hash(span_type) & 0xFFFFFFFFFFFFFFFF
|
|
|
|
|
|
|
|
span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return span_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]:
|
|
|
|
|
|
|
|
if start_time_a is None:
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
timestamp_in_seconds = start_time_a.timestamp()
|
|
|
|
|
|
|
|
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
|
|
|
|
|
|
|
|
return timestamp_in_nanoseconds
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AliyunDataTrace(BaseTraceInstance):
|
|
|
|
class AliyunDataTrace(BaseTraceInstance):
|
|
|
|
@ -53,8 +61,8 @@ class AliyunDataTrace(BaseTraceInstance):
|
|
|
|
aliyun_config: AliyunConfig,
|
|
|
|
aliyun_config: AliyunConfig,
|
|
|
|
):
|
|
|
|
):
|
|
|
|
super().__init__(aliyun_config)
|
|
|
|
super().__init__(aliyun_config)
|
|
|
|
endpoint = aliyun_config.endpoint+aliyun_config.license_key+'/api/otlp/traces'
|
|
|
|
endpoint = aliyun_config.endpoint + aliyun_config.license_key + '/api/otlp/traces'
|
|
|
|
self.trace_client = TraceClient(service_name=aliyun_config.app_name,endpoint=endpoint)
|
|
|
|
self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint)
|
|
|
|
|
|
|
|
|
|
|
|
def trace(self, trace_info: BaseTraceInfo):
|
|
|
|
def trace(self, trace_info: BaseTraceInfo):
|
|
|
|
if isinstance(trace_info, WorkflowTraceInfo):
|
|
|
|
if isinstance(trace_info, WorkflowTraceInfo):
|
|
|
|
@ -77,12 +85,203 @@ class AliyunDataTrace(BaseTraceInstance):
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
|
|
|
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
|
|
|
pass
|
|
|
|
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
|
|
|
|
|
|
|
|
message_span_id = None
|
|
|
|
|
|
|
|
if trace_info.message_id:
|
|
|
|
|
|
|
|
message_span_id = convert_to_span_id(trace_info.message_id, 'message')
|
|
|
|
|
|
|
|
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, 'workflow')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.add_root_span(trace_id, message_span_id, workflow_span_id, trace_info)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# through workflow_run_id get all_nodes_execution using repository
|
|
|
|
|
|
|
|
session_factory = sessionmaker(bind=db.engine)
|
|
|
|
|
|
|
|
# Find the app's creator account
|
|
|
|
|
|
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
|
|
|
|
|
|
# Get the app to find its creator
|
|
|
|
|
|
|
|
app_id = trace_info.metadata.get("app_id")
|
|
|
|
|
|
|
|
if not app_id:
|
|
|
|
|
|
|
|
raise ValueError("No app_id found in trace_info metadata")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = session.query(App).filter(App.id == app_id).first()
|
|
|
|
|
|
|
|
if not app:
|
|
|
|
|
|
|
|
raise ValueError(f"App with id {app_id} not found")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not app.created_by:
|
|
|
|
|
|
|
|
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
service_account = session.query(Account).filter(Account.id == app.created_by).first()
|
|
|
|
|
|
|
|
if not service_account:
|
|
|
|
|
|
|
|
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
|
|
|
|
|
|
|
|
current_tenant = (
|
|
|
|
|
|
|
|
session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first()
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
if not current_tenant:
|
|
|
|
|
|
|
|
raise ValueError(f"Current tenant not found for account {service_account.id}")
|
|
|
|
|
|
|
|
service_account.set_tenant_id(current_tenant.tenant_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
|
|
|
|
|
|
|
session_factory=session_factory,
|
|
|
|
|
|
|
|
user=service_account,
|
|
|
|
|
|
|
|
app_id=trace_info.metadata.get("app_id"),
|
|
|
|
|
|
|
|
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Get all executions for this workflow run
|
|
|
|
|
|
|
|
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
|
|
|
|
|
|
|
|
workflow_run_id=trace_info.workflow_run_id
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for node_execution in workflow_node_executions:
|
|
|
|
|
|
|
|
node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
|
|
|
|
|
|
|
|
self.trace_client.add_span(node_span)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_workflow_node_span(
|
|
|
|
|
|
|
|
self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo,
|
|
|
|
|
|
|
|
workflow_span_id: int):
|
|
|
|
|
|
|
|
span_status: Status = Status(StatusCode.UNSET)
|
|
|
|
|
|
|
|
if node_execution.status == StatusCode.OK:
|
|
|
|
|
|
|
|
span_status = Status(StatusCode.OK)
|
|
|
|
|
|
|
|
elif node_execution.status == StatusCode.ERROR:
|
|
|
|
|
|
|
|
span_status = Status(StatusCode.ERROR, str(node_execution.error))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if node_execution.node_type == NodeType.LLM:
|
|
|
|
|
|
|
|
node_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=workflow_span_id,
|
|
|
|
|
|
|
|
span_id=convert_to_span_id(node_execution.id, 'node'),
|
|
|
|
|
|
|
|
name=node_execution.title,
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''),
|
|
|
|
|
|
|
|
Gen_AI_SYSTEM: node_execution.process_data.get('model_provider', ''),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)),
|
|
|
|
|
|
|
|
GEN_AI_USAGE_OUTPUT_TOKENS: str(
|
|
|
|
|
|
|
|
node_execution.outputs.get('usage', {}).get('completion_tokens', 0)),
|
|
|
|
|
|
|
|
GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
|
|
|
|
|
|
|
|
GEM_AI_COMPLETION: str(node_execution.outputs.get('text', '')),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: str(node_execution.outputs.get('text', ''))
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
status=span_status,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
|
|
|
|
|
|
|
node_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=workflow_span_id,
|
|
|
|
|
|
|
|
span_id=convert_to_span_id(node_execution.id, 'node'),
|
|
|
|
|
|
|
|
name=node_execution.title,
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RETRIEVAL_QUERY: str(node_execution.inputs.get('query', '')),
|
|
|
|
|
|
|
|
RETRIEVAL_DOCUMENT: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: str(node_execution.inputs.get('query', '')),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
elif node_execution.node_type == NodeType.TOOL:
|
|
|
|
|
|
|
|
node_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=workflow_span_id,
|
|
|
|
|
|
|
|
span_id=convert_to_span_id(node_execution.id, 'node'),
|
|
|
|
|
|
|
|
name=node_execution.title,
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TOOL_NAME: node_execution.title,
|
|
|
|
|
|
|
|
TOOL_DESCRIPTION: json.dumps(
|
|
|
|
|
|
|
|
node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}),
|
|
|
|
|
|
|
|
ensure_ascii=False),
|
|
|
|
|
|
|
|
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {},
|
|
|
|
|
|
|
|
ensure_ascii=False),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
status=span_status,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
node_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=workflow_span_id,
|
|
|
|
|
|
|
|
span_id=convert_to_span_id(node_execution.id, 'node'),
|
|
|
|
|
|
|
|
name=node_execution.title,
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value,
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
status=span_status,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return node_span
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_root_span(self, trace_id, message_span_id, workflow_span_id, trace_info):
|
|
|
|
|
|
|
|
user_id = trace_info.metadata.get("user_id")
|
|
|
|
|
|
|
|
message_id = trace_info.message_id
|
|
|
|
|
|
|
|
if message_id: # chatflow
|
|
|
|
|
|
|
|
message_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=None,
|
|
|
|
|
|
|
|
span_id=message_span_id,
|
|
|
|
|
|
|
|
name='message',
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
|
|
|
|
GEN_AI_USER_ID: str(user_id),
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: 'CHAIN',
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
self.trace_client.add_span(message_span)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workflow_span = SpanData(
|
|
|
|
|
|
|
|
trace_id=trace_id,
|
|
|
|
|
|
|
|
parent_span_id=message_span_id,
|
|
|
|
|
|
|
|
span_id=workflow_span_id,
|
|
|
|
|
|
|
|
name='workflow',
|
|
|
|
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
|
|
|
|
attributes={
|
|
|
|
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
|
|
|
|
GEN_AI_USER_ID: str(user_id),
|
|
|
|
|
|
|
|
GEN_AI_SPAN_KIND: 'CHAIN',
|
|
|
|
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: str(trace_info.workflow_run_outputs),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
self.trace_client.add_span(workflow_span)
|
|
|
|
|
|
|
|
|
|
|
|
def message_trace(self, trace_info: MessageTraceInfo):
|
|
|
|
def message_trace(self, trace_info: MessageTraceInfo):
|
|
|
|
# get message file data
|
|
|
|
|
|
|
|
file_list = trace_info.file_list
|
|
|
|
|
|
|
|
metadata = trace_info.metadata
|
|
|
|
|
|
|
|
message_data = trace_info.message_data
|
|
|
|
message_data = trace_info.message_data
|
|
|
|
if message_data is None:
|
|
|
|
if message_data is None:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
@ -96,97 +295,118 @@ class AliyunDataTrace(BaseTraceInstance):
|
|
|
|
if end_user_data is not None:
|
|
|
|
if end_user_data is not None:
|
|
|
|
user_id = end_user_data.session_id
|
|
|
|
user_id = end_user_data.session_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
status: Status = Status(StatusCode.OK)
|
|
|
|
|
|
|
|
if trace_info.error:
|
|
|
|
|
|
|
|
status = Status(StatusCode.ERROR, trace_info.error)
|
|
|
|
|
|
|
|
|
|
|
|
message_span = SpanData(
|
|
|
|
message_span = SpanData(
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
parent_span_id=None,
|
|
|
|
parent_span_id=None,
|
|
|
|
span_id=convert_to_span_id(message_id,'message'),
|
|
|
|
span_id=convert_to_span_id(message_id, 'message'),
|
|
|
|
name='message',
|
|
|
|
name='message',
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
attributes={
|
|
|
|
attributes={
|
|
|
|
'gen_ai.session.id': trace_info.metadata.get('conversation_id',''),
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
'gen_ai.user.id':str(user_id),
|
|
|
|
GEN_AI_USER_ID: str(user_id),
|
|
|
|
'gen_ai.span.kind':'CHAIN',
|
|
|
|
GEN_AI_SPAN_KIND: 'CHAIN',
|
|
|
|
'gen_ai.framework': 'dify',
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
'input.value':json.dumps(trace_info.inputs),
|
|
|
|
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
|
|
|
'output.value': str(trace_info.outputs),
|
|
|
|
OUTPUT_VALUE: str(trace_info.outputs),
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
status=status,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.trace_client.add_span(message_span)
|
|
|
|
self.trace_client.add_span(message_span)
|
|
|
|
|
|
|
|
|
|
|
|
llm_span = SpanData(
|
|
|
|
llm_span = SpanData(
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
parent_span_id=convert_to_span_id(message_id,'message'),
|
|
|
|
parent_span_id=convert_to_span_id(message_id, 'message'),
|
|
|
|
span_id=convert_to_span_id(message_id,'llm'),
|
|
|
|
span_id=convert_to_span_id(message_id, 'llm'),
|
|
|
|
name='llm',
|
|
|
|
name='llm',
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
attributes={
|
|
|
|
attributes={
|
|
|
|
'gen_ai.session.id': trace_info.metadata.get('conversation_id',''),
|
|
|
|
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
|
|
|
|
'gen_ai.user.id': str(user_id),
|
|
|
|
GEN_AI_USER_ID: str(user_id),
|
|
|
|
'gen_ai.span.kind':'LLM',
|
|
|
|
GEN_AI_SPAN_KIND: 'LLM',
|
|
|
|
'gen_ai.framework': 'dify',
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
|
|
|
|
|
|
|
|
'gen_ai.prompt_template.template': 'todo',
|
|
|
|
GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''),
|
|
|
|
'gen_ai.model_name':trace_info.message_data.model_id,
|
|
|
|
Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''),
|
|
|
|
|
|
|
|
|
|
|
|
'input.value':json.dumps(trace_info.inputs),
|
|
|
|
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
|
|
|
|
'output.value': str(trace_info.outputs),
|
|
|
|
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
|
|
|
|
}
|
|
|
|
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
GEM_AI_COMPLETION: str(trace_info.outputs),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
|
|
|
|
|
|
|
OUTPUT_VALUE: str(trace_info.outputs),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
status=status,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
self.trace_client.add_span(llm_span)
|
|
|
|
self.trace_client.add_span(llm_span)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
|
|
|
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
|
|
|
if trace_info.message_data is None:
|
|
|
|
if trace_info.message_data is None:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
message_id = trace_info.message_id
|
|
|
|
message_id = trace_info.message_id
|
|
|
|
|
|
|
|
|
|
|
|
span_data = SpanData(
|
|
|
|
documents_data = extract_retrieval_documents(trace_info.documents)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dataset_retrieval_span = SpanData(
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
parent_span_id=convert_to_span_id(message_id,'message'),
|
|
|
|
parent_span_id=convert_to_span_id(message_id, 'message'),
|
|
|
|
span_id=convert_to_span_id(message_id,'dataset_retrieval'),
|
|
|
|
span_id=convert_to_span_id(message_id, 'dataset_retrieval'),
|
|
|
|
name='dataset_retrieval',
|
|
|
|
name='dataset_retrieval',
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
attributes={
|
|
|
|
attributes={
|
|
|
|
'gen_ai.session.id': 'todo',
|
|
|
|
GEN_AI_SPAN_KIND: 'RETRIEVER',
|
|
|
|
'gen_ai.user.id': 'todo',
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
'gen_ai.span.kind': 'RETRIEVER',
|
|
|
|
|
|
|
|
'gen_ai.framework': 'dify',
|
|
|
|
RETRIEVAL_QUERY: str(trace_info.inputs),
|
|
|
|
|
|
|
|
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
|
|
|
|
'gen_ai.operation.name': 'TASK',
|
|
|
|
|
|
|
|
'retrieval.query':str(trace_info.inputs),
|
|
|
|
INPUT_VALUE: str(trace_info.inputs),
|
|
|
|
'retrieval.document ':str(trace_info.documents)
|
|
|
|
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.trace_client.add_span(span_data)
|
|
|
|
self.trace_client.add_span(dataset_retrieval_span)
|
|
|
|
|
|
|
|
|
|
|
|
def tool_trace(self, trace_info: ToolTraceInfo):
|
|
|
|
def tool_trace(self, trace_info: ToolTraceInfo):
|
|
|
|
if trace_info.message_data is None:
|
|
|
|
if trace_info.message_data is None:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
message_id = trace_info.message_id
|
|
|
|
message_id = trace_info.message_id
|
|
|
|
|
|
|
|
|
|
|
|
span_data = SpanData(
|
|
|
|
status: Status = Status(StatusCode.OK)
|
|
|
|
|
|
|
|
if trace_info.error:
|
|
|
|
|
|
|
|
status = Status(StatusCode.ERROR, trace_info.error)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tool_span = SpanData(
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
trace_id=convert_to_trace_id(message_id),
|
|
|
|
parent_span_id=convert_to_span_id(message_id,'message'),
|
|
|
|
parent_span_id=convert_to_span_id(message_id, 'message'),
|
|
|
|
span_id=convert_to_span_id(message_id,'tool'),
|
|
|
|
span_id=convert_to_span_id(message_id, 'tool'),
|
|
|
|
name='tool',
|
|
|
|
name=trace_info.tool_name,
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
|
|
|
attributes={
|
|
|
|
attributes={
|
|
|
|
'gen_ai.session.id': 'todo',
|
|
|
|
GEN_AI_SPAN_KIND: 'Tool',
|
|
|
|
'gen_ai.user.id': 'todo',
|
|
|
|
GEN_AI_FRAMEWORK: 'dify',
|
|
|
|
'gen_ai.span.kind': 'Tool',
|
|
|
|
|
|
|
|
'gen_ai.framework': 'dify',
|
|
|
|
TOOL_NAME: trace_info.tool_name,
|
|
|
|
|
|
|
|
TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False),
|
|
|
|
'tool.name': trace_info.tool_name,
|
|
|
|
TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
|
|
|
|
'tool.description': trace_info.tool_name,
|
|
|
|
|
|
|
|
'tool.parameters': json.dumps(trace_info.tool_inputs),
|
|
|
|
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
|
|
|
'input.value': json.dumps(trace_info.inputs),
|
|
|
|
OUTPUT_VALUE: str(trace_info.tool_outputs),
|
|
|
|
'output.value': str(trace_info.tool_outputs),
|
|
|
|
},
|
|
|
|
}
|
|
|
|
status=status,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.trace_client.add_span(span_data)
|
|
|
|
self.trace_client.add_span(tool_span)
|
|
|
|
|