Update tracing systems to use domain model

pull/19430/head
-LAN- 1 year ago
parent e072be6778
commit 809d221a39
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -1,11 +1,10 @@
import json
import logging import logging
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
from langfuse import Langfuse # type: ignore from langfuse import Langfuse # type: ignore
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import Session, sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import LangfuseConfig from core.ops.entities.config_entity import LangfuseConfig
@ -31,7 +30,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
from core.ops.utils import filter_none_values from core.ops.utils import filter_none_values
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser from models import Account, App, EndUser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -113,8 +112,26 @@ class LangFuseDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
# Find the app's creator account
with Session(db.engine, expire_on_commit=False) as session:
# Get the app to find its creator
app_id = trace_info.metadata.get("app_id")
if not app_id:
raise ValueError("No app_id found in trace_info metadata")
app = session.query(App).filter(App.id == app_id).first()
if not app:
raise ValueError(f"App with id {app_id} not found")
if not app.created_by:
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
service_account = session.query(Account).filter(Account.id == app.created_by).first()
if not service_account:
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=trace_info.tenant_id session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id")
) )
# Get all executions for this workflow run # Get all executions for this workflow run
@ -124,23 +141,22 @@ class LangFuseDataTrace(BaseTraceInstance):
for node_execution in workflow_node_executions: for node_execution in workflow_node_executions:
node_execution_id = node_execution.id node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id tenant_id = trace_info.tenant_id # Use from trace_info instead
app_id = node_execution.app_id app_id = trace_info.metadata.get("app_id") # Use from trace_info instead
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == "llm": if node_type == "llm":
inputs = ( inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
)
else: else:
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} inputs = node_execution.inputs if node_execution.inputs else {}
outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} outputs = node_execution.outputs if node_execution.outputs else {}
created_at = node_execution.created_at or datetime.now() created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)
metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {} execution_metadata = node_execution.metadata if node_execution.metadata else {}
metadata = execution_metadata.copy()
metadata.update( metadata.update(
{ {
"workflow_run_id": trace_info.workflow_run_id, "workflow_run_id": trace_info.workflow_run_id,
@ -152,7 +168,7 @@ class LangFuseDataTrace(BaseTraceInstance):
"status": status, "status": status,
} }
) )
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} process_data = node_execution.process_data if node_execution.process_data else {}
model_provider = process_data.get("model_provider", None) model_provider = process_data.get("model_provider", None)
model_name = process_data.get("model_name", None) model_name = process_data.get("model_name", None)
if model_provider is not None and model_name is not None: if model_provider is not None and model_name is not None:

@ -1,4 +1,3 @@
import json
import logging import logging
import os import os
import uuid import uuid
@ -7,7 +6,7 @@ from typing import Optional, cast
from langsmith import Client from langsmith import Client
from langsmith.schemas import RunBase from langsmith.schemas import RunBase
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import Session, sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import LangSmithConfig from core.ops.entities.config_entity import LangSmithConfig
@ -30,7 +29,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
from core.ops.utils import filter_none_values, generate_dotted_order from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models import Account, App, EndUser, MessageFile
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -137,8 +136,26 @@ class LangSmithDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
# Find the app's creator account
with Session(db.engine, expire_on_commit=False) as session:
# Get the app to find its creator
app_id = trace_info.metadata.get("app_id")
if not app_id:
raise ValueError("No app_id found in trace_info metadata")
app = session.query(App).filter(App.id == app_id).first()
if not app:
raise ValueError(f"App with id {app_id} not found")
if not app.created_by:
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
service_account = session.query(Account).filter(Account.id == app.created_by).first()
if not service_account:
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id") session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id")
) )
# Get all executions for this workflow run # Get all executions for this workflow run
@ -148,25 +165,21 @@ class LangSmithDataTrace(BaseTraceInstance):
for node_execution in workflow_node_executions: for node_execution in workflow_node_executions:
node_execution_id = node_execution.id node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id tenant_id = trace_info.tenant_id # Use from trace_info instead
app_id = node_execution.app_id app_id = trace_info.metadata.get("app_id") # Use from trace_info instead
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == "llm": if node_type == "llm":
inputs = ( inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
)
else: else:
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} inputs = node_execution.inputs if node_execution.inputs else {}
outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} outputs = node_execution.outputs if node_execution.outputs else {}
created_at = node_execution.created_at or datetime.now() created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = ( execution_metadata = node_execution.metadata if node_execution.metadata else {}
json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
)
node_total_tokens = execution_metadata.get("total_tokens", 0) node_total_tokens = execution_metadata.get("total_tokens", 0)
metadata = execution_metadata.copy() metadata = execution_metadata.copy()
metadata.update( metadata.update(
@ -181,7 +194,7 @@ class LangSmithDataTrace(BaseTraceInstance):
} }
) )
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} process_data = node_execution.process_data if node_execution.process_data else {}
if process_data and process_data.get("model_mode") == "chat": if process_data and process_data.get("model_mode") == "chat":
run_type = LangSmithRunType.llm run_type = LangSmithRunType.llm

