refactor: Enhances workflow execution context handling

Adds workflow and execution context parameters to improve
repository filtering and record management. Introduces
new parameters such as 'workflow_id', 'triggered_from',
and creator details for better tracking of workflow
executions. Updates relevant services and generators to
pass these parameters.

Facilitates debugging and improves execution context
management across multiple components.

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/18805/head
-LAN- 1 year ago
parent a9dd15d260
commit 1c6551a1e7
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -31,7 +31,7 @@ from extensions.ext_database import db
from factories import file_factory from factories import file_factory
from models.account import Account from models.account import Account
from models.model import App, Conversation, EndUser, Message from models.model import App, Conversation, EndUser, Message
from models.workflow import Workflow from models.workflow import Workflow, WorkflowNodeExecutionTriggeredFrom
from services.conversation_service import ConversationService from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError from services.errors.message import MessageNotExistsError
@ -167,6 +167,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
# Default for advanced chat app
"triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
"session_factory": session_factory, "session_factory": session_factory,
} }
) )
@ -235,6 +238,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
# For debugging
"triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
"session_factory": session_factory, "session_factory": session_factory,
} }
) )
@ -301,6 +307,8 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging
"session_factory": session_factory, "session_factory": session_factory,
} }
) )

@ -28,6 +28,7 @@ from core.repository.workflow_node_execution_repository import WorkflowNodeExecu
from extensions.ext_database import db from extensions.ext_database import db
from factories import file_factory from factories import file_factory
from models import Account, App, EndUser, Workflow from models import Account, App, EndUser, Workflow
from models.workflow import WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -142,6 +143,8 @@ class WorkflowAppGenerator(BaseAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, # Default for workflow app
"session_factory": session_factory, "session_factory": session_factory,
} }
) )
@ -268,6 +271,8 @@ class WorkflowAppGenerator(BaseAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging
"session_factory": session_factory, "session_factory": session_factory,
} }
) )
@ -333,6 +338,8 @@ class WorkflowAppGenerator(BaseAppGenerator):
params={ params={
"tenant_id": application_generate_entity.app_config.tenant_id, "tenant_id": application_generate_entity.app_config.tenant_id,
"app_id": application_generate_entity.app_config.app_id, "app_id": application_generate_entity.app_config.app_id,
"workflow_id": workflow.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, # For debugging
"session_factory": session_factory, "session_factory": session_factory,
} }
) )

@ -32,6 +32,7 @@ from core.ops.utils import filter_none_values
from core.repository.repository_factory import RepositoryFactory from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser from models.model import EndUser
from models.workflow import WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -114,7 +115,13 @@ class LangFuseDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository # through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine) session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository( workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
params={"tenant_id": trace_info.tenant_id, "session_factory": session_factory}, params={
"tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"),
"workflow_id": trace_info.workflow_data.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
"session_factory": session_factory,
},
) )
# Get all executions for this workflow run # Get all executions for this workflow run
@ -233,7 +240,7 @@ class LangFuseDataTrace(BaseTraceInstance):
self.add_generation(langfuse_generation_data=node_generation_data) self.add_generation(langfuse_generation_data=node_generation_data)
def message_trace(self, trace_info: MessageTraceInfo, **kwargs): def message_trace(self, trace_info: MessageTraceInfo):
# get message file data # get message file data
file_list = trace_info.file_list file_list = trace_info.file_list
metadata = trace_info.metadata metadata = trace_info.metadata

@ -31,6 +31,7 @@ from core.ops.utils import filter_none_values, generate_dotted_order
from core.repository.repository_factory import RepositoryFactory from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -141,6 +142,8 @@ class LangSmithDataTrace(BaseTraceInstance):
params={ params={
"tenant_id": trace_info.tenant_id, "tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"), "app_id": trace_info.metadata.get("app_id"),
"workflow_id": trace_info.workflow_data.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
"session_factory": session_factory, "session_factory": session_factory,
}, },
) )

