From 1c6551a1e773fadc32ac0f963285d4fb4c33cb92 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 25 Apr 2025 22:27:48 +0800 Subject: [PATCH] refactor: Enhances workflow execution context handling Adds workflow and execution context parameters to improve repository filtering and record management. Introduces new parameters such as 'workflow_id', 'triggered_from', and creator details for better tracking of workflow executions. Updates relevant services and generators to pass these parameters. Facilitates debugging and improves execution context management across multiple components. Signed-off-by: -LAN- --- .../app/apps/advanced_chat/app_generator.py | 10 +- api/core/app/apps/workflow/app_generator.py | 7 + api/core/ops/langfuse_trace/langfuse_trace.py | 11 +- .../ops/langsmith_trace/langsmith_trace.py | 3 + api/core/ops/opik_trace/opik_trace.py | 3 + api/repositories/repository_registry.py | 17 +- .../sqlalchemy_repository.py | 162 +++++++++++++++--- api/services/workflow_run_service.py | 2 + api/services/workflow_service.py | 6 +- 9 files changed, 195 insertions(+), 26 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 6079b51daa..f3cff51549 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -31,7 +31,7 @@ from extensions.ext_database import db from factories import file_factory from models.account import Account from models.model import App, Conversation, EndUser, Message -from models.workflow import Workflow +from models.workflow import Workflow, WorkflowNodeExecutionTriggeredFrom from services.conversation_service import ConversationService from services.errors.message import MessageNotExistsError @@ -167,6 +167,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + # Default for advanced chat app + "triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, "session_factory": session_factory, } ) @@ -235,6 +238,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + # For debugging + "triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, "session_factory": session_factory, } ) @@ -301,6 +307,8 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging "session_factory": session_factory, } ) diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 6be3a7331d..21f9fa96e2 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -28,6 +28,7 @@ from core.repository.workflow_node_execution_repository import WorkflowNodeExecu from extensions.ext_database import db from factories import file_factory from models import Account, App, EndUser, Workflow +from models.workflow import WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -142,6 +143,8 @@ class WorkflowAppGenerator(BaseAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, # Default for workflow app "session_factory": session_factory, } ) @@ -268,6 +271,8 @@ class WorkflowAppGenerator(BaseAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging "session_factory": session_factory, } ) @@ -333,6 +338,8 @@ class WorkflowAppGenerator(BaseAppGenerator): params={ "tenant_id": application_generate_entity.app_config.tenant_id, "app_id": application_generate_entity.app_config.app_id, + "workflow_id": workflow.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging "session_factory": session_factory, } ) diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py index fa78b7b8e9..7f8676bd11 100644 --- a/api/core/ops/langfuse_trace/langfuse_trace.py +++ b/api/core/ops/langfuse_trace/langfuse_trace.py @@ -32,6 +32,7 @@ from core.ops.utils import filter_none_values from core.repository.repository_factory import RepositoryFactory from extensions.ext_database import db from models.model import EndUser +from models.workflow import WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -114,7 +115,13 @@ class LangFuseDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( - params={"tenant_id": trace_info.tenant_id, "session_factory": session_factory}, + params={ + "tenant_id": trace_info.tenant_id, + "app_id": trace_info.metadata.get("app_id"), + "workflow_id": trace_info.workflow_data.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, + "session_factory": session_factory, + }, ) # Get all executions for this workflow run @@ -233,7 +240,7 @@ class LangFuseDataTrace(BaseTraceInstance): self.add_generation(langfuse_generation_data=node_generation_data) - def message_trace(self, trace_info: MessageTraceInfo, **kwargs): + def message_trace(self, trace_info: MessageTraceInfo): # get message file data file_list = trace_info.file_list metadata = trace_info.metadata diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index 85a0eafdc1..68cdf62db7 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -31,6 +31,7 @@ from core.ops.utils import filter_none_values, generate_dotted_order from core.repository.repository_factory import RepositoryFactory from extensions.ext_database import db from models.model import EndUser, MessageFile +from models.workflow import WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -141,6 +142,8 @@ class LangSmithDataTrace(BaseTraceInstance): params={ "tenant_id": trace_info.tenant_id, "app_id": trace_info.metadata.get("app_id"), + "workflow_id": trace_info.workflow_data.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, "session_factory": session_factory, }, ) diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index 923b9a24ed..783a2ac7cc 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -25,6 +25,7 @@ from core.ops.entities.trace_entity import ( from core.repository.repository_factory import RepositoryFactory from extensions.ext_database import db from models.model import EndUser, MessageFile +from models.workflow import WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -154,6 +155,8 @@ class OpikDataTrace(BaseTraceInstance): params={ "tenant_id": trace_info.tenant_id, "app_id": trace_info.metadata.get("app_id"), + "workflow_id": trace_info.workflow_data.id, + "triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, "session_factory": session_factory, }, ) diff --git a/api/repositories/repository_registry.py b/api/repositories/repository_registry.py index aa0a208d8e..d857d38ff9 100644 --- a/api/repositories/repository_registry.py +++ b/api/repositories/repository_registry.py @@ -58,6 +58,11 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA params: Parameters for creating the repository, including: - tenant_id: Required. The tenant ID for multi-tenancy. - app_id: Optional. The application ID for filtering. + - workflow_id: Optional. The workflow ID for filtering. + - triggered_from: Optional. The triggered_from value for filtering + (WorkflowNodeExecutionTriggeredFrom enum). + - created_by_role: Optional. The creator role for new executions. + - created_by: Optional. The creator ID for new executions. - session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided, a new sessionmaker will be created using the global database engine. @@ -74,6 +79,10 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA # Extract optional parameters app_id = params.get("app_id") + workflow_id = params.get("workflow_id") + triggered_from = params.get("triggered_from") + created_by_role = params.get("created_by_role") + created_by = params.get("created_by") # Use the session_factory from params if provided, otherwise create one using the global db engine session_factory = params.get("session_factory") @@ -83,5 +92,11 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA # Create and return the repository return SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, tenant_id=tenant_id, app_id=app_id + session_factory=session_factory, + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + triggered_from=triggered_from, + created_by_role=created_by_role, + created_by=created_by, ) diff --git a/api/repositories/workflow_node_execution/sqlalchemy_repository.py b/api/repositories/workflow_node_execution/sqlalchemy_repository.py index 4bd719613b..a5f60ab09a 100644 --- a/api/repositories/workflow_node_execution/sqlalchemy_repository.py +++ b/api/repositories/workflow_node_execution/sqlalchemy_repository.py @@ -26,14 +26,27 @@ class SQLAlchemyWorkflowNodeExecutionRepository: to the database. This prevents long-running connections in the workflow core. """ - def __init__(self, session_factory: sessionmaker | Engine, tenant_id: str, app_id: Optional[str] = None): + def __init__( + self, + session_factory: sessionmaker | Engine, + tenant_id: str, + app_id: str | None = None, + workflow_id: str | None = None, + triggered_from: WorkflowNodeExecutionTriggeredFrom | None = None, + created_by_role: str | None = None, + created_by: str | None = None, + ): """ - Initialize the repository with a SQLAlchemy sessionmaker or engine and tenant context. + Initialize the repository with a SQLAlchemy sessionmaker or engine and context parameters. Args: session_factory: SQLAlchemy sessionmaker or engine for creating sessions tenant_id: Tenant ID for multi-tenancy - app_id: Optional app ID for filtering by application + app_id: App ID for filtering by application (optional) + workflow_id: Workflow ID for filtering by workflow (optional) + triggered_from: Triggered_from value (WorkflowNodeExecutionTriggeredFrom enum) (optional) + created_by_role: Creator role (e.g., 'account', 'end_user') (optional) + created_by: Creator ID (optional) """ # If an engine is provided, create a sessionmaker from it if isinstance(session_factory, Engine): @@ -47,6 +60,10 @@ class SQLAlchemyWorkflowNodeExecutionRepository: self._tenant_id = tenant_id self._app_id = app_id + self._workflow_id = workflow_id + self._triggered_from = triggered_from + self._created_by_role = created_by_role + self._created_by = created_by def save(self, execution: WorkflowNodeExecution) -> None: """ @@ -56,14 +73,25 @@ class SQLAlchemyWorkflowNodeExecutionRepository: execution: The WorkflowNodeExecution instance to save """ with self._session_factory() as session: - # Ensure tenant_id is set - if not execution.tenant_id: - execution.tenant_id = self._tenant_id + # Always set tenant_id from the repository context + execution.tenant_id = self._tenant_id - # Set app_id if provided and not already set - if self._app_id and not execution.app_id: + # Set other fields only if they are provided in the repository + if self._app_id is not None: execution.app_id = self._app_id + if self._workflow_id is not None: + execution.workflow_id = self._workflow_id + + if self._triggered_from is not None: + execution.triggered_from = self._triggered_from + + if self._created_by_role is not None: + execution.created_by_role = self._created_by_role + + if self._created_by is not None: + execution.created_by = self._created_by + session.add(execution) session.commit() @@ -83,9 +111,16 @@ class SQLAlchemyWorkflowNodeExecutionRepository: WorkflowNodeExecution.tenant_id == self._tenant_id, ) + # Apply additional filters if provided if self._app_id: stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) + if self._workflow_id: + stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id) + + if self._triggered_from: + stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from) + return session.scalar(stmt) def get_by_workflow_run( @@ -109,12 +144,24 @@ class SQLAlchemyWorkflowNodeExecutionRepository: stmt = select(WorkflowNodeExecution).where( WorkflowNodeExecution.workflow_run_id == workflow_run_id, WorkflowNodeExecution.tenant_id == self._tenant_id, - WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, ) + # Use the triggered_from from the instance if provided, otherwise use the default + if self._triggered_from: + stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from) + else: + # Default behavior for backward compatibility + stmt = stmt.where( + WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN + ) + + # Apply additional filters if provided if self._app_id: stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) + if self._workflow_id: + stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id) + # Apply ordering if provided if order_config and order_config.order_by: order_columns: list[UnaryExpression] = [] @@ -147,12 +194,24 @@ class SQLAlchemyWorkflowNodeExecutionRepository: WorkflowNodeExecution.workflow_run_id == workflow_run_id, WorkflowNodeExecution.tenant_id == self._tenant_id, WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING, - WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, ) + # Use the triggered_from from the instance if provided, otherwise use the default + if self._triggered_from: + stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from) + else: + # Default behavior for backward compatibility + stmt = stmt.where( + WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN + ) + + # Apply additional filters if provided if self._app_id: stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) + if self._workflow_id: + stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id) + return session.scalars(stmt).all() def update(self, execution: WorkflowNodeExecution) -> None: @@ -163,35 +222,96 @@ class SQLAlchemyWorkflowNodeExecutionRepository: execution: The WorkflowNodeExecution instance to update """ with self._session_factory() as session: - # Ensure tenant_id is set - if not execution.tenant_id: - execution.tenant_id = self._tenant_id + # Always set tenant_id from the repository context + execution.tenant_id = self._tenant_id - # Set app_id if provided and not already set - if self._app_id and not execution.app_id: + # Set other fields only if they are provided in the repository + if self._app_id is not None: execution.app_id = self._app_id + if self._workflow_id is not None: + execution.workflow_id = self._workflow_id + + if self._triggered_from is not None: + execution.triggered_from = self._triggered_from + + if self._created_by_role is not None: + execution.created_by_role = self._created_by_role + + if self._created_by is not None: + execution.created_by = self._created_by + session.merge(execution) session.commit() + def update_context( + self, + app_id: str | None = None, + workflow_id: str | None = None, + triggered_from: WorkflowNodeExecutionTriggeredFrom | None = None, + created_by_role: str | None = None, + created_by: str | None = None, + ) -> None: + """ + Update the repository's context parameters. + + This method allows updating the repository's context parameters after initialization. + Only parameters that are not None will be updated. + + Args: + app_id: New app ID for filtering + workflow_id: New workflow ID for filtering + triggered_from: New triggered_from value (WorkflowNodeExecutionTriggeredFrom enum) + created_by_role: New creator role + created_by: New creator ID + """ + if app_id is not None: + self._app_id = app_id + if workflow_id is not None: + self._workflow_id = workflow_id + if triggered_from is not None: + self._triggered_from = triggered_from + if created_by_role is not None: + self._created_by_role = created_by_role + if created_by is not None: + self._created_by = created_by + def clear(self) -> None: """ - Clear all WorkflowNodeExecution records for the current tenant_id and app_id. + Clear all WorkflowNodeExecution records for the current tenant_id and other filters. This method deletes all WorkflowNodeExecution records that match the tenant_id - and app_id (if provided) associated with this repository instance. + and other filters (app_id, workflow_id, triggered_from) associated with this repository instance. """ with self._session_factory() as session: stmt = delete(WorkflowNodeExecution).where(WorkflowNodeExecution.tenant_id == self._tenant_id) + # Apply additional filters if provided if self._app_id: stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) + if self._workflow_id: + stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id) + + if self._triggered_from: + stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from) + result = session.execute(stmt) session.commit() + # Build log message with all applied filters + filter_parts = [] + if self._tenant_id: + filter_parts.append(f"tenant {self._tenant_id}") + if self._app_id: + filter_parts.append(f"app {self._app_id}") + + if self._workflow_id: + filter_parts.append(f"workflow {self._workflow_id}") + + if self._triggered_from: + filter_parts.append(f"triggered_from {self._triggered_from}") + + filters_str = " and ".join(filter_parts) deleted_count = result.rowcount - logger.info( - f"Cleared {deleted_count} workflow node execution records for tenant {self._tenant_id}" - + (f" and app {self._app_id}" if self._app_id else "") - ) + logger.info(f"Cleared {deleted_count} workflow node execution records for {filters_str}") diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 8b7213eefb..7968d1f18f 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -133,6 +133,8 @@ class WorkflowRunService: params={ "tenant_id": app_model.tenant_id, "app_id": app_model.id, + "workflow_id": workflow_run.workflow_id, + "triggered_from": workflow_run.triggered_from, "session_factory": db.session.get_bind(), } ) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index e2bffd4211..53a464d025 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -289,6 +289,10 @@ class WorkflowService: params={ "tenant_id": app_model.tenant_id, "app_id": app_model.id, + "workflow_id": draft_workflow.id, + "triggered_from": workflow_node_execution.triggered_from, + "created_by_role": workflow_node_execution.created_by_role, + "created_by": workflow_node_execution.created_by, "session_factory": db.session.get_bind(), } ) @@ -388,7 +392,7 @@ class WorkflowService: workflow_node_execution = WorkflowNodeExecution() workflow_node_execution.id = str(uuid4()) workflow_node_execution.tenant_id = tenant_id - workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value + workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP workflow_node_execution.index = 1 workflow_node_execution.node_id = node_id workflow_node_execution.node_type = node_instance.node_type