From 809d221a39e59275e9a57bab70461714d4b4792c Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 9 May 2025 12:24:27 +0800 Subject: [PATCH] Update tracing systems to use domain model --- api/core/ops/langfuse_trace/langfuse_trace.py | 42 ++++++---- .../ops/langsmith_trace/langsmith_trace.py | 43 ++++++---- api/core/ops/opik_trace/opik_trace.py | 43 ++++++---- api/core/ops/weave_trace/weave_trace.py | 78 +++++++++---------- 4 files changed, 122 insertions(+), 84 deletions(-) diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py index c74617e558..f67093aca5 100644 --- a/api/core/ops/langfuse_trace/langfuse_trace.py +++ b/api/core/ops/langfuse_trace/langfuse_trace.py @@ -1,11 +1,10 @@ -import json import logging import os from datetime import datetime, timedelta from typing import Optional from langfuse import Langfuse # type: ignore -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session, sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import LangfuseConfig @@ -31,7 +30,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import ( from core.ops.utils import filter_none_values from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from extensions.ext_database import db -from models.model import EndUser +from models import Account, App, EndUser logger = logging.getLogger(__name__) @@ -113,8 +112,26 @@ class LangFuseDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) + # Find the app's creator account + with Session(db.engine, expire_on_commit=False) as session: + # Get the app to find its creator + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") + + app = session.query(App).filter(App.id == app_id).first() + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator (created_by is None)") + + service_account = session.query(Account).filter(Account.id == app.created_by).first() + if not service_account: + raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, tenant_id=trace_info.tenant_id + session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id") ) # Get all executions for this workflow run @@ -124,23 +141,22 @@ class LangFuseDataTrace(BaseTraceInstance): for node_execution in workflow_node_executions: node_execution_id = node_execution.id - tenant_id = node_execution.tenant_id - app_id = node_execution.app_id + tenant_id = trace_info.tenant_id # Use from trace_info instead + app_id = trace_info.metadata.get("app_id") # Use from trace_info instead node_name = node_execution.title node_type = node_execution.node_type status = node_execution.status if node_type == "llm": - inputs = ( - json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {} - ) + inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} else: - inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} - outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + inputs = node_execution.inputs if node_execution.inputs else {} + outputs = node_execution.outputs if node_execution.outputs else {} created_at = node_execution.created_at or datetime.now() elapsed_time = node_execution.elapsed_time finished_at = created_at + timedelta(seconds=elapsed_time) - metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {} + execution_metadata = node_execution.metadata if node_execution.metadata else {} + metadata = execution_metadata.copy() metadata.update( { "workflow_run_id": trace_info.workflow_run_id, @@ -152,7 +168,7 @@ class LangFuseDataTrace(BaseTraceInstance): "status": status, } ) - process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} + process_data = node_execution.process_data if node_execution.process_data else {} model_provider = process_data.get("model_provider", None) model_name = process_data.get("model_name", None) if model_provider is not None and model_name is not None: diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index d1e16d3152..28b18932ea 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -1,4 +1,3 @@ -import json import logging import os import uuid @@ -7,7 +6,7 @@ from typing import Optional, cast from langsmith import Client from langsmith.schemas import RunBase -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session, sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import LangSmithConfig @@ -30,7 +29,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import ( from core.ops.utils import filter_none_values, generate_dotted_order from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from extensions.ext_database import db -from models.model import EndUser, MessageFile +from models import Account, App, EndUser, MessageFile logger = logging.getLogger(__name__) @@ -137,8 +136,26 @@ class LangSmithDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) + # Find the app's creator account + with Session(db.engine, expire_on_commit=False) as session: + # Get the app to find its creator + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") + + app = session.query(App).filter(App.id == app_id).first() + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator (created_by is None)") + + service_account = session.query(Account).filter(Account.id == app.created_by).first() + if not service_account: + raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id") + session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id") ) # Get all executions for this workflow run @@ -148,25 +165,21 @@ class LangSmithDataTrace(BaseTraceInstance): for node_execution in workflow_node_executions: node_execution_id = node_execution.id - tenant_id = node_execution.tenant_id - app_id = node_execution.app_id + tenant_id = trace_info.tenant_id # Use from trace_info instead + app_id = trace_info.metadata.get("app_id") # Use from trace_info instead node_name = node_execution.title node_type = node_execution.node_type status = node_execution.status if node_type == "llm": - inputs = ( - json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {} - ) + inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} else: - inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} - outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + inputs = node_execution.inputs if node_execution.inputs else {} + outputs = node_execution.outputs if node_execution.outputs else {} created_at = node_execution.created_at or datetime.now() elapsed_time = node_execution.elapsed_time finished_at = created_at + timedelta(seconds=elapsed_time) - execution_metadata = ( - json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {} - ) + execution_metadata = node_execution.metadata if node_execution.metadata else {} node_total_tokens = execution_metadata.get("total_tokens", 0) metadata = execution_metadata.copy() metadata.update( @@ -181,7 +194,7 @@ class LangSmithDataTrace(BaseTraceInstance): } ) - process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} + process_data = node_execution.process_data if node_execution.process_data else {} if process_data and process_data.get("model_mode") == "chat": run_type = LangSmithRunType.llm diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index 1484041447..5d543b914c 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -1,4 +1,3 @@ -import json import logging import os import uuid @@ -7,7 +6,7 @@ from typing import Optional, cast from opik import Opik, Trace from opik.id_helpers import uuid4_to_uuid7 -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session, sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import OpikConfig @@ -24,7 +23,7 @@ from core.ops.entities.trace_entity import ( ) from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from extensions.ext_database import db -from models.model import EndUser, MessageFile +from models import Account, App, EndUser, MessageFile logger = logging.getLogger(__name__) @@ -150,8 +149,26 @@ class OpikDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) + # Find the app's creator account + with Session(db.engine, expire_on_commit=False) as session: + # Get the app to find its creator + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") + + app = session.query(App).filter(App.id == app_id).first() + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator (created_by is None)") + + service_account = session.query(Account).filter(Account.id == app.created_by).first() + if not service_account: + raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id") + session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id") ) # Get all executions for this workflow run @@ -161,25 +178,21 @@ class OpikDataTrace(BaseTraceInstance): for node_execution in workflow_node_executions: node_execution_id = node_execution.id - tenant_id = node_execution.tenant_id - app_id = node_execution.app_id + tenant_id = trace_info.tenant_id # Use from trace_info instead + app_id = trace_info.metadata.get("app_id") # Use from trace_info instead node_name = node_execution.title node_type = node_execution.node_type status = node_execution.status if node_type == "llm": - inputs = ( - json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {} - ) + inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} else: - inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} - outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + inputs = node_execution.inputs if node_execution.inputs else {} + outputs = node_execution.outputs if node_execution.outputs else {} created_at = node_execution.created_at or datetime.now() elapsed_time = node_execution.elapsed_time finished_at = created_at + timedelta(seconds=elapsed_time) - execution_metadata = ( - json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {} - ) + execution_metadata = node_execution.metadata if node_execution.metadata else {} metadata = execution_metadata.copy() metadata.update( { @@ -193,7 +206,7 @@ class OpikDataTrace(BaseTraceInstance): } ) - process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} + process_data = node_execution.process_data if node_execution.process_data else {} provider = None model = None diff --git a/api/core/ops/weave_trace/weave_trace.py b/api/core/ops/weave_trace/weave_trace.py index 49594cb0f1..c01b25bdc1 100644 --- a/api/core/ops/weave_trace/weave_trace.py +++ b/api/core/ops/weave_trace/weave_trace.py @@ -1,4 +1,3 @@ -import json import logging import os import uuid @@ -7,6 +6,7 @@ from typing import Any, Optional, cast import wandb import weave +from sqlalchemy.orm import Session, sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import WeaveConfig @@ -22,9 +22,9 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel +from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from extensions.ext_database import db -from models.model import EndUser, MessageFile -from models.workflow import WorkflowNodeExecution +from models import Account, App, EndUser, MessageFile logger = logging.getLogger(__name__) @@ -128,56 +128,52 @@ class WeaveDataTrace(BaseTraceInstance): self.start_call(workflow_run, parent_run_id=trace_info.message_id) - # through workflow_run_id get all_nodes_execution - workflow_nodes_execution_id_records = ( - db.session.query(WorkflowNodeExecution.id) - .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id) - .all() - ) + # through workflow_run_id get all_nodes_execution using repository + session_factory = sessionmaker(bind=db.engine) + # Find the app's creator account + with Session(db.engine, expire_on_commit=False) as session: + # Get the app to find its creator + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - for node_execution_id_record in workflow_nodes_execution_id_records: - node_execution = ( - db.session.query( - WorkflowNodeExecution.id, - WorkflowNodeExecution.tenant_id, - WorkflowNodeExecution.app_id, - WorkflowNodeExecution.title, - WorkflowNodeExecution.node_type, - WorkflowNodeExecution.status, - WorkflowNodeExecution.inputs, - WorkflowNodeExecution.outputs, - WorkflowNodeExecution.created_at, - WorkflowNodeExecution.elapsed_time, - WorkflowNodeExecution.process_data, - WorkflowNodeExecution.execution_metadata, - ) - .filter(WorkflowNodeExecution.id == node_execution_id_record.id) - .first() - ) + app = session.query(App).filter(App.id == app_id).first() + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator (created_by is None)") + + service_account = session.query(Account).filter(Account.id == app.created_by).first() + if not service_account: + raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") - if not node_execution: - continue + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id") + ) + + # Get all executions for this workflow run + workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run( + workflow_run_id=trace_info.workflow_run_id + ) + for node_execution in workflow_node_executions: node_execution_id = node_execution.id - tenant_id = node_execution.tenant_id - app_id = node_execution.app_id + tenant_id = trace_info.tenant_id # Use from trace_info instead + app_id = trace_info.metadata.get("app_id") # Use from trace_info instead node_name = node_execution.title node_type = node_execution.node_type status = node_execution.status if node_type == "llm": - inputs = ( - json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {} - ) + inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} else: - inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} - outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + inputs = node_execution.inputs if node_execution.inputs else {} + outputs = node_execution.outputs if node_execution.outputs else {} created_at = node_execution.created_at or datetime.now() elapsed_time = node_execution.elapsed_time finished_at = created_at + timedelta(seconds=elapsed_time) - execution_metadata = ( - json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {} - ) + execution_metadata = node_execution.metadata if node_execution.metadata else {} node_total_tokens = execution_metadata.get("total_tokens", 0) attributes = execution_metadata.copy() attributes.update( @@ -192,7 +188,7 @@ class WeaveDataTrace(BaseTraceInstance): } ) - process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} + process_data = node_execution.process_data if node_execution.process_data else {} if process_data and process_data.get("model_mode") == "chat": attributes.update( {