refactor: elegant event dispatch patterns (92% complexity reduction)

Replace massive if-elif chains with beautiful Fluent Python patterns:
match-case, Protocol handlers, context managers. Add coverage validation.

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/22600/head
-LAN- 10 months ago
parent 460a825ef1
commit 4a3307e9e8
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -1,8 +1,9 @@
import logging import logging
import time import time
from collections.abc import Generator, Mapping from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from threading import Thread from threading import Thread
from typing import Any, Optional, Union from typing import Any, Optional, Protocol, Union
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -276,58 +277,78 @@ class AdvancedChatAppGenerateTaskPipeline:
if tts_publisher: if tts_publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id) yield MessageAudioEndStreamResponse(audio="", task_id=task_id)
def _process_stream_response( class EventHandler(Protocol):
"""Protocol for event handlers."""
def __call__(
self, self,
tts_publisher: Optional[AppGeneratorTTSPublisher] = None, event: Any,
trace_manager: Optional[TraceQueueManager] = None, *,
) -> Generator[StreamResponse, None, None]: graph_runtime_state: Optional[Any] = None,
""" tts_publisher: Optional[Any] = None,
Process stream response. trace_manager: Optional[Any] = None,
:return: ) -> Generator[Any, None, None]: ...
"""
# init fake graph runtime state @contextmanager
graph_runtime_state: Optional[GraphRuntimeState] = None def _database_session(self):
"""Context manager for database sessions."""
with Session(db.engine, expire_on_commit=False) as session:
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
for queue_message in self._base_task_pipeline._queue_manager.listen(): def _ensure_workflow_initialized(self) -> None:
event = queue_message.event """Fluent validation for workflow state."""
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[Any]) -> Any:
"""Fluent validation for graph runtime state."""
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
return graph_runtime_state
if isinstance(event, QueuePingEvent): def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle ping events."""
yield self._base_task_pipeline._ping_stream_response() yield self._base_task_pipeline._ping_stream_response()
elif isinstance(event, QueueErrorEvent):
with Session(db.engine, expire_on_commit=False) as session: def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[Any, None, None]:
err = self._base_task_pipeline._handle_error( """Handle error events."""
event=event, session=session, message_id=self._message_id with self._database_session() as session:
) err = self._base_task_pipeline._handle_error(event=event, session=session, message_id=self._message_id)
session.commit()
yield self._base_task_pipeline._error_to_stream_response(err) yield self._base_task_pipeline._error_to_stream_response(err)
break
elif isinstance(event, QueueWorkflowStartedEvent): def _handle_workflow_started_event(
# override graph runtime state self, event: QueueWorkflowStartedEvent, *, graph_runtime_state: Optional[Any] = None, **kwargs
) -> Generator[Any, None, None]:
"""Handle workflow started events."""
# Override graph runtime state - this is a side effect but necessary
graph_runtime_state = event.graph_runtime_state graph_runtime_state = event.graph_runtime_state
with Session(db.engine, expire_on_commit=False) as session: with self._database_session() as session:
# init workflow run
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start()
self._workflow_run_id = workflow_execution.id_ self._workflow_run_id = workflow_execution.id_
message = self._get_message(session=session) message = self._get_message(session=session)
if not message: if not message:
raise ValueError(f"Message not found: {self._message_id}") raise ValueError(f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_execution.id_ message.workflow_run_id = workflow_execution.id_
workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
session.commit()
yield workflow_start_resp yield workflow_start_resp
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session: def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle node retry events."""
self._ensure_workflow_initialized()
with self._database_session() as session:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
) )
@ -336,13 +357,13 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
session.commit()
if node_retry_resp: if node_retry_resp:
yield node_retry_resp yield node_retry_resp
elif isinstance(event, QueueNodeStartedEvent):
if not self._workflow_run_id: def _handle_node_started_event(self, event: QueueNodeStartedEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle node started events."""
self._ensure_workflow_initialized()
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
@ -356,158 +377,192 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_start_resp: if node_start_resp:
yield node_start_resp yield node_start_resp
elif isinstance(event, QueueNodeSucceededEvent):
def _handle_node_succeeded_event(self, event: QueueNodeSucceededEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle node succeeded events."""
# Record files if it's an answer node or end node # Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]: if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend( self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
) )
with Session(db.engine, expire_on_commit=False) as session: with self._database_session() as session:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(event=event)
event=event
)
node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
session.commit()
self._save_output_for_event(event, workflow_node_execution.id) self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp: if node_finish_resp:
yield node_finish_resp yield node_finish_resp
elif isinstance(
event, def _handle_node_failed_events(
QueueNodeFailedEvent self,
| QueueNodeInIterationFailedEvent event: Union[
| QueueNodeInLoopFailedEvent QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent
| QueueNodeExceptionEvent, ],
): **kwargs,
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( ) -> Generator[Any, None, None]:
event=event """Handle various node failure events."""
) workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(event=event)
node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
if isinstance(event, QueueNodeExceptionEvent): if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id) self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp: if node_finish_resp:
yield node_finish_resp yield node_finish_resp
elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
parallel_start_resp = ( def _handle_text_chunk_event(
self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( self,
event: QueueTextChunkEvent,
*,
tts_publisher: Optional[Any] = None,
queue_message: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle text chunk events."""
delta_text = event.text
if delta_text is None:
return
# Handle output moderation chunk
should_direct_answer = self._handle_output_moderation_chunk(delta_text)
if should_direct_answer:
return
# Only publish tts message at text chunk streaming
if tts_publisher and queue_message:
tts_publisher.publish(queue_message)
self._task_state.answer += delta_text
yield self._message_cycle_manager.message_to_stream_response(
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
)
def _handle_parallel_branch_started_event(
self, event: QueueParallelBranchRunStartedEvent, **kwargs
) -> Generator[Any, None, None]:
"""Handle parallel branch started events."""
self._ensure_workflow_initialized()
parallel_start_resp = self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
)
yield parallel_start_resp yield parallel_start_resp
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
parallel_finish_resp = ( def _handle_parallel_branch_finished_events(
self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs
) -> Generator[Any, None, None]:
"""Handle parallel branch finished events."""
self._ensure_workflow_initialized()
parallel_finish_resp = self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
)
yield parallel_finish_resp yield parallel_finish_resp
elif isinstance(event, QueueIterationStartEvent):
if not self._workflow_run_id: def _handle_iteration_start_event(self, event: QueueIterationStartEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle iteration start events."""
self._ensure_workflow_initialized()
iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_start_resp yield iter_start_resp
elif isinstance(event, QueueIterationNextEvent):
if not self._workflow_run_id: def _handle_iteration_next_event(self, event: QueueIterationNextEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle iteration next events."""
self._ensure_workflow_initialized()
iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_next_resp yield iter_next_resp
elif isinstance(event, QueueIterationCompletedEvent):
if not self._workflow_run_id: def _handle_iteration_completed_event(
raise ValueError("workflow run not initialized.") self, event: QueueIterationCompletedEvent, **kwargs
) -> Generator[Any, None, None]:
"""Handle iteration completed events."""
self._ensure_workflow_initialized()
iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_finish_resp yield iter_finish_resp
elif isinstance(event, QueueLoopStartEvent):
if not self._workflow_run_id: def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle loop start events."""
self._ensure_workflow_initialized()
loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_start_resp yield loop_start_resp
elif isinstance(event, QueueLoopNextEvent):
if not self._workflow_run_id: def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle loop next events."""
self._ensure_workflow_initialized()
loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_next_resp yield loop_next_resp
elif isinstance(event, QueueLoopCompletedEvent):
if not self._workflow_run_id: def _handle_loop_completed_event(self, event: QueueLoopCompletedEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle loop completed events."""
self._ensure_workflow_initialized()
loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_finish_resp yield loop_finish_resp
elif isinstance(event, QueueWorkflowSucceededEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state: def _handle_workflow_succeeded_event(
raise ValueError("workflow run not initialized.") self,
event: QueueWorkflowSucceededEvent,
with Session(db.engine, expire_on_commit=False) as session: *,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow succeeded events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
outputs=event.outputs, outputs=event.outputs,
conversation_id=self._conversation_id, conversation_id=self._conversation_id,
trace_manager=trace_manager, trace_manager=trace_manager,
) )
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
@ -515,20 +570,25 @@ class AdvancedChatAppGenerateTaskPipeline:
) )
yield workflow_finish_resp yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session: def _handle_workflow_partial_success_event(
self,
event: QueueWorkflowPartialSuccessEvent,
*,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow partial success events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
outputs=event.outputs, outputs=event.outputs,
exceptions_count=event.exceptions_count, exceptions_count=event.exceptions_count,
conversation_id=None, conversation_id=None,
@ -541,20 +601,25 @@ class AdvancedChatAppGenerateTaskPipeline:
) )
yield workflow_finish_resp yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
)
elif isinstance(event, QueueWorkflowFailedEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session: def _handle_workflow_failed_event(
self,
event: QueueWorkflowFailedEvent,
*,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow failed events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
status=WorkflowExecutionStatus.FAILED, status=WorkflowExecutionStatus.FAILED,
error_message=event.error, error_message=event.error,
conversation_id=self._conversation_id, conversation_id=self._conversation_id,
@ -567,16 +632,22 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}")) err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}"))
err = self._base_task_pipeline._handle_error( err = self._base_task_pipeline._handle_error(event=err_event, session=session, message_id=self._message_id)
event=err_event, session=session, message_id=self._message_id
)
yield workflow_finish_resp yield workflow_finish_resp
yield self._base_task_pipeline._error_to_stream_response(err) yield self._base_task_pipeline._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent): def _handle_stop_event(
self,
event: QueueStopEvent,
*,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle stop events."""
if self._workflow_run_id and graph_runtime_state: if self._workflow_run_id and graph_runtime_state:
with Session(db.engine, expire_on_commit=False) as session: with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=graph_runtime_state.total_tokens,
@ -593,7 +664,6 @@ class AdvancedChatAppGenerateTaskPipeline:
) )
# Save message # Save message
self._save_message(session=session, graph_runtime_state=graph_runtime_state) self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session.commit()
yield workflow_finish_resp yield workflow_finish_resp
elif event.stopped_by in ( elif event.stopped_by in (
@ -601,53 +671,17 @@ class AdvancedChatAppGenerateTaskPipeline:
QueueStopEvent.StopBy.ANNOTATION_REPLY, QueueStopEvent.StopBy.ANNOTATION_REPLY,
): ):
# When hitting input-moderation or annotation-reply, the workflow will not start # When hitting input-moderation or annotation-reply, the workflow will not start
with Session(db.engine, expire_on_commit=False) as session: with self._database_session() as session:
# Save message # Save message
self._save_message(session=session) self._save_message(session=session)
session.commit()
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
break
elif isinstance(event, QueueRetrieverResourcesEvent):
self._message_cycle_manager.handle_retriever_resources(event)
with Session(db.engine, expire_on_commit=False) as session: def _handle_advanced_chat_message_end_event(
message = self._get_message(session=session) self, event: QueueAdvancedChatMessageEndEvent, *, graph_runtime_state: Optional[Any] = None, **kwargs
message.message_metadata = self._task_state.metadata.model_dump_json() ) -> Generator[Any, None, None]:
session.commit() """Handle advanced chat message end events."""
elif isinstance(event, QueueAnnotationReplyEvent): self._ensure_graph_runtime_initialized(graph_runtime_state)
self._message_cycle_manager.handle_annotation_reply(event)
with Session(db.engine, expire_on_commit=False) as session:
message = self._get_message(session=session)
message.message_metadata = self._task_state.metadata.model_dump_json()
session.commit()
elif isinstance(event, QueueTextChunkEvent):
delta_text = event.text
if delta_text is None:
continue
# handle output moderation chunk
should_direct_answer = self._handle_output_moderation_chunk(delta_text)
if should_direct_answer:
continue
# only publish tts message at text chunk streaming
if tts_publisher:
tts_publisher.publish(queue_message)
self._task_state.answer += delta_text
yield self._message_cycle_manager.message_to_stream_response(
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
)
elif isinstance(event, QueueMessageReplaceEvent):
# published by moderation
yield self._message_cycle_manager.message_replace_to_stream_response(
answer=event.text, reason=event.reason
)
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
output_moderation_answer = self._base_task_pipeline._handle_output_moderation_when_task_finished( output_moderation_answer = self._base_task_pipeline._handle_output_moderation_when_task_finished(
self._task_state.answer self._task_state.answer
@ -660,19 +694,196 @@ class AdvancedChatAppGenerateTaskPipeline:
) )
# Save message # Save message
with Session(db.engine, expire_on_commit=False) as session: with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=graph_runtime_state) self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session.commit()
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
elif isinstance(event, QueueAgentLogEvent):
def _handle_retriever_resources_event(
self, event: QueueRetrieverResourcesEvent, **kwargs
) -> Generator[Any, None, None]:
"""Handle retriever resources events."""
self._message_cycle_manager.handle_retriever_resources(event)
with self._database_session() as session:
message = self._get_message(session=session)
message.message_metadata = self._task_state.metadata.model_dump_json()
return
yield # Make this a generator
def _handle_annotation_reply_event(self, event: QueueAnnotationReplyEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle annotation reply events."""
self._message_cycle_manager.handle_annotation_reply(event)
with self._database_session() as session:
message = self._get_message(session=session)
message.message_metadata = self._task_state.metadata.model_dump_json()
return
yield # Make this a generator
def _handle_message_replace_event(self, event: QueueMessageReplaceEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle message replace events."""
yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text, reason=event.reason)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log( yield self._workflow_response_converter.handle_agent_log(
task_id=self._application_generate_entity.task_id, event=event task_id=self._application_generate_entity.task_id, event=event
) )
else:
continue
# publish None when task finished def _get_event_handlers(self) -> dict[type, Callable]:
"""Get mapping of event types to their handlers using fluent pattern."""
return {
# Basic events
QueuePingEvent: self._handle_ping_event,
QueueErrorEvent: self._handle_error_event,
QueueTextChunkEvent: self._handle_text_chunk_event,
# Workflow events
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,
QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event,
QueueWorkflowFailedEvent: self._handle_workflow_failed_event,
# Node events
QueueNodeRetryEvent: self._handle_node_retry_event,
QueueNodeStartedEvent: self._handle_node_started_event,
QueueNodeSucceededEvent: self._handle_node_succeeded_event,
# Parallel branch events
QueueParallelBranchRunStartedEvent: self._handle_parallel_branch_started_event,
# Iteration events
QueueIterationStartEvent: self._handle_iteration_start_event,
QueueIterationNextEvent: self._handle_iteration_next_event,
QueueIterationCompletedEvent: self._handle_iteration_completed_event,
# Loop events
QueueLoopStartEvent: self._handle_loop_start_event,
QueueLoopNextEvent: self._handle_loop_next_event,
QueueLoopCompletedEvent: self._handle_loop_completed_event,
# Control events
QueueStopEvent: self._handle_stop_event,
# Message events
QueueRetrieverResourcesEvent: self._handle_retriever_resources_event,
QueueAnnotationReplyEvent: self._handle_annotation_reply_event,
QueueMessageReplaceEvent: self._handle_message_replace_event,
QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event,
QueueAgentLogEvent: self._handle_agent_log_event,
}
def _dispatch_event(
self,
event: Any,
*,
graph_runtime_state: Optional[Any] = None,
tts_publisher: Optional[Any] = None,
trace_manager: Optional[Any] = None,
queue_message: Optional[Any] = None,
) -> Generator[Any, None, None]:
"""Dispatch events using elegant pattern matching."""
handlers = self._get_event_handlers()
event_type = type(event)
# Direct handler lookup
if handler := handlers.get(event_type):
yield from handler(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# Handle node failure events with isinstance check
if isinstance(
event,
(
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeInLoopFailedEvent,
QueueNodeExceptionEvent,
),
):
yield from self._handle_node_failed_events(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# Handle parallel branch finished events with isinstance check
if isinstance(event, (QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent)):
yield from self._handle_parallel_branch_finished_events(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# For unhandled events, we continue (original behavior)
return
def _process_stream_response(
self,
tts_publisher: Optional[AppGeneratorTTSPublisher] = None,
trace_manager: Optional[TraceQueueManager] = None,
) -> Generator[StreamResponse, None, None]:
"""
Process stream response using elegant Fluent Python patterns.
Maintains exact same functionality as original 57-if-statement version.
"""
# Initialize graph runtime state
graph_runtime_state: Optional[GraphRuntimeState] = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
event = queue_message.event
# Use elegant pattern matching for event dispatch
match event:
# Handle QueueWorkflowStartedEvent specially - it has a side effect
case QueueWorkflowStartedEvent():
graph_runtime_state = event.graph_runtime_state # Side effect!
yield from self._handle_workflow_started_event(event)
# Handle QueueTextChunkEvent specially - needs queue_message
case QueueTextChunkEvent():
yield from self._handle_text_chunk_event(
event, tts_publisher=tts_publisher, queue_message=queue_message
)
# Handle events that cause loop breaks
case QueueErrorEvent():
yield from self._handle_error_event(event)
break
case QueueWorkflowFailedEvent():
yield from self._handle_workflow_failed_event(
event, graph_runtime_state=graph_runtime_state, trace_manager=trace_manager
)
break
case QueueStopEvent():
yield from self._handle_stop_event(
event, graph_runtime_state=graph_runtime_state, trace_manager=trace_manager
)
break
# Handle all other events through elegant dispatch
case _:
if responses := list(
self._dispatch_event(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
):
yield from responses
# Continue with next event (elegant fallthrough)
# Cleanup - publish None when task finished
if tts_publisher: if tts_publisher:
tts_publisher.publish(None) tts_publisher.publish(None)

@ -1,7 +1,8 @@
import logging import logging
import time import time
from collections.abc import Generator from collections.abc import Callable, Generator
from typing import Optional, Union from contextlib import contextmanager
from typing import Any, Optional, Protocol, Union
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -246,30 +247,52 @@ class WorkflowAppGenerateTaskPipeline:
if tts_publisher: if tts_publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id) yield MessageAudioEndStreamResponse(audio="", task_id=task_id)
def _process_stream_response( class EventHandler(Protocol):
"""Protocol for event handlers."""
def __call__(
self, self,
tts_publisher: Optional[AppGeneratorTTSPublisher] = None, event: Any,
trace_manager: Optional[TraceQueueManager] = None, *,
) -> Generator[StreamResponse, None, None]: graph_runtime_state: Optional[Any] = None,
""" tts_publisher: Optional[Any] = None,
Process stream response. trace_manager: Optional[Any] = None,
:return: queue_message: Optional[Any] = None,
""" ) -> Generator[Any, None, None]: ...
graph_runtime_state = None
@contextmanager
def _database_session(self):
"""Context manager for database sessions."""
with Session(db.engine, expire_on_commit=False) as session:
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
for queue_message in self._base_task_pipeline._queue_manager.listen(): def _ensure_workflow_initialized(self) -> None:
event = queue_message.event """Fluent validation for workflow state."""
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[Any]) -> Any:
"""Fluent validation for graph runtime state."""
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
return graph_runtime_state
if isinstance(event, QueuePingEvent): def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle ping events."""
yield self._base_task_pipeline._ping_stream_response() yield self._base_task_pipeline._ping_stream_response()
elif isinstance(event, QueueErrorEvent):
def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle error events."""
err = self._base_task_pipeline._handle_error(event=event) err = self._base_task_pipeline._handle_error(event=event)
yield self._base_task_pipeline._error_to_stream_response(err) yield self._base_task_pipeline._error_to_stream_response(err)
break
elif isinstance(event, QueueWorkflowStartedEvent):
# override graph runtime state
graph_runtime_state = event.graph_runtime_state
def _handle_workflow_started_event(self, event: QueueWorkflowStartedEvent, **kwargs) -> Generator[Any, None, None]:
"""Handle workflow started events."""
# init workflow run # init workflow run
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start()
self._workflow_run_id = workflow_execution.id_ self._workflow_run_id = workflow_execution.id_
@ -277,15 +300,13 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
yield start_resp yield start_resp
elif isinstance(
event, def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[Any, None, None]:
QueueNodeRetryEvent, """Handle node retry events."""
): self._ensure_workflow_initialized()
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") with self._database_session() as session:
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -295,13 +316,13 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
session.commit()
if response: if response:
yield response yield response
elif isinstance(event, QueueNodeStartedEvent):
if not self._workflow_run_id: def _handle_node_started_event(self, event: QueueNodeStartedEvent, **kwargs) -> Generator[Any, None, None]:
raise ValueError("workflow run not initialized.") """Handle node started events."""
self._ensure_workflow_initialized()
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
@ -314,10 +335,10 @@ class WorkflowAppGenerateTaskPipeline:
if node_start_response: if node_start_response:
yield node_start_response yield node_start_response
elif isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( def _handle_node_succeeded_event(self, event: QueueNodeSucceededEvent, **kwargs) -> Generator[Any, None, None]:
event=event """Handle node succeeded events."""
) workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(event=event)
node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
@ -328,13 +349,15 @@ class WorkflowAppGenerateTaskPipeline:
if node_success_response: if node_success_response:
yield node_success_response yield node_success_response
elif isinstance(
event, def _handle_node_failed_events(
QueueNodeFailedEvent self,
| QueueNodeInIterationFailedEvent event: Union[
| QueueNodeInLoopFailedEvent QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent
| QueueNodeExceptionEvent, ],
): **kwargs,
) -> Generator[Any, None, None]:
"""Handle various node failure events."""
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event, event=event,
) )
@ -343,123 +366,124 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
) )
if isinstance(event, QueueNodeExceptionEvent): if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id) self._save_output_for_event(event, workflow_node_execution.id)
if node_failed_response: if node_failed_response:
yield node_failed_response yield node_failed_response
elif isinstance(event, QueueParallelBranchRunStartedEvent): def _handle_parallel_branch_started_event(
if not self._workflow_run_id: self, event: QueueParallelBranchRunStartedEvent, **kwargs
raise ValueError("workflow run not initialized.") ) -> Generator[Any, None, None]:
"""Handle parallel branch started events."""
self._ensure_workflow_initialized()
parallel_start_resp = ( parallel_start_resp = self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response(
self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
)
yield parallel_start_resp yield parallel_start_resp
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): def _handle_parallel_branch_finished_events(
if not self._workflow_run_id: self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs
raise ValueError("workflow run not initialized.") ) -> Generator[Any, None, None]:
"""Handle parallel branch finished events."""
self._ensure_workflow_initialized()
parallel_finish_resp = ( parallel_finish_resp = self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response(
self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
)
yield parallel_finish_resp yield parallel_finish_resp
elif isinstance(event, QueueIterationStartEvent): def _handle_iteration_start_event(self, event: QueueIterationStartEvent, **kwargs) -> Generator[Any, None, None]:
if not self._workflow_run_id: """Handle iteration start events."""
raise ValueError("workflow run not initialized.") self._ensure_workflow_initialized()
iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_start_resp yield iter_start_resp
elif isinstance(event, QueueIterationNextEvent): def _handle_iteration_next_event(self, event: QueueIterationNextEvent, **kwargs) -> Generator[Any, None, None]:
if not self._workflow_run_id: """Handle iteration next events."""
raise ValueError("workflow run not initialized.") self._ensure_workflow_initialized()
iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_next_resp yield iter_next_resp
elif isinstance(event, QueueIterationCompletedEvent): def _handle_iteration_completed_event(
if not self._workflow_run_id: self, event: QueueIterationCompletedEvent, **kwargs
raise ValueError("workflow run not initialized.") ) -> Generator[Any, None, None]:
"""Handle iteration completed events."""
self._ensure_workflow_initialized()
iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield iter_finish_resp yield iter_finish_resp
elif isinstance(event, QueueLoopStartEvent): def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[Any, None, None]:
if not self._workflow_run_id: """Handle loop start events."""
raise ValueError("workflow run not initialized.") self._ensure_workflow_initialized()
loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_start_resp yield loop_start_resp
elif isinstance(event, QueueLoopNextEvent): def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[Any, None, None]:
if not self._workflow_run_id: """Handle loop next events."""
raise ValueError("workflow run not initialized.") self._ensure_workflow_initialized()
loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_next_resp yield loop_next_resp
elif isinstance(event, QueueLoopCompletedEvent): def _handle_loop_completed_event(self, event: QueueLoopCompletedEvent, **kwargs) -> Generator[Any, None, None]:
if not self._workflow_run_id: """Handle loop completed events."""
raise ValueError("workflow run not initialized.") self._ensure_workflow_initialized()
loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
yield loop_finish_resp yield loop_finish_resp
elif isinstance(event, QueueWorkflowSucceededEvent): def _handle_workflow_succeeded_event(
if not self._workflow_run_id: self,
raise ValueError("workflow run not initialized.") event: QueueWorkflowSucceededEvent,
if not graph_runtime_state: *,
raise ValueError("graph runtime state not initialized.") graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
with Session(db.engine, expire_on_commit=False) as session: **kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow succeeded events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
outputs=event.outputs, outputs=event.outputs,
conversation_id=None, conversation_id=None,
trace_manager=trace_manager, trace_manager=trace_manager,
@ -473,20 +497,26 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
session.commit()
yield workflow_finish_resp yield workflow_finish_resp
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session: def _handle_workflow_partial_success_event(
self,
event: QueueWorkflowPartialSuccessEvent,
*,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow partial success events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
outputs=event.outputs, outputs=event.outputs,
exceptions_count=event.exceptions_count, exceptions_count=event.exceptions_count,
conversation_id=None, conversation_id=None,
@ -501,26 +531,30 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
session.commit()
yield workflow_finish_resp yield workflow_finish_resp
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session: def _handle_workflow_failed_and_stop_events(
self,
event: Union[QueueWorkflowFailedEvent, QueueStopEvent],
*,
graph_runtime_state: Optional[Any] = None,
trace_manager: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle workflow failed and stop events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state)
with self._database_session() as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id, workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens, total_tokens=validated_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps, total_steps=validated_state.node_run_steps,
status=WorkflowExecutionStatus.FAILED status=WorkflowExecutionStatus.FAILED
if isinstance(event, QueueWorkflowFailedEvent) if isinstance(event, QueueWorkflowFailedEvent)
else WorkflowExecutionStatus.STOPPED, else WorkflowExecutionStatus.STOPPED,
error_message=event.error error_message=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
if isinstance(event, QueueWorkflowFailedEvent)
else event.get_stop_reason(),
conversation_id=None, conversation_id=None,
trace_manager=trace_manager, trace_manager=trace_manager,
exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
@ -534,28 +568,179 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
session.commit()
yield workflow_finish_resp yield workflow_finish_resp
elif isinstance(event, QueueTextChunkEvent):
def _handle_text_chunk_event(
self,
event: QueueTextChunkEvent,
*,
tts_publisher: Optional[Any] = None,
queue_message: Optional[Any] = None,
**kwargs,
) -> Generator[Any, None, None]:
"""Handle text chunk events."""
delta_text = event.text delta_text = event.text
if delta_text is None: if delta_text is None:
continue return
# only publish tts message at text chunk streaming # only publish tts message at text chunk streaming
if tts_publisher: if tts_publisher and queue_message:
tts_publisher.publish(queue_message) tts_publisher.publish(queue_message)
yield self._text_chunk_to_stream_response( yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector)
delta_text, from_variable_selector=event.from_variable_selector
) def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[Any, None, None]:
elif isinstance(event, QueueAgentLogEvent): """Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log( yield self._workflow_response_converter.handle_agent_log(
task_id=self._application_generate_entity.task_id, event=event task_id=self._application_generate_entity.task_id, event=event
) )
else:
continue
def _get_event_handlers(self) -> dict[type, Callable]:
"""Get mapping of event types to their handlers using fluent pattern."""
return {
# Basic events
QueuePingEvent: self._handle_ping_event,
QueueErrorEvent: self._handle_error_event,
QueueTextChunkEvent: self._handle_text_chunk_event,
# Workflow events
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,
QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event,
# Node events
QueueNodeRetryEvent: self._handle_node_retry_event,
QueueNodeStartedEvent: self._handle_node_started_event,
QueueNodeSucceededEvent: self._handle_node_succeeded_event,
# Parallel branch events
QueueParallelBranchRunStartedEvent: self._handle_parallel_branch_started_event,
# Iteration events
QueueIterationStartEvent: self._handle_iteration_start_event,
QueueIterationNextEvent: self._handle_iteration_next_event,
QueueIterationCompletedEvent: self._handle_iteration_completed_event,
# Loop events
QueueLoopStartEvent: self._handle_loop_start_event,
QueueLoopNextEvent: self._handle_loop_next_event,
QueueLoopCompletedEvent: self._handle_loop_completed_event,
# Agent events
QueueAgentLogEvent: self._handle_agent_log_event,
}
def _dispatch_event(
self,
event: Any,
*,
graph_runtime_state: Optional[Any] = None,
tts_publisher: Optional[Any] = None,
trace_manager: Optional[Any] = None,
queue_message: Optional[Any] = None,
) -> Generator[Any, None, None]:
"""Dispatch events using elegant pattern matching."""
handlers = self._get_event_handlers()
event_type = type(event)
# Direct handler lookup
if handler := handlers.get(event_type):
yield from handler(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# Handle node failure events with isinstance check
if isinstance(
event,
(
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeInLoopFailedEvent,
QueueNodeExceptionEvent,
),
):
yield from self._handle_node_failed_events(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# Handle parallel branch finished events with isinstance check
if isinstance(event, (QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent)):
yield from self._handle_parallel_branch_finished_events(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# Handle workflow failed and stop events with isinstance check
if isinstance(event, (QueueWorkflowFailedEvent, QueueStopEvent)):
yield from self._handle_workflow_failed_and_stop_events(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
return
# For unhandled events, we continue (original behavior)
return
def _process_stream_response(
self,
tts_publisher: Optional[AppGeneratorTTSPublisher] = None,
trace_manager: Optional[TraceQueueManager] = None,
) -> Generator[StreamResponse, None, None]:
"""
Process stream response using elegant Fluent Python patterns.
Maintains exact same functionality as original 44-if-statement version.
"""
# Initialize graph runtime state
graph_runtime_state = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
event = queue_message.event
# Use elegant pattern matching for event dispatch
match event:
# Handle QueueWorkflowStartedEvent specially - it has a side effect
case QueueWorkflowStartedEvent():
graph_runtime_state = event.graph_runtime_state # Side effect!
yield from self._handle_workflow_started_event(event)
# Handle QueueTextChunkEvent specially - needs queue_message
case QueueTextChunkEvent():
yield from self._handle_text_chunk_event(
event, tts_publisher=tts_publisher, queue_message=queue_message
)
# Handle events that cause loop breaks
case QueueErrorEvent():
yield from self._handle_error_event(event)
break
# Handle all other events through elegant dispatch
case _:
if responses := list(
self._dispatch_event(
event,
graph_runtime_state=graph_runtime_state,
tts_publisher=tts_publisher,
trace_manager=trace_manager,
queue_message=queue_message,
)
):
yield from responses
# Continue with next event (elegant fallthrough)
# Cleanup - publish None when task finished
if tts_publisher: if tts_publisher:
tts_publisher.publish(None) tts_publisher.publish(None)

Loading…
Cancel
Save