diff --git a/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py b/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py index 96bb3e793a..d997bfe649 100644 --- a/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py +++ b/api/core/workflow/nodes/variable_aggregator/variable_aggregator_node.py @@ -1,6 +1,6 @@ from collections.abc import Mapping +from typing import Any -from core.variables.segments import Segment from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode @@ -8,8 +8,12 @@ from core.workflow.nodes.enums import NodeType from core.workflow.nodes.variable_aggregator.entities import VariableAssignerNodeData -class VariableAggregatorNode(BaseNode[VariableAssignerNodeData]): - _node_data_cls = VariableAssignerNodeData +class VariableAggregatorNodeData(VariableAssignerNodeData): + aggregate_all: bool = False + + +class VariableAggregatorNode(BaseNode[VariableAggregatorNodeData]): + _node_data_cls = VariableAggregatorNodeData _node_type = NodeType.VARIABLE_AGGREGATOR @classmethod @@ -18,25 +22,50 @@ class VariableAggregatorNode(BaseNode[VariableAssignerNodeData]): def _run(self) -> NodeRunResult: # Get variables - outputs: dict[str, Segment | Mapping[str, Segment]] = {} - inputs = {} + outputs: dict[str, Any] = {} + inputs: dict[str, Any] = {} + + # if aggregate_all is not configured, aggregate only first variables + if not getattr(self.node_data, "aggregate_all", False): + if not self.node_data.advanced_settings or not self.node_data.advanced_settings.group_enabled: + for selector in self.node_data.variables: + variable = self.graph_runtime_state.variable_pool.get(selector) + if variable is not None: + outputs = {"output": variable} - if not self.node_data.advanced_settings or not self.node_data.advanced_settings.group_enabled: - for selector in self.node_data.variables: - variable = self.graph_runtime_state.variable_pool.get(selector) - if variable is not None: - outputs = {"output": variable} + inputs = {".".join(selector[1:]): variable.to_object()} + break + else: + for group in self.node_data.advanced_settings.groups: + for selector in group.variables: + variable = self.graph_runtime_state.variable_pool.get(selector) - inputs = {".".join(selector[1:]): variable.to_object()} - break + if variable is not None: + outputs[group.group_name] = {"output": variable} + inputs[".".join(selector[1:])] = variable.to_object() + break else: - for group in self.node_data.advanced_settings.groups: - for selector in group.variables: + # if aggregate_all is configured, aggregate all variables + if not self.node_data.advanced_settings or not self.node_data.advanced_settings.group_enabled: + aggregated_values = [] + for selector in self.node_data.variables: variable = self.graph_runtime_state.variable_pool.get(selector) - if variable is not None: - outputs[group.group_name] = {"output": variable} + aggregated_values.append(variable.to_object()) inputs[".".join(selector[1:])] = variable.to_object() - break + + if aggregated_values: + outputs = {"output": aggregated_values} + else: + for group in self.node_data.advanced_settings.groups: + aggregated_values = [] + for selector in group.variables: + variable = self.graph_runtime_state.variable_pool.get(selector) + if variable is not None: + aggregated_values.append(variable.to_object()) + inputs[".".join(selector[1:])] = variable.to_object() + + if aggregated_values: + outputs[group.group_name] = {"output": aggregated_values} return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs=outputs, inputs=inputs) diff --git a/web/app/components/workflow/nodes/variable-assigner/panel.tsx b/web/app/components/workflow/nodes/variable-assigner/panel.tsx index 67ec1020c3..1767b4e4ab 100644 --- a/web/app/components/workflow/nodes/variable-assigner/panel.tsx +++ b/web/app/components/workflow/nodes/variable-assigner/panel.tsx @@ -35,6 +35,7 @@ const Panel: FC> = ({ onRemoveVarConfirm, getAvailableVars, filterVar, + handleAggregateAllChange, } = useConfig(id, data) return ( @@ -95,26 +96,41 @@ const Panel: FC> = ({ /> } /> + + } + /> - {isEnableGroup && ( + + <> - - - <> - {inputs.advanced_settings?.groups.map((item, index) => ( - - ))} - - + {isEnableGroup + ? inputs.advanced_settings?.groups.map((item, index) => ( + + )) + : ( + + )} - )} + { }) }, [inputs, setInputs]) + const handleAggregateAllChange = useCallback((checked: boolean) => { + const newInputs = produce(inputs, (draft) => { + draft.aggregate_all = checked + }) + setInputs(newInputs) + }, [inputs, setInputs]) + const handleListOrTypeChangeInGroup = useCallback((groupId: string) => { return (payload: VarGroupItem) => { const index = inputs.advanced_settings.groups.findIndex(item => item.groupId === groupId) @@ -208,6 +215,7 @@ const useConfig = (id: string, payload: VariableAssignerNodeType) => { onRemoveVarConfirm, getAvailableVars, filterVar, + handleAggregateAllChange, } } diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index c56b497ac2..59d1771782 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -626,6 +626,8 @@ const translation = { }, aggregationGroup: 'Aggregation Group', aggregationGroupTip: 'Enabling this feature allows the variable aggregator to aggregate multiple sets of variables.', + aggregateAll: 'Aggregate All', + aggregateAllTip: 'Enabling this feature allows the variable aggregator to aggregate all input variables into an array, otherwise it will aggregate first arrival variables into the array.', addGroup: 'Add Group', outputVars: { varDescribe: '{{groupName}} output', diff --git a/web/i18n/zh-Hans/workflow.ts b/web/i18n/zh-Hans/workflow.ts index 6bd202d58f..81decb3895 100644 --- a/web/i18n/zh-Hans/workflow.ts +++ b/web/i18n/zh-Hans/workflow.ts @@ -627,6 +627,8 @@ const translation = { }, aggregationGroup: '聚合分组', aggregationGroupTip: '开启该功能后,变量聚合器内可以同时聚合多组变量', + aggregateAll: '聚合所有', + aggregateAllTip: '开启该功能后,变量聚合器将聚合所有输入变量到一个数组中,否则只会聚合第一个变量', addGroup: '添加分组', outputVars: { varDescribe: '{{groupName}}的输出变量',