add aliyun trace project url

fix aliyun trace web & update icon
pull/21471/head
hieheihei 11 months ago
parent 267c39a55a
commit 2354b8bef6

@ -1,5 +1,8 @@
import json import json
import logging
from collections.abc import Sequence
from typing import Optional from typing import Optional
from urllib.parse import urljoin
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Status, StatusCode
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, sessionmaker
@ -9,7 +12,7 @@ from core.ops.aliyun_trace.data_exporter.traceclient import (
convert_datetime_to_nanoseconds, convert_datetime_to_nanoseconds,
convert_to_span_id, convert_to_span_id,
convert_to_trace_id, convert_to_trace_id,
extract_retrieval_documents, generate_span_id,
) )
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
from core.ops.aliyun_trace.entities.semconv import ( from core.ops.aliyun_trace.entities.semconv import (
@ -48,11 +51,18 @@ from core.ops.entities.trace_entity import (
ToolTraceInfo, ToolTraceInfo,
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.rag.models.document import Document
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionMetadataKey from core.workflow.entities.workflow_node_execution import (
WorkflowNodeExecution,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom, db from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom, db
logger = logging.getLogger(__name__)
class AliyunDataTrace(BaseTraceInstance): class AliyunDataTrace(BaseTraceInstance):
@ -61,7 +71,8 @@ class AliyunDataTrace(BaseTraceInstance):
aliyun_config: AliyunConfig, aliyun_config: AliyunConfig,
): ):
super().__init__(aliyun_config) super().__init__(aliyun_config)
endpoint = aliyun_config.endpoint + aliyun_config.license_key + '/api/otlp/traces' base_url = aliyun_config.endpoint.rstrip('/')
endpoint = urljoin(base_url, f'adapt_{aliyun_config.license_key}/api/otlp/traces')
self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint) self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint)
def trace(self, trace_info: BaseTraceInfo): def trace(self, trace_info: BaseTraceInfo):
@ -72,7 +83,7 @@ class AliyunDataTrace(BaseTraceInstance):
if isinstance(trace_info, ModerationTraceInfo): if isinstance(trace_info, ModerationTraceInfo):
pass pass
if isinstance(trace_info, SuggestedQuestionTraceInfo): if isinstance(trace_info, SuggestedQuestionTraceInfo):
pass self.suggested_question_trace(trace_info)
if isinstance(trace_info, DatasetRetrievalTraceInfo): if isinstance(trace_info, DatasetRetrievalTraceInfo):
self.dataset_retrieval_trace(trace_info) self.dataset_retrieval_trace(trace_info)
if isinstance(trace_info, ToolTraceInfo): if isinstance(trace_info, ToolTraceInfo):
@ -81,18 +92,146 @@ class AliyunDataTrace(BaseTraceInstance):
pass pass
def api_check(self): def api_check(self):
# todo return self.trace_client.api_check()
return True
def get_project_url(self):
try:
return self.trace_client.get_project_url()
except Exception as e:
logger.info(f"Aliyun get run url failed: {str(e)}", exc_info=True)
raise ValueError(f"Aliyun get run url failed: {str(e)}")
def workflow_trace(self, trace_info: WorkflowTraceInfo): def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = convert_to_trace_id(trace_info.workflow_run_id) trace_id = convert_to_trace_id(trace_info.workflow_run_id)
message_span_id = None
if trace_info.message_id:
message_span_id = convert_to_span_id(trace_info.message_id, 'message')
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, 'workflow') workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, 'workflow')
self.add_workflow_span(trace_id, workflow_span_id, trace_info)
workflow_node_executions = self.get_workflow_node_executions(trace_info)
for node_execution in workflow_node_executions:
node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
self.trace_client.add_span(node_span)
def message_trace(self, trace_info: MessageTraceInfo):
message_data = trace_info.message_data
if message_data is None:
return
message_id = trace_info.message_id
user_id = message_data.from_account_id
if message_data.from_end_user_id:
end_user_data: Optional[EndUser] = (
db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
)
if end_user_data is not None:
user_id = end_user_data.session_id
status: Status = Status(StatusCode.OK)
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
message_span_id = convert_to_span_id(message_id, 'message')
message_span = SpanData(
trace_id=trace_id,
parent_span_id=None,
span_id=message_span_id,
name='message',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.outputs),
},
status=status,
)
self.trace_client.add_span(message_span)
self.add_root_span(trace_id, message_span_id, workflow_span_id, trace_info) llm_span = SpanData(
trace_id=trace_id,
parent_span_id=message_span_id,
span_id=convert_to_span_id(message_id, 'llm'),
name='llm',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''),
Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''),
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False),
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt,
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEM_AI_COMPLETION: str(trace_info.outputs),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.outputs),
},
status=status,
)
self.trace_client.add_span(llm_span)
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
if trace_info.message_data is None:
return
message_id = trace_info.message_id
documents_data = extract_retrieval_documents(trace_info.documents)
dataset_retrieval_span = SpanData(
trace_id=convert_to_trace_id(message_id),
parent_span_id=convert_to_span_id(message_id, 'message'),
span_id=generate_span_id(),
name='dataset_retrieval',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
GEN_AI_FRAMEWORK: 'dify',
RETRIEVAL_QUERY: str(trace_info.inputs),
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
INPUT_VALUE: str(trace_info.inputs),
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
}
)
self.trace_client.add_span(dataset_retrieval_span)
def tool_trace(self, trace_info: ToolTraceInfo):
if trace_info.message_data is None:
return
message_id = trace_info.message_id
status: Status = Status(StatusCode.OK)
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
tool_span = SpanData(
trace_id=convert_to_trace_id(message_id),
parent_span_id=convert_to_span_id(message_id, 'message'),
span_id=generate_span_id(),
name=trace_info.tool_name,
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
GEN_AI_FRAMEWORK: 'dify',
TOOL_NAME: trace_info.tool_name,
TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.tool_outputs),
},
status=status,
)
self.trace_client.add_span(tool_span)
def get_workflow_node_executions(self, trace_info: WorkflowTraceInfo) -> Sequence[WorkflowNodeExecution]:
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
# Find the app's creator account # Find the app's creator account
@ -118,34 +257,49 @@ class AliyunDataTrace(BaseTraceInstance):
if not current_tenant: if not current_tenant:
raise ValueError(f"Current tenant not found for account {service_account.id}") raise ValueError(f"Current tenant not found for account {service_account.id}")
service_account.set_tenant_id(current_tenant.tenant_id) service_account.set_tenant_id(current_tenant.tenant_id)
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, session_factory=session_factory,
user=service_account, user=service_account,
app_id=trace_info.metadata.get("app_id"), app_id=trace_info.metadata.get("app_id"),
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
) )
# Get all executions for this workflow run # Get all executions for this workflow run
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run( workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
workflow_run_id=trace_info.workflow_run_id workflow_run_id=trace_info.workflow_run_id
) )
return workflow_node_executions
for node_execution in workflow_node_executions:
node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
self.trace_client.add_span(node_span)
def build_workflow_node_span( def build_workflow_node_span(
self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo, self, node_execution: WorkflowNodeExecution, trace_id: int,
workflow_span_id: int): trace_info: WorkflowTraceInfo,workflow_span_id: int):
try:
if node_execution.node_type == NodeType.LLM:
node_span = self.build_workflow_llm_span(
trace_id, workflow_span_id, trace_info, node_execution)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
node_span = self.build_workflow_retrieval_span(
trace_id, workflow_span_id, trace_info, node_execution)
elif node_execution.node_type == NodeType.TOOL:
node_span = self.build_workflow_tool_span(
trace_id, workflow_span_id, trace_info, node_execution)
else:
node_span = self.build_workflow_task_span(
trace_id, workflow_span_id, trace_info, node_execution)
return node_span
except Exception:
return None
def get_workflow_node_status(self, node_execution: WorkflowNodeExecution) -> Status:
span_status: Status = Status(StatusCode.UNSET) span_status: Status = Status(StatusCode.UNSET)
if node_execution.status == StatusCode.OK: if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
span_status = Status(StatusCode.OK) span_status = Status(StatusCode.OK)
elif node_execution.status == StatusCode.ERROR: elif node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
span_status = Status(StatusCode.ERROR, str(node_execution.error)) span_status = Status(StatusCode.ERROR, str(node_execution.error))
return span_status
if node_execution.node_type == NodeType.LLM: def build_workflow_task_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo,
node_span = SpanData( node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
span_id=convert_to_span_id(node_execution.id, 'node'), span_id=convert_to_span_id(node_execution.id, 'node'),
@ -154,29 +308,17 @@ class AliyunDataTrace(BaseTraceInstance):
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
attributes={ attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''), GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False),
GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''), OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
Gen_AI_SYSTEM: node_execution.process_data.get('model_provider', ''),
GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(
node_execution.outputs.get('usage', {}).get('completion_tokens', 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)),
GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
GEM_AI_COMPLETION: str(node_execution.outputs.get('text', '')),
GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''),
INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
OUTPUT_VALUE: str(node_execution.outputs.get('text', ''))
}, },
status=span_status, status=self.get_workflow_node_status(node_execution),
) )
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
node_span = SpanData( def build_workflow_tool_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
span_id=convert_to_span_id(node_execution.id, 'node'), span_id=convert_to_span_id(node_execution.id, 'node'),
@ -184,18 +326,21 @@ class AliyunDataTrace(BaseTraceInstance):
start_time=convert_datetime_to_nanoseconds(node_execution.created_at), start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
attributes={ attributes={
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value, GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
TOOL_NAME: node_execution.title,
RETRIEVAL_QUERY: str(node_execution.inputs.get('query', '')), TOOL_DESCRIPTION: json.dumps(
RETRIEVAL_DOCUMENT: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False), node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}),ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False),
INPUT_VALUE: str(node_execution.inputs.get('query', '')), INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {},ensure_ascii=False),
OUTPUT_VALUE: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False), OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
} },
status=self.get_workflow_node_status(node_execution),
) )
elif node_execution.node_type == NodeType.TOOL:
node_span = SpanData( def build_workflow_retrieval_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
span_id=convert_to_span_id(node_execution.id, 'node'), span_id=convert_to_span_id(node_execution.id, 'node'),
@ -203,23 +348,19 @@ class AliyunDataTrace(BaseTraceInstance):
start_time=convert_datetime_to_nanoseconds(node_execution.created_at), start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
attributes={ attributes={
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value, GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
RETRIEVAL_QUERY: str(node_execution.inputs.get('query', '')),
TOOL_NAME: node_execution.title, RETRIEVAL_DOCUMENT: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False),
TOOL_DESCRIPTION: json.dumps( INPUT_VALUE: str(node_execution.inputs.get('query', '')),
node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}), OUTPUT_VALUE: json.dumps(node_execution.outputs.get('result', []), ensure_ascii=False),
ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {},
ensure_ascii=False),
INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
}, },
status=span_status, status=self.get_workflow_node_status(node_execution),
) )
else:
node_span = SpanData( def build_workflow_llm_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo,
node_execution: WorkflowNodeExecution) -> SpanData:
return SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=workflow_span_id, parent_span_id=workflow_span_id,
span_id=convert_to_span_id(node_execution.id, 'node'), span_id=convert_to_span_id(node_execution.id, 'node'),
@ -228,21 +369,31 @@ class AliyunDataTrace(BaseTraceInstance):
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
attributes={ attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''), GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value, GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: node_execution.process_data.get('model_name', ''),
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False), Gen_AI_SYSTEM: node_execution.process_data.get('model_provider', ''),
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False), GEN_AI_USAGE_INPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('prompt_tokens', 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(node_execution.outputs.get('usage', {}).get('completion_tokens', 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(node_execution.outputs.get('usage', {}).get('total_tokens', 0)),
GEN_AI_PROMPT: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
GEM_AI_COMPLETION: str(node_execution.outputs.get('text', '')),
GEN_AI_RESPONSE_FINISH_REASON: node_execution.outputs.get('finish_reason', ''),
INPUT_VALUE: json.dumps(node_execution.process_data.get('prompts', []), ensure_ascii=False),
OUTPUT_VALUE: str(node_execution.outputs.get('text', ''))
}, },
status=span_status, status=self.get_workflow_node_status(node_execution),
) )
return node_span def add_workflow_span(self, trace_id:int, workflow_span_id:int, trace_info: WorkflowTraceInfo):
message_span_id = None
def add_root_span(self, trace_id, message_span_id, workflow_span_id, trace_info): if trace_info.message_id:
message_span_id = convert_to_span_id(trace_info.message_id, 'message')
user_id = trace_info.metadata.get("user_id") user_id = trace_info.metadata.get("user_id")
message_id = trace_info.message_id status: Status = Status(StatusCode.OK)
if message_id: # chatflow if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
if trace_info.message_id: # chatflow
message_span = SpanData( message_span = SpanData(
trace_id=trace_id, trace_id=trace_id,
parent_span_id=None, parent_span_id=None,
@ -253,12 +404,12 @@ class AliyunDataTrace(BaseTraceInstance):
attributes={ attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''), GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_USER_ID: str(user_id), GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: 'CHAIN', GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: trace_info.workflow_run_inputs.get('sys.query',''),
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
} },
status=status,
) )
self.trace_client.add_span(message_span) self.trace_client.add_span(message_span)
@ -270,143 +421,54 @@ class AliyunDataTrace(BaseTraceInstance):
start_time=convert_datetime_to_nanoseconds(trace_info.start_time), start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={ attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_USER_ID: str(user_id), GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: 'CHAIN', GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False), INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.workflow_run_outputs), OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
} },
status=status,
) )
self.trace_client.add_span(workflow_span) self.trace_client.add_span(workflow_span)
def message_trace(self, trace_info: MessageTraceInfo): def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
message_data = trace_info.message_data
if message_data is None:
return
message_id = trace_info.message_id message_id = trace_info.message_id
user_id = message_data.from_account_id
if message_data.from_end_user_id:
end_user_data: Optional[EndUser] = (
db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
)
if end_user_data is not None:
user_id = end_user_data.session_id
status: Status = Status(StatusCode.OK) status: Status = Status(StatusCode.OK)
if trace_info.error: if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error) status = Status(StatusCode.ERROR, trace_info.error)
suggested_question_span = SpanData(
message_span = SpanData(
trace_id=convert_to_trace_id(message_id),
parent_span_id=None,
span_id=convert_to_span_id(message_id, 'message'),
name='message',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: 'CHAIN',
GEN_AI_FRAMEWORK: 'dify',
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.outputs),
},
status=status,
)
self.trace_client.add_span(message_span)
llm_span = SpanData(
trace_id=convert_to_trace_id(message_id), trace_id=convert_to_trace_id(message_id),
parent_span_id=convert_to_span_id(message_id, 'message'), parent_span_id=convert_to_span_id(message_id, 'message'),
span_id=convert_to_span_id(message_id, 'llm'), span_id=convert_to_span_id(message_id, 'suggested_question'),
name='llm', name='suggested_question',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time), start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={ attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get('conversation_id', ''), GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: 'LLM',
GEN_AI_FRAMEWORK: 'dify', GEN_AI_FRAMEWORK: 'dify',
GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''), GEN_AI_MODEL_NAME: trace_info.metadata.get('ls_model_name', ''),
Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''), Gen_AI_SYSTEM: trace_info.metadata.get('ls_provider', ''),
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(trace_info.message_data.inputs, ensure_ascii=False),
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: trace_info.message_data.app_model_config.pre_prompt,
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEM_AI_COMPLETION: str(trace_info.outputs), GEM_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.outputs), OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
}, },
status=status, status=status,
) )
self.trace_client.add_span(suggested_question_span)
self.trace_client.add_span(llm_span)
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo): def extract_retrieval_documents(documents: list[Document]):
if trace_info.message_data is None: documents_data = []
return for document in documents:
message_id = trace_info.message_id document_data = {
"content": document.page_content,
documents_data = extract_retrieval_documents(trace_info.documents) "metadata": {
"dataset_id": document.metadata.get('dataset_id'),
dataset_retrieval_span = SpanData( "doc_id": document.metadata.get('doc_id'),
trace_id=convert_to_trace_id(message_id), "document_id": document.metadata.get('document_id'),
parent_span_id=convert_to_span_id(message_id, 'message'),
span_id=convert_to_span_id(message_id, 'dataset_retrieval'),
name='dataset_retrieval',
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SPAN_KIND: 'RETRIEVER',
GEN_AI_FRAMEWORK: 'dify',
RETRIEVAL_QUERY: str(trace_info.inputs),
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
INPUT_VALUE: str(trace_info.inputs),
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
}
)
self.trace_client.add_span(dataset_retrieval_span)
def tool_trace(self, trace_info: ToolTraceInfo):
if trace_info.message_data is None:
return
message_id = trace_info.message_id
status: Status = Status(StatusCode.OK)
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
tool_span = SpanData(
trace_id=convert_to_trace_id(message_id),
parent_span_id=convert_to_span_id(message_id, 'message'),
span_id=convert_to_span_id(message_id, 'tool'),
name=trace_info.tool_name,
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SPAN_KIND: 'Tool',
GEN_AI_FRAMEWORK: 'dify',
TOOL_NAME: trace_info.tool_name,
TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
OUTPUT_VALUE: str(trace_info.tool_outputs),
}, },
status=status, "score": document.metadata.get('score'),
) }
self.trace_client.add_span(tool_span) documents_data.append(document_data)
return documents_data

