diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index d0531b98fe..51bae7d0ec 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -336,20 +336,15 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( - workflow_run=workflow_run, event=event - ) + workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_execution_id=self._workflow_run_id, event=event + ) - node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - session.commit() + node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) if node_start_resp: yield node_start_resp diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py index b1097f17ba..db10260fdc 100644 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -298,19 +298,14 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_run = self._workflow_cycle_manager._get_workflow_run( - session=session, workflow_run_id=self._workflow_run_id - ) - workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( - workflow_run=workflow_run, event=event - ) - node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - session.commit() + workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_execution_id=self._workflow_run_id, event=event + ) + node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) if node_start_response: yield node_start_response diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 2ca0d01477..11eead109d 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -247,7 +247,14 @@ class WorkflowCycleManager: return execution - def _handle_node_execution_start(self, *, workflow_run: WorkflowRun, event: QueueNodeStartedEvent) -> NodeExecution: + def _handle_node_execution_start( + self, + *, + workflow_execution_id: str, + event: QueueNodeStartedEvent, + ) -> NodeExecution: + workflow_execution = self._get_workflow_execution(workflow_execution_id) + # Create a domain model created_at = datetime.now(UTC).replace(tzinfo=None) metadata = { @@ -258,8 +265,8 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), - workflow_id=workflow_run.workflow_id, - workflow_run_id=workflow_run.id, + workflow_id=workflow_execution.workflow_id, + workflow_run_id=workflow_execution.id, predecessor_node_id=event.predecessor_node_id, index=event.node_run_index, node_execution_id=event.node_execution_id,