|
|
|
|
@ -65,6 +65,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
"""
|
|
|
|
|
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
_task_state: WorkflowTaskState
|
|
|
|
|
_application_generate_entity: AdvancedChatAppGenerateEntity
|
|
|
|
|
_workflow: Workflow
|
|
|
|
|
@ -72,14 +73,14 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
_workflow_system_variables: dict[SystemVariableKey, Any]
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
application_generate_entity: AdvancedChatAppGenerateEntity,
|
|
|
|
|
workflow: Workflow,
|
|
|
|
|
queue_manager: AppQueueManager,
|
|
|
|
|
conversation: Conversation,
|
|
|
|
|
message: Message,
|
|
|
|
|
user: Union[Account, EndUser],
|
|
|
|
|
stream: bool,
|
|
|
|
|
self,
|
|
|
|
|
application_generate_entity: AdvancedChatAppGenerateEntity,
|
|
|
|
|
workflow: Workflow,
|
|
|
|
|
queue_manager: AppQueueManager,
|
|
|
|
|
conversation: Conversation,
|
|
|
|
|
message: Message,
|
|
|
|
|
user: Union[Account, EndUser],
|
|
|
|
|
stream: bool,
|
|
|
|
|
) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Initialize AdvancedChatAppGenerateTaskPipeline.
|
|
|
|
|
@ -123,13 +124,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
|
|
|
|
# start generate conversation name thread
|
|
|
|
|
self._conversation_name_generate_thread = self._generate_conversation_name(
|
|
|
|
|
self._conversation,
|
|
|
|
|
self._application_generate_entity.query
|
|
|
|
|
self._conversation, self._application_generate_entity.query
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
generator = self._wrapper_process_stream_response(
|
|
|
|
|
trace_manager=self._application_generate_entity.trace_manager
|
|
|
|
|
)
|
|
|
|
|
generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager)
|
|
|
|
|
|
|
|
|
|
if self._stream:
|
|
|
|
|
return self._to_stream_response(generator)
|
|
|
|
|
@ -147,7 +145,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
elif isinstance(stream_response, MessageEndStreamResponse):
|
|
|
|
|
extras = {}
|
|
|
|
|
if stream_response.metadata:
|
|
|
|
|
extras['metadata'] = stream_response.metadata
|
|
|
|
|
extras["metadata"] = stream_response.metadata
|
|
|
|
|
|
|
|
|
|
return ChatbotAppBlockingResponse(
|
|
|
|
|
task_id=stream_response.task_id,
|
|
|
|
|
@ -158,15 +156,17 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
message_id=self._message.id,
|
|
|
|
|
answer=self._task_state.answer,
|
|
|
|
|
created_at=int(self._message.created_at.timestamp()),
|
|
|
|
|
**extras
|
|
|
|
|
)
|
|
|
|
|
**extras,
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
raise Exception('Queue listening stopped unexpectedly.')
|
|
|
|
|
raise Exception("Queue listening stopped unexpectedly.")
|
|
|
|
|
|
|
|
|
|
def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) -> Generator[ChatbotAppStreamResponse, Any, None]:
|
|
|
|
|
def _to_stream_response(
|
|
|
|
|
self, generator: Generator[StreamResponse, None, None]
|
|
|
|
|
) -> Generator[ChatbotAppStreamResponse, Any, None]:
|
|
|
|
|
"""
|
|
|
|
|
To stream response.
|
|
|
|
|
:return:
|
|
|
|
|
@ -176,32 +176,35 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
conversation_id=self._conversation.id,
|
|
|
|
|
message_id=self._message.id,
|
|
|
|
|
created_at=int(self._message.created_at.timestamp()),
|
|
|
|
|
stream_response=stream_response
|
|
|
|
|
stream_response=stream_response,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _listenAudioMsg(self, publisher, task_id: str):
|
|
|
|
|
def _listen_audio_msg(self, publisher, task_id: str):
|
|
|
|
|
if not publisher:
|
|
|
|
|
return None
|
|
|
|
|
audio_msg: AudioTrunk = publisher.checkAndGetAudio()
|
|
|
|
|
audio_msg: AudioTrunk = publisher.check_and_get_audio()
|
|
|
|
|
if audio_msg and audio_msg.status != "finish":
|
|
|
|
|
return MessageAudioStreamResponse(audio=audio_msg.audio, task_id=task_id)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _wrapper_process_stream_response(self, trace_manager: Optional[TraceQueueManager] = None) -> \
|
|
|
|
|
Generator[StreamResponse, None, None]:
|
|
|
|
|
|
|
|
|
|
def _wrapper_process_stream_response(
|
|
|
|
|
self, trace_manager: Optional[TraceQueueManager] = None
|
|
|
|
|
) -> Generator[StreamResponse, None, None]:
|
|
|
|
|
tts_publisher = None
|
|
|
|
|
task_id = self._application_generate_entity.task_id
|
|
|
|
|
tenant_id = self._application_generate_entity.app_config.tenant_id
|
|
|
|
|
features_dict = self._workflow.features_dict
|
|
|
|
|
|
|
|
|
|
if features_dict.get('text_to_speech') and features_dict['text_to_speech'].get('enabled') and features_dict[
|
|
|
|
|
'text_to_speech'].get('autoPlay') == 'enabled':
|
|
|
|
|
tts_publisher = AppGeneratorTTSPublisher(tenant_id, features_dict['text_to_speech'].get('voice'))
|
|
|
|
|
if (
|
|
|
|
|
features_dict.get("text_to_speech")
|
|
|
|
|
and features_dict["text_to_speech"].get("enabled")
|
|
|
|
|
and features_dict["text_to_speech"].get("autoPlay") == "enabled"
|
|
|
|
|
):
|
|
|
|
|
tts_publisher = AppGeneratorTTSPublisher(tenant_id, features_dict["text_to_speech"].get("voice"))
|
|
|
|
|
|
|
|
|
|
for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager):
|
|
|
|
|
while True:
|
|
|
|
|
audio_response = self._listenAudioMsg(tts_publisher, task_id=task_id)
|
|
|
|
|
audio_response = self._listen_audio_msg(tts_publisher, task_id=task_id)
|
|
|
|
|
if audio_response:
|
|
|
|
|
yield audio_response
|
|
|
|
|
else:
|
|
|
|
|
@ -214,7 +217,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
try:
|
|
|
|
|
if not tts_publisher:
|
|
|
|
|
break
|
|
|
|
|
audio_trunk = tts_publisher.checkAndGetAudio()
|
|
|
|
|
audio_trunk = tts_publisher.check_and_get_audio()
|
|
|
|
|
if audio_trunk is None:
|
|
|
|
|
# release cpu
|
|
|
|
|
# sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file)
|
|
|
|
|
@ -228,12 +231,12 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(e)
|
|
|
|
|
break
|
|
|
|
|
yield MessageAudioEndStreamResponse(audio='', task_id=task_id)
|
|
|
|
|
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)
|
|
|
|
|
|
|
|
|
|
def _process_stream_response(
|
|
|
|
|
self,
|
|
|
|
|
tts_publisher: Optional[AppGeneratorTTSPublisher] = None,
|
|
|
|
|
trace_manager: Optional[TraceQueueManager] = None
|
|
|
|
|
self,
|
|
|
|
|
tts_publisher: Optional[AppGeneratorTTSPublisher] = None,
|
|
|
|
|
trace_manager: Optional[TraceQueueManager] = None,
|
|
|
|
|
) -> Generator[StreamResponse, None, None]:
|
|
|
|
|
"""
|
|
|
|
|
Process stream response.
|
|
|
|
|
@ -267,22 +270,18 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
db.session.close()
|
|
|
|
|
|
|
|
|
|
yield self._workflow_start_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueNodeStartedEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
workflow_node_execution = self._handle_node_execution_start(
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
)
|
|
|
|
|
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
|
|
|
|
|
|
|
|
|
|
response = self._workflow_node_start_to_stream_response(
|
|
|
|
|
event=event,
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_node_execution=workflow_node_execution
|
|
|
|
|
workflow_node_execution=workflow_node_execution,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if response:
|
|
|
|
|
@ -293,7 +292,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
response = self._workflow_node_finish_to_stream_response(
|
|
|
|
|
event=event,
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_node_execution=workflow_node_execution
|
|
|
|
|
workflow_node_execution=workflow_node_execution,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if response:
|
|
|
|
|
@ -304,62 +303,52 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
response = self._workflow_node_finish_to_stream_response(
|
|
|
|
|
event=event,
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_node_execution=workflow_node_execution
|
|
|
|
|
workflow_node_execution=workflow_node_execution,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if response:
|
|
|
|
|
yield response
|
|
|
|
|
elif isinstance(event, QueueParallelBranchRunStartedEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
yield self._workflow_parallel_branch_start_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
yield self._workflow_parallel_branch_finished_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueIterationStartEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
yield self._workflow_iteration_start_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueIterationNextEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
yield self._workflow_iteration_next_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueIterationCompletedEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
yield self._workflow_iteration_completed_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
event=event
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueWorkflowSucceededEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
if not graph_runtime_state:
|
|
|
|
|
raise Exception('Graph runtime state not initialized.')
|
|
|
|
|
raise Exception("Graph runtime state not initialized.")
|
|
|
|
|
|
|
|
|
|
workflow_run = self._handle_workflow_run_success(
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
@ -372,20 +361,16 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
yield self._workflow_finish_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._queue_manager.publish(
|
|
|
|
|
QueueAdvancedChatMessageEndEvent(),
|
|
|
|
|
PublishFrom.TASK_PIPELINE
|
|
|
|
|
)
|
|
|
|
|
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
|
|
|
|
|
elif isinstance(event, QueueWorkflowFailedEvent):
|
|
|
|
|
if not workflow_run:
|
|
|
|
|
raise Exception('Workflow run not initialized.')
|
|
|
|
|
raise Exception("Workflow run not initialized.")
|
|
|
|
|
|
|
|
|
|
if not graph_runtime_state:
|
|
|
|
|
raise Exception('Graph runtime state not initialized.')
|
|
|
|
|
raise Exception("Graph runtime state not initialized.")
|
|
|
|
|
|
|
|
|
|
workflow_run = self._handle_workflow_run_failed(
|
|
|
|
|
workflow_run=workflow_run,
|
|
|
|
|
@ -399,11 +384,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
yield self._workflow_finish_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))
|
|
|
|
|
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_run.error}"))
|
|
|
|
|
yield self._error_to_stream_response(self._handle_error(err_event, self._message))
|
|
|
|
|
break
|
|
|
|
|
elif isinstance(event, QueueStopEvent):
|
|
|
|
|
@ -420,8 +404,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
yield self._workflow_finish_to_stream_response(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
workflow_run=workflow_run
|
|
|
|
|
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Save message
|
|
|
|
|
@ -434,8 +417,9 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
|
|
|
|
self._refetch_message()
|
|
|
|
|
|
|
|
|
|
self._message.message_metadata = json.dumps(jsonable_encoder(self._task_state.metadata)) \
|
|
|
|
|
if self._task_state.metadata else None
|
|
|
|
|
self._message.message_metadata = (
|
|
|
|
|
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
db.session.refresh(self._message)
|
|
|
|
|
@ -445,8 +429,9 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
|
|
|
|
self._refetch_message()
|
|
|
|
|
|
|
|
|
|
self._message.message_metadata = json.dumps(jsonable_encoder(self._task_state.metadata)) \
|
|
|
|
|
if self._task_state.metadata else None
|
|
|
|
|
self._message.message_metadata = (
|
|
|
|
|
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
db.session.refresh(self._message)
|
|
|
|
|
@ -466,13 +451,15 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
tts_publisher.publish(message=queue_message)
|
|
|
|
|
|
|
|
|
|
self._task_state.answer += delta_text
|
|
|
|
|
yield self._message_to_stream_response(delta_text, self._message.id)
|
|
|
|
|
yield self._message_to_stream_response(
|
|
|
|
|
answer=delta_text, message_id=self._message.id, from_variable_selector=event.from_variable_selector
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(event, QueueMessageReplaceEvent):
|
|
|
|
|
# published by moderation
|
|
|
|
|
yield self._message_replace_to_stream_response(answer=event.text)
|
|
|
|
|
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
|
|
|
|
|
if not graph_runtime_state:
|
|
|
|
|
raise Exception('Graph runtime state not initialized.')
|
|
|
|
|
raise Exception("Graph runtime state not initialized.")
|
|
|
|
|
|
|
|
|
|
output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
|
|
|
|
|
if output_moderation_answer:
|
|
|
|
|
@ -502,8 +489,9 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
|
|
|
|
self._message.answer = self._task_state.answer
|
|
|
|
|
self._message.provider_response_latency = time.perf_counter() - self._start_at
|
|
|
|
|
self._message.message_metadata = json.dumps(jsonable_encoder(self._task_state.metadata)) \
|
|
|
|
|
if self._task_state.metadata else None
|
|
|
|
|
self._message.message_metadata = (
|
|
|
|
|
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if graph_runtime_state and graph_runtime_state.llm_usage:
|
|
|
|
|
usage = graph_runtime_state.llm_usage
|
|
|
|
|
@ -523,7 +511,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
application_generate_entity=self._application_generate_entity,
|
|
|
|
|
conversation=self._conversation,
|
|
|
|
|
is_first_message=self._application_generate_entity.conversation_id is None,
|
|
|
|
|
extras=self._application_generate_entity.extras
|
|
|
|
|
extras=self._application_generate_entity.extras,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _message_end_to_stream_response(self) -> MessageEndStreamResponse:
|
|
|
|
|
@ -533,15 +521,13 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
"""
|
|
|
|
|
extras = {}
|
|
|
|
|
if self._task_state.metadata:
|
|
|
|
|
extras['metadata'] = self._task_state.metadata.copy()
|
|
|
|
|
extras["metadata"] = self._task_state.metadata.copy()
|
|
|
|
|
|
|
|
|
|
if 'annotation_reply' in extras['metadata']:
|
|
|
|
|
del extras['metadata']['annotation_reply']
|
|
|
|
|
if "annotation_reply" in extras["metadata"]:
|
|
|
|
|
del extras["metadata"]["annotation_reply"]
|
|
|
|
|
|
|
|
|
|
return MessageEndStreamResponse(
|
|
|
|
|
task_id=self._application_generate_entity.task_id,
|
|
|
|
|
id=self._message.id,
|
|
|
|
|
**extras
|
|
|
|
|
task_id=self._application_generate_entity.task_id, id=self._message.id, **extras
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _handle_output_moderation_chunk(self, text: str) -> bool:
|
|
|
|
|
@ -555,14 +541,11 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
# stop subscribe new token when output moderation should direct output
|
|
|
|
|
self._task_state.answer = self._output_moderation_handler.get_final_output()
|
|
|
|
|
self._queue_manager.publish(
|
|
|
|
|
QueueTextChunkEvent(
|
|
|
|
|
text=self._task_state.answer
|
|
|
|
|
), PublishFrom.TASK_PIPELINE
|
|
|
|
|
QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._queue_manager.publish(
|
|
|
|
|
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION),
|
|
|
|
|
PublishFrom.TASK_PIPELINE
|
|
|
|
|
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
|
|
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
|