diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index e17ed0ebaa..1d2b843043 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -165,7 +165,7 @@ class AdvancedChatAppGenerateTaskPipeline: ) generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) - print(f"generator: {generator}") + print(f"generator: {generator}=======") if self._base_task_pipeline._stream: return self._to_stream_response(generator) else: @@ -183,7 +183,6 @@ class AdvancedChatAppGenerateTaskPipeline: extras = {} if stream_response.metadata: extras["metadata"] = stream_response.metadata - print(f"stream_response: {stream_response}") return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( @@ -210,7 +209,6 @@ class AdvancedChatAppGenerateTaskPipeline: :return: """ for stream_response in generator: - print(f"stream_response: {stream_response}") yield ChatbotAppStreamResponse( conversation_id=self._conversation_id, message_id=self._message_id, @@ -361,7 +359,6 @@ class AdvancedChatAppGenerateTaskPipeline: self._recorded_files.extend( self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) ) - with Session(db.engine, expire_on_commit=False) as session: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event @@ -494,6 +491,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not graph_runtime_state: raise ValueError("workflow run not initialized.") + print(f"outputsfff: {event.outputs}") with Session(db.engine, expire_on_commit=False) as session: workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( workflow_run_id=self._workflow_run_id, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 25c889e922..ac464f8d1f 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -49,7 +49,7 @@ class WorkflowTaskState(TaskState): """ answer: str = "" - + outputs: Optional[Mapping[str, Any]] = None class StreamEvent(Enum): """ diff --git a/api/core/workflow/nodes/answer/answer_node.py b/api/core/workflow/nodes/answer/answer_node.py index aa030870e2..9b1ccde2a5 100644 --- a/api/core/workflow/nodes/answer/answer_node.py +++ b/api/core/workflow/nodes/answer/answer_node.py @@ -45,7 +45,18 @@ class AnswerNode(BaseNode[AnswerNodeData]): part = cast(TextGenerateRouteChunk, part) answer += part.text - return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"answer": answer, "files": files}) + output_variables = self.node_data.outputs + + outputs = {} + for variable_selector in output_variables: + variable = self.graph_runtime_state.variable_pool.get(variable_selector.value_selector) + value = variable.to_object() if variable is not None else None + outputs[variable_selector.variable] = value + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs={"answer": answer, "files": files, "outputs": outputs}, + ) @classmethod def _extract_variable_selector_to_variable_mapping( diff --git a/api/core/workflow/nodes/answer/answer_stream_generate_router.py b/api/core/workflow/nodes/answer/answer_stream_generate_router.py index 1d9c3e9b96..628476677c 100644 --- a/api/core/workflow/nodes/answer/answer_stream_generate_router.py +++ b/api/core/workflow/nodes/answer/answer_stream_generate_router.py @@ -23,6 +23,7 @@ class AnswerStreamGeneratorRouter: """ # parse stream output node value selectors of answer nodes answer_generate_route: dict[str, list[GenerateRouteChunk]] = {} + answer_stream_variable_selectors_mapping: dict[str, list[list[str]]] = {} for answer_node_id, node_config in node_id_config_mapping.items(): if node_config.get("data", {}).get("type") != NodeType.ANSWER.value: continue @@ -30,6 +31,7 @@ class AnswerStreamGeneratorRouter: # get generate route for stream output generate_route = cls._extract_generate_route_selectors(node_config) answer_generate_route[answer_node_id] = generate_route + answer_stream_variable_selectors_mapping[answer_node_id] = generate_route # fetch answer dependencies answer_node_ids = list(answer_generate_route.keys()) @@ -38,9 +40,18 @@ class AnswerStreamGeneratorRouter: reverse_edge_mapping=reverse_edge_mapping, node_id_config_mapping=node_id_config_mapping, ) + answer_end_ids = list(answer_stream_variable_selectors_mapping.keys()) + answer_end_dependencies = cls._fetch_answers_dependencies( + answer_node_ids=answer_end_ids, + reverse_edge_mapping=reverse_edge_mapping, + node_id_config_mapping=node_id_config_mapping, + ) return AnswerStreamGenerateRoute( - answer_generate_route=answer_generate_route, answer_dependencies=answer_dependencies + answer_generate_route=answer_generate_route, + answer_dependencies=answer_dependencies, + answer_stream_variable_selectors_mapping=answer_stream_variable_selectors_mapping, + answer_end_dependencies=answer_end_dependencies, ) @classmethod @@ -89,6 +100,7 @@ class AnswerStreamGeneratorRouter: :param config: node config :return: """ + print(f"config: {config}=======") node_data = AnswerNodeData(**config.get("data", {})) return cls.extract_generate_route_from_node_data(node_data) diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index ba6ba16e36..2e48b4c5b2 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -26,9 +26,12 @@ class AnswerStreamProcessor(StreamProcessor): for answer_node_id in self.generate_routes.answer_generate_route: self.route_position[answer_node_id] = 0 self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {} - + self.has_output = False + self.output_node_ids: set[str] = set() + def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generator[GraphEngineEvent, None, None]: for event in generator: + if isinstance(event, NodeRunStartedEvent): if event.route_node_state.node_id == self.graph.root_node_id and not self.rest_node_ids: self.reset() @@ -36,6 +39,11 @@ class AnswerStreamProcessor(StreamProcessor): yield event elif isinstance(event, NodeRunStreamChunkEvent): if event.in_iteration_id or event.in_loop_id: + if self.has_output and event.node_id not in self.output_node_ids: + print(f"event22: {event}") + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event continue @@ -50,6 +58,10 @@ class AnswerStreamProcessor(StreamProcessor): ) for _ in stream_out_answer_node_ids: + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + self.output_node_ids.add(event.node_id) + self.has_output = True yield event elif isinstance(event, NodeRunSucceededEvent | NodeRunExceptionEvent): yield event diff --git a/api/core/workflow/nodes/answer/entities.py b/api/core/workflow/nodes/answer/entities.py index a05cc44c99..e38a60111d 100644 --- a/api/core/workflow/nodes/answer/entities.py +++ b/api/core/workflow/nodes/answer/entities.py @@ -4,7 +4,7 @@ from enum import Enum from pydantic import BaseModel, Field from core.workflow.nodes.base import BaseNodeData - +from core.workflow.entities.variable_entities import VariableSelector class AnswerNodeData(BaseNodeData): """ @@ -12,6 +12,7 @@ class AnswerNodeData(BaseNodeData): """ answer: str = Field(..., description="answer template string") + outputs: list[VariableSelector] class GenerateRouteChunk(BaseModel): diff --git a/web/app/components/workflow/nodes/answer/default.ts b/web/app/components/workflow/nodes/answer/default.ts index 4ff6e49d7e..9538330b80 100644 --- a/web/app/components/workflow/nodes/answer/default.ts +++ b/web/app/components/workflow/nodes/answer/default.ts @@ -7,6 +7,7 @@ const nodeDefault: NodeDefault = { defaultValue: { variables: [], answer: '', + outputs: [], }, getAvailablePrevNodes(isChatMode: boolean) { const nodes = isChatMode diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index 0cec7f63f8..f76931d745 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -7,6 +7,9 @@ import Editor from '@/app/components/workflow/nodes/_base/components/prompt/edit import type { NodePanelProps } from '@/app/components/workflow/types' import useAvailableVarList from '@/app/components/workflow/nodes/_base/hooks/use-available-var-list' const i18nPrefix = 'workflow.nodes.answer' +import Field from '@/app/components/workflow/nodes/_base/components/field' +import AddButton from '@/app/components/base/button/add-button' +import VarList from '@/app/components/workflow/nodes/_base/components/variable/var-list' const Panel: FC> = ({ id, @@ -46,7 +49,7 @@ const Panel: FC> = ({ isSupportFileVar /> - {/*
+
> = ({ onChange={handleVarListChange} /> -
*/} +
) } diff --git a/web/app/components/workflow/nodes/answer/use-config.ts b/web/app/components/workflow/nodes/answer/use-config.ts index 269d953743..83544a4cfa 100644 --- a/web/app/components/workflow/nodes/answer/use-config.ts +++ b/web/app/components/workflow/nodes/answer/use-config.ts @@ -16,6 +16,7 @@ const useConfig = (id: string, payload: AnswerNodeType) => { const { handleVarListChange, handleAddVariable } = useVarList({ inputs, setInputs, + varKey: 'outputs', }) const handleAnswerChange = useCallback((value: string) => {