From 5510c9cfa86ebcdbc1f1e6334151183b7c22bccf Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 21 May 2025 19:15:11 +0800 Subject: [PATCH] refactor(workflow_cycle_manager): Refactor `_handle_workflow_node_execution_retried` to use WorkflowExecution Signed-off-by: -LAN- --- .../apps/advanced_chat/generate_task_pipeline.py | 5 +---- .../workflow/workflow_app_generate_task_pipeline.py | 6 ++---- api/core/workflow/workflow_cycle_manager.py | 13 ++++--------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 4816ace9fa..d0531b98fe 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -320,11 +320,8 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event + workflow_execution_id=self._workflow_run_id, event=event ) node_retry_resp = self._workflow_cycle_manager._workflow_node_retry_to_stream_response( event=event, diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index f0afff1913..b1097f17ba 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -281,11 +281,9 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event + workflow_execution_id=self._workflow_run_id, + event=event, ) response = self._workflow_cycle_manager._workflow_node_retry_to_stream_response( event=event, diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 44319dace7..2ca0d01477 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -360,14 +360,9 @@ class WorkflowCycleManager: return domain_execution def _handle_workflow_node_execution_retried( - self, *, workflow_run: WorkflowRun, event: QueueNodeRetryEvent + self, *, workflow_execution_id: str, event: QueueNodeRetryEvent ) -> NodeExecution: - """ - Workflow node execution failed - :param workflow_run: workflow run - :param event: queue node failed event - :return: - """ + workflow_execution = self._get_workflow_execution(workflow_execution_id) created_at = event.start_at finished_at = datetime.now(UTC).replace(tzinfo=None) elapsed_time = (finished_at - created_at).total_seconds() @@ -392,8 +387,8 @@ class WorkflowCycleManager: # Create a domain model domain_execution = NodeExecution( id=str(uuid4()), - workflow_id=workflow_run.workflow_id, - workflow_run_id=workflow_run.id, + workflow_id=workflow_execution.workflow_id, + workflow_run_id=workflow_execution.id, predecessor_node_id=event.predecessor_node_id, node_execution_id=event.node_execution_id, node_id=event.node_id,