refactor(task_pipeline): Use WorkflowResponseConverter in TasksPipeline

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20071/head
-LAN- 1 year ago
parent d805e76fe8
commit bc518be301
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -57,6 +57,7 @@ from core.app.entities.task_entities import (
) )
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
from core.app.workflow_response_converter import WorkflowResponseConverter
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
@ -131,6 +132,10 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
) )
self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity,
)
self._task_state = WorkflowTaskState() self._task_state = WorkflowTaskState()
self._message_cycle_manager = MessageCycleManage( self._message_cycle_manager = MessageCycleManage(
application_generate_entity=application_generate_entity, task_state=self._task_state application_generate_entity=application_generate_entity, task_state=self._task_state
@ -306,7 +311,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not message: if not message:
raise ValueError(f"Message not found: {self._message_id}") raise ValueError(f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_execution.id message.workflow_run_id = workflow_execution.id
workflow_start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
@ -323,7 +328,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
) )
node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( node_retry_resp = self._workflow_response_converter.workflow_node_retry_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -340,7 +345,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
) )
node_start_resp = self._workflow_cycle_manager.workflow_node_start_to_stream_response( node_start_resp = self._workflow_response_converter.workflow_node_start_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -352,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline:
# Record files if it's an answer node or end node # Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]: if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend( self._recorded_files.extend(
self._workflow_cycle_manager.fetch_files_from_node_outputs(event.outputs or {}) self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
) )
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
@ -360,7 +365,7 @@ class AdvancedChatAppGenerateTaskPipeline:
event=event event=event
) )
node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -380,7 +385,7 @@ class AdvancedChatAppGenerateTaskPipeline:
event=event event=event
) )
node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -392,10 +397,12 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( parallel_start_resp = (
task_id=self._application_generate_entity.task_id, self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response(
workflow_execution_id=self._workflow_run_id, task_id=self._application_generate_entity.task_id,
event=event, workflow_execution_id=self._workflow_run_id,
event=event,
)
) )
yield parallel_start_resp yield parallel_start_resp
@ -404,7 +411,7 @@ class AdvancedChatAppGenerateTaskPipeline:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
parallel_finish_resp = ( parallel_finish_resp = (
self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -416,7 +423,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -427,7 +434,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -438,7 +445,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -449,7 +456,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -460,7 +467,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -471,7 +478,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -495,7 +502,7 @@ class AdvancedChatAppGenerateTaskPipeline:
trace_manager=trace_manager, trace_manager=trace_manager,
) )
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -521,7 +528,7 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=None, conversation_id=None,
trace_manager=trace_manager, trace_manager=trace_manager,
) )
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -548,7 +555,7 @@ class AdvancedChatAppGenerateTaskPipeline:
trace_manager=trace_manager, trace_manager=trace_manager,
exceptions_count=event.exceptions_count, exceptions_count=event.exceptions_count,
) )
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -573,7 +580,7 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=self._conversation_id, conversation_id=self._conversation_id,
trace_manager=trace_manager, trace_manager=trace_manager,
) )
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -657,7 +664,7 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
elif isinstance(event, QueueAgentLogEvent): elif isinstance(event, QueueAgentLogEvent):
yield self._workflow_cycle_manager.handle_agent_log( yield self._workflow_response_converter.handle_agent_log(
task_id=self._application_generate_entity.task_id, event=event task_id=self._application_generate_entity.task_id, event=event
) )
else: else:

@ -52,6 +52,7 @@ from core.app.entities.task_entities import (
WorkflowTaskState, WorkflowTaskState,
) )
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.workflow_response_converter import WorkflowResponseConverter
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_execution_entities import WorkflowExecution from core.workflow.entities.workflow_execution_entities import WorkflowExecution
@ -119,6 +120,10 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution_repository=workflow_node_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository,
) )
self._workflow_response_converter = WorkflowResponseConverter(
application_generate_entity=application_generate_entity,
)
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow_id = workflow.id self._workflow_id = workflow.id
self._workflow_features_dict = workflow.features_dict self._workflow_features_dict = workflow.features_dict
@ -268,7 +273,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_id=self._workflow_id, workflow_id=self._workflow_id,
) )
self._workflow_run_id = workflow_execution.id self._workflow_run_id = workflow_execution.id
start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
@ -285,7 +290,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
) )
response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( response = self._workflow_response_converter.workflow_node_retry_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -301,7 +306,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event workflow_execution_id=self._workflow_run_id, event=event
) )
node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( node_start_response = self._workflow_response_converter.workflow_node_start_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -313,7 +318,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
event=event event=event
) )
node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -331,7 +336,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event, event=event,
) )
node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( node_failed_response = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event, event=event,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution, workflow_node_execution=workflow_node_execution,
@ -344,10 +349,12 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( parallel_start_resp = (
task_id=self._application_generate_entity.task_id, self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response(
workflow_execution_id=self._workflow_run_id, task_id=self._application_generate_entity.task_id,
event=event, workflow_execution_id=self._workflow_run_id,
event=event,
)
) )
yield parallel_start_resp yield parallel_start_resp
@ -357,7 +364,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
parallel_finish_resp = ( parallel_finish_resp = (
self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -370,7 +377,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -382,7 +389,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -394,7 +401,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -406,7 +413,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -418,7 +425,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -430,7 +437,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id: if not self._workflow_run_id:
raise ValueError("workflow run not initialized.") raise ValueError("workflow run not initialized.")
loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution_id=self._workflow_run_id, workflow_execution_id=self._workflow_run_id,
event=event, event=event,
@ -457,7 +464,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log # save workflow app log
self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) self._save_workflow_app_log(session=session, workflow_execution=workflow_execution)
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -485,7 +492,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log # save workflow app log
self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) self._save_workflow_app_log(session=session, workflow_execution=workflow_execution)
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -518,7 +525,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log # save workflow app log
self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) self._save_workflow_app_log(session=session, workflow_execution=workflow_execution)
workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session, session=session,
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
@ -540,7 +547,7 @@ class WorkflowAppGenerateTaskPipeline:
delta_text, from_variable_selector=event.from_variable_selector delta_text, from_variable_selector=event.from_variable_selector
) )
elif isinstance(event, QueueAgentLogEvent): elif isinstance(event, QueueAgentLogEvent):
yield self._workflow_cycle_manager.handle_agent_log( yield self._workflow_response_converter.handle_agent_log(
task_id=self._application_generate_entity.task_id, event=event task_id=self._application_generate_entity.task_id, event=event
) )
else: else:

Loading…
Cancel
Save