diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 25ce924d9b..823247bd50 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -306,7 +306,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not message: raise ValueError(f"Message not found: {self._message_id}") message.workflow_run_id = workflow_execution.id - workflow_start_resp = self._workflow_cycle_manager._workflow_start_to_stream_response( + workflow_start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -323,7 +323,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event ) - node_retry_resp = self._workflow_cycle_manager._workflow_node_retry_to_stream_response( + node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -340,7 +340,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event ) - node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response( + node_start_resp = self._workflow_cycle_manager.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -352,7 +352,7 @@ class AdvancedChatAppGenerateTaskPipeline: # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: self._recorded_files.extend( - self._workflow_cycle_manager._fetch_files_from_node_outputs(event.outputs or {}) + self._workflow_cycle_manager.fetch_files_from_node_outputs(event.outputs or {}) ) with Session(db.engine, expire_on_commit=False) as session: @@ -360,7 +360,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -380,7 +380,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -392,7 +392,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager._workflow_parallel_branch_start_to_stream_response( + parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -404,7 +404,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( + self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -416,7 +416,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager._workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -427,7 +427,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager._workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -438,7 +438,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager._workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -449,7 +449,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -460,7 +460,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager._workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -471,7 +471,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager._workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -495,7 +495,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -521,7 +521,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=None, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -548,7 +548,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, exceptions_count=event.exceptions_count, ) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -573,7 +573,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -657,7 +657,7 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._message_end_to_stream_response() elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager._handle_agent_log( + yield self._workflow_cycle_manager.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 36631e2fe5..07c933b723 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -268,7 +268,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_id=self._workflow_id, ) self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_cycle_manager._workflow_start_to_stream_response( + start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -285,7 +285,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event, ) - response = self._workflow_cycle_manager._workflow_node_retry_to_stream_response( + response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -301,7 +301,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) - node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response( + node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -313,7 +313,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success( event=event ) - node_success_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response( + node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -331,7 +331,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed( event=event, ) - node_failed_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response( + node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -344,7 +344,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager._workflow_parallel_branch_start_to_stream_response( + parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -357,7 +357,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response( + self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -370,7 +370,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager._workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -382,7 +382,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager._workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -394,7 +394,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager._workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -406,7 +406,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -418,7 +418,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager._workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -430,7 +430,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager._workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -457,7 +457,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -485,7 +485,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -518,7 +518,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -540,7 +540,7 @@ class WorkflowAppGenerateTaskPipeline: delta_text, from_variable_selector=event.from_variable_selector ) elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager._handle_agent_log( + yield self._workflow_cycle_manager.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py new file mode 100644 index 0000000000..07c933b723 --- /dev/null +++ b/api/core/workflow/workflow_app_generate_task_pipeline.py @@ -0,0 +1,591 @@ +import logging +import time +from collections.abc import Generator +from typing import Optional, Union + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.entities.app_invoke_entities import ( + InvokeFrom, + WorkflowAppGenerateEntity, +) +from core.app.entities.queue_entities import ( + QueueAgentLogEvent, + QueueErrorEvent, + QueueIterationCompletedEvent, + QueueIterationNextEvent, + QueueIterationStartEvent, + QueueLoopCompletedEvent, + QueueLoopNextEvent, + QueueLoopStartEvent, + QueueNodeExceptionEvent, + QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, + QueueNodeInLoopFailedEvent, + QueueNodeRetryEvent, + QueueNodeStartedEvent, + QueueNodeSucceededEvent, + QueueParallelBranchRunFailedEvent, + QueueParallelBranchRunStartedEvent, + QueueParallelBranchRunSucceededEvent, + QueuePingEvent, + QueueStopEvent, + QueueTextChunkEvent, + QueueWorkflowFailedEvent, + QueueWorkflowPartialSuccessEvent, + QueueWorkflowStartedEvent, + QueueWorkflowSucceededEvent, +) +from core.app.entities.task_entities import ( + ErrorStreamResponse, + MessageAudioEndStreamResponse, + MessageAudioStreamResponse, + StreamResponse, + TextChunkStreamResponse, + WorkflowAppBlockingResponse, + WorkflowAppStreamResponse, + WorkflowFinishStreamResponse, + WorkflowStartStreamResponse, + WorkflowTaskState, +) +from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline +from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk +from core.ops.ops_trace_manager import TraceQueueManager +from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.enums import SystemVariableKey +from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository +from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository +from core.workflow.workflow_cycle_manager import WorkflowCycleManager +from extensions.ext_database import db +from models.account import Account +from models.enums import CreatorUserRole +from models.model import EndUser +from models.workflow import ( + Workflow, + WorkflowAppLog, + WorkflowAppLogCreatedFrom, + WorkflowRun, + WorkflowRunStatus, +) + +logger = logging.getLogger(__name__) + + +class WorkflowAppGenerateTaskPipeline: + """ + WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application. + """ + + def __init__( + self, + application_generate_entity: WorkflowAppGenerateEntity, + workflow: Workflow, + queue_manager: AppQueueManager, + user: Union[Account, EndUser], + stream: bool, + workflow_execution_repository: WorkflowExecutionRepository, + workflow_node_execution_repository: WorkflowNodeExecutionRepository, + ) -> None: + self._base_task_pipeline = BasedGenerateTaskPipeline( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + stream=stream, + ) + + if isinstance(user, EndUser): + self._user_id = user.id + user_session_id = user.session_id + self._created_by_role = CreatorUserRole.END_USER + elif isinstance(user, Account): + self._user_id = user.id + user_session_id = user.id + self._created_by_role = CreatorUserRole.ACCOUNT + else: + raise ValueError(f"Invalid user type: {type(user)}") + + self._workflow_cycle_manager = WorkflowCycleManager( + application_generate_entity=application_generate_entity, + workflow_system_variables={ + SystemVariableKey.FILES: application_generate_entity.files, + SystemVariableKey.USER_ID: user_session_id, + SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id, + SystemVariableKey.WORKFLOW_ID: workflow.id, + SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, + }, + workflow_execution_repository=workflow_execution_repository, + workflow_node_execution_repository=workflow_node_execution_repository, + ) + + self._application_generate_entity = application_generate_entity + self._workflow_id = workflow.id + self._workflow_features_dict = workflow.features_dict + self._task_state = WorkflowTaskState() + self._workflow_run_id = "" + + def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: + """ + Process generate task pipeline. + :return: + """ + generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) + if self._base_task_pipeline._stream: + return self._to_stream_response(generator) + else: + return self._to_blocking_response(generator) + + def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> WorkflowAppBlockingResponse: + """ + To blocking response. + :return: + """ + for stream_response in generator: + if isinstance(stream_response, ErrorStreamResponse): + raise stream_response.err + elif isinstance(stream_response, WorkflowFinishStreamResponse): + response = WorkflowAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + workflow_run_id=stream_response.data.id, + data=WorkflowAppBlockingResponse.Data( + id=stream_response.data.id, + workflow_id=stream_response.data.workflow_id, + status=stream_response.data.status, + outputs=stream_response.data.outputs, + error=stream_response.data.error, + elapsed_time=stream_response.data.elapsed_time, + total_tokens=stream_response.data.total_tokens, + total_steps=stream_response.data.total_steps, + created_at=int(stream_response.data.created_at), + finished_at=int(stream_response.data.finished_at), + ), + ) + + return response + else: + continue + + raise ValueError("queue listening stopped unexpectedly.") + + def _to_stream_response( + self, generator: Generator[StreamResponse, None, None] + ) -> Generator[WorkflowAppStreamResponse, None, None]: + """ + To stream response. + :return: + """ + workflow_run_id = None + for stream_response in generator: + if isinstance(stream_response, WorkflowStartStreamResponse): + workflow_run_id = stream_response.workflow_run_id + + yield WorkflowAppStreamResponse(workflow_run_id=workflow_run_id, stream_response=stream_response) + + def _listen_audio_msg(self, publisher: AppGeneratorTTSPublisher | None, task_id: str): + if not publisher: + return None + audio_msg = publisher.check_and_get_audio() + if audio_msg and isinstance(audio_msg, AudioTrunk) and audio_msg.status != "finish": + return MessageAudioStreamResponse(audio=audio_msg.audio, task_id=task_id) + return None + + def _wrapper_process_stream_response( + self, trace_manager: Optional[TraceQueueManager] = None + ) -> Generator[StreamResponse, None, None]: + tts_publisher = None + task_id = self._application_generate_entity.task_id + tenant_id = self._application_generate_entity.app_config.tenant_id + features_dict = self._workflow_features_dict + + if ( + features_dict.get("text_to_speech") + and features_dict["text_to_speech"].get("enabled") + and features_dict["text_to_speech"].get("autoPlay") == "enabled" + ): + tts_publisher = AppGeneratorTTSPublisher( + tenant_id, features_dict["text_to_speech"].get("voice"), features_dict["text_to_speech"].get("language") + ) + + for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager): + while True: + audio_response = self._listen_audio_msg(publisher=tts_publisher, task_id=task_id) + if audio_response: + yield audio_response + else: + break + yield response + + start_listener_time = time.time() + while (time.time() - start_listener_time) < TTS_AUTO_PLAY_TIMEOUT: + try: + if not tts_publisher: + break + audio_trunk = tts_publisher.check_and_get_audio() + if audio_trunk is None: + # release cpu + # sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file) + time.sleep(TTS_AUTO_PLAY_YIELD_CPU_TIME) + continue + if audio_trunk.status == "finish": + break + else: + yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) + except Exception: + logger.exception(f"Fails to get audio trunk, task_id: {task_id}") + break + if tts_publisher: + yield MessageAudioEndStreamResponse(audio="", task_id=task_id) + + def _process_stream_response( + self, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + trace_manager: Optional[TraceQueueManager] = None, + ) -> Generator[StreamResponse, None, None]: + """ + Process stream response. + :return: + """ + graph_runtime_state = None + + for queue_message in self._base_task_pipeline._queue_manager.listen(): + event = queue_message.event + + if isinstance(event, QueuePingEvent): + yield self._base_task_pipeline._ping_stream_response() + elif isinstance(event, QueueErrorEvent): + err = self._base_task_pipeline._handle_error(event=event) + yield self._base_task_pipeline._error_to_stream_response(err) + break + elif isinstance(event, QueueWorkflowStartedEvent): + # override graph runtime state + graph_runtime_state = event.graph_runtime_state + + with Session(db.engine, expire_on_commit=False) as session: + # init workflow run + workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start( + session=session, + workflow_id=self._workflow_id, + ) + self._workflow_run_id = workflow_execution.id + start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield start_resp + elif isinstance( + event, + QueueNodeRetryEvent, + ): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + with Session(db.engine, expire_on_commit=False) as session: + workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried( + workflow_execution_id=self._workflow_run_id, + event=event, + ) + response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + session.commit() + + if response: + yield response + elif isinstance(event, QueueNodeStartedEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start( + workflow_execution_id=self._workflow_run_id, event=event + ) + node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_start_response: + yield node_start_response + elif isinstance(event, QueueNodeSucceededEvent): + workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success( + event=event + ) + node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_success_response: + yield node_success_response + elif isinstance( + event, + QueueNodeFailedEvent + | QueueNodeInIterationFailedEvent + | QueueNodeInLoopFailedEvent + | QueueNodeExceptionEvent, + ): + workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed( + event=event, + ) + node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_failed_response: + yield node_failed_response + + elif isinstance(event, QueueParallelBranchRunStartedEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield parallel_start_resp + + elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + parallel_finish_resp = ( + self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + ) + + yield parallel_finish_resp + + elif isinstance(event, QueueIterationStartEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield iter_start_resp + + elif isinstance(event, QueueIterationNextEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield iter_next_resp + + elif isinstance(event, QueueIterationCompletedEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield iter_finish_resp + + elif isinstance(event, QueueLoopStartEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield loop_start_resp + + elif isinstance(event, QueueLoopNextEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield loop_next_resp + + elif isinstance(event, QueueLoopCompletedEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + + yield loop_finish_resp + + elif isinstance(event, QueueWorkflowSucceededEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + if not graph_runtime_state: + raise ValueError("graph runtime state not initialized.") + + with Session(db.engine, expire_on_commit=False) as session: + workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success( + workflow_run_id=self._workflow_run_id, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + outputs=event.outputs, + conversation_id=None, + trace_manager=trace_manager, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + session.commit() + + yield workflow_finish_resp + elif isinstance(event, QueueWorkflowPartialSuccessEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + if not graph_runtime_state: + raise ValueError("graph runtime state not initialized.") + + with Session(db.engine, expire_on_commit=False) as session: + workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success( + workflow_run_id=self._workflow_run_id, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + outputs=event.outputs, + exceptions_count=event.exceptions_count, + conversation_id=None, + trace_manager=trace_manager, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + session.commit() + + yield workflow_finish_resp + elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + if not graph_runtime_state: + raise ValueError("graph runtime state not initialized.") + + with Session(db.engine, expire_on_commit=False) as session: + workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed( + workflow_run_id=self._workflow_run_id, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + status=WorkflowRunStatus.FAILED + if isinstance(event, QueueWorkflowFailedEvent) + else WorkflowRunStatus.STOPPED, + error_message=event.error + if isinstance(event, QueueWorkflowFailedEvent) + else event.get_stop_reason(), + conversation_id=None, + trace_manager=trace_manager, + exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + session.commit() + + yield workflow_finish_resp + elif isinstance(event, QueueTextChunkEvent): + delta_text = event.text + if delta_text is None: + continue + + # only publish tts message at text chunk streaming + if tts_publisher: + tts_publisher.publish(queue_message) + + self._task_state.answer += delta_text + yield self._text_chunk_to_stream_response( + delta_text, from_variable_selector=event.from_variable_selector + ) + elif isinstance(event, QueueAgentLogEvent): + yield self._workflow_cycle_manager.handle_agent_log( + task_id=self._application_generate_entity.task_id, event=event + ) + else: + continue + + if tts_publisher: + tts_publisher.publish(None) + + def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + assert workflow_run is not None + invoke_from = self._application_generate_entity.invoke_from + if invoke_from == InvokeFrom.SERVICE_API: + created_from = WorkflowAppLogCreatedFrom.SERVICE_API + elif invoke_from == InvokeFrom.EXPLORE: + created_from = WorkflowAppLogCreatedFrom.INSTALLED_APP + elif invoke_from == InvokeFrom.WEB_APP: + created_from = WorkflowAppLogCreatedFrom.WEB_APP + else: + # not save log for debugging + return + + workflow_app_log = WorkflowAppLog() + workflow_app_log.tenant_id = workflow_run.tenant_id + workflow_app_log.app_id = workflow_run.app_id + workflow_app_log.workflow_id = workflow_run.workflow_id + workflow_app_log.workflow_run_id = workflow_run.id + workflow_app_log.created_from = created_from.value + workflow_app_log.created_by_role = self._created_by_role + workflow_app_log.created_by = self._user_id + + session.add(workflow_app_log) + session.commit() + + def _text_chunk_to_stream_response( + self, text: str, from_variable_selector: Optional[list[str]] = None + ) -> TextChunkStreamResponse: + """ + Handle completed event. + :param text: text + :return: + """ + response = TextChunkStreamResponse( + task_id=self._application_generate_entity.task_id, + data=TextChunkStreamResponse.Data(text=text, from_variable_selector=from_variable_selector), + ) + + return response diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 9006c5495e..db724b2ef6 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -416,7 +416,7 @@ class WorkflowCycleManager: return domain_execution - def _workflow_start_to_stream_response( + def workflow_start_to_stream_response( self, *, task_id: str, @@ -434,7 +434,7 @@ class WorkflowCycleManager: ), ) - def _workflow_finish_to_stream_response( + def workflow_finish_to_stream_response( self, *, session: Session, @@ -487,12 +487,12 @@ class WorkflowCycleManager: created_by=created_by, created_at=int(workflow_execution.started_at.timestamp()), finished_at=finished_at_timestamp, - files=self._fetch_files_from_node_outputs(workflow_execution.outputs), + files=self.fetch_files_from_node_outputs(workflow_execution.outputs), exceptions_count=workflow_execution.exceptions_count, ), ) - def _workflow_node_start_to_stream_response( + def workflow_node_start_to_stream_response( self, *, event: QueueNodeStartedEvent, @@ -538,7 +538,7 @@ class WorkflowCycleManager: return response - def _workflow_node_finish_to_stream_response( + def workflow_node_finish_to_stream_response( self, *, event: QueueNodeSucceededEvent @@ -575,7 +575,7 @@ class WorkflowCycleManager: execution_metadata=workflow_node_execution.metadata, created_at=int(workflow_node_execution.created_at.timestamp()), finished_at=int(workflow_node_execution.finished_at.timestamp()), - files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), + files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, parent_parallel_id=event.parent_parallel_id, @@ -585,7 +585,7 @@ class WorkflowCycleManager: ), ) - def _workflow_node_retry_to_stream_response( + def workflow_node_retry_to_stream_response( self, *, event: QueueNodeRetryEvent, @@ -618,7 +618,7 @@ class WorkflowCycleManager: execution_metadata=workflow_node_execution.metadata, created_at=int(workflow_node_execution.created_at.timestamp()), finished_at=int(workflow_node_execution.finished_at.timestamp()), - files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), + files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, parent_parallel_id=event.parent_parallel_id, @@ -629,7 +629,7 @@ class WorkflowCycleManager: ), ) - def _workflow_parallel_branch_start_to_stream_response( + def workflow_parallel_branch_start_to_stream_response( self, *, task_id: str, @@ -650,7 +650,7 @@ class WorkflowCycleManager: ), ) - def _workflow_parallel_branch_finished_to_stream_response( + def workflow_parallel_branch_finished_to_stream_response( self, *, task_id: str, @@ -673,7 +673,7 @@ class WorkflowCycleManager: ), ) - def _workflow_iteration_start_to_stream_response( + def workflow_iteration_start_to_stream_response( self, *, task_id: str, @@ -697,7 +697,7 @@ class WorkflowCycleManager: ), ) - def _workflow_iteration_next_to_stream_response( + def workflow_iteration_next_to_stream_response( self, *, task_id: str, @@ -723,7 +723,7 @@ class WorkflowCycleManager: ), ) - def _workflow_iteration_completed_to_stream_response( + def workflow_iteration_completed_to_stream_response( self, *, task_id: str, @@ -756,7 +756,7 @@ class WorkflowCycleManager: ), ) - def _workflow_loop_start_to_stream_response( + def workflow_loop_start_to_stream_response( self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent ) -> LoopNodeStartStreamResponse: return LoopNodeStartStreamResponse( @@ -776,7 +776,7 @@ class WorkflowCycleManager: ), ) - def _workflow_loop_next_to_stream_response( + def workflow_loop_next_to_stream_response( self, *, task_id: str, @@ -802,7 +802,7 @@ class WorkflowCycleManager: ), ) - def _workflow_loop_completed_to_stream_response( + def workflow_loop_completed_to_stream_response( self, *, task_id: str, @@ -835,7 +835,7 @@ class WorkflowCycleManager: ), ) - def _fetch_files_from_node_outputs(self, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]: + def fetch_files_from_node_outputs(self, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]: """ Fetch files from node outputs :param outputs_dict: node outputs dict @@ -898,7 +898,7 @@ class WorkflowCycleManager: raise WorkflowRunNotFoundError(id) return execution - def _handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse: + def handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse: """ Handle agent log :param task_id: task id