feat(api): Update variable handling for `VariableAssigner` nodes

- Put updated variables data in `process_data`
- Use structured object to describe updated variables.
pull/20699/head
QuantumGhost 12 months ago
parent a3202356e6
commit c604659287

@ -1,24 +1,36 @@
from collections.abc import Sequence from collections.abc import Mapping, MutableMapping, Sequence
from typing import Any, TypedDict from typing import Any, TypeVar
from core.variables import Segment, SegmentType from pydantic import BaseModel
from core.variables import Segment
from core.variables.consts import MIN_SELECTORS_LENGTH from core.variables.consts import MIN_SELECTORS_LENGTH
# Use double underscore (`__`) prefix for internal variables
# to minimize risk of collision with user-defined variable names.
_UPDATED_VARIABLES_KEY = "__updated_variables"
class VariableOutput(TypedDict): class UpdatedVariable(BaseModel):
name: str name: str
selector: Sequence[str] selector: Sequence[str]
new_value: Any new_value: Segment
type: SegmentType
_T = TypeVar("_T", bound=MutableMapping[str, Any])
def variable_to_output_mapping(selector: Sequence[str], seg: Segment) -> VariableOutput: def variable_to_processed_data(selector: Sequence[str], seg: Segment) -> UpdatedVariable:
if len(selector) < MIN_SELECTORS_LENGTH: if len(selector) < MIN_SELECTORS_LENGTH:
raise Exception("selector too short") raise Exception("selector too short")
node_id, var_name = selector[:2] node_id, var_name = selector[:2]
return { return UpdatedVariable(name=var_name, selector=list(selector[:2]), new_value=seg)
"name": var_name,
"selector": selector[:2],
"new_value": seg.value, def set_updated_variables(m: _T, updates: Sequence[UpdatedVariable]) -> _T:
"type": seg.value_type, # m[_UPDATED_VARIABLES_KEY] = updates
} return m
def get_updated_variables(m: Mapping[str, Any]) -> Sequence[UpdatedVariable] | None:
return m.get(_UPDATED_VARIABLES_KEY, None)

@ -85,20 +85,18 @@ class VariableAssignerNode(BaseNode[VariableAssignerData]):
conv_var_updater = self._conv_var_updater_factory() conv_var_updater = self._conv_var_updater_factory()
conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable) conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable)
conv_var_updater.flush() conv_var_updater.flush()
updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)]
return NodeRunResult( return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={ inputs={
"value": income_value.to_object(), "value": income_value.to_object(),
}, },
outputs={ # NOTE(QuantumGhost): although only one variable is updated in `v1.VariableAssignerNode`,
# NOTE(QuantumGhost): although only one variable is updated in `v1.VariableAssignerNode`, # we still set `output_variables` as a list to ensure the schema of output is
# we still set `output_variables` as a list to ensure the schema of output is # compatible with `v2.VariableAssignerNode`.
# compatible with `v2.VariableAssignerNode`. process_data=common_helpers.set_updated_variables({}, updated_variables),
"updated_variables": [ outputs={},
common_helpers.variable_to_output_mapping(assigned_variable_selector, updated_variable)
]
},
) )

