From 28a37fc52e55cd94e5dc61c575227dff07ba0ef0 Mon Sep 17 00:00:00 2001 From: "hobo.l" Date: Sat, 26 Apr 2025 20:31:05 +0300 Subject: [PATCH] feat: workflow/chatflow api support async --- .../app/apps/advanced_chat/app_generator.py | 6 ++-- .../advanced_chat/generate_task_pipeline.py | 28 ++++++++++--------- .../apps/workflow/generate_task_pipeline.py | 28 +++++++++++-------- api/core/app/entities/queue_task_bridge.py | 2 +- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 1b7998def1..780bcfe2bc 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -100,8 +100,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): query = query.replace("\x00", "") inputs = args["inputs"] - extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False), - "is_async": args.get("is_async", False)} + extras = { + "auto_generate_conversation_name": args.get("auto_generate_name", False), + "is_async": args.get("is_async", False), + } # get conversation conversation = None diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 2c35167bcd..a92c9a4f26 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -296,12 +296,15 @@ class AdvancedChatAppGenerateTaskPipeline: app_mode=self._conversation_mode, message_id=self._message_id, ) - worker_thread = threading.Thread(target=self._generate_worker, kwargs={ - 'flask_app': current_app._get_current_object(), - 'queue_manager': queue_manager, - 'context': contextvars.copy_context(), - 'publisher': publisher - }) + worker_thread = threading.Thread( + target=self._generate_worker, + kwargs={ + "flask_app": current_app._get_current_object(), + "queue_manager": queue_manager, + "context": contextvars.copy_context(), + "publisher": publisher, + }, + ) worker_thread.start() @@ -313,9 +316,9 @@ class AdvancedChatAppGenerateTaskPipeline: if isinstance(event, ForwardQueueMessage): yield event.response - def _generate_worker(self, flask_app: Flask, - queue_manager: AppQueueManager, - context: contextvars.Context, publisher) -> None: + def _generate_worker( + self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher + ) -> None: """ Generate worker in a new thread. :param flask_app: Flask app @@ -335,10 +338,9 @@ class AdvancedChatAppGenerateTaskPipeline: queue_manager.publish(message, PublishFrom.TASK_PIPELINE) def _sync_process_stream_response(self, publisher): - - for response in self._process_stream_response(publisher, - trace_manager=self._application_generate_entity.trace_manager): - + for response in self._process_stream_response( + publisher, trace_manager=self._application_generate_entity.trace_manager + ): while True: audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id) if audio_response: diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index cdea32135f..77779b554a 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -148,7 +148,7 @@ class WorkflowAppGenerateTaskPipeline: data=WorkflowAppBlockingResponse.Data( id=self._application_generate_entity.app_config.app_id, workflow_id=self._workflow_id, - status='processing', + status="processing", created_at=int(time.time()), ), ) @@ -258,9 +258,9 @@ class WorkflowAppGenerateTaskPipeline: if isinstance(event, ForwardQueueMessage): yield event.response - def _generate_worker(self, flask_app: Flask, - queue_manager: AppQueueManager, - context: contextvars.Context, publisher) -> None: + def _generate_worker( + self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher + ) -> None: """ Generate worker in a new thread. :param flask_app: Flask app @@ -288,20 +288,24 @@ class WorkflowAppGenerateTaskPipeline: app_mode=self._application_generate_entity.app_config.app_mode, ) - worker_thread = threading.Thread(target=self._generate_worker, kwargs={ - 'flask_app': current_app._get_current_object(), - 'queue_manager': queue_manager, - 'context': contextvars.copy_context(), - 'publisher': publisher - }) + worker_thread = threading.Thread( + target=self._generate_worker, + kwargs={ + "flask_app": current_app._get_current_object(), + "queue_manager": queue_manager, + "context": contextvars.copy_context(), + "publisher": publisher, + }, + ) worker_thread.start() yield from self._consumer_worker(queue_manager) def _sync_process_stream_response(self, publisher): - for response in self._process_stream_response(publisher, - trace_manager=self._application_generate_entity.trace_manager): + for response in self._process_stream_response( + publisher, trace_manager=self._application_generate_entity.trace_manager + ): while True: audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id) if audio_response: diff --git a/api/core/app/entities/queue_task_bridge.py b/api/core/app/entities/queue_task_bridge.py index 5496b55c26..73e12192a4 100644 --- a/api/core/app/entities/queue_task_bridge.py +++ b/api/core/app/entities/queue_task_bridge.py @@ -28,7 +28,6 @@ workflow_queue_task_map = { StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK, StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE, StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG, - } advance_chat_queue_task_map = { @@ -65,5 +64,6 @@ class ForwardQueueMessage(AppQueueEvent): """ ForwardQueueMessage entity """ + event: QueueEvent = QueueEvent.PING response: StreamResponse