From 7a62d1e97809f1721c72563c18e401c16083ca1d Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 21 May 2025 19:35:39 +0800 Subject: [PATCH] refactor(workflow_cycle_manager): Refactor `_workflow_loop_start_to_stream_response` Signed-off-by: -LAN- --- .../apps/advanced_chat/generate_task_pipeline.py | 15 +++++---------- .../workflow_app_generate_task_pipeline.py | 15 +++++---------- api/core/workflow/workflow_cycle_manager.py | 5 ++--- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index bbf434874b..565da664dc 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -449,16 +449,11 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_run=workflow_run, - event=event, - ) + loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) yield loop_start_resp elif isinstance(event, QueueLoopNextEvent): diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index b49c1c291d..775c94e415 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -406,16 +406,11 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_run=workflow_run, - event=event, - ) + loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) yield loop_start_resp diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 4d3d3d3778..eefaed4950 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -758,12 +758,11 @@ class WorkflowCycleManager: ) def _workflow_loop_start_to_stream_response( - self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueLoopStartEvent + self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent ) -> LoopNodeStartStreamResponse: - _ = session return LoopNodeStartStreamResponse( task_id=task_id, - workflow_run_id=workflow_run.id, + workflow_run_id=workflow_execution_id, data=LoopNodeStartStreamResponse.Data( id=event.node_id, node_id=event.node_id,