diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 09371b5c04..4f34464759 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import ( InvokeFrom, ) from core.app.entities.queue_entities import ( + MessageQueueMessage, QueueAdvancedChatMessageEndEvent, QueueAgentLogEvent, QueueAnnotationReplyEvent, @@ -45,6 +46,7 @@ from core.app.entities.queue_entities import ( QueueWorkflowPartialSuccessEvent, QueueWorkflowStartedEvent, QueueWorkflowSucceededEvent, + WorkflowQueueMessage, ) from core.app.entities.task_entities import ( ChatbotAppBlockingResponse, @@ -53,6 +55,7 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, MessageEndStreamResponse, + PingStreamResponse, StreamResponse, WorkflowTaskState, ) @@ -293,25 +296,25 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[Any]) -> Any: + def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[GraphRuntimeState]) -> GraphRuntimeState: """Fluent validation for graph runtime state.""" if not graph_runtime_state: raise ValueError("graph runtime state not initialized.") return graph_runtime_state - def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[PingStreamResponse, None, None]: """Handle ping events.""" yield self._base_task_pipeline._ping_stream_response() - def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[ErrorStreamResponse, None, None]: """Handle error events.""" with self._database_session() as session: err = self._base_task_pipeline._handle_error(event=event, session=session, message_id=self._message_id) yield self._base_task_pipeline._error_to_stream_response(err) def _handle_workflow_started_event( - self, event: QueueWorkflowStartedEvent, *, graph_runtime_state: Optional[Any] = None, **kwargs - ) -> Generator[Any, None, None]: + self, event: QueueWorkflowStartedEvent, *, graph_runtime_state: Optional[GraphRuntimeState] = None, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle workflow started events.""" # Override graph runtime state - this is a side effect but necessary graph_runtime_state = event.graph_runtime_state @@ -332,7 +335,7 @@ class AdvancedChatAppGenerateTaskPipeline: yield workflow_start_resp - def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle node retry events.""" self._ensure_workflow_initialized() @@ -349,7 +352,9 @@ class AdvancedChatAppGenerateTaskPipeline: if node_retry_resp: yield node_retry_resp - def _handle_node_started_event(self, event: QueueNodeStartedEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_started_event( + self, event: QueueNodeStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle node started events.""" self._ensure_workflow_initialized() @@ -366,7 +371,9 @@ class AdvancedChatAppGenerateTaskPipeline: if node_start_resp: yield node_start_resp - def _handle_node_succeeded_event(self, event: QueueNodeSucceededEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_succeeded_event( + self, event: QueueNodeSucceededEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle node succeeded events.""" # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: @@ -393,7 +400,7 @@ class AdvancedChatAppGenerateTaskPipeline: QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent ], **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle various node failure events.""" workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(event=event) @@ -413,10 +420,10 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: QueueTextChunkEvent, *, - tts_publisher: Optional[Any] = None, - queue_message: Optional[Any] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle text chunk events.""" delta_text = event.text if delta_text is None: @@ -438,7 +445,7 @@ class AdvancedChatAppGenerateTaskPipeline: def _handle_parallel_branch_started_event( self, event: QueueParallelBranchRunStartedEvent, **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle parallel branch started events.""" self._ensure_workflow_initialized() @@ -451,7 +458,7 @@ class AdvancedChatAppGenerateTaskPipeline: def _handle_parallel_branch_finished_events( self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle parallel branch finished events.""" self._ensure_workflow_initialized() @@ -462,7 +469,9 @@ class AdvancedChatAppGenerateTaskPipeline: ) yield parallel_finish_resp - def _handle_iteration_start_event(self, event: QueueIterationStartEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_iteration_start_event( + self, event: QueueIterationStartEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle iteration start events.""" self._ensure_workflow_initialized() @@ -473,7 +482,9 @@ class AdvancedChatAppGenerateTaskPipeline: ) yield iter_start_resp - def _handle_iteration_next_event(self, event: QueueIterationNextEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_iteration_next_event( + self, event: QueueIterationNextEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle iteration next events.""" self._ensure_workflow_initialized() @@ -486,7 +497,7 @@ class AdvancedChatAppGenerateTaskPipeline: def _handle_iteration_completed_event( self, event: QueueIterationCompletedEvent, **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle iteration completed events.""" self._ensure_workflow_initialized() @@ -497,7 +508,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) yield iter_finish_resp - def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle loop start events.""" self._ensure_workflow_initialized() @@ -508,7 +519,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) yield loop_start_resp - def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle loop next events.""" self._ensure_workflow_initialized() @@ -519,7 +530,9 @@ class AdvancedChatAppGenerateTaskPipeline: ) yield loop_next_resp - def _handle_loop_completed_event(self, event: QueueLoopCompletedEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_completed_event( + self, event: QueueLoopCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle loop completed events.""" self._ensure_workflow_initialized() @@ -534,10 +547,10 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: QueueWorkflowSucceededEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow succeeded events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -564,10 +577,10 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: QueueWorkflowPartialSuccessEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow partial success events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -595,10 +608,10 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: QueueWorkflowFailedEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow failed events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -629,10 +642,10 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: QueueStopEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle stop events.""" if self._workflow_run_id and graph_runtime_state: with self._database_session() as session: @@ -666,8 +679,12 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._message_end_to_stream_response() def _handle_advanced_chat_message_end_event( - self, event: QueueAdvancedChatMessageEndEvent, *, graph_runtime_state: Optional[Any] = None, **kwargs - ) -> Generator[Any, None, None]: + self, + event: QueueAdvancedChatMessageEndEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: """Handle advanced chat message end events.""" self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -689,7 +706,7 @@ class AdvancedChatAppGenerateTaskPipeline: def _handle_retriever_resources_event( self, event: QueueRetrieverResourcesEvent, **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle retriever resources events.""" self._message_cycle_manager.handle_retriever_resources(event) @@ -699,7 +716,9 @@ class AdvancedChatAppGenerateTaskPipeline: return yield # Make this a generator - def _handle_annotation_reply_event(self, event: QueueAnnotationReplyEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_annotation_reply_event( + self, event: QueueAnnotationReplyEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle annotation reply events.""" self._message_cycle_manager.handle_annotation_reply(event) @@ -709,11 +728,13 @@ class AdvancedChatAppGenerateTaskPipeline: return yield # Make this a generator - def _handle_message_replace_event(self, event: QueueMessageReplaceEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_message_replace_event( + self, event: QueueMessageReplaceEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle message replace events.""" yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text, reason=event.reason) - def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle agent log events.""" yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event @@ -759,11 +780,11 @@ class AdvancedChatAppGenerateTaskPipeline: self, event: Any, *, - graph_runtime_state: Optional[Any] = None, - tts_publisher: Optional[Any] = None, - trace_manager: Optional[Any] = None, - queue_message: Optional[Any] = None, - ) -> Generator[Any, None, None]: + graph_runtime_state: Optional[GraphRuntimeState] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + trace_manager: Optional[TraceQueueManager] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + ) -> Generator[StreamResponse, None, None]: """Dispatch events using elegant pattern matching.""" handlers = self._get_event_handlers() event_type = type(event) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 1f38410553..cd4ece08e7 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -14,6 +14,7 @@ from core.app.entities.app_invoke_entities import ( WorkflowAppGenerateEntity, ) from core.app.entities.queue_entities import ( + MessageQueueMessage, QueueAgentLogEvent, QueueErrorEvent, QueueIterationCompletedEvent, @@ -39,11 +40,13 @@ from core.app.entities.queue_entities import ( QueueWorkflowPartialSuccessEvent, QueueWorkflowStartedEvent, QueueWorkflowSucceededEvent, + WorkflowQueueMessage, ) from core.app.entities.task_entities import ( ErrorStreamResponse, MessageAudioEndStreamResponse, MessageAudioStreamResponse, + PingStreamResponse, StreamResponse, TextChunkStreamResponse, WorkflowAppBlockingResponse, @@ -55,6 +58,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository @@ -263,22 +267,24 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[Any]) -> Any: + def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[GraphRuntimeState]) -> GraphRuntimeState: """Fluent validation for graph runtime state.""" if not graph_runtime_state: raise ValueError("graph runtime state not initialized.") return graph_runtime_state - def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[PingStreamResponse, None, None]: """Handle ping events.""" yield self._base_task_pipeline._ping_stream_response() - def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[ErrorStreamResponse, None, None]: """Handle error events.""" err = self._base_task_pipeline._handle_error(event=event) yield self._base_task_pipeline._error_to_stream_response(err) - def _handle_workflow_started_event(self, event: QueueWorkflowStartedEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_workflow_started_event( + self, event: QueueWorkflowStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle workflow started events.""" # init workflow run workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() @@ -289,7 +295,7 @@ class WorkflowAppGenerateTaskPipeline: ) yield start_resp - def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle node retry events.""" self._ensure_workflow_initialized() @@ -307,7 +313,9 @@ class WorkflowAppGenerateTaskPipeline: if response: yield response - def _handle_node_started_event(self, event: QueueNodeStartedEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_started_event( + self, event: QueueNodeStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle node started events.""" self._ensure_workflow_initialized() @@ -323,7 +331,9 @@ class WorkflowAppGenerateTaskPipeline: if node_start_response: yield node_start_response - def _handle_node_succeeded_event(self, event: QueueNodeSucceededEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_node_succeeded_event( + self, event: QueueNodeSucceededEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle node succeeded events.""" workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(event=event) node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( @@ -343,7 +353,7 @@ class WorkflowAppGenerateTaskPipeline: QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent ], **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle various node failure events.""" workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, @@ -362,7 +372,7 @@ class WorkflowAppGenerateTaskPipeline: def _handle_parallel_branch_started_event( self, event: QueueParallelBranchRunStartedEvent, **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle parallel branch started events.""" self._ensure_workflow_initialized() @@ -375,7 +385,7 @@ class WorkflowAppGenerateTaskPipeline: def _handle_parallel_branch_finished_events( self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle parallel branch finished events.""" self._ensure_workflow_initialized() @@ -386,7 +396,9 @@ class WorkflowAppGenerateTaskPipeline: ) yield parallel_finish_resp - def _handle_iteration_start_event(self, event: QueueIterationStartEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_iteration_start_event( + self, event: QueueIterationStartEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle iteration start events.""" self._ensure_workflow_initialized() @@ -397,7 +409,9 @@ class WorkflowAppGenerateTaskPipeline: ) yield iter_start_resp - def _handle_iteration_next_event(self, event: QueueIterationNextEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_iteration_next_event( + self, event: QueueIterationNextEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle iteration next events.""" self._ensure_workflow_initialized() @@ -410,7 +424,7 @@ class WorkflowAppGenerateTaskPipeline: def _handle_iteration_completed_event( self, event: QueueIterationCompletedEvent, **kwargs - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle iteration completed events.""" self._ensure_workflow_initialized() @@ -421,7 +435,7 @@ class WorkflowAppGenerateTaskPipeline: ) yield iter_finish_resp - def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle loop start events.""" self._ensure_workflow_initialized() @@ -432,7 +446,7 @@ class WorkflowAppGenerateTaskPipeline: ) yield loop_start_resp - def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle loop next events.""" self._ensure_workflow_initialized() @@ -443,7 +457,9 @@ class WorkflowAppGenerateTaskPipeline: ) yield loop_next_resp - def _handle_loop_completed_event(self, event: QueueLoopCompletedEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_loop_completed_event( + self, event: QueueLoopCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: """Handle loop completed events.""" self._ensure_workflow_initialized() @@ -458,10 +474,10 @@ class WorkflowAppGenerateTaskPipeline: self, event: QueueWorkflowSucceededEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow succeeded events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -491,10 +507,10 @@ class WorkflowAppGenerateTaskPipeline: self, event: QueueWorkflowPartialSuccessEvent, *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow partial success events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -525,10 +541,10 @@ class WorkflowAppGenerateTaskPipeline: self, event: Union[QueueWorkflowFailedEvent, QueueStopEvent], *, - graph_runtime_state: Optional[Any] = None, - trace_manager: Optional[Any] = None, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle workflow failed and stop events.""" self._ensure_workflow_initialized() validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) @@ -562,10 +578,10 @@ class WorkflowAppGenerateTaskPipeline: self, event: QueueTextChunkEvent, *, - tts_publisher: Optional[Any] = None, - queue_message: Optional[Any] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, **kwargs, - ) -> Generator[Any, None, None]: + ) -> Generator[StreamResponse, None, None]: """Handle text chunk events.""" delta_text = event.text if delta_text is None: @@ -577,7 +593,7 @@ class WorkflowAppGenerateTaskPipeline: yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector) - def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[Any, None, None]: + def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle agent log events.""" yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event @@ -616,11 +632,11 @@ class WorkflowAppGenerateTaskPipeline: self, event: Any, *, - graph_runtime_state: Optional[Any] = None, - tts_publisher: Optional[Any] = None, - trace_manager: Optional[Any] = None, - queue_message: Optional[Any] = None, - ) -> Generator[Any, None, None]: + graph_runtime_state: Optional[GraphRuntimeState] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + trace_manager: Optional[TraceQueueManager] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + ) -> Generator[StreamResponse, None, None]: """Dispatch events using elegant pattern matching.""" handlers = self._get_event_handlers() event_type = type(event)