From bc518be301cb5bdf3b71536dd1e5812515d3960e Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 21 May 2025 22:18:38 +0800 Subject: [PATCH] refactor(task_pipeline): Use WorkflowResponseConverter in TasksPipeline Signed-off-by: -LAN- --- .../advanced_chat/generate_task_pipeline.py | 51 +++++++++++-------- .../apps/workflow/generate_task_pipeline.py | 47 +++++++++-------- 2 files changed, 56 insertions(+), 42 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index cd764d56a5..a088d19c70 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -57,6 +57,7 @@ from core.app.entities.task_entities import ( ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manage import MessageCycleManage +from core.app.workflow_response_converter import WorkflowResponseConverter from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder @@ -131,6 +132,10 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution_repository=workflow_node_execution_repository, ) + self._workflow_response_converter = WorkflowResponseConverter( + application_generate_entity=application_generate_entity, + ) + self._task_state = WorkflowTaskState() self._message_cycle_manager = MessageCycleManage( application_generate_entity=application_generate_entity, task_state=self._task_state @@ -306,7 +311,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not message: raise ValueError(f"Message not found: {self._message_id}") message.workflow_run_id = workflow_execution.id - workflow_start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( + workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -323,7 +328,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event ) - node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( + node_retry_resp = self._workflow_response_converter.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -340,7 +345,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event ) - node_start_resp = self._workflow_cycle_manager.workflow_node_start_to_stream_response( + node_start_resp = self._workflow_response_converter.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -352,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline: # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: self._recorded_files.extend( - self._workflow_cycle_manager.fetch_files_from_node_outputs(event.outputs or {}) + self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) ) with Session(db.engine, expire_on_commit=False) as session: @@ -360,7 +365,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -380,7 +385,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -392,10 +397,12 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, + parallel_start_resp = ( + self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) ) yield parallel_start_resp @@ -404,7 +411,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( + self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -416,7 +423,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -427,7 +434,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -438,7 +445,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -449,7 +456,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -460,7 +467,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -471,7 +478,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -495,7 +502,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -521,7 +528,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=None, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -548,7 +555,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, exceptions_count=event.exceptions_count, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -573,7 +580,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -657,7 +664,7 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._message_end_to_stream_response() elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager.handle_agent_log( + yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index f2ebd78b36..4d1ed429a2 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -52,6 +52,7 @@ from core.app.entities.task_entities import ( WorkflowTaskState, ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline +from core.app.workflow_response_converter import WorkflowResponseConverter from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution_entities import WorkflowExecution @@ -119,6 +120,10 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution_repository=workflow_node_execution_repository, ) + self._workflow_response_converter = WorkflowResponseConverter( + application_generate_entity=application_generate_entity, + ) + self._application_generate_entity = application_generate_entity self._workflow_id = workflow.id self._workflow_features_dict = workflow.features_dict @@ -268,7 +273,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_id=self._workflow_id, ) self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( + start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -285,7 +290,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event, ) - response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( + response = self._workflow_response_converter.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -301,7 +306,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) - node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( + node_start_response = self._workflow_response_converter.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -313,7 +318,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event ) - node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -331,7 +336,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, ) - node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_failed_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -344,10 +349,12 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, + parallel_start_resp = ( + self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) ) yield parallel_start_resp @@ -357,7 +364,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( + self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -370,7 +377,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -382,7 +389,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -394,7 +401,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -406,7 +413,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -418,7 +425,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -430,7 +437,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -457,7 +464,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -485,7 +492,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -518,7 +525,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -540,7 +547,7 @@ class WorkflowAppGenerateTaskPipeline: delta_text, from_variable_selector=event.from_variable_selector ) elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager.handle_agent_log( + yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: