diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 4f34464759..337b779b50 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -166,7 +166,6 @@ class AdvancedChatAppGenerateTaskPipeline: Process generate task pipeline. :return: """ - # start generate conversation name thread self._conversation_name_generate_thread = self._message_cycle_manager.generate_conversation_name( conversation_id=self._conversation_id, query=self._application_generate_entity.query ) @@ -258,15 +257,12 @@ class AdvancedChatAppGenerateTaskPipeline: yield response start_listener_time = time.time() - # timeout while (time.time() - start_listener_time) < TTS_AUTO_PLAY_TIMEOUT: try: if not tts_publisher: break audio_trunk = tts_publisher.check_and_get_audio() if audio_trunk is None: - # release cpu - # sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file) time.sleep(TTS_AUTO_PLAY_YIELD_CPU_TIME) continue if audio_trunk.status == "finish": @@ -848,20 +844,16 @@ class AdvancedChatAppGenerateTaskPipeline: for queue_message in self._base_task_pipeline._queue_manager.listen(): event = queue_message.event - # Use elegant pattern matching for event dispatch match event: - # Handle QueueWorkflowStartedEvent specially - it has a side effect case QueueWorkflowStartedEvent(): - graph_runtime_state = event.graph_runtime_state # Side effect! + graph_runtime_state = event.graph_runtime_state yield from self._handle_workflow_started_event(event) - # Handle QueueTextChunkEvent specially - needs queue_message case QueueTextChunkEvent(): yield from self._handle_text_chunk_event( event, tts_publisher=tts_publisher, queue_message=queue_message ) - # Handle events that cause loop breaks case QueueErrorEvent(): yield from self._handle_error_event(event) break @@ -890,9 +882,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) ): yield from responses - # Continue with next event (elegant fallthrough) - # Cleanup - publish None when task finished if tts_publisher: tts_publisher.publish(None) @@ -964,7 +954,6 @@ class AdvancedChatAppGenerateTaskPipeline: """ if self._base_task_pipeline._output_moderation_handler: if self._base_task_pipeline._output_moderation_handler.should_direct_output(): - # stop subscribe new token when output moderation should direct output self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output() self._base_task_pipeline._queue_manager.publish( QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index cd4ece08e7..9a39b2e01e 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -711,20 +711,16 @@ class WorkflowAppGenerateTaskPipeline: for queue_message in self._base_task_pipeline._queue_manager.listen(): event = queue_message.event - # Use elegant pattern matching for event dispatch match event: - # Handle QueueWorkflowStartedEvent specially - it has a side effect case QueueWorkflowStartedEvent(): - graph_runtime_state = event.graph_runtime_state # Side effect! + graph_runtime_state = event.graph_runtime_state yield from self._handle_workflow_started_event(event) - # Handle QueueTextChunkEvent specially - needs queue_message case QueueTextChunkEvent(): yield from self._handle_text_chunk_event( event, tts_publisher=tts_publisher, queue_message=queue_message ) - # Handle events that cause loop breaks case QueueErrorEvent(): yield from self._handle_error_event(event) break @@ -741,9 +737,7 @@ class WorkflowAppGenerateTaskPipeline: ) ): yield from responses - # Continue with next event (elegant fallthrough) - # Cleanup - publish None when task finished if tts_publisher: tts_publisher.publish(None)