From e8ec9112b4235b7d63964ed2eb42a8faf036a2f2 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Mon, 2 Jun 2025 22:20:16 +0800 Subject: [PATCH] fix(api): Fix the issue that output is not saved ... when using `Default Value` error handling strategy refactor draft variable saving logic into a separate method. --- api/core/app/apps/workflow_app_runner.py | 33 ++++++++++++++++-------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 5aaf0d66fa..ee41af72db 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -35,6 +35,7 @@ from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.variable_pool import VariablePool from core.workflow.graph_engine.entities.event import ( AgentLogEvent, + BaseNodeEvent, GraphEngineEvent, GraphRunFailedEvent, GraphRunPartialSucceededEvent, @@ -399,17 +400,7 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) - with Session(bind=db.engine) as session, session.begin(): - draft_var_saver = DraftVariableSaver( - session=session, - app_id=self._get_app_id(), - node_id=event.node_id, - node_type=event.node_type, - # FIXME(QuantumGhost): rely on private state of queue_manager is not ideal. - invoke_from=self.queue_manager._invoke_from, - enclosing_node_id=event.in_loop_id or event.in_iteration_id or None, - ) - draft_var_saver.save(outputs) + self._save_draft_var_for_event(event) elif isinstance(event, NodeRunFailedEvent): self._publish_event( @@ -473,6 +464,8 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) + self._save_draft_var_for_event(event) + elif isinstance(event, NodeInIterationFailedEvent): self._publish_event( QueueNodeInIterationFailedEvent( @@ -726,6 +719,24 @@ class WorkflowBasedAppRunner(AppRunner): def _publish_event(self, event: AppQueueEvent) -> None: self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER) + def _save_draft_var_for_event(self, event: BaseNodeEvent): + run_result = event.route_node_state.node_run_result + if run_result is None: + return + process_data = run_result.process_data + outputs = run_result.outputs + with Session(bind=db.engine) as session, session.begin(): + draft_var_saver = DraftVariableSaver( + session=session, + app_id=self._get_app_id(), + node_id=event.node_id, + node_type=event.node_type, + # FIXME(QuantumGhost): rely on private state of queue_manager is not ideal. + invoke_from=self.queue_manager._invoke_from, + enclosing_node_id=event.in_loop_id or event.in_iteration_id or None, + ) + draft_var_saver.save(process_data=process_data, output=outputs) + def _remove_first_element_from_variable_string(key: str) -> str: """