@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom ,
InvokeFrom ,
)
)
from core . app . entities . queue_entities import (
from core . app . entities . queue_entities import (
MessageQueueMessage ,
QueueAdvancedChatMessageEndEvent ,
QueueAdvancedChatMessageEndEvent ,
QueueAgentLogEvent ,
QueueAgentLogEvent ,
QueueAnnotationReplyEvent ,
QueueAnnotationReplyEvent ,
@ -45,6 +46,7 @@ from core.app.entities.queue_entities import (
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowSucceededEvent ,
QueueWorkflowSucceededEvent ,
WorkflowQueueMessage ,
)
)
from core . app . entities . task_entities import (
from core . app . entities . task_entities import (
ChatbotAppBlockingResponse ,
ChatbotAppBlockingResponse ,
@ -53,6 +55,7 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse ,
MessageAudioEndStreamResponse ,
MessageAudioStreamResponse ,
MessageAudioStreamResponse ,
MessageEndStreamResponse ,
MessageEndStreamResponse ,
PingStreamResponse ,
StreamResponse ,
StreamResponse ,
WorkflowTaskState ,
WorkflowTaskState ,
)
)
@ -293,25 +296,25 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self . _workflow_run_id :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
raise ValueError ( " workflow run not initialized. " )
def _ensure_graph_runtime_initialized ( self , graph_runtime_state : Optional [ Any] ) - > Any :
def _ensure_graph_runtime_initialized ( self , graph_runtime_state : Optional [ GraphRuntimeState] ) - > GraphRuntimeState :
""" Fluent validation for graph runtime state. """
""" Fluent validation for graph runtime state. """
if not graph_runtime_state :
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
raise ValueError ( " graph runtime state not initialized. " )
return graph_runtime_state
return graph_runtime_state
def _handle_ping_event ( self , event : QueuePingEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_ping_event ( self , event : QueuePingEvent , * * kwargs ) - > Generator [ PingStreamResponse , None , None ] :
""" Handle ping events. """
""" Handle ping events. """
yield self . _base_task_pipeline . _ping_stream_response ( )
yield self . _base_task_pipeline . _ping_stream_response ( )
def _handle_error_event ( self , event : QueueErrorEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_error_event ( self , event : QueueErrorEvent , * * kwargs ) - > Generator [ ErrorStreamResponse , None , None ] :
""" Handle error events. """
""" Handle error events. """
with self . _database_session ( ) as session :
with self . _database_session ( ) as session :
err = self . _base_task_pipeline . _handle_error ( event = event , session = session , message_id = self . _message_id )
err = self . _base_task_pipeline . _handle_error ( event = event , session = session , message_id = self . _message_id )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
def _handle_workflow_started_event (
def _handle_workflow_started_event (
self , event : QueueWorkflowStartedEvent , * , graph_runtime_state : Optional [ Any ] = None , * * kwargs
self , event : QueueWorkflowStartedEvent , * , graph_runtime_state : Optional [ GraphRuntimeState ] = None , * * kwargs
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow started events. """
""" Handle workflow started events. """
# Override graph runtime state - this is a side effect but necessary
# Override graph runtime state - this is a side effect but necessary
graph_runtime_state = event . graph_runtime_state
graph_runtime_state = event . graph_runtime_state
@ -332,7 +335,7 @@ class AdvancedChatAppGenerateTaskPipeline:
yield workflow_start_resp
yield workflow_start_resp
def _handle_node_retry_event ( self , event : QueueNodeRetryEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_node_retry_event ( self , event : QueueNodeRetryEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle node retry events. """
""" Handle node retry events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -349,7 +352,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_retry_resp :
if node_retry_resp :
yield node_retry_resp
yield node_retry_resp
def _handle_node_started_event ( self , event : QueueNodeStartedEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_node_started_event (
self , event : QueueNodeStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node started events. """
""" Handle node started events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -366,7 +371,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_start_resp :
if node_start_resp :
yield node_start_resp
yield node_start_resp
def _handle_node_succeeded_event ( self , event : QueueNodeSucceededEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_node_succeeded_event (
self , event : QueueNodeSucceededEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node succeeded events. """
""" Handle node succeeded events. """
# Record files if it's an answer node or end node
# Record files if it's an answer node or end node
if event . node_type in [ NodeType . ANSWER , NodeType . END ] :
if event . node_type in [ NodeType . ANSWER , NodeType . END ] :
@ -393,7 +400,7 @@ class AdvancedChatAppGenerateTaskPipeline:
QueueNodeFailedEvent , QueueNodeInIterationFailedEvent , QueueNodeInLoopFailedEvent , QueueNodeExceptionEvent
QueueNodeFailedEvent , QueueNodeInIterationFailedEvent , QueueNodeInLoopFailedEvent , QueueNodeExceptionEvent
] ,
] ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle various node failure events. """
""" Handle various node failure events. """
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed ( event = event )
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed ( event = event )
@ -413,10 +420,10 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : QueueTextChunkEvent ,
event : QueueTextChunkEvent ,
* ,
* ,
tts_publisher : Optional [ A ny ] = None ,
tts_publisher : Optional [ A ppGeneratorTTSPublisher ] = None ,
queue_message : Optional [ Any ] = None ,
queue_message : Optional [ Union[ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle text chunk events. """
""" Handle text chunk events. """
delta_text = event . text
delta_text = event . text
if delta_text is None :
if delta_text is None :
@ -438,7 +445,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _handle_parallel_branch_started_event (
def _handle_parallel_branch_started_event (
self , event : QueueParallelBranchRunStartedEvent , * * kwargs
self , event : QueueParallelBranchRunStartedEvent , * * kwargs
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch started events. """
""" Handle parallel branch started events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -451,7 +458,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _handle_parallel_branch_finished_events (
def _handle_parallel_branch_finished_events (
self , event : Union [ QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ] , * * kwargs
self , event : Union [ QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ] , * * kwargs
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch finished events. """
""" Handle parallel branch finished events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -462,7 +469,9 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield parallel_finish_resp
yield parallel_finish_resp
def _handle_iteration_start_event ( self , event : QueueIterationStartEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_iteration_start_event (
self , event : QueueIterationStartEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration start events. """
""" Handle iteration start events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -473,7 +482,9 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield iter_start_resp
yield iter_start_resp
def _handle_iteration_next_event ( self , event : QueueIterationNextEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_iteration_next_event (
self , event : QueueIterationNextEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration next events. """
""" Handle iteration next events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -486,7 +497,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _handle_iteration_completed_event (
def _handle_iteration_completed_event (
self , event : QueueIterationCompletedEvent , * * kwargs
self , event : QueueIterationCompletedEvent , * * kwargs
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration completed events. """
""" Handle iteration completed events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -497,7 +508,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield iter_finish_resp
yield iter_finish_resp
def _handle_loop_start_event ( self , event : QueueLoopStartEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_loop_start_event ( self , event : QueueLoopStartEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop start events. """
""" Handle loop start events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -508,7 +519,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield loop_start_resp
yield loop_start_resp
def _handle_loop_next_event ( self , event : QueueLoopNextEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_loop_next_event ( self , event : QueueLoopNextEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop next events. """
""" Handle loop next events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -519,7 +530,9 @@ class AdvancedChatAppGenerateTaskPipeline:
)
)
yield loop_next_resp
yield loop_next_resp
def _handle_loop_completed_event ( self , event : QueueLoopCompletedEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_loop_completed_event (
self , event : QueueLoopCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle loop completed events. """
""" Handle loop completed events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
@ -534,10 +547,10 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : QueueWorkflowSucceededEvent ,
event : QueueWorkflowSucceededEvent ,
* ,
* ,
graph_runtime_state : Optional [ Any ] = None ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ Any ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow succeeded events. """
""" Handle workflow succeeded events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
@ -564,10 +577,10 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : QueueWorkflowPartialSuccessEvent ,
event : QueueWorkflowPartialSuccessEvent ,
* ,
* ,
graph_runtime_state : Optional [ Any ] = None ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ Any ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow partial success events. """
""" Handle workflow partial success events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
@ -595,10 +608,10 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : QueueWorkflowFailedEvent ,
event : QueueWorkflowFailedEvent ,
* ,
* ,
graph_runtime_state : Optional [ Any ] = None ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ Any ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow failed events. """
""" Handle workflow failed events. """
self . _ensure_workflow_initialized ( )
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
@ -629,10 +642,10 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : QueueStopEvent ,
event : QueueStopEvent ,
* ,
* ,
graph_runtime_state : Optional [ Any ] = None ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ Any ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
* * kwargs ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle stop events. """
""" Handle stop events. """
if self . _workflow_run_id and graph_runtime_state :
if self . _workflow_run_id and graph_runtime_state :
with self . _database_session ( ) as session :
with self . _database_session ( ) as session :
@ -666,8 +679,12 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self . _message_end_to_stream_response ( )
yield self . _message_end_to_stream_response ( )
def _handle_advanced_chat_message_end_event (
def _handle_advanced_chat_message_end_event (
self , event : QueueAdvancedChatMessageEndEvent , * , graph_runtime_state : Optional [ Any ] = None , * * kwargs
self ,
) - > Generator [ Any , None , None ] :
event : QueueAdvancedChatMessageEndEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle advanced chat message end events. """
""" Handle advanced chat message end events. """
self . _ensure_graph_runtime_initialized ( graph_runtime_state )
self . _ensure_graph_runtime_initialized ( graph_runtime_state )
@ -689,7 +706,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _handle_retriever_resources_event (
def _handle_retriever_resources_event (
self , event : QueueRetrieverResourcesEvent , * * kwargs
self , event : QueueRetrieverResourcesEvent , * * kwargs
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Handle retriever resources events. """
""" Handle retriever resources events. """
self . _message_cycle_manager . handle_retriever_resources ( event )
self . _message_cycle_manager . handle_retriever_resources ( event )
@ -699,7 +716,9 @@ class AdvancedChatAppGenerateTaskPipeline:
return
return
yield # Make this a generator
yield # Make this a generator
def _handle_annotation_reply_event ( self , event : QueueAnnotationReplyEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_annotation_reply_event (
self , event : QueueAnnotationReplyEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle annotation reply events. """
""" Handle annotation reply events. """
self . _message_cycle_manager . handle_annotation_reply ( event )
self . _message_cycle_manager . handle_annotation_reply ( event )
@ -709,11 +728,13 @@ class AdvancedChatAppGenerateTaskPipeline:
return
return
yield # Make this a generator
yield # Make this a generator
def _handle_message_replace_event ( self , event : QueueMessageReplaceEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_message_replace_event (
self , event : QueueMessageReplaceEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle message replace events. """
""" Handle message replace events. """
yield self . _message_cycle_manager . message_replace_to_stream_response ( answer = event . text , reason = event . reason )
yield self . _message_cycle_manager . message_replace_to_stream_response ( answer = event . text , reason = event . reason )
def _handle_agent_log_event ( self , event : QueueAgentLogEvent , * * kwargs ) - > Generator [ Any , None , None ] :
def _handle_agent_log_event ( self , event : QueueAgentLogEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle agent log events. """
""" Handle agent log events. """
yield self . _workflow_response_converter . handle_agent_log (
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
task_id = self . _application_generate_entity . task_id , event = event
@ -759,11 +780,11 @@ class AdvancedChatAppGenerateTaskPipeline:
self ,
self ,
event : Any ,
event : Any ,
* ,
* ,
graph_runtime_state : Optional [ Any ] = None ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
tts_publisher : Optional [ A ny ] = None ,
tts_publisher : Optional [ A ppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ Any ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
queue_message : Optional [ Any ] = None ,
queue_message : Optional [ Union[ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
) - > Generator [ Any , None , None ] :
) - > Generator [ StreamResponse , None , None ] :
""" Dispatch events using elegant pattern matching. """
""" Dispatch events using elegant pattern matching. """
handlers = self . _get_event_handlers ( )
handlers = self . _get_event_handlers ( )
event_type = type ( event )
event_type = type ( event )