From e7a6942971813962c7dcf1a6a60914e1e60b8b89 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 21 May 2025 18:16:29 +0800 Subject: [PATCH] feat: Apply DI for workflow_execution_repository Signed-off-by: -LAN- --- .../app/apps/advanced_chat/app_generator.py | 52 +++++++++++++++++-- .../advanced_chat/generate_task_pipeline.py | 3 ++ api/core/app/apps/workflow/app_generator.py | 51 ++++++++++++++++-- .../workflow_app_generate_task_pipeline.py | 3 ++ api/core/workflow/workflow_cycle_manager.py | 3 ++ 5 files changed, 105 insertions(+), 7 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index b74100bb19..4b021aa0b3 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -26,10 +26,13 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository +from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from extensions.ext_database import db from factories import file_factory from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom +from models.enums import WorkflowRunTriggeredFrom from services.conversation_service import ConversationService from services.errors.message import MessageNotExistsError @@ -159,8 +162,22 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) - # Create workflow node execution repository + # Create repositories + # + # Create session factory session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + # Create workflow execution(aka workflow run) repository + if invoke_from == InvokeFrom.DEBUGGER: + workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING + else: + workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=workflow_triggered_from, + ) + # Create workflow node execution repository workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, user=user, @@ -173,6 +190,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user=user, invoke_from=invoke_from, application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, conversation=conversation, stream=streaming, @@ -226,8 +244,18 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) - # Create workflow node execution repository + # Create repositories + # + # Create session factory session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + # Create workflow execution(aka workflow run) repository + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + ) + # Create workflow node execution repository workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, user=user, @@ -240,6 +268,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user=user, invoke_from=InvokeFrom.DEBUGGER, application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, conversation=None, stream=streaming, @@ -291,8 +320,18 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) - # Create workflow node execution repository + # Create repositories + # + # Create session factory session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + # Create workflow execution(aka workflow run) repository + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + ) + # Create workflow node execution repository workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, user=user, @@ -305,6 +344,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user=user, invoke_from=InvokeFrom.DEBUGGER, application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, conversation=None, stream=streaming, @@ -317,6 +357,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user: Union[Account, EndUser], invoke_from: InvokeFrom, application_generate_entity: AdvancedChatAppGenerateEntity, + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, conversation: Optional[Conversation] = None, stream: bool = True, @@ -381,6 +422,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message, user=user, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, ) @@ -453,6 +495,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation: Conversation, message: Message, user: Union[Account, EndUser], + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, stream: bool = False, ) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: @@ -476,9 +519,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message, user=user, - stream=stream, dialogue_count=self._dialogue_count, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + stream=stream, ) try: diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 735b2a9709..6bf69cee6e 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -64,6 +64,7 @@ from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import WorkflowCycleManager from events.message_event import message_was_created @@ -94,6 +95,7 @@ class AdvancedChatAppGenerateTaskPipeline: user: Union[Account, EndUser], stream: bool, dialogue_count: int, + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( @@ -125,6 +127,7 @@ class AdvancedChatAppGenerateTaskPipeline: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, }, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, ) diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index d49ff682b9..4d6ef324b0 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -23,11 +23,14 @@ from core.app.entities.task_entities import WorkflowAppBlockingResponse, Workflo from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository +from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_app_generate_task_pipeline import WorkflowAppGenerateTaskPipeline from extensions.ext_database import db from factories import file_factory from models import Account, App, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom +from models.enums import WorkflowRunTriggeredFrom logger = logging.getLogger(__name__) @@ -136,9 +139,22 @@ class WorkflowAppGenerator(BaseAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) - # Create workflow node execution repository + # Create repositories + # + # Create session factory session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - + # Create workflow execution(aka workflow run) repository + if invoke_from == InvokeFrom.DEBUGGER: + workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING + else: + workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=workflow_triggered_from, + ) + # Create workflow node execution repository workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, user=user, @@ -152,6 +168,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, application_generate_entity=application_generate_entity, invoke_from=invoke_from, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, workflow_thread_pool_id=workflow_thread_pool_id, @@ -165,6 +182,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user: Union[Account, EndUser], application_generate_entity: WorkflowAppGenerateEntity, invoke_from: InvokeFrom, + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, streaming: bool = True, workflow_thread_pool_id: Optional[str] = None, @@ -209,6 +227,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow=workflow, queue_manager=queue_manager, user=user, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=streaming, ) @@ -262,6 +281,17 @@ class WorkflowAppGenerator(BaseAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) + # Create repositories + # + # Create session factory + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + # Create workflow execution(aka workflow run) repository + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + ) # Create workflow node execution repository session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) @@ -278,6 +308,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, invoke_from=InvokeFrom.DEBUGGER, application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, ) @@ -327,6 +358,17 @@ class WorkflowAppGenerator(BaseAppGenerator): contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) + # Create repositories + # + # Create session factory + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + # Create workflow execution(aka workflow run) repository + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=user, + app_id=application_generate_entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + ) # Create workflow node execution repository session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) @@ -343,6 +385,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, invoke_from=InvokeFrom.DEBUGGER, application_generate_entity=application_generate_entity, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, ) @@ -400,6 +443,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow: Workflow, queue_manager: AppQueueManager, user: Union[Account, EndUser], + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, stream: bool = False, ) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: @@ -419,8 +463,9 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow=workflow, queue_manager=queue_manager, user=user, - stream=stream, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + stream=stream, ) try: diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index 0396fa8157..a70c635963 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -54,6 +54,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import SystemVariableKey +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import WorkflowCycleManager from extensions.ext_database import db @@ -83,6 +84,7 @@ class WorkflowAppGenerateTaskPipeline: queue_manager: AppQueueManager, user: Union[Account, EndUser], stream: bool, + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( @@ -111,6 +113,7 @@ class WorkflowAppGenerateTaskPipeline: SystemVariableKey.WORKFLOW_ID: workflow.id, SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, }, + workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, ) diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 673083f003..b6fc2fc5c2 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -57,6 +57,7 @@ from core.workflow.entities.node_execution_entities import ( from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_entry import WorkflowEntry from models import ( @@ -77,11 +78,13 @@ class WorkflowCycleManager: *, application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], workflow_system_variables: dict[SystemVariableKey, Any], + workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, ) -> None: self._workflow_run: WorkflowRun | None = None self._application_generate_entity = application_generate_entity self._workflow_system_variables = workflow_system_variables + self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository def _handle_workflow_run_start(