@ -1,9 +1,15 @@
import datetime import datetime
import hashlib
import logging
import random
import socket import socket
import threading
import uuid import uuid
from collections import deque
from collections.abc import Sequence from collections.abc import Sequence
from typing import Optional from typing import Optional
import requests
from opentelemetry import trace as trace_api from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.resources import Resource
@ -13,11 +19,16 @@ from opentelemetry.semconv.resource import ResourceAttributes
from configs import dify_config from configs import dify_config
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
from core.rag.models.document import Document
INVALID_SPAN_ID = 0x0000000000000000
INVALID_TRACE_ID = 0x00000000000000000000000000000000
logger = logging.getLogger(__name__)
class TraceClient: class TraceClient:
def __init__(self, service_name, endpoint): def __init__(self, service_name:str, endpoint:str,max_queue_size:int=1000,
schedule_delay_sec:int=5, max_export_batch_size:int=50):
self.endpoint = endpoint self.endpoint = endpoint
self.resource = Resource( self.resource = Resource(
attributes={ attributes={
@ -30,13 +41,78 @@ class TraceClient:
self.span_builder = SpanBuilder(self.resource) self.span_builder = SpanBuilder(self.resource)
self.exporter = OTLPSpanExporter(endpoint=endpoint) self.exporter = OTLPSpanExporter(endpoint=endpoint)
def add_span(self, span_data: SpanData): self.max_queue_size = max_queue_size
span: ReadableSpan = self.span_builder.build_span(span_data) self.schedule_delay_sec = schedule_delay_sec
self.export([span]) self.max_export_batch_size = max_export_batch_size
self.queue = deque(maxlen=max_queue_size)
self.condition = threading.Condition(threading.Lock())
self.done = False
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
self._spans_dropped = False
def export(self, spans: Sequence[ReadableSpan]): def export(self, spans: Sequence[ReadableSpan]):
self.exporter.export(spans) self.exporter.export(spans)
def api_check(self):
try:
response = requests.head(self.endpoint, timeout=5)
if response.status_code == 405:
return True
else:
logger.debug(f"AliyunTrace API check failed: Unexpected status code: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
logger.debug(f"AliyunTrace API check failed: {str(e)}")
raise ValueError(f"AliyunTrace API check failed: {str(e)}")
def get_project_url(self):
return 'https://arms.console.aliyun.com/#/llm'
def add_span(self, span_data: SpanData):
if span_data is None:
return
span: ReadableSpan = self.span_builder.build_span(span_data)
with self.condition:
if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
self._spans_dropped = True
self.queue.appendleft(span)
if len(self.queue) >= self.max_export_batch_size:
self.condition.notify()
def _worker(self):
while not self.done:
with self.condition:
if len(self.queue) < self.max_export_batch_size and not self.done:
self.condition.wait(timeout=self.schedule_delay_sec)
self._export_batch()
def _export_batch(self):
spans_to_export = []
with self.condition:
while len(spans_to_export) < self.max_export_batch_size and self.queue:
spans_to_export.append(self.queue.pop())
if spans_to_export:
try:
self.exporter.export(spans_to_export)
except Exception as e:
logger.debug(f"Error exporting spans: {e}")
def shutdown(self):
with self.condition:
self.done = True
self.condition.notify_all()
self.worker_thread.join()
self._export_batch()
self.exporter.shutdown()
class SpanBuilder: class SpanBuilder:
def __init__(self, resource): def __init__(self, resource):
@ -83,6 +159,11 @@ class SpanBuilder:
) )
return span return span
def generate_span_id() -> int:
span_id = random.getrandbits(64)
while span_id == INVALID_SPAN_ID:
span_id = random.getrandbits(64)
return span_id
def convert_to_trace_id(uuid_v4: str) -> int: def convert_to_trace_id(uuid_v4: str) -> int:
try: try:
@ -91,18 +172,16 @@ def convert_to_trace_id(uuid_v4: str) -> int:
except Exception as e: except Exception as e:
raise ValueError(f"Invalid UUID input: {e}") raise ValueError(f"Invalid UUID input: {e}")
def convert_to_span_id(uuid_v4: str, span_type: str) -> int: def convert_to_span_id(uuid_v4: str, span_type: str) -> int:
try: try:
uuid_obj = uuid.UUID(uuid_v4) uuid_obj = uuid.UUID(uuid_v4)
except Exception as e: except Exception as e:
raise ValueError(f"Invalid UUID input: {e}") raise ValueError(f"Invalid UUID input: {e}")
type_hash = hash(span_type) & 0xFFFFFFFFFFFFFFFF type_hash = consistent_hash(span_type) & 0xFFFFFFFFFFFFFFFF
span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash
return span_id return span_id
def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]: def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]:
if start_time_a is None: if start_time_a is None:
return None return None
@ -110,18 +189,6 @@ def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optiona
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9) timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
return timestamp_in_nanoseconds return timestamp_in_nanoseconds
def consistent_hash(s: str) -> int:
def extract_retrieval_documents(documents: list[Document]): sha256_hash = hashlib.sha256(s.encode()).hexdigest()
documents_data = [] return int(sha256_hash[:16], 16)
for document in documents:
document_data = {
"content": document.page_content,
"metadata": {
"dataset_id": document.metadata.get('dataset_id'),
"doc_id": document.metadata.get('doc_id'),
"document_id": document.metadata.get('document_id'),
},
"score": document.metadata.get('score'),
}
documents_data.append(document_data)
return documents_data

