diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 823247bd50..cd764d56a5 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -297,7 +297,7 @@ class AdvancedChatAppGenerateTaskPipeline: with Session(db.engine, expire_on_commit=False) as session: # init workflow run - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( session=session, workflow_id=self._workflow_id, ) @@ -320,7 +320,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event ) node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( @@ -336,7 +336,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) @@ -356,7 +356,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event ) @@ -376,7 +376,7 @@ class AdvancedChatAppGenerateTaskPipeline: | QueueNodeInLoopFailedEvent | QueueNodeExceptionEvent, ): - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event ) @@ -486,7 +486,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -512,7 +512,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -538,7 +538,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -564,7 +564,7 @@ class AdvancedChatAppGenerateTaskPipeline: elif isinstance(event, QueueStopEvent): if self._workflow_run_id and graph_runtime_state: with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 07c933b723..f2ebd78b36 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -263,7 +263,7 @@ class WorkflowAppGenerateTaskPipeline: with Session(db.engine, expire_on_commit=False) as session: # init workflow run - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( session=session, workflow_id=self._workflow_id, ) @@ -281,7 +281,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event, ) @@ -298,7 +298,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( @@ -310,7 +310,7 @@ class WorkflowAppGenerateTaskPipeline: if node_start_response: yield node_start_response elif isinstance(event, QueueNodeSucceededEvent): - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event ) node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( @@ -328,7 +328,7 @@ class WorkflowAppGenerateTaskPipeline: | QueueNodeInLoopFailedEvent | QueueNodeExceptionEvent, ): - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, ) node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( @@ -445,7 +445,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -472,7 +472,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -500,7 +500,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index 07c933b723..f2ebd78b36 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -263,7 +263,7 @@ class WorkflowAppGenerateTaskPipeline: with Session(db.engine, expire_on_commit=False) as session: # init workflow run - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( session=session, workflow_id=self._workflow_id, ) @@ -281,7 +281,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event, ) @@ -298,7 +298,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( @@ -310,7 +310,7 @@ class WorkflowAppGenerateTaskPipeline: if node_start_response: yield node_start_response elif isinstance(event, QueueNodeSucceededEvent): - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event ) node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( @@ -328,7 +328,7 @@ class WorkflowAppGenerateTaskPipeline: | QueueNodeInLoopFailedEvent | QueueNodeExceptionEvent, ): - workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed( + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, ) node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( @@ -445,7 +445,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -472,7 +472,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, @@ -500,7 +500,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("graph runtime state not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed( + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_run_id=self._workflow_run_id, total_tokens=graph_runtime_state.total_tokens, total_steps=graph_runtime_state.node_run_steps, diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index db724b2ef6..d4c2b1b6bd 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -85,7 +85,7 @@ class WorkflowCycleManager: self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository - def _handle_workflow_run_start( + def handle_workflow_run_start( self, *, session: Session, @@ -130,7 +130,7 @@ class WorkflowCycleManager: return execution - def _handle_workflow_run_success( + def handle_workflow_run_success( self, *, workflow_run_id: str, @@ -162,7 +162,7 @@ class WorkflowCycleManager: return workflow_execution - def _handle_workflow_run_partial_success( + def handle_workflow_run_partial_success( self, *, workflow_run_id: str, @@ -195,7 +195,7 @@ class WorkflowCycleManager: return execution - def _handle_workflow_run_failed( + def handle_workflow_run_failed( self, *, workflow_run_id: str, @@ -246,7 +246,7 @@ class WorkflowCycleManager: return execution - def _handle_node_execution_start( + def handle_node_execution_start( self, *, workflow_execution_id: str, @@ -282,7 +282,7 @@ class WorkflowCycleManager: return domain_execution - def _handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> NodeExecution: + def handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> NodeExecution: # Get the domain model from repository domain_execution = self._workflow_node_execution_repository.get_by_node_execution_id(event.node_execution_id) if not domain_execution: @@ -315,7 +315,7 @@ class WorkflowCycleManager: return domain_execution - def _handle_workflow_node_execution_failed( + def handle_workflow_node_execution_failed( self, *, event: QueueNodeFailedEvent @@ -365,7 +365,7 @@ class WorkflowCycleManager: return domain_execution - def _handle_workflow_node_execution_retried( + def handle_workflow_node_execution_retried( self, *, workflow_execution_id: str, event: QueueNodeRetryEvent ) -> NodeExecution: workflow_execution = self._get_workflow_execution_or_raise_error(workflow_execution_id) diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 202a41d177..9c955fc086 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -172,12 +172,12 @@ def test_init( def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_workflow): - """Test _handle_workflow_run_start method""" + """Test handle_workflow_run_start method""" # Mock session.scalar to return the workflow and max sequence mock_session.scalar.side_effect = [real_workflow, 5] # Call the method - workflow_execution = workflow_cycle_manager._handle_workflow_run_start( + workflow_execution = workflow_cycle_manager.handle_workflow_run_start( session=mock_session, workflow_id="test-workflow-id", ) @@ -191,7 +191,7 @@ def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_wo def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execution_repository): - """Test _handle_workflow_run_success method""" + """Test handle_workflow_run_success method""" # Create a real WorkflowExecution workflow_execution = WorkflowExecution( @@ -209,7 +209,7 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu workflow_cycle_manager._workflow_execution_repository.get.return_value = workflow_execution # Call the method - result = workflow_cycle_manager._handle_workflow_run_success( + result = workflow_cycle_manager.handle_workflow_run_success( workflow_run_id="test-workflow-run-id", total_tokens=100, total_steps=5, @@ -226,7 +226,7 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execution_repository): - """Test _handle_workflow_run_failed method""" + """Test handle_workflow_run_failed method""" # Create a real WorkflowExecution workflow_execution = WorkflowExecution( @@ -247,7 +247,7 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut workflow_cycle_manager._workflow_node_execution_repository.get_running_executions.return_value = [] # Call the method - result = workflow_cycle_manager._handle_workflow_run_failed( + result = workflow_cycle_manager.handle_workflow_run_failed( workflow_run_id="test-workflow-run-id", total_tokens=50, total_steps=3, @@ -265,7 +265,7 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execution_repository): - """Test _handle_node_execution_start method""" + """Test handle_node_execution_start method""" # Create a real WorkflowExecution workflow_execution = WorkflowExecution( @@ -300,7 +300,7 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu event.in_loop_id = "test-loop-id" # Call the method - result = workflow_cycle_manager._handle_node_execution_start( + result = workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=workflow_execution.id, event=event, ) @@ -351,7 +351,7 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work def test_handle_workflow_node_execution_success(workflow_cycle_manager): - """Test _handle_workflow_node_execution_success method""" + """Test handle_workflow_node_execution_success method""" # Create a mock event event = MagicMock(spec=QueueNodeSucceededEvent) event.node_execution_id = "test-node-execution-id" @@ -379,7 +379,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution # Call the method - result = workflow_cycle_manager._handle_workflow_node_execution_success( + result = workflow_cycle_manager.handle_workflow_node_execution_success( event=event, ) @@ -392,7 +392,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workflow_execution_repository): - """Test _handle_workflow_run_partial_success method""" + """Test handle_workflow_run_partial_success method""" # Create a real WorkflowExecution workflow_execution = WorkflowExecution( @@ -410,7 +410,7 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl workflow_cycle_manager._workflow_execution_repository.get.return_value = workflow_execution # Call the method - result = workflow_cycle_manager._handle_workflow_run_partial_success( + result = workflow_cycle_manager.handle_workflow_run_partial_success( workflow_run_id="test-workflow-run-id", total_tokens=75, total_steps=4, @@ -429,7 +429,7 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl def test_handle_workflow_node_execution_failed(workflow_cycle_manager): - """Test _handle_workflow_node_execution_failed method""" + """Test handle_workflow_node_execution_failed method""" # Create a mock event event = MagicMock(spec=QueueNodeFailedEvent) event.node_execution_id = "test-node-execution-id" @@ -458,7 +458,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution # Call the method - result = workflow_cycle_manager._handle_workflow_node_execution_failed( + result = workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, )