@ -162,18 +162,18 @@ class VariableAssignerNode(BaseNode[VariableAssignerNodeData]):
variable=variable, variable=variable,
) )
conv_var_updater.flush() conv_var_updater.flush()
updated_variables = [
common_helpers.variable_to_processed_data(selector, seg)
for selector in updated_variable_selectors
if (seg := self.graph_runtime_state.variable_pool.get(selector)) is not None
]
common_helpers.set_updated_variables(process_data, updated_variables)
return NodeRunResult( return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=inputs, inputs=inputs,
process_data=process_data, process_data=process_data,
outputs={ outputs={},
"updated_variables": [
common_helpers.variable_to_output_mapping(selector, seg)
for selector in updated_variable_selectors
if (seg := self.graph_runtime_state.variable_pool.get(selector)) is not None
],
},
) )
def _handle_item( def _handle_item(

@ -16,6 +16,7 @@ from core.variables.consts import MIN_SELECTORS_LENGTH
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.variable_assigner.common.helpers import get_updated_variables
from core.workflow.variable_loader import VariableLoader from core.workflow.variable_loader import VariableLoader
from factories import variable_factory from factories import variable_factory
from factories.variable_factory import build_segment, segment_to_variable from factories.variable_factory import build_segment, segment_to_variable
@ -442,13 +443,11 @@ class DraftVariableSaver:
return False return False
return True return True
def _build_from_variable_assigner_mapping(self, output: Mapping[str, Any]) -> list[WorkflowDraftVariable]: def _build_from_variable_assigner_mapping(self, process_data: Mapping[str, Any]) -> list[WorkflowDraftVariable]:
draft_vars: list[WorkflowDraftVariable] = [] draft_vars: list[WorkflowDraftVariable] = []
updated_variables = output.get("updated_variables", []) updated_variables = get_updated_variables(process_data) or []
for item in updated_variables: for item in updated_variables:
selector = item.get("selector") selector = item.selector
if selector is None:
continue
if len(selector) < MIN_SELECTORS_LENGTH: if len(selector) < MIN_SELECTORS_LENGTH:
raise Exception("selector too short") raise Exception("selector too short")
# NOTE(QuantumGhost): only the following two kinds of variable could be updated by # NOTE(QuantumGhost): only the following two kinds of variable could be updated by
@ -456,21 +455,11 @@ class DraftVariableSaver:
# We only save conversation variable here. # We only save conversation variable here.
if selector[0] != CONVERSATION_VARIABLE_NODE_ID: if selector[0] != CONVERSATION_VARIABLE_NODE_ID:
continue continue
name = item.get("name")
if name is None:
continue
new_value = item["new_value"]
value_type = item.get("type")
if value_type is None:
continue
var_seg = variable_factory.build_segment(new_value)
if var_seg.value_type != value_type:
raise Exception("value_type mismatch!")
draft_vars.append( draft_vars.append(
WorkflowDraftVariable.new_conversation_variable( WorkflowDraftVariable.new_conversation_variable(
app_id=self._app_id, app_id=self._app_id,
name=name, name=item.name,
value=var_seg, value=item.new_value,
) )
) )
return draft_vars return draft_vars
@ -538,14 +527,16 @@ class DraftVariableSaver:
) )
return draft_vars return draft_vars
def save(self, output: Mapping[str, Any] | None): def save(self, output: Mapping[str, Any] | None, process_data: Mapping[str, Any] | None = None):
draft_vars: list[WorkflowDraftVariable] = [] draft_vars: list[WorkflowDraftVariable] = []
if output is None: if output is None:
output = {} output = {}
if process_data is None:
process_data = {}
if not self._should_save_output_variables_for_draft(): if not self._should_save_output_variables_for_draft():
return return
if self._node_type == NodeType.VARIABLE_ASSIGNER: if self._node_type == NodeType.VARIABLE_ASSIGNER:
draft_vars = self._build_from_variable_assigner_mapping(output) draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data)
elif self._node_type == NodeType.START: elif self._node_type == NodeType.START:
draft_vars = self._build_variables_from_start_mapping(output) draft_vars = self._build_variables_from_start_mapping(output)
elif self._node_type == NodeType.LOOP: elif self._node_type == NodeType.LOOP:

@ -407,6 +407,7 @@ class WorkflowService:
# Convert node_execution to WorkflowNodeExecution after save # Convert node_execution to WorkflowNodeExecution after save
workflow_node_execution = repository.to_db_model(node_execution) workflow_node_execution = repository.to_db_model(node_execution)
process_data = workflow_node_execution.process_data_dict or {}
output = workflow_node_execution.outputs_dict or {} output = workflow_node_execution.outputs_dict or {}
exec_metadata = workflow_node_execution.execution_metadata_dict or {} exec_metadata = workflow_node_execution.execution_metadata_dict or {}
@ -424,7 +425,7 @@ class WorkflowService:
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
enclosing_node_id=loop_id or iteration_id or None, enclosing_node_id=loop_id or iteration_id or None,
) )
draft_var_saver.save(output) draft_var_saver.save(process_data=process_data, output=output)
session.commit() session.commit()
return workflow_node_execution return workflow_node_execution

Loading…
Cancel
Save