@ -76,6 +76,16 @@ class OpsService:
new_decrypt_tracing_config.update({"project_url": project_url}) new_decrypt_tracing_config.update({"project_url": project_url})
except Exception: except Exception:
new_decrypt_tracing_config.update({"project_url": "https://wandb.ai/"}) new_decrypt_tracing_config.update({"project_url": "https://wandb.ai/"})
if tracing_provider == "aliyun" and (
"project_url" not in decrypt_tracing_config or not decrypt_tracing_config.get("project_url")
):
try:
project_url = OpsTraceManager.get_trace_config_project_url(decrypt_tracing_config, tracing_provider)
new_decrypt_tracing_config.update({"project_url": project_url})
except Exception:
new_decrypt_tracing_config.update({"project_url": "https://arms.console.aliyun.com/"})
trace_config_data.tracing_config = new_decrypt_tracing_config trace_config_data.tracing_config = new_decrypt_tracing_config
return trace_config_data.to_dict() return trace_config_data.to_dict()

@ -5,5 +5,5 @@ export const docURL = {
[TracingProvider.langfuse]: 'https://docs.langfuse.com', [TracingProvider.langfuse]: 'https://docs.langfuse.com',
[TracingProvider.opik]: 'https://www.comet.com/docs/opik/tracing/integrations/dify#setup-instructions', [TracingProvider.opik]: 'https://www.comet.com/docs/opik/tracing/integrations/dify#setup-instructions',
[TracingProvider.weave]: 'https://weave-docs.wandb.ai/', [TracingProvider.weave]: 'https://weave-docs.wandb.ai/',
[TracingProvider.aliyun]: 'https://arms.console.aliyun.com/', [TracingProvider.aliyun]: 'https://help.aliyun.com/zh/arms/',
} }