@ -25,6 +25,7 @@ from core.ops.entities.trace_entity import (
from core.repository.repository_factory import RepositoryFactory from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -154,6 +155,8 @@ class OpikDataTrace(BaseTraceInstance):
params={ params={
"tenant_id": trace_info.tenant_id, "tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"), "app_id": trace_info.metadata.get("app_id"),
"workflow_id": trace_info.workflow_data.id,
"triggered_from": WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
"session_factory": session_factory, "session_factory": session_factory,
}, },
) )

@ -58,6 +58,11 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA
params: Parameters for creating the repository, including: params: Parameters for creating the repository, including:
- tenant_id: Required. The tenant ID for multi-tenancy. - tenant_id: Required. The tenant ID for multi-tenancy.
- app_id: Optional. The application ID for filtering. - app_id: Optional. The application ID for filtering.
- workflow_id: Optional. The workflow ID for filtering.
- triggered_from: Optional. The triggered_from value for filtering
(WorkflowNodeExecutionTriggeredFrom enum).
- created_by_role: Optional. The creator role for new executions.
- created_by: Optional. The creator ID for new executions.
- session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided, - session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided,
a new sessionmaker will be created using the global database engine. a new sessionmaker will be created using the global database engine.
@ -74,6 +79,10 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA
# Extract optional parameters # Extract optional parameters
app_id = params.get("app_id") app_id = params.get("app_id")
workflow_id = params.get("workflow_id")
triggered_from = params.get("triggered_from")
created_by_role = params.get("created_by_role")
created_by = params.get("created_by")
# Use the session_factory from params if provided, otherwise create one using the global db engine # Use the session_factory from params if provided, otherwise create one using the global db engine
session_factory = params.get("session_factory") session_factory = params.get("session_factory")
@ -83,5 +92,11 @@ def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLA
# Create and return the repository # Create and return the repository
return SQLAlchemyWorkflowNodeExecutionRepository( return SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=tenant_id, app_id=app_id session_factory=session_factory,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
triggered_from=triggered_from,
created_by_role=created_by_role,
created_by=created_by,
) )

