diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 2817313e5d..c1372e6f4d 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -735,7 +735,7 @@ class WorkflowBasedAppRunner(AppRunner): invoke_from=self.queue_manager._invoke_from, enclosing_node_id=event.in_loop_id or event.in_iteration_id or None, ) - draft_var_saver.save(process_data=process_data, output=outputs) + draft_var_saver.save(process_data=process_data, outputs=outputs) def _remove_first_element_from_variable_string(key: str) -> str: diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 870523877e..5412d4b923 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -550,10 +550,14 @@ class DraftVariableSaver: ) return draft_vars - def save(self, output: Mapping[str, Any] | None, process_data: Mapping[str, Any] | None = None): + def save( + self, + process_data: Mapping[str, Any] | None = None, + outputs: Mapping[str, Any] | None = None, + ): draft_vars: list[WorkflowDraftVariable] = [] - if output is None: - output = {} + if outputs is None: + outputs = {} if process_data is None: process_data = {} if not self._should_save_output_variables_for_draft(): @@ -561,13 +565,13 @@ class DraftVariableSaver: if self._node_type == NodeType.VARIABLE_ASSIGNER: draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data) elif self._node_type == NodeType.START: - draft_vars = self._build_variables_from_start_mapping(output) + draft_vars = self._build_variables_from_start_mapping(outputs) elif self._node_type == NodeType.LOOP: # Do not save output variables for loop node. # (since the loop variables are inaccessible outside the loop node.) return else: - draft_vars = self._build_variables_from_mapping(output) + draft_vars = self._build_variables_from_mapping(outputs) _batch_upsert_draft_varaible(self._session, draft_vars) @staticmethod diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 2300246336..0d3aa85c30 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -395,15 +395,12 @@ class WorkflowService: # Convert node_execution to WorkflowNodeExecution after save workflow_node_execution = repository.to_db_model(node_execution) - process_data = workflow_node_execution.process_data_dict or {} - output = workflow_node_execution.outputs_dict or {} exec_metadata = workflow_node_execution.execution_metadata_dict or {} loop_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID, None) iteration_id = exec_metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID, None) - # TODO(QuantumGhost): single step does not include loop_id or iteration_id in execution_metadata. with Session(bind=db.engine) as session, session.begin(): draft_var_saver = DraftVariableSaver( session=session, @@ -413,7 +410,7 @@ class WorkflowService: invoke_from=InvokeFrom.DEBUGGER, enclosing_node_id=loop_id or iteration_id or None, ) - draft_var_saver.save(process_data=process_data, output=output) + draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs) session.commit() return workflow_node_execution