@ -166,7 +166,6 @@ class AdvancedChatAppGenerateTaskPipeline:
Process generate task pipeline .
: return :
"""
# start generate conversation name thread
self . _conversation_name_generate_thread = self . _message_cycle_manager . generate_conversation_name (
conversation_id = self . _conversation_id , query = self . _application_generate_entity . query
)
@ -258,15 +257,12 @@ class AdvancedChatAppGenerateTaskPipeline:
yield response
start_listener_time = time . time ( )
# timeout
while ( time . time ( ) - start_listener_time ) < TTS_AUTO_PLAY_TIMEOUT :
try :
if not tts_publisher :
break
audio_trunk = tts_publisher . check_and_get_audio ( )
if audio_trunk is None :
# release cpu
# sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file)
time . sleep ( TTS_AUTO_PLAY_YIELD_CPU_TIME )
continue
if audio_trunk . status == " finish " :
@ -848,20 +844,16 @@ class AdvancedChatAppGenerateTaskPipeline:
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
# Use elegant pattern matching for event dispatch
match event :
# Handle QueueWorkflowStartedEvent specially - it has a side effect
case QueueWorkflowStartedEvent ( ) :
graph_runtime_state = event . graph_runtime_state # Side effect!
graph_runtime_state = event . graph_runtime_state
yield from self . _handle_workflow_started_event ( event )
# Handle QueueTextChunkEvent specially - needs queue_message
case QueueTextChunkEvent ( ) :
yield from self . _handle_text_chunk_event (
event , tts_publisher = tts_publisher , queue_message = queue_message
)
# Handle events that cause loop breaks
case QueueErrorEvent ( ) :
yield from self . _handle_error_event ( event )
break
@ -890,9 +882,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
) :
yield from responses
# Continue with next event (elegant fallthrough)
# Cleanup - publish None when task finished
if tts_publisher :
tts_publisher . publish ( None )
@ -964,7 +954,6 @@ class AdvancedChatAppGenerateTaskPipeline:
"""
if self . _base_task_pipeline . _output_moderation_handler :
if self . _base_task_pipeline . _output_moderation_handler . should_direct_output ( ) :
# stop subscribe new token when output moderation should direct output
self . _task_state . answer = self . _base_task_pipeline . _output_moderation_handler . get_final_output ( )
self . _base_task_pipeline . _queue_manager . publish (
QueueTextChunkEvent ( text = self . _task_state . answer ) , PublishFrom . TASK_PIPELINE