@ -156,8 +156,13 @@ const ProviderConfigModal: FC<Props> = ({
} }
if (type === TracingProvider.aliyun) { if (type === TracingProvider.aliyun) {
// todo: check const postData = config as AliyunConfig
// const postData = config as AliyunConfig if (!errorMessage && !postData.app_name)
errorMessage = t('common.errorMsg.fieldRequired', { field: 'App Name' })
if (!errorMessage && !postData.license_key)
errorMessage = t('common.errorMsg.fieldRequired', { field: 'License Key' })
if (!errorMessage && !postData.endpoint)
errorMessage = t('common.errorMsg.fieldRequired', { field: 'Endpoint' })
} }
return errorMessage return errorMessage
@ -224,7 +229,7 @@ const ProviderConfigModal: FC<Props> = ({
labelClassName='!text-sm' labelClassName='!text-sm'
value={(config as AliyunConfig).endpoint} value={(config as AliyunConfig).endpoint}
onChange={handleConfigChange('endpoint')} onChange={handleConfigChange('endpoint')}
placeholder={'https://arms.console.aliyun.com/'} placeholder={'https://tracing.arms.aliyuncs.com'}
/> />
<Field <Field
label='App Name' label='App Name'

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 340 B

After

Width:  |  Height:  |  Size: 14 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 340 B

After

Width:  |  Height:  |  Size: 10 KiB

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -0,0 +1,107 @@
{
"icon": {
"type": "element",
"isRootNode": true,
"name": "svg",
"attributes": {
"xmlns": "http://www.w3.org/2000/svg",
"xmlns:xlink": "http://www.w3.org/1999/xlink",
"width": "120px",
"height": "16px",
"viewBox": "0 0 120 16",
"version": "1.1"
},
"children": [
{
"type": "element",
"name": "defs",
"children": [
{
"type": "element",
"name": "clipPath",
"attributes": {
"id": "master_svg0_36_00924"
},
"children": [
{
"type": "element",
"name": "rect",
"attributes": {
"x": "0",
"y": "0",
"width": "16",
"height": "16",
"rx": "0"
}
}
]
}
]
},
{
"type": "element",
"name": "g",
"children": [
{
"type": "element",
"name": "g",
"attributes": {
"clip-path": "url(#master_svg0_36_00924)"
},
"children": [
{
"type": "element",
"name": "g",
"children": [
{
"type": "element",
"name": "path",
"attributes": {
"d": "a",
"fill-rule":"evenodd",
"fill":"#000000",
"fill-opacity":"1",
"style":"mix-blend-mode:passthrough"
}
}
]
},
{
"type": "element",
"name": "g",
"children": [
{
"type": "element",
"name": "path",
"attributes": {
"d": "b",
"fill":"#000000",
"fill-opacity":"1",
"style":"mix-blend-mode:passthrough"
}
}
]
}
]
},
{
"type": "element",
"name": "g",
"children": [
{
"type": "element",
"name": "path",
"attributes": {
"d": "c",
"fill":"#333333",
"fill-opacity":"1"
}
}
]
}
]
}
]
},
"name": "AliyunIcon"
}

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="utf-8"?>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" fill="none" version="1.1" width="106" height="16" viewBox="0 0 106 16">
<defs>
<clipPath id="master_svg0_36_00924">
<rect x="0" y="0" width="19" height="16" rx="0"/>
</clipPath>
</defs>
<g>
<g clip-path="url(#master_svg0_36_00924)">
<g>
<g>
<path d="a" fill-rule="evenodd" fill="#000000" fill-opacity="1" style="mix-blend-mode:passthrough"/>
</g>
<g>
<path d="b" fill="#000000" fill-opacity="1" style="mix-blend-mode:passthrough"/>
</g>
</g>
</g>
<g>
<g>
<path d="c" fill="#000000" fill-opacity="1"/>
</g>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 768 B

@ -165,9 +165,9 @@ const translation = {
title: 'Weave', title: 'Weave',
description: 'Weave is an open-source platform for evaluating, testing, and monitoring LLM applications.', description: 'Weave is an open-source platform for evaluating, testing, and monitoring LLM applications.',
}, },
cms: { aliyun: {
title: 'Aliyun', title: 'LLM observability',
description: 'Aliyun Demo', description: 'The SaaS observability platform provided by Alibaba Cloud enables out of box monitoring, tracing, and evaluation of Dify applications.',
}, },
inUse: 'In use', inUse: 'In use',
configProvider: { configProvider: {

@ -177,8 +177,8 @@ const translation = {
description: 'Weave 是一个开源平台,用于评估、测试和监控大型语言模型应用程序。', description: 'Weave 是一个开源平台,用于评估、测试和监控大型语言模型应用程序。',
}, },
aliyun: { aliyun: {
title: '阿里云可观测', title: '大模型可观测',
description: '阿里云可观测 Demo', description: '阿里云提供的SaaS化可观测平台一键开启Dify应用的监控追踪和评估。',
}, },
}, },
appSelector: { appSelector: {

Loading…
Cancel
Save