@ -26,14 +26,27 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
to the database. This prevents long-running connections in the workflow core. to the database. This prevents long-running connections in the workflow core.
""" """
def __init__(self, session_factory: sessionmaker | Engine, tenant_id: str, app_id: Optional[str] = None): def __init__(
self,
session_factory: sessionmaker | Engine,
tenant_id: str,
app_id: str | None = None,
workflow_id: str | None = None,
triggered_from: WorkflowNodeExecutionTriggeredFrom | None = None,
created_by_role: str | None = None,
created_by: str | None = None,
):
""" """
Initialize the repository with a SQLAlchemy sessionmaker or engine and tenant context. Initialize the repository with a SQLAlchemy sessionmaker or engine and context parameters.
Args: Args:
session_factory: SQLAlchemy sessionmaker or engine for creating sessions session_factory: SQLAlchemy sessionmaker or engine for creating sessions
tenant_id: Tenant ID for multi-tenancy tenant_id: Tenant ID for multi-tenancy
app_id: Optional app ID for filtering by application app_id: App ID for filtering by application (optional)
workflow_id: Workflow ID for filtering by workflow (optional)
triggered_from: Triggered_from value (WorkflowNodeExecutionTriggeredFrom enum) (optional)
created_by_role: Creator role (e.g., 'account', 'end_user') (optional)
created_by: Creator ID (optional)
""" """
# If an engine is provided, create a sessionmaker from it # If an engine is provided, create a sessionmaker from it
if isinstance(session_factory, Engine): if isinstance(session_factory, Engine):
@ -47,6 +60,10 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
self._tenant_id = tenant_id self._tenant_id = tenant_id
self._app_id = app_id self._app_id = app_id
self._workflow_id = workflow_id
self._triggered_from = triggered_from
self._created_by_role = created_by_role
self._created_by = created_by
def save(self, execution: WorkflowNodeExecution) -> None: def save(self, execution: WorkflowNodeExecution) -> None:
""" """
@ -56,14 +73,25 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
execution: The WorkflowNodeExecution instance to save execution: The WorkflowNodeExecution instance to save
""" """
with self._session_factory() as session: with self._session_factory() as session:
# Ensure tenant_id is set # Always set tenant_id from the repository context
if not execution.tenant_id: execution.tenant_id = self._tenant_id
execution.tenant_id = self._tenant_id
# Set app_id if provided and not already set # Set other fields only if they are provided in the repository
if self._app_id and not execution.app_id: if self._app_id is not None:
execution.app_id = self._app_id execution.app_id = self._app_id
if self._workflow_id is not None:
execution.workflow_id = self._workflow_id
if self._triggered_from is not None:
execution.triggered_from = self._triggered_from
if self._created_by_role is not None:
execution.created_by_role = self._created_by_role
if self._created_by is not None:
execution.created_by = self._created_by
session.add(execution) session.add(execution)
session.commit() session.commit()
@ -83,9 +111,16 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
WorkflowNodeExecution.tenant_id == self._tenant_id, WorkflowNodeExecution.tenant_id == self._tenant_id,
) )
# Apply additional filters if provided
if self._app_id: if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
if self._workflow_id:
stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id)
if self._triggered_from:
stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from)
return session.scalar(stmt) return session.scalar(stmt)
def get_by_workflow_run( def get_by_workflow_run(
@ -109,12 +144,24 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
stmt = select(WorkflowNodeExecution).where( stmt = select(WorkflowNodeExecution).where(
WorkflowNodeExecution.workflow_run_id == workflow_run_id, WorkflowNodeExecution.workflow_run_id == workflow_run_id,
WorkflowNodeExecution.tenant_id == self._tenant_id, WorkflowNodeExecution.tenant_id == self._tenant_id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
) )
# Use the triggered_from from the instance if provided, otherwise use the default
if self._triggered_from:
stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from)
else:
# Default behavior for backward compatibility
stmt = stmt.where(
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN
)
# Apply additional filters if provided
if self._app_id: if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
if self._workflow_id:
stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id)
# Apply ordering if provided # Apply ordering if provided
if order_config and order_config.order_by: if order_config and order_config.order_by:
order_columns: list[UnaryExpression] = [] order_columns: list[UnaryExpression] = []
@ -147,12 +194,24 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
WorkflowNodeExecution.workflow_run_id == workflow_run_id, WorkflowNodeExecution.workflow_run_id == workflow_run_id,
WorkflowNodeExecution.tenant_id == self._tenant_id, WorkflowNodeExecution.tenant_id == self._tenant_id,
WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING, WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
) )
# Use the triggered_from from the instance if provided, otherwise use the default
if self._triggered_from:
stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from)
else:
# Default behavior for backward compatibility
stmt = stmt.where(
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN
)
# Apply additional filters if provided
if self._app_id: if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
if self._workflow_id:
stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id)
return session.scalars(stmt).all() return session.scalars(stmt).all()
def update(self, execution: WorkflowNodeExecution) -> None: def update(self, execution: WorkflowNodeExecution) -> None:
@ -163,35 +222,96 @@ class SQLAlchemyWorkflowNodeExecutionRepository:
execution: The WorkflowNodeExecution instance to update execution: The WorkflowNodeExecution instance to update
""" """
with self._session_factory() as session: with self._session_factory() as session:
# Ensure tenant_id is set # Always set tenant_id from the repository context
if not execution.tenant_id: execution.tenant_id = self._tenant_id
execution.tenant_id = self._tenant_id
# Set app_id if provided and not already set # Set other fields only if they are provided in the repository
if self._app_id and not execution.app_id: if self._app_id is not None:
execution.app_id = self._app_id execution.app_id = self._app_id
if self._workflow_id is not None:
execution.workflow_id = self._workflow_id
if self._triggered_from is not None:
execution.triggered_from = self._triggered_from
if self._created_by_role is not None:
execution.created_by_role = self._created_by_role
if self._created_by is not None:
execution.created_by = self._created_by
session.merge(execution) session.merge(execution)
session.commit() session.commit()
def update_context(
self,
app_id: str | None = None,
workflow_id: str | None = None,
triggered_from: WorkflowNodeExecutionTriggeredFrom | None = None,
created_by_role: str | None = None,
created_by: str | None = None,
) -> None:
"""
Update the repository's context parameters.
This method allows updating the repository's context parameters after initialization.
Only parameters that are not None will be updated.
Args:
app_id: New app ID for filtering
workflow_id: New workflow ID for filtering
triggered_from: New triggered_from value (WorkflowNodeExecutionTriggeredFrom enum)
created_by_role: New creator role
created_by: New creator ID
"""
if app_id is not None:
self._app_id = app_id
if workflow_id is not None:
self._workflow_id = workflow_id
if triggered_from is not None:
self._triggered_from = triggered_from
if created_by_role is not None:
self._created_by_role = created_by_role
if created_by is not None:
self._created_by = created_by
def clear(self) -> None: def clear(self) -> None:
""" """
Clear all WorkflowNodeExecution records for the current tenant_id and app_id. Clear all WorkflowNodeExecution records for the current tenant_id and other filters.
This method deletes all WorkflowNodeExecution records that match the tenant_id This method deletes all WorkflowNodeExecution records that match the tenant_id
and app_id (if provided) associated with this repository instance. and other filters (app_id, workflow_id, triggered_from) associated with this repository instance.
""" """
with self._session_factory() as session: with self._session_factory() as session:
stmt = delete(WorkflowNodeExecution).where(WorkflowNodeExecution.tenant_id == self._tenant_id) stmt = delete(WorkflowNodeExecution).where(WorkflowNodeExecution.tenant_id == self._tenant_id)
# Apply additional filters if provided
if self._app_id: if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id) stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
if self._workflow_id:
stmt = stmt.where(WorkflowNodeExecution.workflow_id == self._workflow_id)
if self._triggered_from:
stmt = stmt.where(WorkflowNodeExecution.triggered_from == self._triggered_from)
result = session.execute(stmt) result = session.execute(stmt)
session.commit() session.commit()
# Build log message with all applied filters
filter_parts = []
if self._tenant_id:
filter_parts.append(f"tenant {self._tenant_id}")
if self._app_id:
filter_parts.append(f"app {self._app_id}")
if self._workflow_id:
filter_parts.append(f"workflow {self._workflow_id}")
if self._triggered_from:
filter_parts.append(f"triggered_from {self._triggered_from}")
filters_str = " and ".join(filter_parts)
deleted_count = result.rowcount deleted_count = result.rowcount
logger.info( logger.info(f"Cleared {deleted_count} workflow node execution records for {filters_str}")
f"Cleared {deleted_count} workflow node execution records for tenant {self._tenant_id}"
+ (f" and app {self._app_id}" if self._app_id else "")
)

