update sytle

pull/20921/head
GuanMu 11 months ago
parent 81ff539253
commit 314013aa33

@ -164,8 +164,7 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=self._conversation_id, query=self._application_generate_entity.query conversation_id=self._conversation_id, query=self._application_generate_entity.query
) )
generator = self._wrapper_process_stream_response( generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager)
trace_manager=self._application_generate_entity.trace_manager)
if self._base_task_pipeline._stream: if self._base_task_pipeline._stream:
return self._to_stream_response(generator) return self._to_stream_response(generator)
else: else:
@ -186,7 +185,7 @@ class AdvancedChatAppGenerateTaskPipeline:
# Retrieve outputs from task state metadata, which is populated earlier # Retrieve outputs from task state metadata, which is populated earlier
final_outputs = {} final_outputs = {}
if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs'): if self._task_state.metadata and hasattr(self._task_state.metadata, "outputs"):
final_outputs = self._task_state.metadata.outputs final_outputs = self._task_state.metadata.outputs
return ChatbotAppBlockingResponse( return ChatbotAppBlockingResponse(
@ -244,14 +243,12 @@ class AdvancedChatAppGenerateTaskPipeline:
and features_dict["text_to_speech"].get("autoPlay") == "enabled" and features_dict["text_to_speech"].get("autoPlay") == "enabled"
): ):
tts_publisher = AppGeneratorTTSPublisher( tts_publisher = AppGeneratorTTSPublisher(
tenant_id, features_dict["text_to_speech"].get( tenant_id, features_dict["text_to_speech"].get("voice"), features_dict["text_to_speech"].get("language")
"voice"), features_dict["text_to_speech"].get("language")
) )
for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager): for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager):
while True: while True:
audio_response = self._listen_audio_msg( audio_response = self._listen_audio_msg(publisher=tts_publisher, task_id=task_id)
publisher=tts_publisher, task_id=task_id)
if audio_response: if audio_response:
yield audio_response yield audio_response
else: else:
@ -276,8 +273,7 @@ class AdvancedChatAppGenerateTaskPipeline:
start_listener_time = time.time() start_listener_time = time.time()
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception: except Exception:
logger.exception( logger.exception(f"Failed to listen audio message, task_id: {task_id}")
f"Failed to listen audio message, task_id: {task_id}")
break break
if tts_publisher: if tts_publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id) yield MessageAudioEndStreamResponse(audio="", task_id=task_id)
@ -317,8 +313,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._workflow_run_id = workflow_execution.id_ self._workflow_run_id = workflow_execution.id_
message = self._get_message(session=session) message = self._get_message(session=session)
if not message: if not message:
raise ValueError( raise ValueError(f"Message not found: {self._message_id}")
f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_execution.id_ message.workflow_run_id = workflow_execution.id_
workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
@ -367,8 +362,7 @@ class AdvancedChatAppGenerateTaskPipeline:
# Record files if it's an answer node or end node # Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]: if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend( self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs( self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
event.outputs or {})
) )
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
@ -516,10 +510,8 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
workflow_outputs_data = workflow_finish_resp.data.outputs.get( workflow_outputs_data = workflow_finish_resp.data.outputs.get("outputs", {})
'outputs', {}) self._task_state.metadata.outputs = workflow_outputs_data.get("outputs")
self._task_state.metadata.outputs = workflow_outputs_data.get(
'outputs')
yield workflow_finish_resp yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
@ -572,8 +564,7 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
err_event = QueueErrorEvent(error=ValueError( err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}"))
f"Run failed: {workflow_execution.error_message}"))
err = self._base_task_pipeline._handle_error( err = self._base_task_pipeline._handle_error(
event=err_event, session=session, message_id=self._message_id event=err_event, session=session, message_id=self._message_id
) )
@ -599,8 +590,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
# Save message # Save message
self._save_message( self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session=session, graph_runtime_state=graph_runtime_state)
session.commit() session.commit()
yield workflow_finish_resp yield workflow_finish_resp
@ -636,8 +626,7 @@ class AdvancedChatAppGenerateTaskPipeline:
continue continue
# handle output moderation chunk # handle output moderation chunk
should_direct_answer = self._handle_output_moderation_chunk( should_direct_answer = self._handle_output_moderation_chunk(delta_text)
delta_text)
if should_direct_answer: if should_direct_answer:
continue continue
@ -669,8 +658,7 @@ class AdvancedChatAppGenerateTaskPipeline:
) )
# Save message # Save message
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
self._save_message( self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session=session, graph_runtime_state=graph_runtime_state)
session.commit() session.commit()
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
@ -691,8 +679,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None: def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None:
message = self._get_message(session=session) message = self._get_message(session=session)
message.answer = self._task_state.answer message.answer = self._task_state.answer
message.provider_response_latency = time.perf_counter() - \ message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at
self._base_task_pipeline._start_at
message.message_metadata = self._task_state.metadata.model_dump_json() message.message_metadata = self._task_state.metadata.model_dump_json()
message_files = [ message_files = [
MessageFile( MessageFile(
@ -757,18 +744,15 @@ class AdvancedChatAppGenerateTaskPipeline:
# stop subscribe new token when output moderation should direct output # stop subscribe new token when output moderation should direct output
self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output() self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output()
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(
QueueTextChunkEvent( QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE
text=self._task_state.answer), PublishFrom.TASK_PIPELINE
) )
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(
QueueStopEvent( QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
) )
return True return True
else: else:
self._base_task_pipeline._output_moderation_handler.append_new_token( self._base_task_pipeline._output_moderation_handler.append_new_token(text)
text)
return False return False

Loading…
Cancel
Save