@ -1,7 +1,8 @@
import logging
import time
from collections . abc import Generator
from typing import Optional , Union
from collections . abc import Callable , Generator
from contextlib import contextmanager
from typing import Any , Optional , Union
from sqlalchemy . orm import Session
@ -13,6 +14,7 @@ from core.app.entities.app_invoke_entities import (
WorkflowAppGenerateEntity ,
)
from core . app . entities . queue_entities import (
MessageQueueMessage ,
QueueAgentLogEvent ,
QueueErrorEvent ,
QueueIterationCompletedEvent ,
@ -38,11 +40,13 @@ from core.app.entities.queue_entities import (
QueueWorkflowPartialSuccessEvent ,
QueueWorkflowStartedEvent ,
QueueWorkflowSucceededEvent ,
WorkflowQueueMessage ,
)
from core . app . entities . task_entities import (
ErrorStreamResponse ,
MessageAudioEndStreamResponse ,
MessageAudioStreamResponse ,
PingStreamResponse ,
StreamResponse ,
TextChunkStreamResponse ,
WorkflowAppBlockingResponse ,
@ -54,10 +58,11 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas
from core . base . tts import AppGeneratorTTSPublisher , AudioTrunk
from core . ops . ops_trace_manager import TraceQueueManager
from core . workflow . entities . workflow_execution import WorkflowExecution , WorkflowExecutionStatus , WorkflowType
from core . workflow . enums import SystemVariableKey
from core . workflow . graph_engine. entities . graph_runtime_state import GraphRuntimeState
from core . workflow . repositories . draft_variable_repository import DraftVariableSaverFactory
from core . workflow . repositories . workflow_execution_repository import WorkflowExecutionRepository
from core . workflow . repositories . workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core . workflow . system_variable import SystemVariable
from core . workflow . workflow_cycle_manager import CycleManagerWorkflowInfo , WorkflowCycleManager
from extensions . ext_database import db
from models . account import Account
@ -107,13 +112,13 @@ class WorkflowAppGenerateTaskPipeline:
self . _workflow_cycle_manager = WorkflowCycleManager (
application_generate_entity = application_generate_entity ,
workflow_system_variables = {
SystemVariableKey. FILES : application_generate_entity . files ,
SystemVariableKey. USER_ID : user_session_id ,
SystemVariableKey. APP_ID : application_generate_entity . app_config . app_id ,
SystemVariableKey. WORKFLOW_ID : workflow . id ,
SystemVariableKey. WORKFLOW_EXECUTION_ID : application_generate_entity . workflow_execution_id ,
} ,
workflow_system_variables = SystemVariable (
files= application_generate_entity . files ,
user_id= user_session_id ,
app_id= application_generate_entity . app_config . app_id ,
workflow_id= workflow . id ,
workflow_execution_id= application_generate_entity . workflow_execution_id ,
) ,
workflow_info = CycleManagerWorkflowInfo (
workflow_id = workflow . id ,
workflow_type = WorkflowType ( workflow . type ) ,
@ -246,315 +251,495 @@ class WorkflowAppGenerateTaskPipeline:
if tts_publisher :
yield MessageAudioEndStreamResponse ( audio = " " , task_id = task_id )
def _process_stream_response (
@contextmanager
def _database_session ( self ) :
""" Context manager for database sessions. """
with Session ( db . engine , expire_on_commit = False ) as session :
try :
yield session
session . commit ( )
except Exception :
session . rollback ( )
raise
def _ensure_workflow_initialized ( self ) - > None :
""" Fluent validation for workflow state. """
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _ensure_graph_runtime_initialized ( self , graph_runtime_state : Optional [ GraphRuntimeState ] ) - > GraphRuntimeState :
""" Fluent validation for graph runtime state. """
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
return graph_runtime_state
def _handle_ping_event ( self , event : QueuePingEvent , * * kwargs ) - > Generator [ PingStreamResponse , None , None ] :
""" Handle ping events. """
yield self . _base_task_pipeline . _ping_stream_response ( )
def _handle_error_event ( self , event : QueueErrorEvent , * * kwargs ) - > Generator [ ErrorStreamResponse , None , None ] :
""" Handle error events. """
err = self . _base_task_pipeline . _handle_error ( event = event )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
def _handle_workflow_started_event (
self , event : QueueWorkflowStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow started events. """
# init workflow run
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_start ( )
self . _workflow_run_id = workflow_execution . id_
start_resp = self . _workflow_response_converter . workflow_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
yield start_resp
def _handle_node_retry_event ( self , event : QueueNodeRetryEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle node retry events. """
self . _ensure_workflow_initialized ( )
with self . _database_session ( ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_retried (
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
response = self . _workflow_response_converter . workflow_node_retry_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
if response :
yield response
def _handle_node_started_event (
self , event : QueueNodeStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node started events. """
self . _ensure_workflow_initialized ( )
workflow_node_execution = self . _workflow_cycle_manager . handle_node_execution_start (
workflow_execution_id = self . _workflow_run_id , event = event
)
node_start_response = self . _workflow_response_converter . workflow_node_start_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
if node_start_response :
yield node_start_response
def _handle_node_succeeded_event (
self , event : QueueNodeSucceededEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle node succeeded events. """
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success ( event = event )
node_success_response = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
self . _save_output_for_event ( event , workflow_node_execution . id )
if node_success_response :
yield node_success_response
def _handle_node_failed_events (
self ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
event : Union [
QueueNodeFailedEvent , QueueNodeInIterationFailedEvent , QueueNodeInLoopFailedEvent , QueueNodeExceptionEvent
] ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
"""
Process stream response .
: return :
"""
graph_runtime_state = None
""" Handle various node failure events. """
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed (
event = event ,
)
node_failed_response = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
if isinstance ( event , QueueNodeExceptionEvent ) :
self . _save_output_for_event ( event , workflow_node_execution . id )
if isinstance ( event , QueuePingEvent ) :
yield self . _base_task_pipeline . _ping_stream_response ( )
elif isinstance ( event , QueueErrorEvent ) :
err = self . _base_task_pipeline . _handle_error ( event = event )
yield self . _base_task_pipeline . _error_to_stream_response ( err )
break
elif isinstance ( event , QueueWorkflowStartedEvent ) :
# override graph runtime state
graph_runtime_state = event . graph_runtime_state
# init workflow run
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_start ( )
self . _workflow_run_id = workflow_execution . id_
start_resp = self . _workflow_response_converter . workflow_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
if node_failed_response :
yield node_failed_response
yield start_resp
elif isinstance (
event ,
QueueNodeRetryEvent ,
) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_retried (
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
response = self . _workflow_response_converter . workflow_node_retry_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
session . commit ( )
def _handle_parallel_branch_started_event (
self , event : QueueParallelBranchRunStartedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch started events. """
self . _ensure_workflow_initialized ( )
if response :
yield response
elif isinstance ( event , QueueNodeStartedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
parallel_start_resp = self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield parallel_start_resp
workflow_node_execution = self . _workflow_cycle_manager . handle_node_execution_start (
workflow_execution_id = self . _workflow_run_id , event = event
)
node_start_response = self . _workflow_response_converter . workflow_node_start_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
def _handle_parallel_branch_finished_events (
self , event : Union [ QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ] , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle parallel branch finished events. """
self . _ensure_workflow_initialized ( )
if node_start_response :
yield node_start_response
elif isinstance ( event , QueueNodeSucceededEvent ) :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_success (
event = event
)
node_success_response = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
parallel_finish_resp = self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield parallel_finish_resp
self . _save_output_for_event ( event , workflow_node_execution . id )
def _handle_iteration_start_event (
self , event : QueueIterationStartEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration start events. """
self . _ensure_workflow_initialized ( )
if node_success_response :
yield node_success_response
elif isinstance (
event ,
QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent ,
) :
workflow_node_execution = self . _workflow_cycle_manager . handle_workflow_node_execution_failed (
event = event ,
)
node_failed_response = self . _workflow_response_converter . workflow_node_finish_to_stream_response (
event = event ,
task_id = self . _application_generate_entity . task_id ,
workflow_node_execution = workflow_node_execution ,
)
if isinstance ( event , QueueNodeExceptionEvent ) :
self . _save_output_for_event ( event , workflow_node_execution . id )
iter_start_resp = self . _workflow_response_converter . workflow_iteration_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_start_resp
if node_failed_response :
yield node_failed_response
def _handle_iteration_next_event (
self , event : QueueIterationNextEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration next events. """
self . _ensure_workflow_initialized ( )
elif isinstance ( event , QueueParallelBranchRunStartedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
iter_next_resp = self . _workflow_response_converter . workflow_iteration_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_next_resp
parallel_start_resp = (
self . _workflow_response_converter . workflow_parallel_branch_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
)
def _handle_iteration_completed_event (
self , event : QueueIterationCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle iteration completed events. """
self . _ensure_workflow_initialized ( )
yield parallel_start_resp
iter_finish_resp = self . _workflow_response_converter . workflow_iteration_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield iter_finish_resp
elif isinstance ( event , QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_loop_start_event ( self , event : QueueLoopStartEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop start events. """
self . _ensure_workflow_initialized ( )
parallel_finish_resp = (
self . _workflow_response_converter . workflow_parallel_branch_finished_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
)
loop_start_resp = self . _workflow_response_converter . workflow_loop_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_start_resp
yield parallel_finish_resp
def _handle_loop_next_event ( self , event : QueueLoopNextEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle loop next events. """
self . _ensure_workflow_initialized ( )
elif isinstance ( event , QueueIterationStartEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
loop_next_resp = self . _workflow_response_converter . workflow_loop_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_next_resp
iter_start_resp = self . _workflow_response_converter . workflow_iteration_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
def _handle_loop_completed_event (
self , event : QueueLoopCompletedEvent , * * kwargs
) - > Generator [ StreamResponse , None , None ] :
""" Handle loop completed events. """
self . _ensure_workflow_initialized ( )
yield iter_start_resp
loop_finish_resp = self . _workflow_response_converter . workflow_loop_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield loop_finish_resp
elif isinstance ( event , QueueIterationNextEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_workflow_succeeded_event (
self ,
event : QueueWorkflowSucceededEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow succeeded events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated_state . total_tokens ,
total_steps = validated_state . node_run_steps ,
outputs = event . outputs ,
conversation_id = None ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
iter_next_resp = self . _workflow_response_converter . workflow_iteration_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
yield iter_next_resp
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
elif isinstance ( event , QueueIterationCompletedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
yield workflow_finish_resp
iter_finish_resp = self . _workflow_response_converter . workflow_iteration_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
def _handle_workflow_partial_success_event (
self ,
event : QueueWorkflowPartialSuccessEvent ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow partial success events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_partial_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated_state . total_tokens ,
total_steps = validated_state . node_run_steps ,
outputs = event . outputs ,
exceptions_count = event . exceptions_count ,
conversation_id = None ,
trace_manager = trace_manager ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
yield iter_finish_resp
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
elif isinstance ( event , QueueLoopStartEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
loop_start_resp = self . _workflow_response_converter . workflow_loop_start_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
yield workflow_finish_resp
yield loop_start_resp
def _handle_workflow_failed_and_stop_events (
self ,
event : Union [ QueueWorkflowFailedEvent , QueueStopEvent ] ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle workflow failed and stop events. """
self . _ensure_workflow_initialized ( )
validated_state = self . _ensure_graph_runtime_initialized ( graph_runtime_state )
with self . _database_session ( ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
total_tokens = validated_state . total_tokens ,
total_steps = validated_state . node_run_steps ,
status = WorkflowExecutionStatus . FAILED
if isinstance ( event , QueueWorkflowFailedEvent )
else WorkflowExecutionStatus . STOPPED ,
error_message = event . error if isinstance ( event , QueueWorkflowFailedEvent ) else event . get_stop_reason ( ) ,
conversation_id = None ,
trace_manager = trace_manager ,
exceptions_count = event . exceptions_count if isinstance ( event , QueueWorkflowFailedEvent ) else 0 ,
external_trace_id = self . _application_generate_entity . extras . get ( " external_trace_id " ) ,
)
elif isinstance ( event , QueueLoopNextEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
loop_next_resp = self . _workflow_response_converter . workflow_loop_next_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish _to_stream_response(
session = session ,
task_id = self . _application_generate_entity . task _id,
workflow_execution = workflow_execution ,
)
yield loop_next_resp
yield workflow_finish _resp
elif isinstance ( event , QueueLoopCompletedEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
def _handle_text_chunk_event (
self ,
event : QueueTextChunkEvent ,
* ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
* * kwargs ,
) - > Generator [ StreamResponse , None , None ] :
""" Handle text chunk events. """
delta_text = event . text
if delta_text is None :
return
loop_finish_resp = self . _workflow_response_converter . workflow_loop_completed_to_stream_response (
task_id = self . _application_generate_entity . task_id ,
workflow_execution_id = self . _workflow_run_id ,
event = event ,
)
# only publish tts message at text chunk streaming
if tts_publisher and queue_message :
tts_publisher . publish ( queue_message )
yield loop_finish_resp
elif isinstance ( event , QueueWorkflowSucceededEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
outputs = event . outputs ,
conversation_id = None ,
trace_manager = trace_manager ,
)
yield self . _text_chunk_to_stream_response ( delta_text , from_variable_selector = event . from_variable_selector )
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
def _handle_agent_log_event ( self , event : QueueAgentLogEvent , * * kwargs ) - > Generator [ StreamResponse , None , None ] :
""" Handle agent log events. """
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
)
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
session . commit ( )
yield workflow_finish_resp
elif isinstance ( event , QueueWorkflowPartialSuccessEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_partial_success (
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
outputs = event . outputs ,
exceptions_count = event . exceptions_count ,
conversation_id = None ,
trace_manager = trace_manager ,
)
def _get_event_handlers ( self ) - > dict [ type , Callable ] :
""" Get mapping of event types to their handlers using fluent pattern. """
return {
# Basic events
QueuePingEvent : self . _handle_ping_event ,
QueueErrorEvent : self . _handle_error_event ,
QueueTextChunkEvent : self . _handle_text_chunk_event ,
# Workflow events
QueueWorkflowStartedEvent : self . _handle_workflow_started_event ,
QueueWorkflowSucceededEvent : self . _handle_workflow_succeeded_event ,
QueueWorkflowPartialSuccessEvent : self . _handle_workflow_partial_success_event ,
# Node events
QueueNodeRetryEvent : self . _handle_node_retry_event ,
QueueNodeStartedEvent : self . _handle_node_started_event ,
QueueNodeSucceededEvent : self . _handle_node_succeeded_event ,
# Parallel branch events
QueueParallelBranchRunStartedEvent : self . _handle_parallel_branch_started_event ,
# Iteration events
QueueIterationStartEvent : self . _handle_iteration_start_event ,
QueueIterationNextEvent : self . _handle_iteration_next_event ,
QueueIterationCompletedEvent : self . _handle_iteration_completed_event ,
# Loop events
QueueLoopStartEvent : self . _handle_loop_start_event ,
QueueLoopNextEvent : self . _handle_loop_next_event ,
QueueLoopCompletedEvent : self . _handle_loop_completed_event ,
# Agent events
QueueAgentLogEvent : self . _handle_agent_log_event ,
}
def _dispatch_event (
self ,
event : Any ,
* ,
graph_runtime_state : Optional [ GraphRuntimeState ] = None ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
queue_message : Optional [ Union [ WorkflowQueueMessage , MessageQueueMessage ] ] = None ,
) - > Generator [ StreamResponse , None , None ] :
""" Dispatch events using elegant pattern matching. """
handlers = self . _get_event_handlers ( )
event_type = type ( event )
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
# Direct handler lookup
if handler := handlers . get ( event_type ) :
yield from handler (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
session . commit ( )
yield workflow_finish_resp
elif isinstance ( event , QueueWorkflowFailedEvent | QueueStopEvent ) :
if not self . _workflow_run_id :
raise ValueError ( " workflow run not initialized. " )
if not graph_runtime_state :
raise ValueError ( " graph runtime state not initialized. " )
with Session ( db . engine , expire_on_commit = False ) as session :
workflow_execution = self . _workflow_cycle_manager . handle_workflow_run_failed (
workflow_run_id = self . _workflow_run_id ,
total_tokens = graph_runtime_state . total_tokens ,
total_steps = graph_runtime_state . node_run_steps ,
status = WorkflowExecutionStatus . FAILED
if isinstance ( event , QueueWorkflowFailedEvent )
else WorkflowExecutionStatus . STOPPED ,
error_message = event . error
if isinstance ( event , QueueWorkflowFailedEvent )
else event . get_stop_reason ( ) ,
conversation_id = None ,
trace_manager = trace_manager ,
exceptions_count = event . exceptions_count if isinstance ( event , QueueWorkflowFailedEvent ) else 0 ,
)
# Handle node failure events with isinstance check
if isinstance (
event ,
(
QueueNodeFailedEvent ,
QueueNodeInIterationFailedEvent ,
QueueNodeInLoopFailedEvent ,
QueueNodeExceptionEvent ,
) ,
) :
yield from self . _handle_node_failed_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
# save workflow app log
self . _save_workflow_app_log ( session = session , workflow_execution = workflow_execution )
# Handle parallel branch finished events with isinstance check
if isinstance ( event , ( QueueParallelBranchRunSucceededEvent , QueueParallelBranchRunFailedEvent ) ) :
yield from self . _handle_parallel_branch_finished_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
workflow_finish_resp = self . _workflow_response_converter . workflow_finish_to_stream_response (
session = session ,
task_id = self . _application_generate_entity . task_id ,
workflow_execution = workflow_execution ,
)
session . commit ( )
# Handle workflow failed and stop events with isinstance check
if isinstance ( event , ( QueueWorkflowFailedEvent , QueueStopEvent ) ) :
yield from self . _handle_workflow_failed_and_stop_events (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
return
yield workflow_finish_resp
elif isinstance ( event , QueueTextChunkEvent ) :
delta_text = event . text
if delta_text is None :
continue
# For unhandled events, we continue (original behavior)
return
# only publish tts message at text chunk streaming
if tts_publisher :
tts_publisher . publish ( queue_message )
def _process_stream_response (
self ,
tts_publisher : Optional [ AppGeneratorTTSPublisher ] = None ,
trace_manager : Optional [ TraceQueueManager ] = None ,
) - > Generator [ StreamResponse , None , None ] :
"""
Process stream response using elegant Fluent Python patterns .
Maintains exact same functionality as original 44 - if - statement version .
"""
# Initialize graph runtime state
graph_runtime_state = None
yield self . _text_chunk_to_stream_response (
delta_text , from_variable_selector = event . from_variable_selector
)
elif isinstance ( event , QueueAgentLogEvent ) :
yield self . _workflow_response_converter . handle_agent_log (
task_id = self . _application_generate_entity . task_id , event = event
)
else :
continue
for queue_message in self . _base_task_pipeline . _queue_manager . listen ( ) :
event = queue_message . event
match event :
case QueueWorkflowStartedEvent ( ) :
graph_runtime_state = event . graph_runtime_state
yield from self . _handle_workflow_started_event ( event )
case QueueTextChunkEvent ( ) :
yield from self . _handle_text_chunk_event (
event , tts_publisher = tts_publisher , queue_message = queue_message
)
case QueueErrorEvent ( ) :
yield from self . _handle_error_event ( event )
break
# Handle all other events through elegant dispatch
case _ :
if responses := list (
self . _dispatch_event (
event ,
graph_runtime_state = graph_runtime_state ,
tts_publisher = tts_publisher ,
trace_manager = trace_manager ,
queue_message = queue_message ,
)
) :
yield from responses
if tts_publisher :
tts_publisher . publish ( None )