@ -133,6 +133,8 @@ class WorkflowRunService:
params={ params={
"tenant_id": app_model.tenant_id, "tenant_id": app_model.tenant_id,
"app_id": app_model.id, "app_id": app_model.id,
"workflow_id": workflow_run.workflow_id,
"triggered_from": workflow_run.triggered_from,
"session_factory": db.session.get_bind(), "session_factory": db.session.get_bind(),
} }
) )

@ -289,6 +289,10 @@ class WorkflowService:
params={ params={
"tenant_id": app_model.tenant_id, "tenant_id": app_model.tenant_id,
"app_id": app_model.id, "app_id": app_model.id,
"workflow_id": draft_workflow.id,
"triggered_from": workflow_node_execution.triggered_from,
"created_by_role": workflow_node_execution.created_by_role,
"created_by": workflow_node_execution.created_by,
"session_factory": db.session.get_bind(), "session_factory": db.session.get_bind(),
} }
) )
@ -388,7 +392,7 @@ class WorkflowService:
workflow_node_execution = WorkflowNodeExecution() workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4()) workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = tenant_id workflow_node_execution.tenant_id = tenant_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP
workflow_node_execution.index = 1 workflow_node_execution.index = 1
workflow_node_execution.node_id = node_id workflow_node_execution.node_id = node_id
workflow_node_execution.node_type = node_instance.node_type workflow_node_execution.node_type = node_instance.node_type

Loading…
Cancel
Save