feat: workflow/chatflow api support async

pull/18838/head
hobo.l 1 year ago
parent 01943bca04
commit 28a37fc52e

@ -100,8 +100,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
query = query.replace("\x00", "") query = query.replace("\x00", "")
inputs = args["inputs"] inputs = args["inputs"]
extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False), extras = {
"is_async": args.get("is_async", False)} "auto_generate_conversation_name": args.get("auto_generate_name", False),
"is_async": args.get("is_async", False),
}
# get conversation # get conversation
conversation = None conversation = None

@ -296,12 +296,15 @@ class AdvancedChatAppGenerateTaskPipeline:
app_mode=self._conversation_mode, app_mode=self._conversation_mode,
message_id=self._message_id, message_id=self._message_id,
) )
worker_thread = threading.Thread(target=self._generate_worker, kwargs={ worker_thread = threading.Thread(
'flask_app': current_app._get_current_object(), target=self._generate_worker,
'queue_manager': queue_manager, kwargs={
'context': contextvars.copy_context(), "flask_app": current_app._get_current_object(),
'publisher': publisher "queue_manager": queue_manager,
}) "context": contextvars.copy_context(),
"publisher": publisher,
},
)
worker_thread.start() worker_thread.start()
@ -313,9 +316,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if isinstance(event, ForwardQueueMessage): if isinstance(event, ForwardQueueMessage):
yield event.response yield event.response
def _generate_worker(self, flask_app: Flask, def _generate_worker(
queue_manager: AppQueueManager, self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher
context: contextvars.Context, publisher) -> None: ) -> None:
""" """
Generate worker in a new thread. Generate worker in a new thread.
:param flask_app: Flask app :param flask_app: Flask app
@ -335,10 +338,9 @@ class AdvancedChatAppGenerateTaskPipeline:
queue_manager.publish(message, PublishFrom.TASK_PIPELINE) queue_manager.publish(message, PublishFrom.TASK_PIPELINE)
def _sync_process_stream_response(self, publisher): def _sync_process_stream_response(self, publisher):
for response in self._process_stream_response(
for response in self._process_stream_response(publisher, publisher, trace_manager=self._application_generate_entity.trace_manager
trace_manager=self._application_generate_entity.trace_manager): ):
while True: while True:
audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id) audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id)
if audio_response: if audio_response:

@ -148,7 +148,7 @@ class WorkflowAppGenerateTaskPipeline:
data=WorkflowAppBlockingResponse.Data( data=WorkflowAppBlockingResponse.Data(
id=self._application_generate_entity.app_config.app_id, id=self._application_generate_entity.app_config.app_id,
workflow_id=self._workflow_id, workflow_id=self._workflow_id,
status='processing', status="processing",
created_at=int(time.time()), created_at=int(time.time()),
), ),
) )
@ -258,9 +258,9 @@ class WorkflowAppGenerateTaskPipeline:
if isinstance(event, ForwardQueueMessage): if isinstance(event, ForwardQueueMessage):
yield event.response yield event.response
def _generate_worker(self, flask_app: Flask, def _generate_worker(
queue_manager: AppQueueManager, self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher
context: contextvars.Context, publisher) -> None: ) -> None:
""" """
Generate worker in a new thread. Generate worker in a new thread.
:param flask_app: Flask app :param flask_app: Flask app
@ -288,20 +288,24 @@ class WorkflowAppGenerateTaskPipeline:
app_mode=self._application_generate_entity.app_config.app_mode, app_mode=self._application_generate_entity.app_config.app_mode,
) )
worker_thread = threading.Thread(target=self._generate_worker, kwargs={ worker_thread = threading.Thread(
'flask_app': current_app._get_current_object(), target=self._generate_worker,
'queue_manager': queue_manager, kwargs={
'context': contextvars.copy_context(), "flask_app": current_app._get_current_object(),
'publisher': publisher "queue_manager": queue_manager,
}) "context": contextvars.copy_context(),
"publisher": publisher,
},
)
worker_thread.start() worker_thread.start()
yield from self._consumer_worker(queue_manager) yield from self._consumer_worker(queue_manager)
def _sync_process_stream_response(self, publisher): def _sync_process_stream_response(self, publisher):
for response in self._process_stream_response(publisher, for response in self._process_stream_response(
trace_manager=self._application_generate_entity.trace_manager): publisher, trace_manager=self._application_generate_entity.trace_manager
):
while True: while True:
audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id) audio_response = self._listen_audio_msg(publisher, task_id=self._application_generate_entity.task_id)
if audio_response: if audio_response:

@ -28,7 +28,6 @@ workflow_queue_task_map = {
StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK, StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK,
StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE, StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE,
StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG, StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG,
} }
advance_chat_queue_task_map = { advance_chat_queue_task_map = {
@ -65,5 +64,6 @@ class ForwardQueueMessage(AppQueueEvent):
""" """
ForwardQueueMessage entity ForwardQueueMessage entity
""" """
event: QueueEvent = QueueEvent.PING event: QueueEvent = QueueEvent.PING
response: StreamResponse response: StreamResponse

Loading…
Cancel
Save