diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index dc27076a4d..f612f48379 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -566,6 +566,11 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) + # Store outputs into metadata for external usage (exclude the answer key) + workflow_outputs = workflow_finish_resp.data.outputs + if workflow_outputs: + filtered_outputs = {k: v for k, v in workflow_outputs.items() if k != "answer"} + self._task_state.metadata.outputs = filtered_outputs yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 25c889e922..40ace038d2 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -24,6 +24,7 @@ class AnnotationReply(BaseModel): class TaskStateMetadata(BaseModel): annotation_reply: AnnotationReply | None = None retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list) + outputs: Optional[Mapping[str, Any]] = None usage: LLMUsage | None = None @@ -49,6 +50,7 @@ class WorkflowTaskState(TaskState): """ answer: str = "" + outputs: Optional[Mapping[str, Any]] = None class StreamEvent(Enum): diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index b315129763..6c2d828ad0 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -194,6 +194,15 @@ class GraphEngine: self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ "answer" ].strip() + + # Only save the user-defined output variables, not the entire node output + if ( + item.route_node_state.node_run_result + and item.route_node_state.node_run_result.outputs + and "outputs" in item.route_node_state.node_run_result.outputs + ): + user_outputs = item.route_node_state.node_run_result.outputs.get("outputs", {}) + self.graph_runtime_state.outputs.update(user_outputs) except Exception as e: logger.exception("Graph run failed") yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions)) diff --git a/api/core/workflow/nodes/answer/answer_node.py b/api/core/workflow/nodes/answer/answer_node.py index 84bbabca73..cde69a3a09 100644 --- a/api/core/workflow/nodes/answer/answer_node.py +++ b/api/core/workflow/nodes/answer/answer_node.py @@ -72,9 +72,17 @@ class AnswerNode(BaseNode): part = cast(TextGenerateRouteChunk, part) answer += part.text + output_variables = self._node_data.outputs + + outputs = {} + for variable_selector in output_variables: + variable = self.graph_runtime_state.variable_pool.get(variable_selector.value_selector) + value = variable.to_object() if variable is not None else None + outputs[variable_selector.variable] = value + return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={"answer": answer, "files": ArrayFileSegment(value=files)}, + outputs={"answer": answer, "files": ArrayFileSegment(value=files), "outputs": outputs}, ) @classmethod diff --git a/api/core/workflow/nodes/answer/answer_stream_generate_router.py b/api/core/workflow/nodes/answer/answer_stream_generate_router.py index 1d9c3e9b96..a5ab818673 100644 --- a/api/core/workflow/nodes/answer/answer_stream_generate_router.py +++ b/api/core/workflow/nodes/answer/answer_stream_generate_router.py @@ -23,6 +23,7 @@ class AnswerStreamGeneratorRouter: """ # parse stream output node value selectors of answer nodes answer_generate_route: dict[str, list[GenerateRouteChunk]] = {} + answer_stream_variable_selectors_mapping: dict[str, list[GenerateRouteChunk]] = {} for answer_node_id, node_config in node_id_config_mapping.items(): if node_config.get("data", {}).get("type") != NodeType.ANSWER.value: continue @@ -30,6 +31,7 @@ class AnswerStreamGeneratorRouter: # get generate route for stream output generate_route = cls._extract_generate_route_selectors(node_config) answer_generate_route[answer_node_id] = generate_route + answer_stream_variable_selectors_mapping[answer_node_id] = generate_route # fetch answer dependencies answer_node_ids = list(answer_generate_route.keys()) @@ -38,9 +40,18 @@ class AnswerStreamGeneratorRouter: reverse_edge_mapping=reverse_edge_mapping, node_id_config_mapping=node_id_config_mapping, ) + answer_end_ids = list(answer_stream_variable_selectors_mapping.keys()) + answer_end_dependencies = cls._fetch_answers_dependencies( + answer_node_ids=answer_end_ids, + reverse_edge_mapping=reverse_edge_mapping, + node_id_config_mapping=node_id_config_mapping, + ) return AnswerStreamGenerateRoute( - answer_generate_route=answer_generate_route, answer_dependencies=answer_dependencies + answer_generate_route=answer_generate_route, + answer_dependencies=answer_dependencies, + answer_stream_variable_selectors_mapping=answer_stream_variable_selectors_mapping, + answer_end_dependencies=answer_end_dependencies, ) @classmethod diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index 97666fad05..e607f51a49 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -25,6 +25,8 @@ class AnswerStreamProcessor(StreamProcessor): for answer_node_id in self.generate_routes.answer_generate_route: self.route_position[answer_node_id] = 0 self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {} + self.has_output = False + self.output_node_ids: set[str] = set() def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generator[GraphEngineEvent, None, None]: for event in generator: @@ -35,6 +37,10 @@ class AnswerStreamProcessor(StreamProcessor): yield event elif isinstance(event, NodeRunStreamChunkEvent): if event.in_iteration_id or event.in_loop_id: + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event continue @@ -49,6 +55,10 @@ class AnswerStreamProcessor(StreamProcessor): ) for _ in stream_out_answer_node_ids: + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event elif isinstance(event, NodeRunSucceededEvent | NodeRunExceptionEvent): yield event diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index a05cc44c99..ea84ffda59 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -3,6 +3,7 @@ from enum import Enum from pydantic import BaseModel, Field +from core.workflow.entities.variable_entities import VariableSelector from core.workflow.nodes.base import BaseNodeData @@ -12,6 +13,7 @@ class AnswerNodeData(BaseNodeData): """ answer: str = Field(..., description="answer template string") + outputs: list[VariableSelector] = Field(default_factory=list, description="list of variable selectors") class GenerateRouteChunk(BaseModel): @@ -63,3 +65,11 @@ class AnswerStreamGenerateRoute(BaseModel): answer_generate_route: dict[str, list[GenerateRouteChunk]] = Field( ..., description="answer generate route (answer node id -> generate route chunks)" ) + answer_stream_variable_selectors_mapping: dict[str, list[GenerateRouteChunk]] = Field( + default_factory=dict, + description="answer node id -> generate route chunks for streaming variable selectors", + ) + answer_end_dependencies: dict[str, list[str]] = Field( + default_factory=dict, + description="answer node id -> dependent answer node ids at message end", + ) diff --git a/api/fields/message_fields.py b/api/fields/message_fields.py index e6aebd810f..deff11a939 100644 --- a/api/fields/message_fields.py +++ b/api/fields/message_fields.py @@ -45,6 +45,7 @@ message_fields = { "conversation_id": fields.String, "parent_message_id": fields.String, "inputs": FilesContainedField, + "outputs": fields.Raw(attribute="outputs_dict"), "query": fields.String, "answer": fields.String(attribute="re_sign_file_url_answer"), "feedback": fields.Nested(feedback_fields, attribute="user_feedback", allow_null=True), diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py index 13ba11016a..8a7bafec1f 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py @@ -43,7 +43,7 @@ def test_init(): "id": "llm", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -57,7 +57,7 @@ def test_init(): "id": "http", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer2", }, ], @@ -126,7 +126,7 @@ def test__init_iteration_graph(): "id": "llm", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -148,7 +148,7 @@ def test__init_iteration_graph(): "parentId": "iteration", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer-in-iteration", "parentId": "iteration", }, @@ -235,7 +235,7 @@ def test_parallels_graph(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -310,7 +310,7 @@ def test_parallels_graph2(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -373,7 +373,7 @@ def test_parallels_graph3(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -480,7 +480,7 @@ def test_parallels_graph4(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -606,7 +606,7 @@ def test_parallels_graph5(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -745,7 +745,7 @@ def test_parallels_graph6(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index ed4e42425e..c0bf24e4d1 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -271,21 +271,21 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove): ], "nodes": [ {"data": {"type": "start", "title": "start"}, "id": "start"}, - {"data": {"type": "answer", "title": "answer1", "answer": "1"}, "id": "answer1"}, + {"data": {"type": "answer", "title": "answer1", "answer": "1", "outputs": []}, "id": "answer1"}, { - "data": {"type": "answer", "title": "answer2", "answer": "2"}, + "data": {"type": "answer", "title": "answer2", "answer": "2", "outputs": []}, "id": "answer2", }, { - "data": {"type": "answer", "title": "answer3", "answer": "3"}, + "data": {"type": "answer", "title": "answer3", "answer": "3", "outputs": []}, "id": "answer3", }, { - "data": {"type": "answer", "title": "answer4", "answer": "4"}, + "data": {"type": "answer", "title": "answer4", "answer": "4", "outputs": []}, "id": "answer4", }, { - "data": {"type": "answer", "title": "answer5", "answer": "5"}, + "data": {"type": "answer", "title": "answer5", "answer": "5", "outputs": []}, "id": "answer5", }, ], @@ -403,7 +403,13 @@ def test_run_branch(mock_close, mock_remove): "id": "start", }, { - "data": {"answer": "1 {{#start.uid#}}", "title": "Answer", "type": "answer", "variables": []}, + "data": { + "answer": "1 {{#start.uid#}}", + "title": "Answer", + "type": "answer", + "variables": [], + "outputs": [], + }, "id": "answer-1", }, { @@ -458,6 +464,7 @@ def test_run_branch(mock_close, mock_remove): "answer": "2", "title": "Answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -466,6 +473,7 @@ def test_run_branch(mock_close, mock_remove): "answer": "3", "title": "Answer 3", "type": "answer", + "outputs": [], }, "id": "answer-3", }, @@ -707,6 +715,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 2", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382531085", @@ -726,6 +735,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 3", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382534798", @@ -745,6 +755,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 4", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382538517", @@ -764,6 +775,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "Answer 5", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742434464898", diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 1ef024f46b..c72d73ae4e 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -54,6 +54,7 @@ def test_execute_answer(): system_variables=SystemVariable(user_id="aaa", files=[]), user_inputs={}, environment_variables=[], + conversation_variables=[], ) pool.add(["start", "weather"], "sunny") pool.add(["llm", "text"], "You are a helpful AI.") @@ -64,6 +65,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, } @@ -85,4 +87,181 @@ def test_execute_answer(): result = node._run() assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None assert result.outputs["answer"] == "Today's weather is sunny\nYou are a helpful AI.\n{{img}}\nFin." + + +def test_execute_answer_with_outputs(): + from core.workflow.enums import SystemVariableKey + + """Test Answer node with custom output variables""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "Answer with outputs", + "type": "answer", + "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", + "outputs": [ + {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, + {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, + ], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + variable_pool.add(["start", "weather"], "sunny") + variable_pool.add(["start", "score"], 85) + + node_config = { + "id": "answer", + "data": { + "title": "Answer with outputs", + "type": "answer", + "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", + "outputs": [ + {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, + {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, + ], + }, + } + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config=node_config, + ) + + node.init_node_data(node_config["data"]) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None + assert result.outputs["answer"] == "Weather: sunny, Score: 85" + + # Check outputs field + assert "outputs" in result.outputs + outputs = result.outputs["outputs"] + assert outputs["confidence"] == 85 + assert outputs["status"] == "sunny" + + +def test_execute_answer_with_empty_outputs(): + from core.workflow.enums import SystemVariableKey + + """Test Answer node with empty outputs configuration""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "No outputs", + "type": "answer", + "answer": "Simple answer", + "outputs": [], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + + node_config = { + "id": "answer", + "data": { + "title": "No outputs", + "type": "answer", + "answer": "Simple answer", + "outputs": [], + }, + } + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config=node_config, + ) + + node.init_node_data(node_config["data"]) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None + assert result.outputs["answer"] == "Simple answer" + + # Check that outputs field is empty when no outputs are configured + assert "outputs" in result.outputs + assert result.outputs["outputs"] == {} diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py index bce87536d8..76b8f596e7 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py @@ -89,11 +89,11 @@ def test_init(): "id": "llm5", }, { - "data": {"type": "answer", "title": "answer", "answer": "1{{#llm2.text#}}2"}, + "data": {"type": "answer", "title": "answer", "answer": "1{{#llm2.text#}}2", "outputs": []}, "id": "answer", }, { - "data": {"type": "answer", "title": "answer2", "answer": "1{{#llm3.text#}}2"}, + "data": {"type": "answer", "title": "answer2", "answer": "1{{#llm3.text#}}2", "outputs": []}, "id": "answer2", }, ], diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py index 137e8b889d..20077a9700 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py @@ -167,11 +167,11 @@ def test_process(): "id": "llm5", }, { - "data": {"type": "answer", "title": "answer", "answer": "a{{#llm2.text#}}b"}, + "data": {"type": "answer", "title": "answer", "answer": "a{{#llm2.text#}}b", "outputs": []}, "id": "answer", }, { - "data": {"type": "answer", "title": "answer2", "answer": "c{{#llm3.text#}}d"}, + "data": {"type": "answer", "title": "answer2", "answer": "c{{#llm3.text#}}d", "outputs": []}, "id": "answer2", }, ], diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index f53f391433..2eed57af17 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -75,6 +75,7 @@ def test_run(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -89,7 +90,12 @@ def test_run(): "id": "tt", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -110,7 +116,13 @@ def test_run(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -279,6 +291,7 @@ def test_run_parallel(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -311,7 +324,12 @@ def test_run_parallel(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -332,7 +350,13 @@ def test_run_parallel(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -500,6 +524,7 @@ def test_iteration_run_in_parallel_mode(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -532,7 +557,12 @@ def test_iteration_run_in_parallel_mode(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -553,7 +583,13 @@ def test_iteration_run_in_parallel_mode(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -762,7 +798,12 @@ def test_iteration_run_error_handle(): "id": "tt2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { diff --git a/api/tests/unit_tests/core/workflow/nodes/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/test_answer.py index 466d7bad06..6f8e27a3e3 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_answer.py @@ -31,6 +31,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, "id": "answer", }, @@ -67,6 +68,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, } diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index 3f83428834..865ad90cd2 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -243,7 +243,10 @@ def test_code_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_code_node( error_code, "default-value", [{"key": "result", "type": "number", "value": 132123}] ), @@ -270,11 +273,11 @@ def test_code_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "node node run successfully"}, + "data": {"title": "success", "type": "answer", "answer": "node node run successfully", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "node node run failed"}, + "data": {"title": "error", "type": "answer", "answer": "node node run failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_code_node(error_code), @@ -296,7 +299,10 @@ def test_http_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}"}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "response", "type": "string", "value": "http node got error response"}] ), @@ -321,11 +327,11 @@ def test_http_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, + "data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "HTTP request failed"}, + "data": {"title": "error", "type": "answer", "answer": "HTTP request failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_http_node(), @@ -399,7 +405,10 @@ def test_llm_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}"}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_llm_node( "default-value", [{"key": "answer", "type": "string", "value": "default LLM response"}] ), @@ -423,11 +432,11 @@ def test_llm_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "LLM request successful"}, + "data": {"title": "success", "type": "answer", "answer": "LLM request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "LLM request failed"}, + "data": {"title": "error", "type": "answer", "answer": "LLM request failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_llm_node(), @@ -451,11 +460,11 @@ def test_status_code_error_http_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "http execute successful"}, + "data": {"title": "success", "type": "answer", "answer": "http execute successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "http execute failed"}, + "data": {"title": "error", "type": "answer", "answer": "http execute failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_error_status_code_http_node(), @@ -478,11 +487,11 @@ def test_variable_pool_error_type_variable(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "http execute successful"}, + "data": {"title": "success", "type": "answer", "answer": "http execute successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "http execute failed"}, + "data": {"title": "error", "type": "answer", "answer": "http execute failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_error_status_code_http_node(), @@ -503,7 +512,10 @@ def test_no_node_in_fail_branch_continue_on_error(): "edges": FAIL_BRANCH_EDGES[:-1], "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, "id": "success"}, + { + "data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, + "id": "success", + }, ContinueOnErrorTestHelper.get_http_node(), ], } @@ -523,11 +535,11 @@ def test_stream_output_with_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "LLM request successful"}, + "data": {"title": "success", "type": "answer", "answer": "LLM request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}"}, + "data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_llm_node(), diff --git a/api/tests/unit_tests/core/workflow/nodes/test_retry.py b/api/tests/unit_tests/core/workflow/nodes/test_retry.py index 57d3b203b9..d807fd5828 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_retry.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_retry.py @@ -27,7 +27,10 @@ def test_retry_default_value_partial_success(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "result", "type": "string", "value": "http node got error response"}], @@ -50,7 +53,10 @@ def test_retry_failed(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( None, None, diff --git a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts index 63d284f260..fe8360465e 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts @@ -21,6 +21,8 @@ function useVarList({ const handleAddVariable = useCallback(() => { const newInputs = produce(inputs, (draft: any) => { + if (!draft[varKey]) + draft[varKey] = [] draft[varKey].push({ variable: '', value_selector: [], diff --git a/web/app/components/workflow/nodes/answer/default.ts b/web/app/components/workflow/nodes/answer/default.ts index 4ff6e49d7e..9538330b80 100644 --- a/web/app/components/workflow/nodes/answer/default.ts +++ b/web/app/components/workflow/nodes/answer/default.ts @@ -7,6 +7,7 @@ const nodeDefault: NodeDefault = { defaultValue: { variables: [], answer: '', + outputs: [], }, getAvailablePrevNodes(isChatMode: boolean) { const nodes = isChatMode diff --git a/web/app/components/workflow/nodes/answer/node.tsx b/web/app/components/workflow/nodes/answer/node.tsx index bb28066d95..3d36de4565 100644 --- a/web/app/components/workflow/nodes/answer/node.tsx +++ b/web/app/components/workflow/nodes/answer/node.tsx @@ -4,12 +4,43 @@ import { useTranslation } from 'react-i18next' import InfoPanel from '../_base/components/info-panel' import ReadonlyInputWithSelectVar from '../_base/components/readonly-input-with-select-var' import type { AnswerNodeType } from './types' -import type { NodeProps } from '@/app/components/workflow/types' +import { + useIsChatMode, + useWorkflow, + useWorkflowVariables, +} from '@/app/components/workflow/hooks' +import { BlockEnum } from '@/app/components/workflow/types' +import type { NodeProps, Variable } from '@/app/components/workflow/types' +import { isConversationVar, isENV, isSystemVar } from '@/app/components/workflow/nodes/_base/components/variable/utils' +import { VarBlockIcon } from '@/app/components/workflow/block-icon' +import { Variable02 } from '@/app/components/base/icons/src/vender/solid/development' +import { BubbleX, Env } from '@/app/components/base/icons/src/vender/line/others' +import { Line3 } from '@/app/components/base/icons/src/public/common' +import cn from 'classnames' + const Node: FC> = ({ id, data, }) => { const { t } = useTranslation() + const { getBeforeNodesInSameBranch } = useWorkflow() + const availableNodes = getBeforeNodesInSameBranch(id) + const { getCurrentVariableType } = useWorkflowVariables() + const isChatMode = useIsChatMode() + + const startNode = availableNodes.find((node: any) => { + return node.data.type === BlockEnum.Start + }) + + const getNode = (id: string) => { + return availableNodes.find(node => node.id === id) || startNode + } + + const { outputs = [] } = data + const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0) + + if (!filteredOutputs.length) + return null return (
@@ -19,7 +50,52 @@ const Node: FC> = ({ nodeId={id} /> } /> +
+ {filteredOutputs.map(({ value_selector }, index) => { + const node = getNode(value_selector[0]) + const isSystem = isSystemVar(value_selector) + const isEnv = isENV(value_selector) + const isChatVar = isConversationVar(value_selector) + const varName = isSystem ? `sys.${value_selector[value_selector.length - 1]}` : value_selector[value_selector.length - 1] + const varType = getCurrentVariableType({ + valueSelector: value_selector, + availableNodes, + isChatMode, + }) + return ( +
+
+ {!isEnv && ( + <> +
+ +
+
{node?.data.title}
+ + + )} +
+ {!isEnv && !isChatVar && } + {isEnv && } + {isChatVar && } + +
{varName}
+
+
+
+
{varType}
+
+
+ ) + })} + +
+
+ ) } diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index 2a4b70ee32..8bfc2c8191 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -7,6 +7,9 @@ import Editor from '@/app/components/workflow/nodes/_base/components/prompt/edit import type { NodePanelProps } from '@/app/components/workflow/types' import useAvailableVarList from '@/app/components/workflow/nodes/_base/hooks/use-available-var-list' const i18nPrefix = 'workflow.nodes.answer' +import Field from '@/app/components/workflow/nodes/_base/components/field' +import AddButton from '@/app/components/base/button/add-button' +import VarList from '@/app/components/workflow/nodes/_base/components/variable/var-list' const Panel: FC> = ({ id, @@ -19,8 +22,12 @@ const Panel: FC> = ({ inputs, handleAnswerChange, filterVar, + handleVarListChange, + handleAddVariable, } = useConfig(id, data) + const outputs = inputs?.outputs || [] + const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { onlyLeafNodeVar: false, hideChatVar: false, @@ -29,17 +36,34 @@ const Panel: FC> = ({ }) return ( -
- +
+
+ +
+
+ : undefined + } + > + + +
) } diff --git a/web/app/components/workflow/nodes/answer/types.ts b/web/app/components/workflow/nodes/answer/types.ts index 8f1b3cb6cd..721bb0aacb 100644 --- a/web/app/components/workflow/nodes/answer/types.ts +++ b/web/app/components/workflow/nodes/answer/types.ts @@ -1,6 +1,7 @@ import type { CommonNodeType, Variable } from '@/app/components/workflow/types' export type AnswerNodeType = CommonNodeType & { + outputs: Variable[] variables: Variable[] answer: string } diff --git a/web/app/components/workflow/nodes/answer/use-config.ts b/web/app/components/workflow/nodes/answer/use-config.ts index 269d953743..83544a4cfa 100644 --- a/web/app/components/workflow/nodes/answer/use-config.ts +++ b/web/app/components/workflow/nodes/answer/use-config.ts @@ -16,6 +16,7 @@ const useConfig = (id: string, payload: AnswerNodeType) => { const { handleVarListChange, handleAddVariable } = useVarList({ inputs, setInputs, + varKey: 'outputs', }) const handleAnswerChange = useCallback((value: string) => {