diff --git a/api/core/workflow/nodes/variable_assigner/common/helpers.py b/api/core/workflow/nodes/variable_assigner/common/helpers.py index f1b66f63ff..c850449b2d 100644 --- a/api/core/workflow/nodes/variable_assigner/common/helpers.py +++ b/api/core/workflow/nodes/variable_assigner/common/helpers.py @@ -1,24 +1,36 @@ -from collections.abc import Sequence -from typing import Any, TypedDict +from collections.abc import Mapping, MutableMapping, Sequence +from typing import Any, TypeVar -from core.variables import Segment, SegmentType +from pydantic import BaseModel + +from core.variables import Segment from core.variables.consts import MIN_SELECTORS_LENGTH +# Use double underscore (`__`) prefix for internal variables +# to minimize risk of collision with user-defined variable names. +_UPDATED_VARIABLES_KEY = "__updated_variables" + -class VariableOutput(TypedDict): +class UpdatedVariable(BaseModel): name: str selector: Sequence[str] - new_value: Any - type: SegmentType + new_value: Segment + + +_T = TypeVar("_T", bound=MutableMapping[str, Any]) -def variable_to_output_mapping(selector: Sequence[str], seg: Segment) -> VariableOutput: +def variable_to_processed_data(selector: Sequence[str], seg: Segment) -> UpdatedVariable: if len(selector) < MIN_SELECTORS_LENGTH: raise Exception("selector too short") node_id, var_name = selector[:2] - return { - "name": var_name, - "selector": selector[:2], - "new_value": seg.value, - "type": seg.value_type, - } + return UpdatedVariable(name=var_name, selector=list(selector[:2]), new_value=seg) + + +def set_updated_variables(m: _T, updates: Sequence[UpdatedVariable]) -> _T: + # m[_UPDATED_VARIABLES_KEY] = updates + return m + + +def get_updated_variables(m: Mapping[str, Any]) -> Sequence[UpdatedVariable] | None: + return m.get(_UPDATED_VARIABLES_KEY, None) diff --git a/api/core/workflow/nodes/variable_assigner/v1/node.py b/api/core/workflow/nodes/variable_assigner/v1/node.py index 005d506a13..3cb920a247 100644 --- a/api/core/workflow/nodes/variable_assigner/v1/node.py +++ b/api/core/workflow/nodes/variable_assigner/v1/node.py @@ -85,20 +85,18 @@ class VariableAssignerNode(BaseNode[VariableAssignerData]): conv_var_updater = self._conv_var_updater_factory() conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable) conv_var_updater.flush() + updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)] return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs={ "value": income_value.to_object(), }, - outputs={ - # NOTE(QuantumGhost): although only one variable is updated in `v1.VariableAssignerNode`, - # we still set `output_variables` as a list to ensure the schema of output is - # compatible with `v2.VariableAssignerNode`. - "updated_variables": [ - common_helpers.variable_to_output_mapping(assigned_variable_selector, updated_variable) - ] - }, + # NOTE(QuantumGhost): although only one variable is updated in `v1.VariableAssignerNode`, + # we still set `output_variables` as a list to ensure the schema of output is + # compatible with `v2.VariableAssignerNode`. + process_data=common_helpers.set_updated_variables({}, updated_variables), + outputs={}, ) diff --git a/api/core/workflow/nodes/variable_assigner/v2/node.py b/api/core/workflow/nodes/variable_assigner/v2/node.py index 096889a23c..6914952590 100644 --- a/api/core/workflow/nodes/variable_assigner/v2/node.py +++ b/api/core/workflow/nodes/variable_assigner/v2/node.py @@ -162,18 +162,18 @@ class VariableAssignerNode(BaseNode[VariableAssignerNodeData]): variable=variable, ) conv_var_updater.flush() + updated_variables = [ + common_helpers.variable_to_processed_data(selector, seg) + for selector in updated_variable_selectors + if (seg := self.graph_runtime_state.variable_pool.get(selector)) is not None + ] + common_helpers.set_updated_variables(process_data, updated_variables) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=inputs, process_data=process_data, - outputs={ - "updated_variables": [ - common_helpers.variable_to_output_mapping(selector, seg) - for selector in updated_variable_selectors - if (seg := self.graph_runtime_state.variable_pool.get(selector)) is not None - ], - }, + outputs={}, ) def _handle_item( diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 3c3668fb36..8235163b76 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -16,6 +16,7 @@ from core.variables.consts import MIN_SELECTORS_LENGTH from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType +from core.workflow.nodes.variable_assigner.common.helpers import get_updated_variables from core.workflow.variable_loader import VariableLoader from factories import variable_factory from factories.variable_factory import build_segment, segment_to_variable @@ -442,13 +443,11 @@ class DraftVariableSaver: return False return True - def _build_from_variable_assigner_mapping(self, output: Mapping[str, Any]) -> list[WorkflowDraftVariable]: + def _build_from_variable_assigner_mapping(self, process_data: Mapping[str, Any]) -> list[WorkflowDraftVariable]: draft_vars: list[WorkflowDraftVariable] = [] - updated_variables = output.get("updated_variables", []) + updated_variables = get_updated_variables(process_data) or [] for item in updated_variables: - selector = item.get("selector") - if selector is None: - continue + selector = item.selector if len(selector) < MIN_SELECTORS_LENGTH: raise Exception("selector too short") # NOTE(QuantumGhost): only the following two kinds of variable could be updated by @@ -456,21 +455,11 @@ class DraftVariableSaver: # We only save conversation variable here. if selector[0] != CONVERSATION_VARIABLE_NODE_ID: continue - name = item.get("name") - if name is None: - continue - new_value = item["new_value"] - value_type = item.get("type") - if value_type is None: - continue - var_seg = variable_factory.build_segment(new_value) - if var_seg.value_type != value_type: - raise Exception("value_type mismatch!") draft_vars.append( WorkflowDraftVariable.new_conversation_variable( app_id=self._app_id, - name=name, - value=var_seg, + name=item.name, + value=item.new_value, ) ) return draft_vars @@ -538,14 +527,16 @@ class DraftVariableSaver: ) return draft_vars - def save(self, output: Mapping[str, Any] | None): + def save(self, output: Mapping[str, Any] | None, process_data: Mapping[str, Any] | None = None): draft_vars: list[WorkflowDraftVariable] = [] if output is None: output = {} + if process_data is None: + process_data = {} if not self._should_save_output_variables_for_draft(): return if self._node_type == NodeType.VARIABLE_ASSIGNER: - draft_vars = self._build_from_variable_assigner_mapping(output) + draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data) elif self._node_type == NodeType.START: draft_vars = self._build_variables_from_start_mapping(output) elif self._node_type == NodeType.LOOP: diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 87ba69c4dd..59ff016235 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -407,6 +407,7 @@ class WorkflowService: # Convert node_execution to WorkflowNodeExecution after save workflow_node_execution = repository.to_db_model(node_execution) + process_data = workflow_node_execution.process_data_dict or {} output = workflow_node_execution.outputs_dict or {} exec_metadata = workflow_node_execution.execution_metadata_dict or {} @@ -424,7 +425,7 @@ class WorkflowService: invoke_from=InvokeFrom.DEBUGGER, enclosing_node_id=loop_id or iteration_id or None, ) - draft_var_saver.save(output) + draft_var_saver.save(process_data=process_data, output=output) session.commit() return workflow_node_execution