@ -1,6 +1,7 @@
import logging
import logging
import time
import time
from collections . abc import Generator , Mapping
from collections . abc import Callable , Generator , Mapping
from contextlib import contextmanager
from threading import Thread
from threading import Thread
from typing import Any , Optional , Union
from typing import Any , Optional , Union
@ -15,6 +16,7 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom ,
InvokeFrom ,
)
)
from core . app . entities . queue_entities import (
from core . app . entities . queue_entities import (
MessageQueueMessage ,
QueueAdvancedChatMessageEndEvent ,
QueueAdvancedChatMessageEndEvent ,
QueueAgentLogEvent ,
QueueAgentLogEvent ,
QueueAnnotationReplyEvent ,
QueueAnnotationReplyEvent ,
@ -44,6 +46,7 @@ from core.app.entities.queue_entities import (
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowSucceededEvent ,
QueueWorkflowSucceededEvent ,
WorkflowQueueMessage ,
)
)
from core . app . entities . task_entities import (
from core . app . entities . task_entities import (
ChatbotAppBlockingResponse ,
ChatbotAppBlockingResponse ,
@ -52,6 +55,7 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse ,
MessageAudioEndStreamResponse ,
MessageAudioStreamResponse ,
MessageAudioStreamResponse ,
MessageEndStreamResponse ,
MessageEndStreamResponse ,
PingStreamResponse ,
StreamResponse ,
StreamResponse ,
WorkflowTaskState ,
WorkflowTaskState ,
)
)
@ -61,12 +65,12 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core . model_runtime . entities . llm_entities import LLMUsage
from core . model_runtime . entities . llm_entities import LLMUsage
from core . ops . ops_trace_manager import TraceQueueManager
from core . ops . ops_trace_manager import TraceQueueManager
from core . workflow . entities . workflow_execution import WorkflowExecutionStatus , WorkflowType
from core . workflow . entities . workflow_execution import WorkflowExecutionStatus , WorkflowType
from core . workflow . enums import SystemVariableKey
from core . workflow . graph_engine . entities . graph_runtime_state import GraphRuntimeState
from core . workflow . graph_engine . entities . graph_runtime_state import GraphRuntimeState
from core . workflow . nodes import NodeType
from core . workflow . nodes import NodeType
from core . workflow . repositories . draft_variable_repository import DraftVariableSaverFactory
from core . workflow . repositories . draft_variable_repository import DraftVariableSaverFactory
from core . workflow . repositories . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositories . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositories . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core . workflow . repositories . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core . workflow . system_variable import SystemVariable
from core . workflow . workflow_cycle_manager import CycleManagerWorkflowInfo , WorkflowCycleManager
from core . workflow . workflow_cycle_manager import CycleManagerWorkflowInfo , WorkflowCycleManager
from events . message_event import message_was_created
from events . message_event import message_was_created
from extensions . ext_database import db
from extensions . ext_database import db
@ -116,16 +120,16 @@ class AdvancedChatAppGenerateTaskPipeline:
self . _workflow_cycle_manager = WorkflowCycleManager (
self . _workflow_cycle_manager = WorkflowCycleManager (
application_generate_entity = application_generate_entity ,
application_generate_entity = application_generate_entity ,
workflow_system_variables = {
workflow_system_variables = SystemVariable (
SystemVariableKey. QUERY : message . query ,
query= message . query ,
SystemVariableKey. FILES : application_generate_entity . files ,
files= application_generate_entity . files ,
SystemVariableKey. CONVERSATION_ID : conversation . id ,
conversation_id= conversation . id ,
SystemVariableKey. USER_ID : user_session_id ,
user_id= user_session_id ,
SystemVariableKey. DIALOGUE_COUNT : dialogue_count ,
dialogue_count= dialogue_count ,
SystemVariableKey. APP_ID : application_generate_entity . app_config . app_id ,
app_id= application_generate_entity . app_config . app_id ,
SystemVariableKey. WORKFLOW_ID : workflow . id ,
workflow_id= workflow . id ,
SystemVariableKey. WORKFLOW_EXECUTION_ID : application_generate_entity . workflow_run_id ,
workflow_execution_id= application_generate_entity . workflow_run_id ,
} ,
) ,
workflow_info = CycleManagerWorkflowInfo (
workflow_info = CycleManagerWorkflowInfo (
workflow_id = workflow . id ,
workflow_id = workflow . id ,
workflow_type = WorkflowType ( workflow . type ) ,
workflow_type = WorkflowType ( workflow . type ) ,
@ -162,7 +166,6 @@ class AdvancedChatAppGenerateTaskPipeline:
Process generate task pipeline .
Process generate task pipeline .
: return :
: return :
"""
"""
# start generate conversation name thread
self . _conversation_name_generate_thread = self . _message_cycle_manager . generate_conversation_name (
self . _conversation_name_generate_thread = self . _message_cycle_manager . generate_conversation_name (
conversation_id = self . _conversation_id , query = self . _application_generate_entity . query
conversation_id = self . _conversation_id , query = self . _application_generate_entity . query
)
)
@ -254,15 +257,12 @@ class AdvancedChatAppGenerateTaskPipeline:
yield response
yield response
start_listener_time = time . time ( )
start_listener_time = time . time ( )
# timeout
while ( time . time ( ) - start_listener_time ) < TTS_AUTO_PLAY_TIMEOUT :
while ( time . time ( ) - start_listener_time ) < TTS_AUTO_PLAY_TIMEOUT :
try :
try :
if not tts_publisher :
if not tts_publisher :
break
break
audio_trunk = tts_publisher . check_and_get_audio ( )
audio_trunk = tts_publisher . check_and_get_audio ( )
if audio_trunk is None :
if audio_trunk is None :
# release cpu
# sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file)
time . sleep ( TTS_AUTO_PLAY_YIELD_CPU_TIME )
time . sleep ( TTS_AUTO_PLAY_YIELD_CPU_TIME )
continue
continue
if audio_trunk . status == " finish " :
if audio_trunk . status == " finish " :
@ -276,58 +276,66 @@ class AdvancedChatAppGenerateTaskPipeline:
if tts_publisher :
if tts_publisher :
yield MessageAudioEndStreamResponse ( audio = " " , task_id = task_id )
yield MessageAudioEndStreamResponse ( audio = " " , task_id = task_id )
def _process_stream_response (
@contextmanager
self ,
def _database_session ( self ) :
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
""" Context manager for database sessions. """
trace_manager : Optional [ TraceQueueManager ] = None ,
with Session ( db . engine , expire_on_commit = False ) as session :
) - > Generator [ StreamResponse , None , None ] :
try :
"""
yield session
Process stream response .
session . commit ( )
: return :
except Exception :
"""
session . rollback ( )
# init fake graph runtime state
raise
graph_runtime_state : Optional [ GraphRuntimeState ] = None
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
def _ensure_workflow_initialized ( self ) - > None :
event = queue_message . event
""" Fluent validation for workflow state. """
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _ensure_graph_runtime_initialized ( self , graph_runtime_state : Optional [ GraphRuntimeState ] ) - > GraphRuntimeState :
""" Fluent validation for graph runtime state. """
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
return graph_runtime_state
if isinstance ( event , QueuePingEvent ) :
def _handle_ping_event ( self , event : QueuePingEvent , * * kwargs ) - > Generator [ PingStreamResponse , None , None ] :
""" Handle ping events. """
yield self . _base_task_pipeline . _ping_stream_response ( )
yield self . _base_task_pipeline . _ping_stream_response ( )
elif isinstance ( event , QueueErrorEvent ) :
with Session ( db . engine , expire_on_commit = False ) as session :
def _handle_error_event ( self , event : QueueErrorEvent , * * kwargs ) - > Generator [ ErrorStreamResponse , None , None ] :
err = self . _base_task_pipeline . _handle_error (
""" Handle error events. """
event = event , session = session , message_id = self . _message_id
with self . _database_session ( ) as session :
)
err = self . _base_task_pipeline . _handle_error ( event = event , session = session , message_id = self . _message_id )
session . commit ( )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
break
elif isinstance ( event , QueueWorkflowStartedEvent ) :
def _handle_workflow_started_event (
# override graph runtime state
self , event : QueueWorkflowStartedEvent , * , graph_runtime_state : Optional [ GraphRuntimeState ] = None , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow started events. """
# Override graph runtime state - this is a side effect but necessary
graph_runtime_state = event . graph_runtime_state
graph_runtime_state = event . graph_runtime_state
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
# init workflow run
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_start ( )
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_start ( )
self . _workflow_run_id = workflow_execution . id_
self . _workflow_run_id = workflow_execution . id_
message = self . _get_message ( session = session )
message = self . _get_message ( session = session )
if not message :
if not message :
raise ValueError ( f " Message not found: { self . _message_id } " )
raise ValueError ( f " Message not found: { self . _message_id } " )
message . workflow_run_id = workflow_execution . id_
message . workflow_run_id = workflow_execution . id_
workflow_start_resp = self . _workflow_response_converter . workflow_start_to_stream_response (
workflow_start_resp = self . _workflow_response_converter . workflow_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
workflow_execution = workflow_execution ,
)
)
session . commit ( )
yield workflow_start_resp
yield workflow_start_resp
elif isinstance (
event ,
QueueNodeRetryEvent ,
) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
def _handle_node_retry_event ( self , event : QueueNodeRetryEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle node retry events. """
self . _ensure_workflow_initialized ( )
with self . _database_session ( ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_retried (
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_retried (
workflow_execution_id = self . _workflow_run_id , event = event
workflow_execution_id = self . _workflow_run_id , event = event
)
)
@ -336,13 +344,15 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
workflow_node_execution = workflow_node_execution ,
)
)
session . commit ( )
if node_retry_resp :
if node_retry_resp :
yield node_retry_resp
yield node_retry_resp
elif isinstance ( event , QueueNodeStartedEvent ) :
if not self . _workflow_run_id :
def _handle_node_started_event (
raise ValueError ( " workflow run not initialized. " )
self , event : QueueNodeStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node started events. """
self . _ensure_workflow_initialized ( )
workflow_node_execution = self . _workflow_cycle_manager . handle_node_execution_start (
workflow_node_execution = self . _workflow_cycle_manager . handle_node_execution_start (
workflow_execution_id = self . _workflow_run_id , event = event
workflow_execution_id = self . _workflow_run_id , event = event
@ -356,158 +366,201 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_start_resp :
if node_start_resp :
yield node_start_resp
yield node_start_resp
elif isinstance ( event , QueueNodeSucceededEvent ) :
def _handle_node_succeeded_event (
self , event : QueueNodeSucceededEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node succeeded events. """
# Record files if it's an answer node or end node
# Record files if it's an answer node or end node
if event . node_type in [ NodeType . ANSWER , NodeType . END ] :
if event . node_type in [ NodeType . ANSWER , NodeType . END ] :
self . _recorded_files . extend (
self . _recorded_files . extend (
self . _workflow_response_converter . fetch_files_from_node_outputs ( event . outputs or { } )
self . _workflow_response_converter . fetch_files_from_node_outputs ( event . outputs or { } )
)
)
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success (
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success ( event = event )
event = event
)
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
event = event ,
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
workflow_node_execution = workflow_node_execution ,
)
)
session . commit ( )
self . _save_output_for_event ( event , workflow_node_execution . id )
self . _save_output_for_event ( event , workflow_node_execution . id )
if node_finish_resp :
if node_finish_resp :
yield node_finish_resp
yield node_finish_resp
elif isinstance (
event ,
def _handle_node_failed_events (
QueueNodeFailedEvent
self ,
| QueueNodeInIterationFailedEvent
event : Union [
| QueueNodeInLoopFailed Event
QueueNodeFailedEvent , QueueNodeInIterationFailedEvent , QueueNodeInLoopFailedEvent , QueueNodeException Event
| QueueNodeExceptionEvent ,
] ,
) :
* * kwargs ,
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed (
) - > Generator [ StreamResponse , None , None ] :
event = event
""" Handle various node failure events. """
)
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed ( event = event )
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
node_finish_resp = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
event = event ,
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
workflow_node_execution = workflow_node_execution ,
)
)
if isinstance ( event , QueueNodeExceptionEvent ) :
if isinstance ( event , QueueNodeExceptionEvent ) :
self . _save_output_for_event ( event , workflow_node_execution . id )
self . _save_output_for_event ( event , workflow_node_execution . id )
if node_finish_resp :
if node_finish_resp :
yield node_finish_resp
yield node_finish_resp
elif isinstance ( event , QueueParallelBranchRunStartedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
parallel_start_resp = (
def _handle_text_chunk_event (
self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
self ,
event : QueueTextChunkEvent ,
* ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle text chunk events. """
delta_text = event . text
if delta_text is None :
return
# Handle output moderation chunk
should_direct_answer = self . _handle_output_moderation_chunk ( delta_text )
if should_direct_answer :
return
# Only publish tts message at text chunk streaming
if tts_publisher and queue_message :
tts_publisher . publish ( queue_message )
self . _task_state . answer + = delta_text
yield self . _message_cycle_manager . message_to_stream_response (
answer = delta_text , message_id = self . _message_id , from_variable_selector = event . from_variable_selector
)
def _handle_parallel_branch_started_event (
self , event : QueueParallelBranchRunStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch started events. """
self . _ensure_workflow_initialized ( )
parallel_start_resp = self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
)
yield parallel_start_resp
yield parallel_start_resp
elif isinstance ( event , QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
parallel_finish_resp = (
def _handle_parallel_branch_finished_events (
self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
self , event : Union [ QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ] , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch finished events. """
self . _ensure_workflow_initialized ( )
parallel_finish_resp = self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
)
yield parallel_finish_resp
yield parallel_finish_resp
elif isinstance ( event , QueueIterationStartEvent ) :
if not self . _workflow_run_id :
def _handle_iteration_start_event (
raise ValueError ( " workflow run not initialized. " )
self , event : QueueIterationStartEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration start events. """
self . _ensure_workflow_initialized ( )
iter_start_resp = self . _workflow_response_converter . workflow_iteration_start_to_stream_response (
iter_start_resp = self . _workflow_response_converter . workflow_iteration_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield iter_start_resp
yield iter_start_resp
elif isinstance ( event , QueueIterationNextEvent ) :
if not self . _workflow_run_id :
def _handle_iteration_next_event (
raise ValueError ( " workflow run not initialized. " )
self , event : QueueIterationNextEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration next events. """
self . _ensure_workflow_initialized ( )
iter_next_resp = self . _workflow_response_converter . workflow_iteration_next_to_stream_response (
iter_next_resp = self . _workflow_response_converter . workflow_iteration_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield iter_next_resp
yield iter_next_resp
elif isinstance ( event , QueueIterationCompletedEvent ) :
if not self . _workflow_run_id :
def _handle_iteration_completed_event (
raise ValueError ( " workflow run not initialized. " )
self , event : QueueIterationCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration completed events. """
self . _ensure_workflow_initialized ( )
iter_finish_resp = self . _workflow_response_converter . workflow_iteration_completed_to_stream_response (
iter_finish_resp = self . _workflow_response_converter . workflow_iteration_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield iter_finish_resp
yield iter_finish_resp
elif isinstance ( event , QueueLoopStartEvent ) :
if not self . _workflow_run_id :
def _handle_loop_start_event ( self , event : QueueLoopStartEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
raise ValueError ( " workflow run not initialized. " )
""" Handle loop start events. """
self . _ensure_workflow_initialized ( )
loop_start_resp = self . _workflow_response_converter . workflow_loop_start_to_stream_response (
loop_start_resp = self . _workflow_response_converter . workflow_loop_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield loop_start_resp
yield loop_start_resp
elif isinstance ( event , QueueLoopNextEvent ) :
if not self . _workflow_run_id :
def _handle_loop_next_event ( self , event : QueueLoopNextEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
raise ValueError ( " workflow run not initialized. " )
""" Handle loop next events. """
self . _ensure_workflow_initialized ( )
loop_next_resp = self . _workflow_response_converter . workflow_loop_next_to_stream_response (
loop_next_resp = self . _workflow_response_converter . workflow_loop_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield loop_next_resp
yield loop_next_resp
elif isinstance ( event , QueueLoopCompletedEvent ) :
if not self . _workflow_run_id :
def _handle_loop_completed_event (
raise ValueError ( " workflow run not initialized. " )
self , event : QueueLoopCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle loop completed events. """
self . _ensure_workflow_initialized ( )
loop_finish_resp = self . _workflow_response_converter . workflow_loop_completed_to_stream_response (
loop_finish_resp = self . _workflow_response_converter . workflow_loop_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
event = event ,
)
)
yield loop_finish_resp
yield loop_finish_resp
elif isinstance ( event , QueueWorkflowSucceededEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
def _handle_workflow_succeeded_event (
raise ValueError ( " workflow run not initialized. " )
self ,
event : QueueWorkflowSucceededEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow succeeded events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_success (
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_success (
workflow_run_id = self . _workflow_run_id ,
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_tokens = validated _state. total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
total_steps = validated _state. node_run_steps ,
outputs = event . outputs ,
outputs = event . outputs ,
conversation_id = self . _conversation_id ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
session = session ,
task_id = self . _application_generate_entity . task_id ,
task_id = self . _application_generate_entity . task_id ,
@ -515,24 +568,30 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield workflow_finish_resp
yield workflow_finish_resp
self . _base_task_pipeline . _queue_manager . publish (
self . _base_task_pipeline . _queue_manager . publish ( QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE )
QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE
)
elif isinstance ( event , QueueWorkflowPartialSuccessEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
def _handle_workflow_partial_success_event (
self ,
event : QueueWorkflowPartialSuccessEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow partial success events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_partial_success (
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_partial_success (
workflow_run_id = self . _workflow_run_id ,
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_tokens = validated _state. total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
total_steps = validated _state. node_run_steps ,
outputs = event . outputs ,
outputs = event . outputs ,
exceptions_count = event . exceptions_count ,
exceptions_count = event . exceptions_count ,
conversation_id = None ,
conversation_id = None ,
trace_manager = trace_manager ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
session = session ,
@ -541,25 +600,31 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield workflow_finish_resp
yield workflow_finish_resp
self . _base_task_pipeline . _queue_manager . publish (
self . _base_task_pipeline . _queue_manager . publish ( QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE )
QueueAdvancedChatMessageEndEvent ( ) , PublishFrom . TASK_PIPELINE
)
elif isinstance ( event , QueueWorkflowFailedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
def _handle_workflow_failed_event (
self ,
event : QueueWorkflowFailedEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow failed events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_tokens = validated _state. total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
total_steps = validated _state. node_run_steps ,
status = WorkflowExecutionStatus . FAILED ,
status = WorkflowExecutionStatus . FAILED ,
error_message = event . error ,
error_message = event . error ,
conversation_id = self . _conversation_id ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
trace_manager = trace_manager ,
exceptions_count = event . exceptions_count ,
exceptions_count = event . exceptions_count ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
session = session ,
@ -567,16 +632,22 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_execution = workflow_execution ,
workflow_execution = workflow_execution ,
)
)
err_event = QueueErrorEvent ( error = ValueError ( f " Run failed: { workflow_execution . error_message } " ) )
err_event = QueueErrorEvent ( error = ValueError ( f " Run failed: { workflow_execution . error_message } " ) )
err = self . _base_task_pipeline . _handle_error (
err = self . _base_task_pipeline . _handle_error ( event = err_event , session = session , message_id = self . _message_id )
event = err_event , session = session , message_id = self . _message_id
)
yield workflow_finish_resp
yield workflow_finish_resp
yield self . _base_task_pipeline . _error_to_stream_response ( err )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
break
elif isinstance ( event , QueueStopEvent ) :
def _handle_stop_event (
self ,
event : QueueStopEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle stop events. """
if self . _workflow_run_id and graph_runtime_state :
if self . _workflow_run_id and graph_runtime_state :
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_tokens = graph_runtime_state . total_tokens ,
@ -585,6 +656,7 @@ class AdvancedChatAppGenerateTaskPipeline:
error_message = event . get_stop_reason ( ) ,
error_message = event . get_stop_reason ( ) ,
conversation_id = self . _conversation_id ,
conversation_id = self . _conversation_id ,
trace_manager = trace_manager ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
session = session ,
@ -593,7 +665,6 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
# Save message
# Save message
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
session . commit ( )
yield workflow_finish_resp
yield workflow_finish_resp
elif event . stopped_by in (
elif event . stopped_by in (
@ -601,53 +672,21 @@ class AdvancedChatAppGenerateTaskPipeline:
QueueStopEvent . StopBy . ANNOTATION_REPLY ,
QueueStopEvent . StopBy . ANNOTATION_REPLY ,
) :
) :
# When hitting input-moderation or annotation-reply, the workflow will not start
# When hitting input-moderation or annotation-reply, the workflow will not start
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
# Save message
# Save message
self . _save_message ( session = session )
self . _save_message ( session = session )
session . commit ( )
yield self . _message_end_to_stream_response ( )
yield self . _message_end_to_stream_response ( )
break
elif isinstance ( event , QueueRetrieverResourcesEvent ) :
self . _message_cycle_manager . handle_retriever_resources ( event )
with Session ( db . engine , expire_on_commit = False ) as session :
def _handle_advanced_chat_message_end_event (
message = self . _get_message ( session = session )
self ,
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
event : QueueAdvancedChatMessageEndEvent ,
session . commit ( )
* ,
elif isinstance ( event , QueueAnnotationReplyEvent ) :
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
self . _message_cycle_manager . handle_annotation_reply ( event )
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
with Session ( db . engine , expire_on_commit = False ) as session :
""" Handle advanced chat message end events. """
message = self . _get_message ( session = session )
self . _ensure_graph_runtime_initialized ( graph_runtime_state )
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
session . commit ( )
elif isinstance ( event , QueueTextChunkEvent ) :
delta_text = event . text
if delta_text is None :
continue
# handle output moderation chunk
should_direct_answer = self . _handle_output_moderation_chunk ( delta_text )
if should_direct_answer :
continue
# only publish tts message at text chunk streaming
if tts_publisher :
tts_publisher . publish ( queue_message )
self . _task_state . answer + = delta_text
yield self . _message_cycle_manager . message_to_stream_response (
answer = delta_text , message_id = self . _message_id , from_variable_selector = event . from_variable_selector
)
elif isinstance ( event , QueueMessageReplaceEvent ) :
# published by moderation
yield self . _message_cycle_manager . message_replace_to_stream_response (
answer = event . text , reason = event . reason
)
elif isinstance ( event , QueueAdvancedChatMessageEndEvent ) :
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
output_moderation_answer = self . _base_task_pipeline . _handle_output_moderation_when_task_finished (
output_moderation_answer = self . _base_task_pipeline . _handle_output_moderation_when_task_finished (
self . _task_state . answer
self . _task_state . answer
@ -660,19 +699,194 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
# Save message
# Save message
with Session ( db . engine , expire_on_commit = False ) as session :
with self . _database_session ( ) as session :
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
self . _save_message ( session = session , graph_runtime_state = graph_runtime_state )
session . commit ( )
yield self . _message_end_to_stream_response ( )
yield self . _message_end_to_stream_response ( )
elif isinstance ( event , QueueAgentLogEvent ) :
def _handle_retriever_resources_event (
self , event : QueueRetrieverResourcesEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle retriever resources events. """
self . _message_cycle_manager . handle_retriever_resources ( event )
with self . _database_session ( ) as session :
message = self . _get_message ( session = session )
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
return
yield # Make this a generator
def _handle_annotation_reply_event (
self , event : QueueAnnotationReplyEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle annotation reply events. """
self . _message_cycle_manager . handle_annotation_reply ( event )
with self . _database_session ( ) as session :
message = self . _get_message ( session = session )
message . message_metadata = self . _task_state . metadata . model_dump_json ( )
return
yield # Make this a generator
def _handle_message_replace_event (
self , event : QueueMessageReplaceEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle message replace events. """
yield self . _message_cycle_manager . message_replace_to_stream_response ( answer = event . text , reason = event . reason )
def _handle_agent_log_event ( self , event : QueueAgentLogEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle agent log events. """
yield self . _workflow_response_converter . handle_agent_log (
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
task_id = self . _application_generate_entity . task_id , event = event
)
)
else :
continue
# publish None when task finished
def _get_event_handlers ( self ) - > dict [ type , Callable ] :
""" Get mapping of event types to their handlers using fluent pattern. """
return {
# Basic events
QueuePingEvent : self . _handle_ping_event ,
QueueErrorEvent : self . _handle_error_event ,
QueueTextChunkEvent : self . _handle_text_chunk_event ,
# Workflow events
QueueWorkflowStartedEvent : self . _handle_workflow_started_event ,
QueueWorkflowSucceededEvent : self . _handle_workflow_succeeded_event ,
QueueWorkflowPartialSuccessEvent : self . _handle_workflow_partial_success_event ,
QueueWorkflowFailedEvent : self . _handle_workflow_failed_event ,
# Node events
QueueNodeRetryEvent : self . _handle_node_retry_event ,
QueueNodeStartedEvent : self . _handle_node_started_event ,
QueueNodeSucceededEvent : self . _handle_node_succeeded_event ,
# Parallel branch events
QueueParallelBranchRunStartedEvent : self . _handle_parallel_branch_started_event ,
# Iteration events
QueueIterationStartEvent : self . _handle_iteration_start_event ,
QueueIterationNextEvent : self . _handle_iteration_next_event ,
QueueIterationCompletedEvent : self . _handle_iteration_completed_event ,
# Loop events
QueueLoopStartEvent : self . _handle_loop_start_event ,
QueueLoopNextEvent : self . _handle_loop_next_event ,
QueueLoopCompletedEvent : self . _handle_loop_completed_event ,
# Control events
QueueStopEvent : self . _handle_stop_event ,
# Message events
QueueRetrieverResourcesEvent : self . _handle_retriever_resources_event ,
QueueAnnotationReplyEvent : self . _handle_annotation_reply_event ,
QueueMessageReplaceEvent : self . _handle_message_replace_event ,
QueueAdvancedChatMessageEndEvent : self . _handle_advanced_chat_message_end_event ,
QueueAgentLogEvent : self . _handle_agent_log_event ,
}
def _dispatch_event (
self ,
event : Any ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
) - > Generator [ StreamResponse , None , None ] :
""" Dispatch events using elegant pattern matching. """
handlers = self . _get_event_handlers ( )
event_type = type ( event )
# Direct handler lookup
if handler := handlers . get ( event_type ) :
yield from handler (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
# Handle node failure events with isinstance check
if isinstance (
event ,
(
QueueNodeFailedEvent ,
QueueNodeInIterationFailedEvent ,
QueueNodeInLoopFailedEvent ,
QueueNodeExceptionEvent ,
) ,
) :
yield from self . _handle_node_failed_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
# Handle parallel branch finished events with isinstance check
if isinstance ( event , ( QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ) ) :
yield from self . _handle_parallel_branch_finished_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
# For unhandled events, we continue (original behavior)
return
def _process_stream_response (
self ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
) - > Generator [ StreamResponse , None , None ] :
"""
Process stream response using elegant Fluent Python patterns .
Maintains exact same functionality as original 57 - if - statement version .
"""
# Initialize graph runtime state
graph_runtime_state : Optional [ GraphRuntimeState ] = None
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
match event :
case QueueWorkflowStartedEvent ( ) :
graph_runtime_state = event . graph_runtime_state
yield from self . _handle_workflow_started_event ( event )
case QueueTextChunkEvent ( ) :
yield from self . _handle_text_chunk_event (
event , tts_publisher = tts_publisher , queue_message = queue_message
)
case QueueErrorEvent ( ) :
yield from self . _handle_error_event ( event )
break
case QueueWorkflowFailedEvent ( ) :
yield from self . _handle_workflow_failed_event (
event , graph_runtime_state = graph_runtime_state , trace_manager = trace_manager
)
break
case QueueStopEvent ( ) :
yield from self . _handle_stop_event (
event , graph_runtime_state = graph_runtime_state , trace_manager = trace_manager
)
break
# Handle all other events through elegant dispatch
case _ :
if responses := list (
self . _dispatch_event (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
) :
yield from responses
if tts_publisher :
if tts_publisher :
tts_publisher . publish ( None )
tts_publisher . publish ( None )
@ -744,7 +958,6 @@ class AdvancedChatAppGenerateTaskPipeline:
"""
"""
if self . _base_task_pipeline . _output_moderation_handler :
if self . _base_task_pipeline . _output_moderation_handler :
if self . _base_task_pipeline . _output_moderation_handler . should_direct_output ( ) :
if self . _base_task_pipeline . _output_moderation_handler . should_direct_output ( ) :
# stop subscribe new token when output moderation should direct output
self . _task_state . answer = self . _base_task_pipeline . _output_moderation_handler . get_final_output ( )
self . _task_state . answer = self . _base_task_pipeline . _output_moderation_handler . get_final_output ( )
self . _base_task_pipeline . _queue_manager . publish (
self . _base_task_pipeline . _queue_manager . publish (
QueueTextChunkEvent ( text = self . _task_state . answer ) , PublishFrom . TASK_PIPELINE
QueueTextChunkEvent ( text = self . _task_state . answer ) , PublishFrom . TASK_PIPELINE