update aliyun trace
parent
f65c2fcb1d
commit
2806edb70b
@ -0,0 +1,192 @@
|
||||
import datetime
|
||||
import json
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import TraceClient
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
from core.ops.entities.config_entity import AliyunConfig
|
||||
from core.ops.entities.trace_entity import (
|
||||
BaseTraceInfo,
|
||||
DatasetRetrievalTraceInfo,
|
||||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from models import EndUser, db
|
||||
|
||||
|
||||
def convert_to_trace_id(uuid_v4:str) -> int:
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
return uuid_obj.int
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
|
||||
def convert_to_span_id(uuid_v4:str, span_type:str) -> int:
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
|
||||
type_hash = hash(span_type) & 0xFFFFFFFFFFFFFFFF
|
||||
span_id = (uuid_obj.int & 0xFFFFFFFFFFFFFFFF) ^ type_hash
|
||||
|
||||
return span_id
|
||||
|
||||
def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]:
|
||||
if start_time_a is None:
|
||||
return None
|
||||
timestamp_in_seconds = start_time_a.timestamp()
|
||||
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
|
||||
return timestamp_in_nanoseconds
|
||||
|
||||
|
||||
class AliyunDataTrace(BaseTraceInstance):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
aliyun_config: AliyunConfig,
|
||||
):
|
||||
super().__init__(aliyun_config)
|
||||
endpoint = aliyun_config.endpoint+aliyun_config.license_key+'/api/otlp/traces'
|
||||
self.trace_client = TraceClient(service_name=aliyun_config.app_name,endpoint=endpoint)
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo):
|
||||
if isinstance(trace_info, WorkflowTraceInfo):
|
||||
self.workflow_trace(trace_info)
|
||||
if isinstance(trace_info, MessageTraceInfo):
|
||||
self.message_trace(trace_info)
|
||||
if isinstance(trace_info, ModerationTraceInfo):
|
||||
pass
|
||||
if isinstance(trace_info, SuggestedQuestionTraceInfo):
|
||||
pass
|
||||
if isinstance(trace_info, DatasetRetrievalTraceInfo):
|
||||
self.dataset_retrieval_trace(trace_info)
|
||||
if isinstance(trace_info, ToolTraceInfo):
|
||||
self.tool_trace(trace_info)
|
||||
if isinstance(trace_info, GenerateNameTraceInfo):
|
||||
pass
|
||||
|
||||
def api_check(self):
|
||||
# todo
|
||||
return True
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||
pass
|
||||
|
||||
def message_trace(self, trace_info: MessageTraceInfo):
|
||||
# get message file data
|
||||
file_list = trace_info.file_list
|
||||
metadata = trace_info.metadata
|
||||
message_data = trace_info.message_data
|
||||
if message_data is None:
|
||||
return
|
||||
message_id = trace_info.message_id
|
||||
|
||||
user_id = message_data.from_account_id
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: Optional[EndUser] = (
|
||||
db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
user_id = end_user_data.session_id
|
||||
|
||||
message_span = SpanData(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
parent_span_id=None,
|
||||
span_id=convert_to_span_id(message_id,'message'),
|
||||
name='message',
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
'gen_ai.session.id': trace_info.metadata.get('conversation_id',''),
|
||||
'gen_ai.user.id':str(user_id),
|
||||
'gen_ai.span.kind':'CHAIN',
|
||||
'gen_ai.framework': 'dify',
|
||||
|
||||
'input.value':json.dumps(trace_info.inputs),
|
||||
'output.value': str(trace_info.outputs),
|
||||
}
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
llm_span = SpanData(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
parent_span_id=convert_to_span_id(message_id,'message'),
|
||||
span_id=convert_to_span_id(message_id,'llm'),
|
||||
name='llm',
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
'gen_ai.session.id': trace_info.metadata.get('conversation_id',''),
|
||||
'gen_ai.user.id': str(user_id),
|
||||
'gen_ai.span.kind':'LLM',
|
||||
'gen_ai.framework': 'dify',
|
||||
|
||||
'gen_ai.prompt_template.template': 'todo',
|
||||
'gen_ai.model_name':trace_info.message_data.model_id,
|
||||
|
||||
'input.value':json.dumps(trace_info.inputs),
|
||||
'output.value': str(trace_info.outputs),
|
||||
}
|
||||
)
|
||||
|
||||
self.trace_client.add_span(llm_span)
|
||||
|
||||
|
||||
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
message_id = trace_info.message_id
|
||||
|
||||
span_data = SpanData(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
parent_span_id=convert_to_span_id(message_id,'message'),
|
||||
span_id=convert_to_span_id(message_id,'dataset_retrieval'),
|
||||
name='dataset_retrieval',
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
'gen_ai.session.id': 'todo',
|
||||
'gen_ai.user.id': 'todo',
|
||||
'gen_ai.span.kind': 'RETRIEVER',
|
||||
'gen_ai.framework': 'dify',
|
||||
|
||||
'gen_ai.operation.name': 'TASK',
|
||||
'retrieval.query':str(trace_info.inputs),
|
||||
'retrieval.document ':str(trace_info.documents)
|
||||
}
|
||||
)
|
||||
self.trace_client.add_span(span_data)
|
||||
|
||||
def tool_trace(self, trace_info: ToolTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
message_id = trace_info.message_id
|
||||
|
||||
span_data = SpanData(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
parent_span_id=convert_to_span_id(message_id,'message'),
|
||||
span_id=convert_to_span_id(message_id,'tool'),
|
||||
name='tool',
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
'gen_ai.session.id': 'todo',
|
||||
'gen_ai.user.id': 'todo',
|
||||
'gen_ai.span.kind': 'Tool',
|
||||
'gen_ai.framework': 'dify',
|
||||
|
||||
'tool.name': trace_info.tool_name,
|
||||
'tool.description': trace_info.tool_name,
|
||||
'tool.parameters': json.dumps(trace_info.tool_inputs),
|
||||
'input.value': json.dumps(trace_info.inputs),
|
||||
'output.value': str(trace_info.tool_outputs),
|
||||
}
|
||||
)
|
||||
self.trace_client.add_span(span_data)
|
||||
@ -0,0 +1,80 @@
|
||||
import socket
|
||||
from collections.abc import Sequence
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import ReadableSpan
|
||||
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
|
||||
from opentelemetry.semconv.resource import ResourceAttributes
|
||||
|
||||
from configs import dify_config
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
|
||||
|
||||
class TraceClient:
|
||||
def __init__(self,service_name,endpoint):
|
||||
self.endpoint = endpoint
|
||||
self.resource = Resource(
|
||||
attributes={
|
||||
ResourceAttributes.SERVICE_NAME: service_name,
|
||||
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
|
||||
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
|
||||
ResourceAttributes.HOST_NAME: socket.gethostname(),
|
||||
}
|
||||
)
|
||||
self.span_builder = SpanBuilder(self.resource)
|
||||
self.exporter = OTLPSpanExporter(endpoint=endpoint)
|
||||
|
||||
def add_span(self,span_data:SpanData):
|
||||
span:ReadableSpan = self.span_builder.build_span(span_data)
|
||||
self.export([span])
|
||||
|
||||
def export(self,spans:Sequence[ReadableSpan]):
|
||||
self.exporter.export(spans)
|
||||
|
||||
class SpanBuilder:
|
||||
def __init__(self, resource):
|
||||
self.resource = resource
|
||||
self.instrumentation_scope = InstrumentationScope(
|
||||
__name__,
|
||||
"",
|
||||
None,
|
||||
None,
|
||||
)
|
||||
|
||||
def build_span(self, span_data: SpanData) -> ReadableSpan:
|
||||
span_context = trace_api.SpanContext(
|
||||
trace_id=span_data.trace_id,
|
||||
span_id=span_data.span_id,
|
||||
is_remote=False,
|
||||
trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED),
|
||||
trace_state=None,
|
||||
)
|
||||
|
||||
parent_span_context = None
|
||||
if span_data.parent_span_id is not None:
|
||||
parent_span_context = trace_api.SpanContext(
|
||||
trace_id=span_data.trace_id,
|
||||
span_id=span_data.parent_span_id,
|
||||
is_remote=False,
|
||||
trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED),
|
||||
trace_state=None,
|
||||
)
|
||||
|
||||
span = ReadableSpan(
|
||||
name=span_data.name,
|
||||
context=span_context,
|
||||
parent=parent_span_context,
|
||||
resource=self.resource,
|
||||
attributes=span_data.attributes,
|
||||
events=span_data.events,
|
||||
links=span_data.links,
|
||||
kind=trace_api.SpanKind.INTERNAL,
|
||||
status=span_data.status,
|
||||
start_time=span_data.start_time,
|
||||
end_time=span_data.end_time,
|
||||
instrumentation_scope=self.instrumentation_scope,
|
||||
)
|
||||
return span
|
||||
|
||||
@ -0,0 +1,21 @@
|
||||
from collections.abc import Sequence
|
||||
from typing import Optional
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.sdk.trace import Event, Status, StatusCode
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SpanData(BaseModel):
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
trace_id: int = Field(..., description="The unique identifier for the trace.")
|
||||
parent_span_id: Optional[int] = Field(None, description="The ID of the parent span, if any.")
|
||||
span_id: int = Field(..., description="The unique identifier for this span.")
|
||||
name: str = Field(..., description="The name of the span.")
|
||||
attributes: dict[str, str] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.")
|
||||
links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.")
|
||||
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
|
||||
start_time: int = Field(..., description="The start time of the span in nanoseconds.")
|
||||
end_time: int = Field(..., description="The end time of the span in nanoseconds.")
|
||||
@ -0,0 +1,7 @@
|
||||
<svg width="800" height="600" xmlns="http://www.w3.org/2000/svg">
|
||||
|
||||
<g>
|
||||
<title>Layer 1</title>
|
||||
<text transform="matrix(5.69286 0 0 3.7385 -1737.67 -722.766)" stroke="#000" xml:space="preserve" text-anchor="start" font-family="Noto Sans JP" font-size="24" id="svg_2" y="285" x="369" stroke-width="0" fill="#000000">A</text>
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 340 B |
@ -0,0 +1,7 @@
|
||||
<svg width="800" height="600" xmlns="http://www.w3.org/2000/svg">
|
||||
|
||||
<g>
|
||||
<title>Layer 1</title>
|
||||
<text transform="matrix(5.69286 0 0 3.7385 -1737.67 -722.766)" stroke="#000" xml:space="preserve" text-anchor="start" font-family="Noto Sans JP" font-size="24" id="svg_2" y="285" x="369" stroke-width="0" fill="#000000">A</text>
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 340 B |
@ -0,0 +1,18 @@
|
||||
{
|
||||
"icon": {
|
||||
"type": "element",
|
||||
"isRootNode": true,
|
||||
"name": "svg",
|
||||
"attributes": {
|
||||
"xmlns": "http://www.w3.org/2000/svg",
|
||||
"xmlns:xlink": "http://www.w3.org/1999/xlink",
|
||||
"width": "120px",
|
||||
"height": "16px",
|
||||
"viewBox": "0 0 120 16",
|
||||
"version": "1.1"
|
||||
},
|
||||
"children": [
|
||||
]
|
||||
},
|
||||
"name": "WeaveIcon"
|
||||
}
|
||||
@ -0,0 +1,16 @@
|
||||
// GENERATE BY script
|
||||
// DON NOT EDIT IT MANUALLY
|
||||
|
||||
import * as React from 'react'
|
||||
import data from './AliyunIcon.json'
|
||||
import IconBase from '@/app/components/base/icons/IconBase'
|
||||
import type { IconBaseProps, IconData } from '@/app/components/base/icons/IconBase'
|
||||
|
||||
const Icon = React.forwardRef<React.MutableRefObject<SVGElement>, Omit<IconBaseProps, 'data'>>((
|
||||
props,
|
||||
ref,
|
||||
) => <IconBase {...props} ref={ref} data={data as IconData} />)
|
||||
|
||||
Icon.displayName = 'AliyunIcon'
|
||||
|
||||
export default Icon
|
||||
@ -0,0 +1,18 @@
|
||||
{
|
||||
"icon": {
|
||||
"type": "element",
|
||||
"isRootNode": true,
|
||||
"name": "svg",
|
||||
"attributes": {
|
||||
"xmlns": "http://www.w3.org/2000/svg",
|
||||
"xmlns:xlink": "http://www.w3.org/1999/xlink",
|
||||
"width": "124px",
|
||||
"height": "16px",
|
||||
"viewBox": "0 0 120 16",
|
||||
"version": "1.1"
|
||||
},
|
||||
"children": [
|
||||
]
|
||||
},
|
||||
"name": "WeaveIconBig"
|
||||
}
|
||||
@ -0,0 +1,16 @@
|
||||
// GENERATE BY script
|
||||
// DON NOT EDIT IT MANUALLY
|
||||
|
||||
import * as React from 'react'
|
||||
import data from './AliyunIconBig.json'
|
||||
import IconBase from '@/app/components/base/icons/IconBase'
|
||||
import type { IconBaseProps, IconData } from '@/app/components/base/icons/IconBase'
|
||||
|
||||
const Icon = React.forwardRef<React.MutableRefObject<SVGElement>, Omit<IconBaseProps, 'data'>>((
|
||||
props,
|
||||
ref,
|
||||
) => <IconBase {...props} ref={ref} data={data as IconData} />)
|
||||
|
||||
Icon.displayName = 'AliyunIconBig'
|
||||
|
||||
export default Icon
|
||||
Loading…
Reference in New Issue