@ -1,4 +1,3 @@
import json
import logging import logging
import os import os
import uuid import uuid
@ -7,7 +6,7 @@ from typing import Optional, cast
from opik import Opik, Trace from opik import Opik, Trace
from opik.id_helpers import uuid4_to_uuid7 from opik.id_helpers import uuid4_to_uuid7
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import Session, sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import OpikConfig from core.ops.entities.config_entity import OpikConfig
@ -24,7 +23,7 @@ from core.ops.entities.trace_entity import (
) )
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models import Account, App, EndUser, MessageFile
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -150,8 +149,26 @@ class OpikDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
# Find the app's creator account
with Session(db.engine, expire_on_commit=False) as session:
# Get the app to find its creator
app_id = trace_info.metadata.get("app_id")
if not app_id:
raise ValueError("No app_id found in trace_info metadata")
app = session.query(App).filter(App.id == app_id).first()
if not app:
raise ValueError(f"App with id {app_id} not found")
if not app.created_by:
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
service_account = session.query(Account).filter(Account.id == app.created_by).first()
if not service_account:
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id") session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id")
) )
# Get all executions for this workflow run # Get all executions for this workflow run
@ -161,25 +178,21 @@ class OpikDataTrace(BaseTraceInstance):
for node_execution in workflow_node_executions: for node_execution in workflow_node_executions:
node_execution_id = node_execution.id node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id tenant_id = trace_info.tenant_id # Use from trace_info instead
app_id = node_execution.app_id app_id = trace_info.metadata.get("app_id") # Use from trace_info instead
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == "llm": if node_type == "llm":
inputs = ( inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
)
else: else:
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} inputs = node_execution.inputs if node_execution.inputs else {}
outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} outputs = node_execution.outputs if node_execution.outputs else {}
created_at = node_execution.created_at or datetime.now() created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = ( execution_metadata = node_execution.metadata if node_execution.metadata else {}
json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
)
metadata = execution_metadata.copy() metadata = execution_metadata.copy()
metadata.update( metadata.update(
{ {
@ -193,7 +206,7 @@ class OpikDataTrace(BaseTraceInstance):
} }
) )
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} process_data = node_execution.process_data if node_execution.process_data else {}
provider = None provider = None
model = None model = None

@ -1,4 +1,3 @@
import json
import logging import logging
import os import os
import uuid import uuid
@ -7,6 +6,7 @@ from typing import Any, Optional, cast
import wandb import wandb
import weave import weave
from sqlalchemy.orm import Session, sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import WeaveConfig from core.ops.entities.config_entity import WeaveConfig
@ -22,9 +22,9 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models import Account, App, EndUser, MessageFile
from models.workflow import WorkflowNodeExecution
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -128,56 +128,52 @@ class WeaveDataTrace(BaseTraceInstance):
self.start_call(workflow_run, parent_run_id=trace_info.message_id) self.start_call(workflow_run, parent_run_id=trace_info.message_id)
# through workflow_run_id get all_nodes_execution # through workflow_run_id get all_nodes_execution using repository
workflow_nodes_execution_id_records = ( session_factory = sessionmaker(bind=db.engine)
db.session.query(WorkflowNodeExecution.id) # Find the app's creator account
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id) with Session(db.engine, expire_on_commit=False) as session:
.all() # Get the app to find its creator
) app_id = trace_info.metadata.get("app_id")
if not app_id:
raise ValueError("No app_id found in trace_info metadata")
for node_execution_id_record in workflow_nodes_execution_id_records: app = session.query(App).filter(App.id == app_id).first()
node_execution = ( if not app:
db.session.query( raise ValueError(f"App with id {app_id} not found")
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id, if not app.created_by:
WorkflowNodeExecution.app_id, raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type, service_account = session.query(Account).filter(Account.id == app.created_by).first()
WorkflowNodeExecution.status, if not service_account:
WorkflowNodeExecution.inputs, raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution: workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
continue session_factory=session_factory, user=service_account, app_id=trace_info.metadata.get("app_id")
)
# Get all executions for this workflow run
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
workflow_run_id=trace_info.workflow_run_id
)
for node_execution in workflow_node_executions:
node_execution_id = node_execution.id node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id tenant_id = trace_info.tenant_id # Use from trace_info instead
app_id = node_execution.app_id app_id = trace_info.metadata.get("app_id") # Use from trace_info instead
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == "llm": if node_type == "llm":
inputs = ( inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
)
else: else:
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {} inputs = node_execution.inputs if node_execution.inputs else {}
outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} outputs = node_execution.outputs if node_execution.outputs else {}
created_at = node_execution.created_at or datetime.now() created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = ( execution_metadata = node_execution.metadata if node_execution.metadata else {}
json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
)
node_total_tokens = execution_metadata.get("total_tokens", 0) node_total_tokens = execution_metadata.get("total_tokens", 0)
attributes = execution_metadata.copy() attributes = execution_metadata.copy()
attributes.update( attributes.update(
@ -192,7 +188,7 @@ class WeaveDataTrace(BaseTraceInstance):
} }
) )
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} process_data = node_execution.process_data if node_execution.process_data else {}
if process_data and process_data.get("model_mode") == "chat": if process_data and process_data.get("model_mode") == "chat":
attributes.update( attributes.update(
{ {

Loading…
Cancel
Save