From 5da9bc636e607c7cbc6c31446d38df08bb3367bc Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 21 May 2025 19:23:49 +0800 Subject: [PATCH] refactor(workflow_cycle_manager): Refactor `_workflow_parallel_branch_finished_to_stream_response` Signed-off-by: -LAN- --- .../advanced_chat/generate_task_pipeline.py | 17 ++++++----------- .../workflow_app_generate_task_pipeline.py | 17 ++++++----------- api/core/workflow/workflow_cycle_manager.py | 6 ++---- 3 files changed, 14 insertions(+), 26 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index aec88c1bfd..d58d2667b9 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -403,18 +403,13 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - parallel_finish_resp = ( - self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_run=workflow_run, - event=event, - ) + parallel_finish_resp = ( + self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, ) + ) yield parallel_finish_resp elif isinstance(event, QueueIterationStartEvent): diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index 7e48f32941..fc45a832d8 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -356,18 +356,13 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - parallel_finish_resp = ( - self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_run=workflow_run, - event=event, - ) + parallel_finish_resp = ( + self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, ) + ) yield parallel_finish_resp diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 1deeb770a3..83d863f57a 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -654,15 +654,13 @@ class WorkflowCycleManager: def _workflow_parallel_branch_finished_to_stream_response( self, *, - session: Session, task_id: str, - workflow_run: WorkflowRun, + workflow_execution_id: str, event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent, ) -> ParallelBranchFinishedStreamResponse: - _ = session return ParallelBranchFinishedStreamResponse( task_id=task_id, - workflow_run_id=workflow_run.id, + workflow_run_id=workflow_execution_id, data=ParallelBranchFinishedStreamResponse.Data( parallel_id=event.parallel_id, parallel_branch_id=event.parallel_start_node_id,