@ -1,6 +1,6 @@
from collections . abc import Mapping
from collections . abc import Mapping
from typing import Any
from core . variables . segments import Segment
from core . workflow . entities . node_entities import NodeRunResult
from core . workflow . entities . node_entities import NodeRunResult
from core . workflow . entities . workflow_node_execution import WorkflowNodeExecutionStatus
from core . workflow . entities . workflow_node_execution import WorkflowNodeExecutionStatus
from core . workflow . nodes . base import BaseNode
from core . workflow . nodes . base import BaseNode
@ -8,8 +8,12 @@ from core.workflow.nodes.enums import NodeType
from core . workflow . nodes . variable_aggregator . entities import VariableAssignerNodeData
from core . workflow . nodes . variable_aggregator . entities import VariableAssignerNodeData
class VariableAggregatorNode ( BaseNode [ VariableAssignerNodeData ] ) :
class VariableAggregatorNodeData ( VariableAssignerNodeData ) :
_node_data_cls = VariableAssignerNodeData
aggregate_all : bool = False
class VariableAggregatorNode ( BaseNode [ VariableAggregatorNodeData ] ) :
_node_data_cls = VariableAggregatorNodeData
_node_type = NodeType . VARIABLE_AGGREGATOR
_node_type = NodeType . VARIABLE_AGGREGATOR
@classmethod
@classmethod
@ -18,25 +22,50 @@ class VariableAggregatorNode(BaseNode[VariableAssignerNodeData]):
def _run ( self ) - > NodeRunResult :
def _run ( self ) - > NodeRunResult :
# Get variables
# Get variables
outputs : dict [ str , Segment | Mapping [ str , Segment ] ] = { }
outputs : dict [ str , Any ] = { }
inputs = { }
inputs : dict [ str , Any ] = { }
# if aggregate_all is not configured, aggregate only first variables
if not getattr ( self . node_data , " aggregate_all " , False ) :
if not self . node_data . advanced_settings or not self . node_data . advanced_settings . group_enabled :
for selector in self . node_data . variables :
variable = self . graph_runtime_state . variable_pool . get ( selector )
if variable is not None :
outputs = { " output " : variable }
if not self . node_data . advanced_settings or not self . node_data . advanced_settings . group_enabled :
inputs = { " . " . join ( selector [ 1 : ] ) : variable . to_object ( ) }
for selector in self . node_data . variables :
break
variable = self . graph_runtime_state . variable_pool . get ( selector )
else :
if variable is not None :
for group in self . node_data . advanced_settings . groups :
outputs = { " output " : variable }
for selector in group . variables :
variable = self . graph_runtime_state . variable_pool . get ( selector )
inputs = { " . " . join ( selector [ 1 : ] ) : variable . to_object ( ) }
if variable is not None :
break
outputs [ group . group_name ] = { " output " : variable }
inputs [ " . " . join ( selector [ 1 : ] ) ] = variable . to_object ( )
break
else :
else :
for group in self . node_data . advanced_settings . groups :
# if aggregate_all is configured, aggregate all variables
for selector in group . variables :
if not self . node_data . advanced_settings or not self . node_data . advanced_settings . group_enabled :
aggregated_values = [ ]
for selector in self . node_data . variables :
variable = self . graph_runtime_state . variable_pool . get ( selector )
variable = self . graph_runtime_state . variable_pool . get ( selector )
if variable is not None :
if variable is not None :
outputs[ group . group_name ] = { " output " : variable }
aggregated_values. append ( variable . to_object ( ) )
inputs [ " . " . join ( selector [ 1 : ] ) ] = variable . to_object ( )
inputs [ " . " . join ( selector [ 1 : ] ) ] = variable . to_object ( )
break
if aggregated_values :
outputs = { " output " : aggregated_values }
else :
for group in self . node_data . advanced_settings . groups :
aggregated_values = [ ]
for selector in group . variables :
variable = self . graph_runtime_state . variable_pool . get ( selector )
if variable is not None :
aggregated_values . append ( variable . to_object ( ) )
inputs [ " . " . join ( selector [ 1 : ] ) ] = variable . to_object ( )
if aggregated_values :
outputs [ group . group_name ] = { " output " : aggregated_values }
return NodeRunResult ( status = WorkflowNodeExecutionStatus . SUCCEEDED , outputs = outputs , inputs = inputs )
return NodeRunResult ( status = WorkflowNodeExecutionStatus . SUCCEEDED , outputs = outputs , inputs = inputs )