@ -1,7 +1,6 @@
import logging
import time
from collections . abc import Callable , Generator , Mapping
from contextlib import contextmanager
from collections . abc import Generator , Mapping
from threading import Thread
from typing import Any , Optional , Union
@ -16,7 +15,6 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom ,
)
from core . app . entities . queue_entities import (
MessageQueueMessage ,
QueueAdvancedChatMessageEndEvent ,
QueueAgentLogEvent ,
QueueAnnotationReplyEvent ,
@ -46,7 +44,6 @@ from core.app.entities.queue_entities import (
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowSucceededEvent ,
WorkflowQueueMessage ,
)
from core . app . entities . task_entities import (
ChatbotAppBlockingResponse ,
@ -55,7 +52,6 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse ,
MessageAudioStreamResponse ,
MessageEndStreamResponse ,
PingStreamResponse ,
StreamResponse ,
WorkflowTaskState ,
)
@ -166,6 +162,7 @@ class AdvancedChatAppGenerateTaskPipeline:
Process generate task pipeline .
: return :
"""
# start generate conversation name thread
self . _conversation_name_generate_thread = self . _message_cycle_manager . generate_conversation_name (
conversation_id = self . _conversation_id , query = self . _application_generate_entity . query
)
@ -257,12 +254,15 @@ class AdvancedChatAppGenerateTaskPipeline:
yield response
start_listener_time = time . time ( )
# timeout
while ( time . time ( ) - start_listener_time ) < TTS_AUTO_PLAY_TIMEOUT :
try :
if not tts_publisher :
break
audio_trunk = tts_publisher . check_and_get_audio ( )
if audio_trunk is None :
# release cpu
# sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file)
time . sleep ( TTS_AUTO_PLAY_YIELD_CPU_TIME )
continue
if audio_trunk . status == " finish " :
@ -276,66 +276,58 @@ class AdvancedChatAppGenerateTaskPipeline:
if tts_publisher :
yield MessageAudioEndStreamResponse ( audio = " " , task_id = task_id )
@contextmanager
def _database_session ( self ) :
""" Context manager for database sessions. """
with Session ( db . engine , expire_on_commit = False ) as session :
try :
yield session
session . commit ( )
except Exception :
session . rollback ( )
raise
def _ensure_workflow_initialized ( self ) - > None :
""" Fluent validation for workflow state. """
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _process_stream_response (
self ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
) - > Generator [ StreamResponse , None , None ] :
"""
Process stream response .
: return :
"""
# init fake graph runtime state
graph_runtime_state : Optional [ GraphRuntimeState ] = None
def _ensure_graph_runtime_initialized ( self , graph_runtime_state : Optional [ GraphRuntimeState ] ) - > GraphRuntimeState :
""" Fluent validation for graph runtime state. """
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
return graph_runtime_state
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
def _handle_ping_event ( self , event : QueuePingEvent , * * kwargs ) - > Generator [ PingStreamResponse , None , None ] :
""" Handle ping events. """
if isinstance ( event , QueuePingEvent ) :
yield self . _base_task_pipeline . _ping_stream_response ( )
def _handle_error_event ( self , event : QueueErrorEvent , * * kwargs ) - > Generator [ ErrorStreamResponse , None , None ] :
""" Handle error events. """
with self . _database_session ( ) as session :
err = self . _base_task_pipeline . _handle_error ( event = event , session = session , message_id = self . _message_id )
elif isinstance ( event , QueueErrorEvent ) :
with Session ( db . engine , expire_on_commit = False ) as session :
err = self . _base_task_pipeline . _handle_error (
event = event , session = session , message_id = self . _message_id
)
session . commit ( )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
def _handle_workflow_started_event (
self , event : QueueWorkflowStartedEvent , * , graph_runtime_state : Optional [ GraphRuntimeState ] = None , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow started events. """
# Override graph runtime state - this is a side effect but necessary
break
elif isinstance ( event , QueueWorkflowStartedEvent ) :
# override graph runtime state
graph_runtime_state = event . graph_runtime_state
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
# init workflow run
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_start ( )
self . _workflow_run_id = workflow_execution . id_
message = self . _get_message ( session = session )
if not message :
raise ValueError ( f " Message not found: { self . _message_id } " )
message . workflow_run_id = workflow_execution . id_
workflow_start_resp = self . _workflow_response_converter . workflow_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
session . commit ( )
yield workflow_start_resp
elif isinstance (
event ,
QueueNodeRetryEvent ,
) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_node_retry_event ( self , event : QueueNodeRetryEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle node retry events. """
self . _ensure_workflow_initialized ( )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_retried (
workflow_execution_id = self . _workflow_run_id , event = event
)
@ -344,15 +336,13 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
session . commit ( )
if node_retry_resp :
yield node_retry_resp
def _handle_node_started_event (
self , event : QueueNodeStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node started events. """
self . _ensure_workflow_initialized ( )
elif isinstance ( event , QueueNodeStartedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
workflow_node_execution = self . _workflow_cycle_manager . handle_node_execution_start (
workflow_execution_id = self . _workflow_run_id , event = event
@ -366,201 +356,158 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_start_resp :
yield node_start_resp
def _handle_node_succeeded_event (
self , event : QueueNodeSucceededEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node succeeded events. """
elif isinstance ( event , QueueNodeSucceededEvent ) :
# Record files if it's an answer node or end node
if event . node_type in [ NodeType . ANSWER , NodeType . END ] :
self . _recorded_files . extend (
self . _workflow_response_converter . fetch_files_from_node_outputs ( event . outputs or { } )
)
with self . _database_session ( ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success ( event = event )
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success (
event = event
)
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
session . commit ( )
self . _save_output_for_event ( event , workflow_node_execution . id )
if node_finish_resp :
yield node_finish_resp
def _handle_node_failed_events (
self ,
event : Union [
QueueNodeFailedEvent , QueueNodeInIterationFailedEvent , QueueNodeInLoopFailedEvent , QueueNodeException Event
] ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle various node failure events. """
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed ( event = event )
elif isinstance (
event ,
QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailed Event
| QueueNodeExceptionEvent ,
) :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed (
event = event
)
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
if isinstance ( event , QueueNodeExceptionEvent ) :
self . _save_output_for_event ( event , workflow_node_execution . id )
if node_finish_resp :
yield node_finish_resp
elif isinstance ( event , QueueParallelBranchRunStartedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_text_chunk_event (
self ,
event : QueueTextChunkEvent ,
* ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle text chunk events. """
delta_text = event . text
if delta_text is None :
return
# Handle output moderation chunk
should_direct_answer = self . _handle_output_moderation_chunk ( delta_text )
if should_direct_answer :
return
# Only publish tts message at text chunk streaming
if tts_publisher and queue_message :
tts_publisher . publish ( queue_message )
self . _task_state . answer + = delta_text
yield self . _message_cycle_manager . message_to_stream_response (
answer = delta_text , message_id = self . _message_id , from_variable_selector = event . from_variable_selector
)
def _handle_parallel_branch_started_event (
self , event : QueueParallelBranchRunStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch started events. """
self . _ensure_workflow_initialized ( )
parallel_start_resp = self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
parallel_start_resp = (
self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield parallel_start_resp
)
def _handle_parallel_branch_finished_events (
self , event : Union [ QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ] , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch finished events. """
self . _ensure_workflow_initialized ( )
yield parallel_start_resp
elif isinstance ( event , QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
parallel_finish_resp = self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
parallel_finish_resp = (
self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield parallel_finish_resp
)
def _handle_iteration_start_event (
self , event : QueueIterationStartEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration start events. """
self . _ensure_workflow_initialized ( )
yield parallel_finish_resp
elif isinstance ( event , QueueIterationStartEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
iter_start_resp = self . _workflow_response_converter . workflow_iteration_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_start_resp
def _handle_iteration_next_event (
self , event : QueueIterationNextEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration next events. """
self . _ensure_workflow_initialized ( )
yield iter_start_resp
elif isinstance ( event , QueueIterationNextEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
iter_next_resp = self . _workflow_response_converter . workflow_iteration_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_next_resp
def _handle_iteration_completed_event (
self , event : QueueIterationCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration completed events. """
self . _ensure_workflow_initialized ( )
yield iter_next_resp
elif isinstance ( event , QueueIterationCompletedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
iter_finish_resp = self . _workflow_response_converter . workflow_iteration_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_finish_resp
def _handle_loop_start_event ( self , event : QueueLoopStartEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop start events. """
self . _ensure_workflow_initialized ( )
yield iter_finish_resp
elif isinstance ( event , QueueLoopStartEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
loop_start_resp = self . _workflow_response_converter . workflow_loop_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_start_resp
def _handle_loop_next_event ( self , event : QueueLoopNextEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop next events. """
self . _ensure_workflow_initialized ( )
yield loop_start_resp
elif isinstance ( event , QueueLoopNextEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
loop_next_resp = self . _workflow_response_converter . workflow_loop_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_next_resp
def _handle_loop_completed_event (
self , event : QueueLoopCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle loop completed events. """
self . _ensure_workflow_initialized ( )
yield loop_next_resp
elif isinstance ( event , QueueLoopCompletedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
loop_finish_resp = self . _workflow_response_converter . workflow_loop_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_finish_resp
elif isinstance ( event , QueueWorkflowSucceededEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_workflow_succeeded_event (
self ,
event : QueueWorkflowSucceededEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow succeeded events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
if not graph_runtime_state :
raise ValueError ( " workflow run not initialized. " )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated _state. total_tokens ,
total_steps = validated _state. node_run_steps ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
outputs = event . outputs ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
@ -568,30 +515,24 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self . _base_task_pipeline . _queue_manager . publish ( QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE )
def _handle_workflow_partial_success_event (
self ,
event : QueueWorkflowPartialSuccessEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow partial success events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
self . _base_task_pipeline . _queue_manager . publish (
QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE
)
elif isinstance ( event , QueueWorkflowPartialSuccessEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_partial_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated _state. total_tokens ,
total_steps = validated _state. node_run_steps ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime _state. node_run_steps ,
outputs = event . outputs ,
exceptions_count = event . exceptions_count ,
conversation_id = None ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
@ -600,31 +541,25 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self . _base_task_pipeline . _queue_manager . publish ( QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE )
def _handle_workflow_failed_event (
self ,
event : QueueWorkflowFailedEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow failed events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
self . _base_task_pipeline . _queue_manager . publish (
QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE
)
elif isinstance ( event , QueueWorkflowFailedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated _state. total_tokens ,
total_steps = validated _state. node_run_steps ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime _state. node_run_steps ,
status = WorkflowExecutionStatus . FAILED ,
error_message = event . error ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
exceptions_count = event . exceptions_count ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
@ -632,22 +567,16 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_execution = workflow_execution ,
)
err_event = QueueErrorEvent ( error = ValueError ( f " Run failed: { workflow_execution . error_message } " ) )
err = self . _base_task_pipeline . _handle_error ( event = err_event , session = session , message_id = self . _message_id )
err = self . _base_task_pipeline . _handle_error (
event = err_event , session = session , message_id = self . _message_id
)
yield workflow_finish_resp
yield self . _base_task_pipeline . _error_to_stream_response ( err )
def _handle_stop_event (
self ,
event : QueueStopEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle stop events. """
break
elif isinstance ( event , QueueStopEvent ) :
if self . _workflow_run_id and graph_runtime_state :
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
@ -656,7 +585,6 @@ class AdvancedChatAppGenerateTaskPipeline:
error_message = event . get_stop_reason ( ) ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
@ -665,6 +593,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
# Save message
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
session . commit ( )
yield workflow_finish_resp
elif event . stopped_by in (
@ -672,221 +601,78 @@ class AdvancedChatAppGenerateTaskPipeline:
QueueStopEvent . StopBy . ANNOTATION_REPLY ,
) :
# When hitting input-moderation or annotation-reply, the workflow will not start
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
# Save message
self . _save_message ( session = session )
session . commit ( )
yield self . _message_end_to_stream_response ( )
def _handle_advanced_chat_message_end_event (
self ,
event : QueueAdvancedChatMessageEndEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle advanced chat message end events. """
self . _ensure_graph_runtime_initialized ( graph_runtime_state )
output_moderation_answer = self . _base_task_pipeline . _handle_output_moderation_when_task_finished (
self . _task_state . answer
)
if output_moderation_answer :
self . _task_state . answer = output_moderation_answer
yield self . _message_cycle_manager . message_replace_to_stream_response (
answer = output_moderation_answer ,
reason = QueueMessageReplaceEvent . MessageReplaceReason . OUTPUT_MODERATION ,
)
# Save message
with self . _database_session ( ) as session :
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
yield self . _message_end_to_stream_response ( )
def _handle_retriever_resources_event (
self , event : QueueRetrieverResourcesEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle retriever resources events. """
break
elif isinstance ( event , QueueRetrieverResourcesEvent ) :
self . _message_cycle_manager . handle_retriever_resources ( event )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
message = self . _get_message ( session = session )
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
return
yield # Make this a generator
def _handle_annotation_reply_event (
self , event : QueueAnnotationReplyEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle annotation reply events. """
session . commit ( )
elif isinstance ( event , QueueAnnotationReplyEvent ) :
self . _message_cycle_manager . handle_annotation_reply ( event )
with self . _database_session ( ) as session :
with Session ( db . engine , expire_on_commit = False ) as session :
message = self . _get_message ( session = session )
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
return
yield # Make this a generator
def _handle_message_replace_event (
self , event : QueueMessageReplaceEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle message replace events. """
yield self . _message_cycle_manager . message_replace_to_stream_response ( answer = event . text , reason = event . reason )
def _handle_agent_log_event ( self , event : QueueAgentLogEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle agent log events. """
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
)
session . commit ( )
elif isinstance ( event , QueueTextChunkEvent ) :
delta_text = event . text
if delta_text is None :
continue
def _get_event_handlers ( self ) - > dict [ type , Callable ] :
""" Get mapping of event types to their handlers using fluent pattern. """
return {
# Basic events
QueuePingEvent : self . _handle_ping_event ,
QueueErrorEvent : self . _handle_error_event ,
QueueTextChunkEvent : self . _handle_text_chunk_event ,
# Workflow events
QueueWorkflowStartedEvent : self . _handle_workflow_started_event ,
QueueWorkflowSucceededEvent : self . _handle_workflow_succeeded_event ,
QueueWorkflowPartialSuccessEvent : self . _handle_workflow_partial_success_event ,
QueueWorkflowFailedEvent : self . _handle_workflow_failed_event ,
# Node events
QueueNodeRetryEvent : self . _handle_node_retry_event ,
QueueNodeStartedEvent : self . _handle_node_started_event ,
QueueNodeSucceededEvent : self . _handle_node_succeeded_event ,
# Parallel branch events
QueueParallelBranchRunStartedEvent : self . _handle_parallel_branch_started_event ,
# Iteration events
QueueIterationStartEvent : self . _handle_iteration_start_event ,
QueueIterationNextEvent : self . _handle_iteration_next_event ,
QueueIterationCompletedEvent : self . _handle_iteration_completed_event ,
# Loop events
QueueLoopStartEvent : self . _handle_loop_start_event ,
QueueLoopNextEvent : self . _handle_loop_next_event ,
QueueLoopCompletedEvent : self . _handle_loop_completed_event ,
# Control events
QueueStopEvent : self . _handle_stop_event ,
# Message events
QueueRetrieverResourcesEvent : self . _handle_retriever_resources_event ,
QueueAnnotationReplyEvent : self . _handle_annotation_reply_event ,
QueueMessageReplaceEvent : self . _handle_message_replace_event ,
QueueAdvancedChatMessageEndEvent : self . _handle_advanced_chat_message_end_event ,
QueueAgentLogEvent : self . _handle_agent_log_event ,
}
def _dispatch_event (
self ,
event : Any ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
) - > Generator [ StreamResponse , None , None ] :
""" Dispatch events using elegant pattern matching. """
handlers = self . _get_event_handlers ( )
event_type = type ( event )
# handle output moderation chunk
should_direct_answer = self . _handle_output_moderation_chunk ( delta_text )
if should_direct_answer :
continue
# Direct handler lookup
if handler := handlers . get ( event_type ) :
yield from handler (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
# only publish tts message at text chunk streaming
if tts_publisher :
tts_publisher . publish ( queue_message )
# Handle node failure events with isinstance check
if isinstance (
event ,
(
QueueNodeFailedEvent ,
QueueNodeInIterationFailedEvent ,
QueueNodeInLoopFailedEvent ,
QueueNodeExceptionEvent ,
) ,
) :
yield from self . _handle_node_failed_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
self . _task_state . answer + = delta_text
yield self . _message_cycle_manager . message_to_stream_response (
answer = delta_text , message_id = self . _message_id , from_variable_selector = event . from_variable_selector
)
return
# Handle parallel branch finished events with isinstance check
if isinstance ( event , ( QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ) ) :
yield from self . _handle_parallel_branch_finished_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
elif isinstance ( event , QueueMessageReplaceEvent ) :
# published by moderation
yield self . _message_cycle_manager . message_replace_to_stream_response (
answer = event . text , reason = event . reason
)
return
# For unhandled events, we continue (original behavior)
return
def _process_stream_response (
self ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
) - > Generator [ StreamResponse , None , None ] :
"""
Process stream response using elegant Fluent Python patterns .
Maintains exact same functionality as original 57 - if - statement version .
"""
# Initialize graph runtime state
graph_runtime_state : Optional [ GraphRuntimeState ] = None
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
match event :
case QueueWorkflowStartedEvent ( ) :
graph_runtime_state = event . graph_runtime_state
yield from self . _handle_workflow_started_event ( event )
elif isinstance ( event , QueueAdvancedChatMessageEndEvent ) :
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
case QueueTextChunkEvent ( ) :
yield from self . _handle_text_chunk_event (
event , tts_publisher = tts_publisher , queue_message = queue_message
output_moderation_answer = self . _base_task_pipeline . _handle_output_moderation_when_task_finished (
self . _task_state . answer
)
case QueueErrorEvent ( ) :
yield from self . _handle_error_event ( event )
break
case QueueWorkflowFailedEvent ( ) :
yield from self . _handle_workflow_failed_event (
event , graph_runtime_state = graph_runtime_state , trace_manager = trace_manager
if output_moderation_answer :
self . _task_state . answer = output_moderation_answer
yield self . _message_cycle_manager . message_replace_to_stream_response (
answer = output_moderation_answer ,
reason = QueueMessageReplaceEvent . MessageReplaceReason . OUTPUT_MODERATION ,
)
break
case QueueStopEvent ( ) :
yield from self . _handle_stop_event (
event , graph_runtime_state = graph_runtime_state , trace_manager = trace_manager
)
break
# Save message
with Session ( db . engine , expire_on_commit = False ) as session :
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
session . commit ( )
# Handle all other events through elegant dispatch
case _ :
if responses := list (
self . _dispatch_event (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
yield self . _message_end_to_stream_response ( )
elif isinstance ( event , QueueAgentLogEvent ) :
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
)
) :
yield from responses
else :
continue
# publish None when task finished
if tts_publisher :
tts_publisher . publish ( None )
@ -958,6 +744,7 @@ class AdvancedChatAppGenerateTaskPipeline:
"""
if self . _base_task_pipeline . _output_moderation_handler :
if self . _base_task_pipeline . _output_moderation_handler . should_direct_output ( ) :
# stop subscribe new token when output moderation should direct output
self . _task_state . answer = self . _base_task_pipeline . _output_moderation_handler . get_final_output ( )
self . _base_task_pipeline . _queue_manager . publish (
QueueTextChunkEvent ( text = self . _task_state . answer ) , PublishFrom . TASK_PIPELINE