From a36533a5ec410c85d4399cc15df28bc155241225 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 4 Jun 2025 02:58:14 +0000 Subject: [PATCH 01/33] feat: add outputs field to message model and enhance advanced chat task pipeline - Introduced `outputs` field in the Message model to store output data. - Updated `generate_task_pipeline.py` to handle and print outputs during task processing. - Enhanced the answer node components to utilize the new outputs field, including filtering and displaying output variables. - Added necessary adjustments in the TypeScript types for better integration with the new outputs functionality. --- .../advanced_chat/generate_task_pipeline.py | 9 ++- api/fields/message_fields.py | 1 + ...update_message_table_outputs_column_to_.py | 33 ++++++++ api/models/model.py | 10 +++ .../components/workflow/nodes/answer/node.tsx | 79 ++++++++++++++++++- .../workflow/nodes/answer/panel.tsx | 43 +++++++--- .../components/workflow/nodes/answer/types.ts | 1 + 7 files changed, 160 insertions(+), 16 deletions(-) create mode 100644 api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 8c5645bbb7..e17ed0ebaa 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -165,7 +165,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) - + print(f"generator: {generator}") if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -183,12 +183,13 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata - + print(f"stream_response: {stream_response}") return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( id=self._message_id, mode=self._conversation_mode, + outputs=stream_response.data.outputs, conversation_id=self._conversation_id, message_id=self._message_id, answer=self._task_state.answer, @@ -209,6 +210,7 @@ class AdvancedChatAppGenerateTaskPipeline: :return: """ for stream_response in generator: + print(f"stream_response: {stream_response}") yield ChatbotAppStreamResponse( conversation_id=self._conversation_id, message_id=self._message_id, @@ -501,13 +503,12 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - + print(f"workflow_finish_resp: {workflow_finish_resp}") yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE diff --git a/api/fields/message_fields.py b/api/fields/message_fields.py index e6aebd810f..deff11a939 100644 --- a/api/fields/message_fields.py +++ b/api/fields/message_fields.py @@ -45,6 +45,7 @@ message_fields = { "conversation_id": fields.String, "parent_message_id": fields.String, "inputs": FilesContainedField, + "outputs": fields.Raw(attribute="outputs_dict"), "query": fields.String, "answer": fields.String(attribute="re_sign_file_url_answer"), "feedback": fields.Nested(feedback_fields, attribute="user_feedback", allow_null=True), diff --git a/api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py b/api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py new file mode 100644 index 0000000000..aa4d1126bd --- /dev/null +++ b/api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py @@ -0,0 +1,33 @@ +"""Update Message table outputs column to text and add outputs_dict property + +Revision ID: 83f1262d3f22 +Revises: 2adcbe1f5dfb +Create Date: 2025-06-03 12:29:08.619582 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '83f1262d3f22' +down_revision = '2adcbe1f5dfb' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.add_column(sa.Column('outputs', sa.Text(), server_default=sa.text("'{}'::text"), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.drop_column('outputs') + + # ### end Alembic commands ### diff --git a/api/models/model.py b/api/models/model.py index 229e77134e..26c6aa2a47 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -898,6 +898,7 @@ class Message(Base): message_unit_price = db.Column(db.Numeric(10, 4), nullable=False) message_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) answer: Mapped[str] = db.Column(db.Text, nullable=False) + outputs: Mapped[Optional[str]] = mapped_column("outputs", db.Text, nullable=True, server_default=db.text("'{}'::text")) answer_tokens = db.Column(db.Integer, nullable=False, server_default=db.text("0")) answer_unit_price = db.Column(db.Numeric(10, 4), nullable=False) answer_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) @@ -1086,6 +1087,14 @@ class Message(Base): def message_metadata_dict(self) -> dict: return json.loads(self.message_metadata) if self.message_metadata else {} + @property + def outputs_dict(self) -> dict: + return json.loads(self.outputs) if self.outputs else {} + + @outputs_dict.setter + def outputs_dict(self, value: Mapping[str, Any]): + self.outputs = json.dumps(value, ensure_ascii=False) if value else "{}" + @property def agent_thoughts(self): return ( @@ -1180,6 +1189,7 @@ class Message(Base): "model_id": self.model_id, "inputs": self.inputs, "query": self.query, + "outputs": self.outputs_dict, "total_price": self.total_price, "message": self.message, "answer": self.answer, diff --git a/web/app/components/workflow/nodes/answer/node.tsx b/web/app/components/workflow/nodes/answer/node.tsx index bb28066d95..57e3fab7bd 100644 --- a/web/app/components/workflow/nodes/answer/node.tsx +++ b/web/app/components/workflow/nodes/answer/node.tsx @@ -4,12 +4,44 @@ import { useTranslation } from 'react-i18next' import InfoPanel from '../_base/components/info-panel' import ReadonlyInputWithSelectVar from '../_base/components/readonly-input-with-select-var' import type { AnswerNodeType } from './types' -import type { NodeProps } from '@/app/components/workflow/types' +import { + useIsChatMode, + useWorkflow, + useWorkflowVariables, +} from '@/app/components/workflow/hooks' +import { BlockEnum } from '@/app/components/workflow/types' +import type { NodeProps, Variable } from '@/app/components/workflow/types' +import { isConversationVar, isENV, isSystemVar } from '@/app/components/workflow/nodes/_base/components/variable/utils' +import { VarBlockIcon } from '@/app/components/workflow/block-icon' +import { Variable02 } from '@/app/components/base/icons/src/vender/solid/development' +import { BubbleX, Env } from '@/app/components/base/icons/src/vender/line/others' +import { Line3 } from '@/app/components/base/icons/src/public/common' +import cn from 'classnames' + const Node: FC> = ({ id, data, }) => { const { t } = useTranslation() + const { getBeforeNodesInSameBranch } = useWorkflow() + const availableNodes = getBeforeNodesInSameBranch(id) + const { getCurrentVariableType } = useWorkflowVariables() + const isChatMode = useIsChatMode() + + const startNode = availableNodes.find((node: any) => { + return node.data.type === BlockEnum.Start + }) + + const getNode = (id: string) => { + return availableNodes.find(node => node.id === id) || startNode + } + + const { outputs = [] } = data + const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0) + console.log('filteredOutputs', filteredOutputs) + + if (!filteredOutputs.length) + return null return (
@@ -19,7 +51,52 @@ const Node: FC> = ({ nodeId={id} /> } /> +
+ {filteredOutputs.map(({ value_selector }, index) => { + const node = getNode(value_selector[0]) + const isSystem = isSystemVar(value_selector) + const isEnv = isENV(value_selector) + const isChatVar = isConversationVar(value_selector) + const varName = isSystem ? `sys.${value_selector[value_selector.length - 1]}` : value_selector[value_selector.length - 1] + const varType = getCurrentVariableType({ + valueSelector: value_selector, + availableNodes, + isChatMode, + }) + return ( +
+
+ {!isEnv && ( + <> +
+ +
+
{node?.data.title}
+ + + )} +
+ {!isEnv && !isChatVar && } + {isEnv && } + {isChatVar && } + +
{varName}
+
+
+
+
{varType}
+
+
+ ) + })} + +
+
+ ) } diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index 2a4b70ee32..0cec7f63f8 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -19,8 +19,12 @@ const Panel: FC> = ({ inputs, handleAnswerChange, filterVar, + handleVarListChange, + handleAddVariable, } = useConfig(id, data) + const outputs = inputs.outputs + const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { onlyLeafNodeVar: false, hideChatVar: false, @@ -29,17 +33,34 @@ const Panel: FC> = ({ }) return ( -
- +
+
+ +
+ {/*
+ : undefined + } + > + + +
*/}
) } diff --git a/web/app/components/workflow/nodes/answer/types.ts b/web/app/components/workflow/nodes/answer/types.ts index 8f1b3cb6cd..721bb0aacb 100644 --- a/web/app/components/workflow/nodes/answer/types.ts +++ b/web/app/components/workflow/nodes/answer/types.ts @@ -1,6 +1,7 @@ import type { CommonNodeType, Variable } from '@/app/components/workflow/types' export type AnswerNodeType = CommonNodeType & { + outputs: Variable[] variables: Variable[] answer: string } From df3bf8e52e12a6ae5ecb81d034bb0ed983b41150 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 4 Jun 2025 02:58:42 +0000 Subject: [PATCH 02/33] style: format outputs field definition for better readability in Message model --- api/models/model.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/models/model.py b/api/models/model.py index 26c6aa2a47..eccb49f507 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -898,7 +898,9 @@ class Message(Base): message_unit_price = db.Column(db.Numeric(10, 4), nullable=False) message_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) answer: Mapped[str] = db.Column(db.Text, nullable=False) - outputs: Mapped[Optional[str]] = mapped_column("outputs", db.Text, nullable=True, server_default=db.text("'{}'::text")) + outputs: Mapped[Optional[str]] = mapped_column( + "outputs", db.Text, nullable=True, server_default=db.text("'{}'::text") + ) answer_tokens = db.Column(db.Integer, nullable=False, server_default=db.text("0")) answer_unit_price = db.Column(db.Numeric(10, 4), nullable=False) answer_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) From cf4fd0ecb494ecb5dc7ec128143bd516aa9be9d7 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Thu, 5 Jun 2025 01:34:11 +0000 Subject: [PATCH 03/33] feat: enhance answer node functionality and output handling - Updated `AnswerNode` to include dynamic output variable handling. - Modified `AnswerStreamProcessor` to manage output states and ensure proper event processing. - Added `outputs` field to relevant components and TypeScript types for better integration. - Improved logging for debugging purposes in task pipeline and answer stream processing. --- .../apps/advanced_chat/generate_task_pipeline.py | 6 ++---- api/core/app/entities/task_entities.py | 2 +- api/core/workflow/nodes/answer/answer_node.py | 13 ++++++++++++- .../nodes/answer/answer_stream_generate_router.py | 14 +++++++++++++- .../nodes/answer/answer_stream_processor.py | 14 +++++++++++++- api/core/workflow/nodes/answer/entities.py | 3 ++- .../components/workflow/nodes/answer/default.ts | 1 + web/app/components/workflow/nodes/answer/panel.tsx | 7 +++++-- .../components/workflow/nodes/answer/use-config.ts | 1 + 9 files changed, 50 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index e17ed0ebaa..1d2b843043 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -165,7 +165,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) - print(f"generator: {generator}") + print(f"generator: {generator}=======") if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -183,7 +183,6 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata - print(f"stream_response: {stream_response}") return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( @@ -210,7 +209,6 @@ class AdvancedChatAppGenerateTaskPipeline: :return: """ for stream_response in generator: - print(f"stream_response: {stream_response}") yield ChatbotAppStreamResponse( conversation_id=self._conversation_id, message_id=self._message_id, @@ -361,7 +359,6 @@ class AdvancedChatAppGenerateTaskPipeline: self._recorded_files.extend( self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) ) - with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event @@ -494,6 +491,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not graph_runtime_state: raise ValueError("workflow run not initialized.") + print(f"outputsfff: {event.outputs}") with Session(db.engine, expire_on_commit=False) as session: workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 25c889e922..ac464f8d1f 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -49,7 +49,7 @@ class WorkflowTaskState(TaskState): """ answer: str = "" - + outputs: Optional[Mapping[str, Any]] = None class StreamEvent(Enum): """ diff --git a/api/core/workflow/nodes/answer/answer_node.py b/api/core/workflow/nodes/answer/answer_node.py index aa030870e2..9b1ccde2a5 100644 --- a/api/core/workflow/nodes/answer/answer_node.py +++ b/api/core/workflow/nodes/answer/answer_node.py @@ -45,7 +45,18 @@ class AnswerNode(BaseNode[AnswerNodeData]): part = cast(TextGenerateRouteChunk, part) answer += part.text - return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"answer": answer, "files": files}) + output_variables = self.node_data.outputs + + outputs = {} + for variable_selector in output_variables: + variable = self.graph_runtime_state.variable_pool.get(variable_selector.value_selector) + value = variable.to_object() if variable is not None else None + outputs[variable_selector.variable] = value + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs={"answer": answer, "files": files, "outputs": outputs}, + ) @classmethod def _extract_variable_selector_to_variable_mapping( diff --git a/api/core/workflow/nodes/answer/answer_stream_generate_router.py b/api/core/workflow/nodes/answer/answer_stream_generate_router.py index 1d9c3e9b96..628476677c 100644 --- a/api/core/workflow/nodes/answer/answer_stream_generate_router.py +++ b/api/core/workflow/nodes/answer/answer_stream_generate_router.py @@ -23,6 +23,7 @@ class AnswerStreamGeneratorRouter: """ # parse stream output node value selectors of answer nodes answer_generate_route: dict[str, list[GenerateRouteChunk]] = {} + answer_stream_variable_selectors_mapping: dict[str, list[list[str]]] = {} for answer_node_id, node_config in node_id_config_mapping.items(): if node_config.get("data", {}).get("type") != NodeType.ANSWER.value: continue @@ -30,6 +31,7 @@ class AnswerStreamGeneratorRouter: # get generate route for stream output generate_route = cls._extract_generate_route_selectors(node_config) answer_generate_route[answer_node_id] = generate_route + answer_stream_variable_selectors_mapping[answer_node_id] = generate_route # fetch answer dependencies answer_node_ids = list(answer_generate_route.keys()) @@ -38,9 +40,18 @@ class AnswerStreamGeneratorRouter: reverse_edge_mapping=reverse_edge_mapping, node_id_config_mapping=node_id_config_mapping, ) + answer_end_ids = list(answer_stream_variable_selectors_mapping.keys()) + answer_end_dependencies = cls._fetch_answers_dependencies( + answer_node_ids=answer_end_ids, + reverse_edge_mapping=reverse_edge_mapping, + node_id_config_mapping=node_id_config_mapping, + ) return AnswerStreamGenerateRoute( - answer_generate_route=answer_generate_route, answer_dependencies=answer_dependencies + answer_generate_route=answer_generate_route, + answer_dependencies=answer_dependencies, + answer_stream_variable_selectors_mapping=answer_stream_variable_selectors_mapping, + answer_end_dependencies=answer_end_dependencies, ) @classmethod @@ -89,6 +100,7 @@ class AnswerStreamGeneratorRouter: :param config: node config :return: """ + print(f"config: {config}=======") node_data = AnswerNodeData(**config.get("data", {})) return cls.extract_generate_route_from_node_data(node_data) diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index ba6ba16e36..2e48b4c5b2 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -26,9 +26,12 @@ class AnswerStreamProcessor(StreamProcessor): for answer_node_id in self.generate_routes.answer_generate_route: self.route_position[answer_node_id] = 0 self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {} - + self.has_output = False + self.output_node_ids: set[str] = set() + def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generator[GraphEngineEvent, None, None]: for event in generator: + if isinstance(event, NodeRunStartedEvent): if event.route_node_state.node_id == self.graph.root_node_id and not self.rest_node_ids: self.reset() @@ -36,6 +39,11 @@ class AnswerStreamProcessor(StreamProcessor): yield event elif isinstance(event, NodeRunStreamChunkEvent): if event.in_iteration_id or event.in_loop_id: + if self.has_output and event.node_id not in self.output_node_ids: + print(f"event22: {event}") + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event continue @@ -50,6 +58,10 @@ class AnswerStreamProcessor(StreamProcessor): ) for _ in stream_out_answer_node_ids: + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event elif isinstance(event, NodeRunSucceededEvent | NodeRunExceptionEvent): yield event diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index a05cc44c99..e38a60111d 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -4,7 +4,7 @@ from enum import Enum from pydantic import BaseModel, Field from core.workflow.nodes.base import BaseNodeData - +from core.workflow.entities.variable_entities import VariableSelector class AnswerNodeData(BaseNodeData): """ @@ -12,6 +12,7 @@ class AnswerNodeData(BaseNodeData): """ answer: str = Field(..., description="answer template string") + outputs: list[VariableSelector] class GenerateRouteChunk(BaseModel): diff --git a/web/app/components/workflow/nodes/answer/default.ts b/web/app/components/workflow/nodes/answer/default.ts index 4ff6e49d7e..9538330b80 100644 --- a/web/app/components/workflow/nodes/answer/default.ts +++ b/web/app/components/workflow/nodes/answer/default.ts @@ -7,6 +7,7 @@ const nodeDefault: NodeDefault = { defaultValue: { variables: [], answer: '', + outputs: [], }, getAvailablePrevNodes(isChatMode: boolean) { const nodes = isChatMode diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index 0cec7f63f8..f76931d745 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -7,6 +7,9 @@ import Editor from '@/app/components/workflow/nodes/_base/components/prompt/edit import type { NodePanelProps } from '@/app/components/workflow/types' import useAvailableVarList from '@/app/components/workflow/nodes/_base/hooks/use-available-var-list' const i18nPrefix = 'workflow.nodes.answer' +import Field from '@/app/components/workflow/nodes/_base/components/field' +import AddButton from '@/app/components/base/button/add-button' +import VarList from '@/app/components/workflow/nodes/_base/components/variable/var-list' const Panel: FC> = ({ id, @@ -46,7 +49,7 @@ const Panel: FC> = ({ isSupportFileVar />
- {/*
+
> = ({ onChange={handleVarListChange} /> -
*/} +
) } diff --git a/web/app/components/workflow/nodes/answer/use-config.ts b/web/app/components/workflow/nodes/answer/use-config.ts index 269d953743..83544a4cfa 100644 --- a/web/app/components/workflow/nodes/answer/use-config.ts +++ b/web/app/components/workflow/nodes/answer/use-config.ts @@ -16,6 +16,7 @@ const useConfig = (id: string, payload: AnswerNodeType) => { const { handleVarListChange, handleAddVariable } = useVarList({ inputs, setInputs, + varKey: 'outputs', }) const handleAnswerChange = useCallback((value: string) => { From fb420781abe1b4f243ef18dbc0a5f374165f9bb2 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Thu, 5 Jun 2025 01:34:30 +0000 Subject: [PATCH 04/33] fix: reorder import statements in AnswerNodeData for consistency and clarity --- api/core/workflow/nodes/answer/entities.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index e38a60111d..871a965e5d 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -3,8 +3,9 @@ from enum import Enum from pydantic import BaseModel, Field -from core.workflow.nodes.base import BaseNodeData from core.workflow.entities.variable_entities import VariableSelector +from core.workflow.nodes.base import BaseNodeData + class AnswerNodeData(BaseNodeData): """ From e36b6e2f261e96521050ab4638c44b578518398f Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 10 Jun 2025 13:16:32 +0000 Subject: [PATCH 05/33] feat: enhance task state management and output handling in advanced chat pipeline --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 7 +++++-- api/core/app/entities/task_entities.py | 1 + api/core/workflow/graph_engine/graph_engine.py | 1 + .../components/workflow/nodes/_base/hooks/use-var-list.ts | 2 ++ web/app/components/workflow/nodes/answer/node.tsx | 1 - web/app/components/workflow/nodes/answer/panel.tsx | 2 +- 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 1d2b843043..c470bf3655 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,3 +1,4 @@ +import json import logging import time from collections.abc import Generator, Mapping @@ -506,7 +507,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - print(f"workflow_finish_resp: {workflow_finish_resp}") + self._task_state.metadata.data = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') + print(f"self._task_state.metadata.data: {self._task_state.metadata.data}") yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE @@ -630,6 +632,7 @@ class AdvancedChatAppGenerateTaskPipeline: tts_publisher.publish(queue_message) self._task_state.answer += delta_text + print(f"self._task_state.answer: {self._task_state.answer}") yield self._message_cycle_manager.message_to_stream_response( answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector ) @@ -651,7 +654,7 @@ class AdvancedChatAppGenerateTaskPipeline: answer=output_moderation_answer, reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, ) - + print(f"graph_runtime_state: {graph_runtime_state}") # Save message with Session(db.engine, expire_on_commit=False) as session: self._save_message(session=session, graph_runtime_state=graph_runtime_state) diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index ac464f8d1f..7e628fb62d 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -24,6 +24,7 @@ class AnnotationReply(BaseModel): class TaskStateMetadata(BaseModel): annotation_reply: AnnotationReply | None = None retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list) + data: Optional[Mapping[str, Any]] = None usage: LLMUsage | None = None diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 363b2ee920..9870df749d 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -194,6 +194,7 @@ class GraphEngine: self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ "answer" ].strip() + self.graph_runtime_state.outputs["outputs"] = item.route_node_state.node_run_result.outputs except Exception as e: logger.exception("Graph run failed") yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions)) diff --git a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts index 63d284f260..fe8360465e 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts @@ -21,6 +21,8 @@ function useVarList({ const handleAddVariable = useCallback(() => { const newInputs = produce(inputs, (draft: any) => { + if (!draft[varKey]) + draft[varKey] = [] draft[varKey].push({ variable: '', value_selector: [], diff --git a/web/app/components/workflow/nodes/answer/node.tsx b/web/app/components/workflow/nodes/answer/node.tsx index 57e3fab7bd..3d36de4565 100644 --- a/web/app/components/workflow/nodes/answer/node.tsx +++ b/web/app/components/workflow/nodes/answer/node.tsx @@ -38,7 +38,6 @@ const Node: FC> = ({ const { outputs = [] } = data const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0) - console.log('filteredOutputs', filteredOutputs) if (!filteredOutputs.length) return null diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index f76931d745..8bfc2c8191 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -26,7 +26,7 @@ const Panel: FC> = ({ handleAddVariable, } = useConfig(id, data) - const outputs = inputs.outputs + const outputs = inputs?.outputs || [] const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { onlyLeafNodeVar: false, From b20aa20ac570f1b59bade61ebd1d7fde2db3423a Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 10 Jun 2025 13:17:23 +0000 Subject: [PATCH 06/33] fix: remove debug print statements in generate_task_pipeline --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index c470bf3655..f667e09a2a 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,4 +1,3 @@ -import json import logging import time from collections.abc import Generator, Mapping @@ -492,7 +491,6 @@ class AdvancedChatAppGenerateTaskPipeline: if not graph_runtime_state: raise ValueError("workflow run not initialized.") - print(f"outputsfff: {event.outputs}") with Session(db.engine, expire_on_commit=False) as session: workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, @@ -508,7 +506,6 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution=workflow_execution, ) self._task_state.metadata.data = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') - print(f"self._task_state.metadata.data: {self._task_state.metadata.data}") yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE From 4f9879d2ddefa8a5af5f181971c82c9feffa2b53 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 02:56:56 +0000 Subject: [PATCH 07/33] fix: remove debug print statements in generate_task_pipeline --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index f667e09a2a..367d12c43b 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -629,7 +629,6 @@ class AdvancedChatAppGenerateTaskPipeline: tts_publisher.publish(queue_message) self._task_state.answer += delta_text - print(f"self._task_state.answer: {self._task_state.answer}") yield self._message_cycle_manager.message_to_stream_response( answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector ) @@ -651,7 +650,6 @@ class AdvancedChatAppGenerateTaskPipeline: answer=output_moderation_answer, reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, ) - print(f"graph_runtime_state: {graph_runtime_state}") # Save message with Session(db.engine, expire_on_commit=False) as session: self._save_message(session=session, graph_runtime_state=graph_runtime_state) From fca91d1024a1c00e6cc4b6b1f30d8db4fa744469 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 12:43:06 +0000 Subject: [PATCH 08/33] fix: update output handling in task state metadata and remove debug print statements --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 6 +++--- api/core/app/entities/task_entities.py | 2 +- .../workflow/nodes/answer/answer_stream_generate_router.py | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 367d12c43b..a298f451a1 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -165,7 +165,6 @@ class AdvancedChatAppGenerateTaskPipeline: ) generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) - print(f"generator: {generator}=======") if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -183,12 +182,13 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata + final_outputs = self._task_state.metadata.outputs if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs') else {} return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( id=self._message_id, mode=self._conversation_mode, - outputs=stream_response.data.outputs, + outputs=final_outputs, conversation_id=self._conversation_id, message_id=self._message_id, answer=self._task_state.answer, @@ -505,7 +505,7 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - self._task_state.metadata.data = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') + self._task_state.metadata.outputs = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 7e628fb62d..cca8ce7737 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -24,7 +24,7 @@ class AnnotationReply(BaseModel): class TaskStateMetadata(BaseModel): annotation_reply: AnnotationReply | None = None retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list) - data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None usage: LLMUsage | None = None diff --git a/api/core/workflow/nodes/answer/answer_stream_generate_router.py b/api/core/workflow/nodes/answer/answer_stream_generate_router.py index 628476677c..ce6095a34f 100644 --- a/api/core/workflow/nodes/answer/answer_stream_generate_router.py +++ b/api/core/workflow/nodes/answer/answer_stream_generate_router.py @@ -100,7 +100,6 @@ class AnswerStreamGeneratorRouter: :param config: node config :return: """ - print(f"config: {config}=======") node_data = AnswerNodeData(**config.get("data", {})) return cls.extract_generate_route_from_node_data(node_data) From 44d13d60ddac2334bed3bae78a1d4fb27c22840b Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 12:43:11 +0000 Subject: [PATCH 09/33] fix: refine output retrieval from task state metadata in advanced chat pipeline --- .../app/apps/advanced_chat/generate_task_pipeline.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a298f451a1..d94a14ed04 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -182,7 +182,12 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata - final_outputs = self._task_state.metadata.outputs if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs') else {} + + # Retrieve outputs from task state metadata, which is populated earlier + final_outputs = {} + if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs'): + final_outputs = self._task_state.metadata.outputs + return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( @@ -505,7 +510,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - self._task_state.metadata.outputs = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') + workflow_outputs_data = workflow_finish_resp.data.outputs.get('outputs', {}) + self._task_state.metadata.outputs = workflow_outputs_data.get('outputs') yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE From 4c7e9d0dd490d9830bf4d9d70481d318c14bde1a Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 12:48:27 +0000 Subject: [PATCH 10/33] fix: remove print --- api/core/workflow/nodes/answer/answer_stream_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index 2e48b4c5b2..e14e17d318 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -40,7 +40,6 @@ class AnswerStreamProcessor(StreamProcessor): elif isinstance(event, NodeRunStreamChunkEvent): if event.in_iteration_id or event.in_loop_id: if self.has_output and event.node_id not in self.output_node_ids: - print(f"event22: {event}") event.chunk_content = "\n" + event.chunk_content self.output_node_ids.add(event.node_id) self.has_output = True From 1d19f7f88a98127046e8b457392f4433faaa53e2 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 12:58:17 +0000 Subject: [PATCH 11/33] remove reversion --- ...de22b8f3_merge_multiple_heads_for_database_.py} | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) rename api/migrations/versions/{2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py => 2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py} (75%) diff --git a/api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py b/api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py similarity index 75% rename from api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py rename to api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py index aa4d1126bd..69532ed109 100644 --- a/api/migrations/versions/2025_06_03_1229-83f1262d3f22_update_message_table_outputs_column_to_.py +++ b/api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py @@ -1,8 +1,8 @@ -"""Update Message table outputs column to text and add outputs_dict property +"""Merge multiple heads for database migration -Revision ID: 83f1262d3f22 -Revises: 2adcbe1f5dfb -Create Date: 2025-06-03 12:29:08.619582 +Revision ID: 75f1de22b8f3 +Revises: 83f1262d3f22, 4474872b0ee6 +Create Date: 2025-06-11 12:54:20.128554 """ from alembic import op @@ -11,8 +11,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '83f1262d3f22' -down_revision = '2adcbe1f5dfb' +revision = '75f1de22b8f3' +down_revision = '4474872b0ee6' branch_labels = None depends_on = None @@ -21,7 +21,6 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('messages', schema=None) as batch_op: batch_op.add_column(sa.Column('outputs', sa.Text(), server_default=sa.text("'{}'::text"), nullable=True)) - # ### end Alembic commands ### @@ -29,5 +28,4 @@ def downgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('messages', schema=None) as batch_op: batch_op.drop_column('outputs') - # ### end Alembic commands ### From 81ff539253d116ed6df962f6c8591f812893e1bf Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 13:04:35 +0000 Subject: [PATCH 12/33] update style --- .../advanced_chat/generate_task_pipeline.py | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index d94a14ed04..09220de6e6 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -164,7 +164,8 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, query=self._application_generate_entity.query ) - generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) + generator = self._wrapper_process_stream_response( + trace_manager=self._application_generate_entity.trace_manager) if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -182,7 +183,7 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata - + # Retrieve outputs from task state metadata, which is populated earlier final_outputs = {} if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs'): @@ -243,12 +244,14 @@ class AdvancedChatAppGenerateTaskPipeline: and features_dict["text_to_speech"].get("autoPlay") == "enabled" ): tts_publisher = AppGeneratorTTSPublisher( - tenant_id, features_dict["text_to_speech"].get("voice"), features_dict["text_to_speech"].get("language") + tenant_id, features_dict["text_to_speech"].get( + "voice"), features_dict["text_to_speech"].get("language") ) for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager): while True: - audio_response = self._listen_audio_msg(publisher=tts_publisher, task_id=task_id) + audio_response = self._listen_audio_msg( + publisher=tts_publisher, task_id=task_id) if audio_response: yield audio_response else: @@ -273,7 +276,8 @@ class AdvancedChatAppGenerateTaskPipeline: start_listener_time = time.time() yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) except Exception: - logger.exception(f"Failed to listen audio message, task_id: {task_id}") + logger.exception( + f"Failed to listen audio message, task_id: {task_id}") break if tts_publisher: yield MessageAudioEndStreamResponse(audio="", task_id=task_id) @@ -313,7 +317,8 @@ class AdvancedChatAppGenerateTaskPipeline: self._workflow_run_id = workflow_execution.id_ message = self._get_message(session=session) if not message: - raise ValueError(f"Message not found: {self._message_id}") + raise ValueError( + f"Message not found: {self._message_id}") message.workflow_run_id = workflow_execution.id_ workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, @@ -362,7 +367,8 @@ class AdvancedChatAppGenerateTaskPipeline: # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: self._recorded_files.extend( - self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) + self._workflow_response_converter.fetch_files_from_node_outputs( + event.outputs or {}) ) with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( @@ -510,8 +516,10 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - workflow_outputs_data = workflow_finish_resp.data.outputs.get('outputs', {}) - self._task_state.metadata.outputs = workflow_outputs_data.get('outputs') + workflow_outputs_data = workflow_finish_resp.data.outputs.get( + 'outputs', {}) + self._task_state.metadata.outputs = workflow_outputs_data.get( + 'outputs') yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE @@ -564,7 +572,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}")) + err_event = QueueErrorEvent(error=ValueError( + f"Run failed: {workflow_execution.error_message}")) err = self._base_task_pipeline._handle_error( event=err_event, session=session, message_id=self._message_id ) @@ -590,7 +599,8 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution=workflow_execution, ) # Save message - self._save_message(session=session, graph_runtime_state=graph_runtime_state) + self._save_message( + session=session, graph_runtime_state=graph_runtime_state) session.commit() yield workflow_finish_resp @@ -626,7 +636,8 @@ class AdvancedChatAppGenerateTaskPipeline: continue # handle output moderation chunk - should_direct_answer = self._handle_output_moderation_chunk(delta_text) + should_direct_answer = self._handle_output_moderation_chunk( + delta_text) if should_direct_answer: continue @@ -658,7 +669,8 @@ class AdvancedChatAppGenerateTaskPipeline: ) # Save message with Session(db.engine, expire_on_commit=False) as session: - self._save_message(session=session, graph_runtime_state=graph_runtime_state) + self._save_message( + session=session, graph_runtime_state=graph_runtime_state) session.commit() yield self._message_end_to_stream_response() @@ -679,7 +691,8 @@ class AdvancedChatAppGenerateTaskPipeline: def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None: message = self._get_message(session=session) message.answer = self._task_state.answer - message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at + message.provider_response_latency = time.perf_counter() - \ + self._base_task_pipeline._start_at message.message_metadata = self._task_state.metadata.model_dump_json() message_files = [ MessageFile( @@ -744,15 +757,18 @@ class AdvancedChatAppGenerateTaskPipeline: # stop subscribe new token when output moderation should direct output self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output() self._base_task_pipeline._queue_manager.publish( - QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE + QueueTextChunkEvent( + text=self._task_state.answer), PublishFrom.TASK_PIPELINE ) self._base_task_pipeline._queue_manager.publish( - QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE + QueueStopEvent( + stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE ) return True else: - self._base_task_pipeline._output_moderation_handler.append_new_token(text) + self._base_task_pipeline._output_moderation_handler.append_new_token( + text) return False From 314013aa3354bcbccf8e5c4a9118f9ad5291c648 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 13:07:08 +0000 Subject: [PATCH 13/33] update sytle --- .../advanced_chat/generate_task_pipeline.py | 50 +++++++------------ 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 09220de6e6..d8bcd84b51 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -164,8 +164,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, query=self._application_generate_entity.query ) - generator = self._wrapper_process_stream_response( - trace_manager=self._application_generate_entity.trace_manager) + generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -186,7 +185,7 @@ class AdvancedChatAppGenerateTaskPipeline: # Retrieve outputs from task state metadata, which is populated earlier final_outputs = {} - if self._task_state.metadata and hasattr(self._task_state.metadata, 'outputs'): + if self._task_state.metadata and hasattr(self._task_state.metadata, "outputs"): final_outputs = self._task_state.metadata.outputs return ChatbotAppBlockingResponse( @@ -244,14 +243,12 @@ class AdvancedChatAppGenerateTaskPipeline: and features_dict["text_to_speech"].get("autoPlay") == "enabled" ): tts_publisher = AppGeneratorTTSPublisher( - tenant_id, features_dict["text_to_speech"].get( - "voice"), features_dict["text_to_speech"].get("language") + tenant_id, features_dict["text_to_speech"].get("voice"), features_dict["text_to_speech"].get("language") ) for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager): while True: - audio_response = self._listen_audio_msg( - publisher=tts_publisher, task_id=task_id) + audio_response = self._listen_audio_msg(publisher=tts_publisher, task_id=task_id) if audio_response: yield audio_response else: @@ -276,8 +273,7 @@ class AdvancedChatAppGenerateTaskPipeline: start_listener_time = time.time() yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) except Exception: - logger.exception( - f"Failed to listen audio message, task_id: {task_id}") + logger.exception(f"Failed to listen audio message, task_id: {task_id}") break if tts_publisher: yield MessageAudioEndStreamResponse(audio="", task_id=task_id) @@ -317,8 +313,7 @@ class AdvancedChatAppGenerateTaskPipeline: self._workflow_run_id = workflow_execution.id_ message = self._get_message(session=session) if not message: - raise ValueError( - f"Message not found: {self._message_id}") + raise ValueError(f"Message not found: {self._message_id}") message.workflow_run_id = workflow_execution.id_ workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, @@ -367,8 +362,7 @@ class AdvancedChatAppGenerateTaskPipeline: # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: self._recorded_files.extend( - self._workflow_response_converter.fetch_files_from_node_outputs( - event.outputs or {}) + self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) ) with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( @@ -516,10 +510,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - workflow_outputs_data = workflow_finish_resp.data.outputs.get( - 'outputs', {}) - self._task_state.metadata.outputs = workflow_outputs_data.get( - 'outputs') + workflow_outputs_data = workflow_finish_resp.data.outputs.get("outputs", {}) + self._task_state.metadata.outputs = workflow_outputs_data.get("outputs") yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE @@ -572,8 +564,7 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - err_event = QueueErrorEvent(error=ValueError( - f"Run failed: {workflow_execution.error_message}")) + err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}")) err = self._base_task_pipeline._handle_error( event=err_event, session=session, message_id=self._message_id ) @@ -599,8 +590,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution=workflow_execution, ) # Save message - self._save_message( - session=session, graph_runtime_state=graph_runtime_state) + self._save_message(session=session, graph_runtime_state=graph_runtime_state) session.commit() yield workflow_finish_resp @@ -636,8 +626,7 @@ class AdvancedChatAppGenerateTaskPipeline: continue # handle output moderation chunk - should_direct_answer = self._handle_output_moderation_chunk( - delta_text) + should_direct_answer = self._handle_output_moderation_chunk(delta_text) if should_direct_answer: continue @@ -669,8 +658,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) # Save message with Session(db.engine, expire_on_commit=False) as session: - self._save_message( - session=session, graph_runtime_state=graph_runtime_state) + self._save_message(session=session, graph_runtime_state=graph_runtime_state) session.commit() yield self._message_end_to_stream_response() @@ -691,8 +679,7 @@ class AdvancedChatAppGenerateTaskPipeline: def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None: message = self._get_message(session=session) message.answer = self._task_state.answer - message.provider_response_latency = time.perf_counter() - \ - self._base_task_pipeline._start_at + message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at message.message_metadata = self._task_state.metadata.model_dump_json() message_files = [ MessageFile( @@ -757,18 +744,15 @@ class AdvancedChatAppGenerateTaskPipeline: # stop subscribe new token when output moderation should direct output self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output() self._base_task_pipeline._queue_manager.publish( - QueueTextChunkEvent( - text=self._task_state.answer), PublishFrom.TASK_PIPELINE + QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE ) self._base_task_pipeline._queue_manager.publish( - QueueStopEvent( - stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE + QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE ) return True else: - self._base_task_pipeline._output_moderation_handler.append_new_token( - text) + self._base_task_pipeline._output_moderation_handler.append_new_token(text) return False From d6240204cf579cd6ce332071cf3ceadc24ca81b1 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Jun 2025 13:07:49 +0000 Subject: [PATCH 14/33] fix: style --- api/core/app/entities/task_entities.py | 1 + api/core/workflow/nodes/answer/answer_stream_processor.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index cca8ce7737..40ace038d2 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -52,6 +52,7 @@ class WorkflowTaskState(TaskState): answer: str = "" outputs: Optional[Mapping[str, Any]] = None + class StreamEvent(Enum): """ Stream event diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index e14e17d318..afd7e6f593 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -28,10 +28,9 @@ class AnswerStreamProcessor(StreamProcessor): self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {} self.has_output = False self.output_node_ids: set[str] = set() - + def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generator[GraphEngineEvent, None, None]: for event in generator: - if isinstance(event, NodeRunStartedEvent): if event.route_node_state.node_id == self.graph.root_node_id and not self.rest_node_ids: self.reset() From 0e68195552f113bef24080e4dd236b1fe4e2c90b Mon Sep 17 00:00:00 2001 From: GuanMu Date: Thu, 12 Jun 2025 02:12:26 +0000 Subject: [PATCH 15/33] fix: Add output fields to all answer nodes --- .../core/workflow/graph_engine/test_graph.py | 20 ++++++------ .../graph_engine/test_graph_engine.py | 18 +++++++---- .../core/workflow/nodes/answer/test_answer.py | 1 + .../test_answer_stream_generate_router.py | 4 +-- .../answer/test_answer_stream_processor.py | 4 +-- .../nodes/iteration/test_iteration.py | 17 ++++++---- .../core/workflow/nodes/test_answer.py | 2 ++ .../workflow/nodes/test_continue_on_error.py | 32 +++++++++---------- .../core/workflow/nodes/test_retry.py | 4 +-- 9 files changed, 57 insertions(+), 45 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py index 13ba11016a..8a7bafec1f 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph.py @@ -43,7 +43,7 @@ def test_init(): "id": "llm", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -57,7 +57,7 @@ def test_init(): "id": "http", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer2", }, ], @@ -126,7 +126,7 @@ def test__init_iteration_graph(): "id": "llm", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -148,7 +148,7 @@ def test__init_iteration_graph(): "parentId": "iteration", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer-in-iteration", "parentId": "iteration", }, @@ -235,7 +235,7 @@ def test_parallels_graph(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -310,7 +310,7 @@ def test_parallels_graph2(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -373,7 +373,7 @@ def test_parallels_graph3(): "id": "llm3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -480,7 +480,7 @@ def test_parallels_graph4(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], @@ -606,7 +606,7 @@ def test_parallels_graph5(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, { @@ -745,7 +745,7 @@ def test_parallels_graph6(): "id": "code3", }, { - "data": {"type": "answer", "title": "answer", "answer": "1"}, + "data": {"type": "answer", "title": "answer", "answer": "1", "outputs": []}, "id": "answer", }, ], diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index 7535ec4866..e8d8efb97c 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -267,21 +267,21 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove): ], "nodes": [ {"data": {"type": "start", "title": "start"}, "id": "start"}, - {"data": {"type": "answer", "title": "answer1", "answer": "1"}, "id": "answer1"}, + {"data": {"type": "answer", "title": "answer1", "answer": "1", "outputs": []}, "id": "answer1"}, { - "data": {"type": "answer", "title": "answer2", "answer": "2"}, + "data": {"type": "answer", "title": "answer2", "answer": "2", "outputs": []}, "id": "answer2", }, { - "data": {"type": "answer", "title": "answer3", "answer": "3"}, + "data": {"type": "answer", "title": "answer3", "answer": "3", "outputs": []}, "id": "answer3", }, { - "data": {"type": "answer", "title": "answer4", "answer": "4"}, + "data": {"type": "answer", "title": "answer4", "answer": "4", "outputs": []}, "id": "answer4", }, { - "data": {"type": "answer", "title": "answer5", "answer": "5"}, + "data": {"type": "answer", "title": "answer5", "answer": "5", "outputs": []}, "id": "answer5", }, ], @@ -398,7 +398,7 @@ def test_run_branch(mock_close, mock_remove): "id": "start", }, { - "data": {"answer": "1 {{#start.uid#}}", "title": "Answer", "type": "answer", "variables": []}, + "data": {"answer": "1 {{#start.uid#}}", "title": "Answer", "type": "answer", "variables": [], "outputs": []}, "id": "answer-1", }, { @@ -453,6 +453,7 @@ def test_run_branch(mock_close, mock_remove): "answer": "2", "title": "Answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -461,6 +462,7 @@ def test_run_branch(mock_close, mock_remove): "answer": "3", "title": "Answer 3", "type": "answer", + "outputs": [], }, "id": "answer-3", }, @@ -701,6 +703,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 2", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382531085", @@ -720,6 +723,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 3", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382534798", @@ -739,6 +743,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "answer 4", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742382538517", @@ -758,6 +763,7 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): "title": "Answer 5", "type": "answer", "variables": [], + "outputs": [], }, "height": 105, "id": "1742434464898", diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index b7f78d91fa..860bc29455 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -69,6 +69,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, }, ) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py index bce87536d8..76b8f596e7 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_generate_router.py @@ -89,11 +89,11 @@ def test_init(): "id": "llm5", }, { - "data": {"type": "answer", "title": "answer", "answer": "1{{#llm2.text#}}2"}, + "data": {"type": "answer", "title": "answer", "answer": "1{{#llm2.text#}}2", "outputs": []}, "id": "answer", }, { - "data": {"type": "answer", "title": "answer2", "answer": "1{{#llm3.text#}}2"}, + "data": {"type": "answer", "title": "answer2", "answer": "1{{#llm3.text#}}2", "outputs": []}, "id": "answer2", }, ], diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py index c3a3818655..19147fcde7 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py @@ -167,11 +167,11 @@ def test_process(): "id": "llm5", }, { - "data": {"type": "answer", "title": "answer", "answer": "a{{#llm2.text#}}b"}, + "data": {"type": "answer", "title": "answer", "answer": "a{{#llm2.text#}}b", "outputs": []}, "id": "answer", }, { - "data": {"type": "answer", "title": "answer2", "answer": "c{{#llm3.text#}}d"}, + "data": {"type": "answer", "title": "answer2", "answer": "c{{#llm3.text#}}d", "outputs": []}, "id": "answer2", }, ], diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 6d854c950d..7db224ba3f 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -74,6 +74,7 @@ def test_run(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -88,7 +89,7 @@ def test_run(): "id": "tt", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, "id": "answer-3", }, { @@ -109,7 +110,7 @@ def test_run(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, "id": "answer-4", }, { @@ -273,6 +274,7 @@ def test_run_parallel(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -305,7 +307,7 @@ def test_run_parallel(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, "id": "answer-3", }, { @@ -326,7 +328,7 @@ def test_run_parallel(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, "id": "answer-4", }, { @@ -489,6 +491,7 @@ def test_iteration_run_in_parallel_mode(): "iteration_id": "iteration-1", "title": "answer 2", "type": "answer", + "outputs": [], }, "id": "answer-2", }, @@ -521,7 +524,7 @@ def test_iteration_run_in_parallel_mode(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, "id": "answer-3", }, { @@ -542,7 +545,7 @@ def test_iteration_run_in_parallel_mode(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, "id": "answer-4", }, { @@ -741,7 +744,7 @@ def test_iteration_run_error_handle(): "id": "tt2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, "id": "answer-3", }, { diff --git a/api/tests/unit_tests/core/workflow/nodes/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/test_answer.py index abc822e98b..2b7a3c8ea7 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_answer.py @@ -31,6 +31,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, "id": "answer", }, @@ -72,6 +73,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, }, ) diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index ff60d5974b..7802e88028 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -239,7 +239,7 @@ def test_code_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, ContinueOnErrorTestHelper.get_code_node( error_code, "default-value", [{"key": "result", "type": "number", "value": 132123}] ), @@ -266,11 +266,11 @@ def test_code_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "node node run successfully"}, + "data": {"title": "success", "type": "answer", "answer": "node node run successfully", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "node node run failed"}, + "data": {"title": "error", "type": "answer", "answer": "node node run failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_code_node(error_code), @@ -292,7 +292,7 @@ def test_http_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}"}, "id": "answer"}, + {"data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}", "outputs": []}, "id": "answer"}, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "response", "type": "string", "value": "http node got error response"}] ), @@ -317,11 +317,11 @@ def test_http_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, + "data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "HTTP request failed"}, + "data": {"title": "error", "type": "answer", "answer": "HTTP request failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_http_node(), @@ -395,7 +395,7 @@ def test_llm_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}"}, "id": "answer"}, + {"data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}", "outputs": []}, "id": "answer"}, ContinueOnErrorTestHelper.get_llm_node( "default-value", [{"key": "answer", "type": "string", "value": "default LLM response"}] ), @@ -419,11 +419,11 @@ def test_llm_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "LLM request successful"}, + "data": {"title": "success", "type": "answer", "answer": "LLM request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "LLM request failed"}, + "data": {"title": "error", "type": "answer", "answer": "LLM request failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_llm_node(), @@ -447,11 +447,11 @@ def test_status_code_error_http_node_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "http execute successful"}, + "data": {"title": "success", "type": "answer", "answer": "http execute successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "http execute failed"}, + "data": {"title": "error", "type": "answer", "answer": "http execute failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_error_status_code_http_node(), @@ -474,11 +474,11 @@ def test_variable_pool_error_type_variable(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "http execute successful"}, + "data": {"title": "success", "type": "answer", "answer": "http execute successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "http execute failed"}, + "data": {"title": "error", "type": "answer", "answer": "http execute failed", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_error_status_code_http_node(), @@ -499,7 +499,7 @@ def test_no_node_in_fail_branch_continue_on_error(): "edges": FAIL_BRANCH_EDGES[:-1], "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, "id": "success"}, + {"data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, "id": "success"}, ContinueOnErrorTestHelper.get_http_node(), ], } @@ -519,11 +519,11 @@ def test_stream_output_with_fail_branch_continue_on_error(): "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { - "data": {"title": "success", "type": "answer", "answer": "LLM request successful"}, + "data": {"title": "success", "type": "answer", "answer": "LLM request successful", "outputs": []}, "id": "success", }, { - "data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}"}, + "data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}", "outputs": []}, "id": "error", }, ContinueOnErrorTestHelper.get_llm_node(), diff --git a/api/tests/unit_tests/core/workflow/nodes/test_retry.py b/api/tests/unit_tests/core/workflow/nodes/test_retry.py index 57d3b203b9..40ac03188d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_retry.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_retry.py @@ -27,7 +27,7 @@ def test_retry_default_value_partial_success(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "result", "type": "string", "value": "http node got error response"}], @@ -50,7 +50,7 @@ def test_retry_failed(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"}, + {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, ContinueOnErrorTestHelper.get_http_node( None, None, From 12e3c00ec85e282eeb30845723344f644134b9ab Mon Sep 17 00:00:00 2001 From: GuanMu Date: Thu, 12 Jun 2025 03:06:30 +0000 Subject: [PATCH 16/33] fix:Delete the migration file for modifying the default model name length and remove the related output fields and attributes --- ...f8c4f3_modify_default_model_name_length.py | 39 -------------- .../graph_engine/test_graph_engine.py | 8 ++- .../nodes/iteration/test_iteration.py | 52 ++++++++++++++++--- .../workflow/nodes/test_continue_on_error.py | 20 +++++-- .../core/workflow/nodes/test_retry.py | 10 +++- 5 files changed, 76 insertions(+), 53 deletions(-) delete mode 100644 api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py diff --git a/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py b/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py deleted file mode 100644 index b37928d3c0..0000000000 --- a/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py +++ /dev/null @@ -1,39 +0,0 @@ -"""modify default model name length - -Revision ID: 47cc7df8c4f3 -Revises: 3c7cac9521c6 -Create Date: 2024-05-10 09:48:09.046298 - -""" -import sqlalchemy as sa -from alembic import op - -import models as models - -# revision identifiers, used by Alembic. -revision = '47cc7df8c4f3' -down_revision = '3c7cac9521c6' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('tenant_default_models', schema=None) as batch_op: - batch_op.alter_column('model_name', - existing_type=sa.VARCHAR(length=40), - type_=sa.String(length=255), - existing_nullable=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('tenant_default_models', schema=None) as batch_op: - batch_op.alter_column('model_name', - existing_type=sa.String(length=255), - type_=sa.VARCHAR(length=40), - existing_nullable=False) - - # ### end Alembic commands ### diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index e8d8efb97c..714ef1160e 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -398,7 +398,13 @@ def test_run_branch(mock_close, mock_remove): "id": "start", }, { - "data": {"answer": "1 {{#start.uid#}}", "title": "Answer", "type": "answer", "variables": [], "outputs": []}, + "data": { + "answer": "1 {{#start.uid#}}", + "title": "Answer", + "type": "answer", + "variables": [], + "outputs": [], + }, "id": "answer-1", }, { diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 7db224ba3f..00a38d9802 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -89,7 +89,12 @@ def test_run(): "id": "tt", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -110,7 +115,13 @@ def test_run(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -307,7 +318,12 @@ def test_run_parallel(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -328,7 +344,13 @@ def test_run_parallel(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -524,7 +546,12 @@ def test_iteration_run_in_parallel_mode(): "id": "tt-2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { @@ -545,7 +572,13 @@ def test_iteration_run_in_parallel_mode(): "id": "if-else", }, { - "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer", "outputs": []}, + "data": { + "answer": "no hi", + "iteration_id": "iteration-1", + "title": "answer 4", + "type": "answer", + "outputs": [], + }, "id": "answer-4", }, { @@ -744,7 +777,12 @@ def test_iteration_run_error_handle(): "id": "tt2", }, { - "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer", "outputs": []}, + "data": { + "answer": "{{#iteration-1.output#}}88888", + "title": "answer 3", + "type": "answer", + "outputs": [], + }, "id": "answer-3", }, { diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index 7802e88028..efe4be204b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -239,7 +239,10 @@ def test_code_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_code_node( error_code, "default-value", [{"key": "result", "type": "number", "value": 132123}] ), @@ -292,7 +295,10 @@ def test_http_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}", "outputs": []}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "response", "type": "string", "value": "http node got error response"}] ), @@ -395,7 +401,10 @@ def test_llm_node_default_value_continue_on_error(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}", "outputs": []}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_llm_node( "default-value", [{"key": "answer", "type": "string", "value": "default LLM response"}] ), @@ -499,7 +508,10 @@ def test_no_node_in_fail_branch_continue_on_error(): "edges": FAIL_BRANCH_EDGES[:-1], "nodes": [ {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, "id": "success"}, + { + "data": {"title": "success", "type": "answer", "answer": "HTTP request successful", "outputs": []}, + "id": "success", + }, ContinueOnErrorTestHelper.get_http_node(), ], } diff --git a/api/tests/unit_tests/core/workflow/nodes/test_retry.py b/api/tests/unit_tests/core/workflow/nodes/test_retry.py index 40ac03188d..d807fd5828 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_retry.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_retry.py @@ -27,7 +27,10 @@ def test_retry_default_value_partial_success(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( "default-value", [{"key": "result", "type": "string", "value": "http node got error response"}], @@ -50,7 +53,10 @@ def test_retry_failed(): "edges": DEFAULT_VALUE_EDGE, "nodes": [ {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"}, - {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, "id": "answer"}, + { + "data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}", "outputs": []}, + "id": "answer", + }, ContinueOnErrorTestHelper.get_http_node( None, None, From df7e33b4f5a680981d2b1f3b198d9e3937e1b37e Mon Sep 17 00:00:00 2001 From: GuanMu Date: Thu, 12 Jun 2025 03:34:41 +0000 Subject: [PATCH 17/33] fix: remove outputs --- api/models/model.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/api/models/model.py b/api/models/model.py index eccb49f507..229e77134e 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -898,9 +898,6 @@ class Message(Base): message_unit_price = db.Column(db.Numeric(10, 4), nullable=False) message_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) answer: Mapped[str] = db.Column(db.Text, nullable=False) - outputs: Mapped[Optional[str]] = mapped_column( - "outputs", db.Text, nullable=True, server_default=db.text("'{}'::text") - ) answer_tokens = db.Column(db.Integer, nullable=False, server_default=db.text("0")) answer_unit_price = db.Column(db.Numeric(10, 4), nullable=False) answer_price_unit = db.Column(db.Numeric(10, 7), nullable=False, server_default=db.text("0.001")) @@ -1089,14 +1086,6 @@ class Message(Base): def message_metadata_dict(self) -> dict: return json.loads(self.message_metadata) if self.message_metadata else {} - @property - def outputs_dict(self) -> dict: - return json.loads(self.outputs) if self.outputs else {} - - @outputs_dict.setter - def outputs_dict(self, value: Mapping[str, Any]): - self.outputs = json.dumps(value, ensure_ascii=False) if value else "{}" - @property def agent_thoughts(self): return ( @@ -1191,7 +1180,6 @@ class Message(Base): "model_id": self.model_id, "inputs": self.inputs, "query": self.query, - "outputs": self.outputs_dict, "total_price": self.total_price, "message": self.message, "answer": self.answer, From 7dd1bf0597c5eff16f0d11b9b4f586a8dc26f943 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 15 Jul 2025 12:34:56 +0000 Subject: [PATCH 18/33] Revert: Remove personal modifications in api/migrations/versions/ directory - Delete custom migration file 75f1de22b8f3_merge_multiple_heads_for_database_.py - Restore deleted migration file 47cc7df8c4f3_modify_default_model_name_length.py - Reset migrations directory to match upstream main branch --- ...b8f3_merge_multiple_heads_for_database_.py | 31 --------------- ...f8c4f3_modify_default_model_name_length.py | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+), 31 deletions(-) delete mode 100644 api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py create mode 100644 api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py diff --git a/api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py b/api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py deleted file mode 100644 index 69532ed109..0000000000 --- a/api/migrations/versions/2025_06_11_1254-75f1de22b8f3_merge_multiple_heads_for_database_.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Merge multiple heads for database migration - -Revision ID: 75f1de22b8f3 -Revises: 83f1262d3f22, 4474872b0ee6 -Create Date: 2025-06-11 12:54:20.128554 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '75f1de22b8f3' -down_revision = '4474872b0ee6' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('messages', schema=None) as batch_op: - batch_op.add_column(sa.Column('outputs', sa.Text(), server_default=sa.text("'{}'::text"), nullable=True)) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('messages', schema=None) as batch_op: - batch_op.drop_column('outputs') - # ### end Alembic commands ### diff --git a/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py b/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py new file mode 100644 index 0000000000..b37928d3c0 --- /dev/null +++ b/api/migrations/versions/47cc7df8c4f3_modify_default_model_name_length.py @@ -0,0 +1,39 @@ +"""modify default model name length + +Revision ID: 47cc7df8c4f3 +Revises: 3c7cac9521c6 +Create Date: 2024-05-10 09:48:09.046298 + +""" +import sqlalchemy as sa +from alembic import op + +import models as models + +# revision identifiers, used by Alembic. +revision = '47cc7df8c4f3' +down_revision = '3c7cac9521c6' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('tenant_default_models', schema=None) as batch_op: + batch_op.alter_column('model_name', + existing_type=sa.VARCHAR(length=40), + type_=sa.String(length=255), + existing_nullable=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('tenant_default_models', schema=None) as batch_op: + batch_op.alter_column('model_name', + existing_type=sa.String(length=255), + type_=sa.VARCHAR(length=40), + existing_nullable=False) + + # ### end Alembic commands ### From 81027f607792df573ec41294808ffe2cf2c48e2d Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 02:28:57 +0000 Subject: [PATCH 19/33] Test:Add multiple test cases to verify the correctness of different output configurations, including complex outputs and handling of missing variables. --- .../workflow/graph_engine/graph_engine.py | 8 +- .../core/workflow/nodes/answer/test_answer.py | 371 ++++++++++++++++++ 2 files changed, 378 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 4e552981ae..c73ba46e0d 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -196,7 +196,13 @@ class GraphEngine: self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ "answer" ].strip() - self.graph_runtime_state.outputs["outputs"] = item.route_node_state.node_run_result.outputs + + # Only save the user-defined output variables, not the entire node output + if (item.route_node_state.node_run_result + and item.route_node_state.node_run_result.outputs + and "outputs" in item.route_node_state.node_run_result.outputs): + user_outputs = item.route_node_state.node_run_result.outputs.get("outputs", {}) + self.graph_runtime_state.outputs.update(user_outputs) except Exception as e: logger.exception("Graph run failed") yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions)) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 860bc29455..39adf1da63 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -82,3 +82,374 @@ def test_execute_answer(): assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs["answer"] == "Today's weather is sunny\nYou are a helpful AI.\n{{img}}\nFin." + + +def test_execute_answer_with_outputs(): + """Test Answer node with custom output variables""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "Answer with outputs", + "type": "answer", + "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", + "outputs": [ + { + "variable": "confidence", + "type": "number", + "value_selector": ["start", "score"] + }, + { + "variable": "status", + "type": "string", + "value_selector": ["start", "weather"] + } + ], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + variable_pool.add(["start", "weather"], "sunny") + variable_pool.add(["start", "score"], 85) + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config={ + "id": "answer", + "data": { + "title": "Answer with outputs", + "type": "answer", + "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", + "outputs": [ + { + "variable": "confidence", + "type": "number", + "value_selector": ["start", "score"] + }, + { + "variable": "status", + "type": "string", + "value_selector": ["start", "weather"] + } + ], + }, + }, + ) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None + assert result.outputs["answer"] == "Weather: sunny, Score: 85" + + # Check outputs field + assert "outputs" in result.outputs + outputs = result.outputs["outputs"] + assert outputs["confidence"] == 85 + assert outputs["status"] == "sunny" + + +def test_execute_answer_with_complex_outputs(): + """Test Answer node with complex output variables including arrays and objects""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "Complex outputs", + "type": "answer", + "answer": "Analysis complete", + "outputs": [ + { + "variable": "scores", + "type": "array[number]", + "value_selector": ["start", "score_list"] + }, + { + "variable": "metadata", + "type": "object", + "value_selector": ["start", "meta_info"] + } + ], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool with complex data + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + variable_pool.add(["start", "score_list"], [85, 92, 78]) + variable_pool.add(["start", "meta_info"], {"category": "test", "priority": "high"}) + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config={ + "id": "answer", + "data": { + "title": "Complex outputs", + "type": "answer", + "answer": "Analysis complete", + "outputs": [ + { + "variable": "scores", + "type": "array[number]", + "value_selector": ["start", "score_list"] + }, + { + "variable": "metadata", + "type": "object", + "value_selector": ["start", "meta_info"] + } + ], + }, + }, + ) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["answer"] == "Analysis complete" + + # Check complex outputs + assert "outputs" in result.outputs + outputs = result.outputs["outputs"] + assert outputs["scores"] == [85, 92, 78] + assert outputs["metadata"] == {"category": "test", "priority": "high"} + + +def test_execute_answer_with_empty_outputs(): + """Test Answer node with empty outputs configuration""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "No outputs", + "type": "answer", + "answer": "Simple answer", + "outputs": [], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config={ + "id": "answer", + "data": { + "title": "No outputs", + "type": "answer", + "answer": "Simple answer", + "outputs": [], + }, + }, + ) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["answer"] == "Simple answer" + + # Check that outputs field is empty when no outputs are configured + assert "outputs" in result.outputs + assert result.outputs["outputs"] == {} + + +def test_execute_answer_outputs_variable_not_found(): + """Test Answer node when output variable selector points to non-existent variable""" + graph_config = { + "edges": [ + { + "id": "start-source-answer-target", + "source": "start", + "target": "answer", + }, + ], + "nodes": [ + {"data": {"type": "start"}, "id": "start"}, + { + "data": { + "title": "Missing variable", + "type": "answer", + "answer": "Test answer", + "outputs": [ + { + "variable": "missing_var", + "type": "string", + "value_selector": ["start", "non_existent"] + } + ], + }, + "id": "answer", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool without the referenced variable + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + + node = AnswerNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), + config={ + "id": "answer", + "data": { + "title": "Missing variable", + "type": "answer", + "answer": "Test answer", + "outputs": [ + { + "variable": "missing_var", + "type": "string", + "value_selector": ["start", "non_existent"] + } + ], + }, + }, + ) + + # Mock db.session.close() + db.session.close = MagicMock() + + # execute node + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["answer"] == "Test answer" + + # Check that outputs field handles missing variables gracefully + assert "outputs" in result.outputs + outputs = result.outputs["outputs"] + # Missing variables should result in None or empty value + assert outputs.get("missing_var") is None or outputs.get("missing_var") == "" From ea3a99efcd82a8951bac91c8ba56d12be83f1d36 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 03:15:05 +0000 Subject: [PATCH 20/33] fix: Simplify the formatting of output variables, improve code readability; update test cases to verify the correctness of different output configurations. --- .../workflow/graph_engine/graph_engine.py | 10 +-- .../core/workflow/nodes/answer/test_answer.py | 70 ++++--------------- 2 files changed, 20 insertions(+), 60 deletions(-) diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index c73ba46e0d..f37672aaad 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -196,11 +196,13 @@ class GraphEngine: self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ "answer" ].strip() - + # Only save the user-defined output variables, not the entire node output - if (item.route_node_state.node_run_result - and item.route_node_state.node_run_result.outputs - and "outputs" in item.route_node_state.node_run_result.outputs): + if ( + item.route_node_state.node_run_result + and item.route_node_state.node_run_result.outputs + and "outputs" in item.route_node_state.node_run_result.outputs + ): user_outputs = item.route_node_state.node_run_result.outputs.get("outputs", {}) self.graph_runtime_state.outputs.update(user_outputs) except Exception as e: diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 39adf1da63..7702384e71 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -102,16 +102,8 @@ def test_execute_answer_with_outputs(): "type": "answer", "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", "outputs": [ - { - "variable": "confidence", - "type": "number", - "value_selector": ["start", "score"] - }, - { - "variable": "status", - "type": "string", - "value_selector": ["start", "weather"] - } + {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, + {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, ], }, "id": "answer", @@ -155,16 +147,8 @@ def test_execute_answer_with_outputs(): "type": "answer", "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", "outputs": [ - { - "variable": "confidence", - "type": "number", - "value_selector": ["start", "score"] - }, - { - "variable": "status", - "type": "string", - "value_selector": ["start", "weather"] - } + {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, + {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, ], }, }, @@ -179,7 +163,7 @@ def test_execute_answer_with_outputs(): assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs is not None assert result.outputs["answer"] == "Weather: sunny, Score: 85" - + # Check outputs field assert "outputs" in result.outputs outputs = result.outputs["outputs"] @@ -205,16 +189,8 @@ def test_execute_answer_with_complex_outputs(): "type": "answer", "answer": "Analysis complete", "outputs": [ - { - "variable": "scores", - "type": "array[number]", - "value_selector": ["start", "score_list"] - }, - { - "variable": "metadata", - "type": "object", - "value_selector": ["start", "meta_info"] - } + {"variable": "scores", "type": "array[number]", "value_selector": ["start", "score_list"]}, + {"variable": "metadata", "type": "object", "value_selector": ["start", "meta_info"]}, ], }, "id": "answer", @@ -258,16 +234,8 @@ def test_execute_answer_with_complex_outputs(): "type": "answer", "answer": "Analysis complete", "outputs": [ - { - "variable": "scores", - "type": "array[number]", - "value_selector": ["start", "score_list"] - }, - { - "variable": "metadata", - "type": "object", - "value_selector": ["start", "meta_info"] - } + {"variable": "scores", "type": "array[number]", "value_selector": ["start", "score_list"]}, + {"variable": "metadata", "type": "object", "value_selector": ["start", "meta_info"]}, ], }, }, @@ -281,7 +249,7 @@ def test_execute_answer_with_complex_outputs(): assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs["answer"] == "Analysis complete" - + # Check complex outputs assert "outputs" in result.outputs outputs = result.outputs["outputs"] @@ -359,7 +327,7 @@ def test_execute_answer_with_empty_outputs(): assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs["answer"] == "Simple answer" - + # Check that outputs field is empty when no outputs are configured assert "outputs" in result.outputs assert result.outputs["outputs"] == {} @@ -383,11 +351,7 @@ def test_execute_answer_outputs_variable_not_found(): "type": "answer", "answer": "Test answer", "outputs": [ - { - "variable": "missing_var", - "type": "string", - "value_selector": ["start", "non_existent"] - } + {"variable": "missing_var", "type": "string", "value_selector": ["start", "non_existent"]} ], }, "id": "answer", @@ -428,13 +392,7 @@ def test_execute_answer_outputs_variable_not_found(): "title": "Missing variable", "type": "answer", "answer": "Test answer", - "outputs": [ - { - "variable": "missing_var", - "type": "string", - "value_selector": ["start", "non_existent"] - } - ], + "outputs": [{"variable": "missing_var", "type": "string", "value_selector": ["start", "non_existent"]}], }, }, ) @@ -447,7 +405,7 @@ def test_execute_answer_outputs_variable_not_found(): assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs["answer"] == "Test answer" - + # Check that outputs field handles missing variables gracefully assert "outputs" in result.outputs outputs = result.outputs["outputs"] From e057de53e566455764c874e898bba4d1e1885035 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 07:48:07 +0000 Subject: [PATCH 21/33] fix: Filter workflow outputs, remove unnecessary "answer" field to optimize task status metadata --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index c6df13f53e..a725ddce8f 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -516,8 +516,9 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - workflow_outputs_data = workflow_finish_resp.data.outputs.get("outputs", {}) - self._task_state.metadata.outputs = workflow_outputs_data.get("outputs") + workflow_outputs = workflow_finish_resp.data.outputs + filtered_outputs = {k: v for k, v in workflow_outputs.items() if k != "answer"} + self._task_state.metadata.outputs = filtered_outputs yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE From 0d8769a7e4c8b7f175e3a2f82dfccd79384cbfd0 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 07:57:10 +0000 Subject: [PATCH 22/33] Enhance test cases, ensure output results are not empty, and verify the correctness of different output configurations. --- api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 7702384e71..44c9f4e5f0 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -248,6 +248,7 @@ def test_execute_answer_with_complex_outputs(): result = node._run() assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None assert result.outputs["answer"] == "Analysis complete" # Check complex outputs @@ -326,6 +327,7 @@ def test_execute_answer_with_empty_outputs(): result = node._run() assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None assert result.outputs["answer"] == "Simple answer" # Check that outputs field is empty when no outputs are configured @@ -404,6 +406,7 @@ def test_execute_answer_outputs_variable_not_found(): result = node._run() assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None assert result.outputs["answer"] == "Test answer" # Check that outputs field handles missing variables gracefully From 623f16e32dbdeb9d9904dc990996ee7d56d68a9f Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 08:34:17 +0000 Subject: [PATCH 23/33] fix:test --- .../core/workflow/nodes/answer/test_answer.py | 170 +----------------- 1 file changed, 2 insertions(+), 168 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 44c9f4e5f0..90c77b0385 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -54,6 +54,7 @@ def test_execute_answer(): system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={}, environment_variables=[], + conversation_variables=[], ) pool.add(["start", "weather"], "sunny") pool.add(["llm", "text"], "You are a helpful AI.") @@ -81,6 +82,7 @@ def test_execute_answer(): result = node._run() assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs is not None assert result.outputs["answer"] == "Today's weather is sunny\nYou are a helpful AI.\n{{img}}\nFin." @@ -171,93 +173,6 @@ def test_execute_answer_with_outputs(): assert outputs["status"] == "sunny" -def test_execute_answer_with_complex_outputs(): - """Test Answer node with complex output variables including arrays and objects""" - graph_config = { - "edges": [ - { - "id": "start-source-answer-target", - "source": "start", - "target": "answer", - }, - ], - "nodes": [ - {"data": {"type": "start"}, "id": "start"}, - { - "data": { - "title": "Complex outputs", - "type": "answer", - "answer": "Analysis complete", - "outputs": [ - {"variable": "scores", "type": "array[number]", "value_selector": ["start", "score_list"]}, - {"variable": "metadata", "type": "object", "value_selector": ["start", "meta_info"]}, - ], - }, - "id": "answer", - }, - ], - } - - graph = Graph.init(graph_config=graph_config) - - init_params = GraphInitParams( - tenant_id="1", - app_id="1", - workflow_type=WorkflowType.WORKFLOW, - workflow_id="1", - graph_config=graph_config, - user_id="1", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, - call_depth=0, - ) - - # construct variable pool with complex data - variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, - user_inputs={}, - environment_variables=[], - conversation_variables=[], - ) - variable_pool.add(["start", "score_list"], [85, 92, 78]) - variable_pool.add(["start", "meta_info"], {"category": "test", "priority": "high"}) - - node = AnswerNode( - id=str(uuid.uuid4()), - graph_init_params=init_params, - graph=graph, - graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), - config={ - "id": "answer", - "data": { - "title": "Complex outputs", - "type": "answer", - "answer": "Analysis complete", - "outputs": [ - {"variable": "scores", "type": "array[number]", "value_selector": ["start", "score_list"]}, - {"variable": "metadata", "type": "object", "value_selector": ["start", "meta_info"]}, - ], - }, - }, - ) - - # Mock db.session.close() - db.session.close = MagicMock() - - # execute node - result = node._run() - - assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert result.outputs is not None - assert result.outputs["answer"] == "Analysis complete" - - # Check complex outputs - assert "outputs" in result.outputs - outputs = result.outputs["outputs"] - assert outputs["scores"] == [85, 92, 78] - assert outputs["metadata"] == {"category": "test", "priority": "high"} - - def test_execute_answer_with_empty_outputs(): """Test Answer node with empty outputs configuration""" graph_config = { @@ -333,84 +248,3 @@ def test_execute_answer_with_empty_outputs(): # Check that outputs field is empty when no outputs are configured assert "outputs" in result.outputs assert result.outputs["outputs"] == {} - - -def test_execute_answer_outputs_variable_not_found(): - """Test Answer node when output variable selector points to non-existent variable""" - graph_config = { - "edges": [ - { - "id": "start-source-answer-target", - "source": "start", - "target": "answer", - }, - ], - "nodes": [ - {"data": {"type": "start"}, "id": "start"}, - { - "data": { - "title": "Missing variable", - "type": "answer", - "answer": "Test answer", - "outputs": [ - {"variable": "missing_var", "type": "string", "value_selector": ["start", "non_existent"]} - ], - }, - "id": "answer", - }, - ], - } - - graph = Graph.init(graph_config=graph_config) - - init_params = GraphInitParams( - tenant_id="1", - app_id="1", - workflow_type=WorkflowType.WORKFLOW, - workflow_id="1", - graph_config=graph_config, - user_id="1", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, - call_depth=0, - ) - - # construct variable pool without the referenced variable - variable_pool = VariablePool( - system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, - user_inputs={}, - environment_variables=[], - conversation_variables=[], - ) - - node = AnswerNode( - id=str(uuid.uuid4()), - graph_init_params=init_params, - graph=graph, - graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), - config={ - "id": "answer", - "data": { - "title": "Missing variable", - "type": "answer", - "answer": "Test answer", - "outputs": [{"variable": "missing_var", "type": "string", "value_selector": ["start", "non_existent"]}], - }, - }, - ) - - # Mock db.session.close() - db.session.close = MagicMock() - - # execute node - result = node._run() - - assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert result.outputs is not None - assert result.outputs["answer"] == "Test answer" - - # Check that outputs field handles missing variables gracefully - assert "outputs" in result.outputs - outputs = result.outputs["outputs"] - # Missing variables should result in None or empty value - assert outputs.get("missing_var") is None or outputs.get("missing_var") == "" From 7a0894f7b60c8c9437a3cf6d33a42e042fcb732e Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 08:50:04 +0000 Subject: [PATCH 24/33] Fix: Ensure the import of SystemVariableKey is available in test cases to support the correctness validation of custom output variables. --- api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 90c77b0385..8af6202ef6 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -87,6 +87,7 @@ def test_execute_answer(): def test_execute_answer_with_outputs(): + from core.workflow.enums import SystemVariableKey # 确保函数作用域内可用 """Test Answer node with custom output variables""" graph_config = { "edges": [ @@ -174,6 +175,7 @@ def test_execute_answer_with_outputs(): def test_execute_answer_with_empty_outputs(): + from core.workflow.enums import SystemVariableKey # 确保函数作用域内可用 """Test Answer node with empty outputs configuration""" graph_config = { "edges": [ From 05704f1bd158e14a62a667aef083fea6ff3936a9 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 08:51:45 +0000 Subject: [PATCH 25/33] fix test --- .../unit_tests/core/workflow/nodes/answer/test_answer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 8af6202ef6..2a43b93e2c 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -87,7 +87,8 @@ def test_execute_answer(): def test_execute_answer_with_outputs(): - from core.workflow.enums import SystemVariableKey # 确保函数作用域内可用 + from core.workflow.enums import SystemVariableKey + """Test Answer node with custom output variables""" graph_config = { "edges": [ @@ -175,7 +176,8 @@ def test_execute_answer_with_outputs(): def test_execute_answer_with_empty_outputs(): - from core.workflow.enums import SystemVariableKey # 确保函数作用域内可用 + from core.workflow.enums import SystemVariableKey + """Test Answer node with empty outputs configuration""" graph_config = { "edges": [ From aff8e9824f67ecbb2b3329b259c0d6c8732395d8 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 08:51:58 +0000 Subject: [PATCH 26/33] fix test --- .../unit_tests/core/workflow/nodes/answer/test_answer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 2a43b93e2c..0e3faf944c 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -87,7 +87,7 @@ def test_execute_answer(): def test_execute_answer_with_outputs(): - from core.workflow.enums import SystemVariableKey + from core.workflow.enums import SystemVariableKey """Test Answer node with custom output variables""" graph_config = { @@ -176,7 +176,7 @@ def test_execute_answer_with_outputs(): def test_execute_answer_with_empty_outputs(): - from core.workflow.enums import SystemVariableKey + from core.workflow.enums import SystemVariableKey """Test Answer node with empty outputs configuration""" graph_config = { From 37b5b4ea2d94df74544d02ec77e866d36cb2c38d Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 11:00:07 +0000 Subject: [PATCH 27/33] remove unused code --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a725ddce8f..f19d29d4cf 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -186,17 +186,11 @@ class AdvancedChatAppGenerateTaskPipeline: if stream_response.metadata: extras["metadata"] = stream_response.metadata - # Retrieve outputs from task state metadata, which is populated earlier - final_outputs = {} - if self._task_state.metadata and hasattr(self._task_state.metadata, "outputs"): - final_outputs = self._task_state.metadata.outputs - return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( id=self._message_id, mode=self._conversation_mode, - outputs=final_outputs, conversation_id=self._conversation_id, message_id=self._message_id, answer=self._task_state.answer, From 54520e12561c95429586150196c8e5d24b1dbf8b Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 11:29:36 +0000 Subject: [PATCH 28/33] Fix: Update the type of answer_stream_variable_selectors_mapping to a list of GenerateRouteChunk and add the answer_end_dependencies field in AnswerStreamGenerateRoute to support mapping and dependency management of stream variable selectors. --- .../nodes/answer/answer_stream_generate_router.py | 2 +- api/core/workflow/nodes/answer/entities.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/answer/answer_stream_generate_router.py b/api/core/workflow/nodes/answer/answer_stream_generate_router.py index ce6095a34f..a5ab818673 100644 --- a/api/core/workflow/nodes/answer/answer_stream_generate_router.py +++ b/api/core/workflow/nodes/answer/answer_stream_generate_router.py @@ -23,7 +23,7 @@ class AnswerStreamGeneratorRouter: """ # parse stream output node value selectors of answer nodes answer_generate_route: dict[str, list[GenerateRouteChunk]] = {} - answer_stream_variable_selectors_mapping: dict[str, list[list[str]]] = {} + answer_stream_variable_selectors_mapping: dict[str, list[GenerateRouteChunk]] = {} for answer_node_id, node_config in node_id_config_mapping.items(): if node_config.get("data", {}).get("type") != NodeType.ANSWER.value: continue diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index 871a965e5d..530327cdf5 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -65,3 +65,11 @@ class AnswerStreamGenerateRoute(BaseModel): answer_generate_route: dict[str, list[GenerateRouteChunk]] = Field( ..., description="answer generate route (answer node id -> generate route chunks)" ) + answer_stream_variable_selectors_mapping: dict[str, list[GenerateRouteChunk]] = Field( + default_factory=dict, + description="answer node id -> generate route chunks for streaming variable selectors", + ) + answer_end_dependencies: dict[str, list[str]] = Field( + default_factory=dict, + description="answer node id -> dependent answer node ids at message end", + ) From 2c8855f6684e80e08bbacfd6fb63ad5284f509c0 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 16 Jul 2025 11:40:48 +0000 Subject: [PATCH 29/33] fix test --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index f19d29d4cf..140ccbc443 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -511,8 +511,9 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution=workflow_execution, ) workflow_outputs = workflow_finish_resp.data.outputs - filtered_outputs = {k: v for k, v in workflow_outputs.items() if k != "answer"} - self._task_state.metadata.outputs = filtered_outputs + if workflow_outputs: + filtered_outputs = {k: v for k, v in workflow_outputs.items() if k != "answer"} + self._task_state.metadata.outputs = filtered_outputs yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE From 868542dea8bdbbe871b18613be7f6dc9d2a62265 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 21 Jul 2025 14:44:22 +0000 Subject: [PATCH 30/33] Fix AnswerNode attribute access and update tests to include outputs --- api/core/workflow/nodes/answer/answer_node.py | 2 +- api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py | 1 + api/tests/unit_tests/core/workflow/nodes/test_answer.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/answer/answer_node.py b/api/core/workflow/nodes/answer/answer_node.py index 70d7500d47..cde69a3a09 100644 --- a/api/core/workflow/nodes/answer/answer_node.py +++ b/api/core/workflow/nodes/answer/answer_node.py @@ -72,7 +72,7 @@ class AnswerNode(BaseNode): part = cast(TextGenerateRouteChunk, part) answer += part.text - output_variables = self.node_data.outputs + output_variables = self._node_data.outputs outputs = {} for variable_selector in output_variables: diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index a1e47e63b9..7c968f6ff0 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -65,6 +65,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, } diff --git a/api/tests/unit_tests/core/workflow/nodes/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/test_answer.py index 72c6a1b71b..6f8e27a3e3 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_answer.py @@ -68,6 +68,7 @@ def test_execute_answer(): "title": "123", "type": "answer", "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.", + "outputs": [], }, } From f433b8a175eec3462f6561d805dd46710e126ce3 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 21 Jul 2025 15:01:34 +0000 Subject: [PATCH 31/33] test: Refactor AnswerNode tests, extract configuration to variables and update node initialization. --- .../core/workflow/nodes/answer/test_answer.py | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py index 7c968f6ff0..c72d73ae4e 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py @@ -144,25 +144,29 @@ def test_execute_answer_with_outputs(): variable_pool.add(["start", "weather"], "sunny") variable_pool.add(["start", "score"], 85) + node_config = { + "id": "answer", + "data": { + "title": "Answer with outputs", + "type": "answer", + "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", + "outputs": [ + {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, + {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, + ], + }, + } + node = AnswerNode( id=str(uuid.uuid4()), graph_init_params=init_params, graph=graph, graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), - config={ - "id": "answer", - "data": { - "title": "Answer with outputs", - "type": "answer", - "answer": "Weather: {{#start.weather#}}, Score: {{#start.score#}}", - "outputs": [ - {"variable": "confidence", "type": "number", "value_selector": ["start", "score"]}, - {"variable": "status", "type": "string", "value_selector": ["start", "weather"]}, - ], - }, - }, + config=node_config, ) + node.init_node_data(node_config["data"]) + # Mock db.session.close() db.session.close = MagicMock() @@ -228,22 +232,26 @@ def test_execute_answer_with_empty_outputs(): conversation_variables=[], ) + node_config = { + "id": "answer", + "data": { + "title": "No outputs", + "type": "answer", + "answer": "Simple answer", + "outputs": [], + }, + } + node = AnswerNode( id=str(uuid.uuid4()), graph_init_params=init_params, graph=graph, graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), - config={ - "id": "answer", - "data": { - "title": "No outputs", - "type": "answer", - "answer": "Simple answer", - "outputs": [], - }, - }, + config=node_config, ) + node.init_node_data(node_config["data"]) + # Mock db.session.close() db.session.close = MagicMock() From 838eda13846afb3f3a7c0012488302ed98cb73c3 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 22 Jul 2025 01:51:19 +0000 Subject: [PATCH 32/33] fix : https://github.com/langgenius/dify/pull/20921#discussion_r2220751387 --- api/core/workflow/nodes/answer/entities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index 530327cdf5..ea84ffda59 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -13,7 +13,7 @@ class AnswerNodeData(BaseNodeData): """ answer: str = Field(..., description="answer template string") - outputs: list[VariableSelector] + outputs: list[VariableSelector] = Field(default_factory=list, description="list of variable selectors") class GenerateRouteChunk(BaseModel): From a9f8329660831ea599cba2d2ae93b24c809e59ac Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 22 Jul 2025 02:02:39 +0000 Subject: [PATCH 33/33] fix https://github.com/langgenius/dify/pull/20921#discussion_r2220751389 --- web/app/components/base/chat/chat/question.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/app/components/base/chat/chat/question.tsx b/web/app/components/base/chat/chat/question.tsx index 666a869a32..6630d9bb9d 100644 --- a/web/app/components/base/chat/chat/question.tsx +++ b/web/app/components/base/chat/chat/question.tsx @@ -117,7 +117,7 @@ const Question: FC = ({
{