From 151fe3d02ed9b0bf3f2af32023136d2f650cdbe6 Mon Sep 17 00:00:00 2001 From: YiQiu Jia Date: Mon, 7 Jul 2025 14:29:33 +0800 Subject: [PATCH] feat(end_stream_processor): support streaming output of all configured variables in End node - Iterate all variable selectors for all end nodes and stream if matched - Add detailed English comments explaining each step of the logic - This enables streaming output for multiple variables (e.g., text, text1, text2...) instead of only the first one --- .../nodes/end/end_stream_processor.py | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/api/core/workflow/nodes/end/end_stream_processor.py b/api/core/workflow/nodes/end/end_stream_processor.py index a6fb2ffc18..fcf4011dae 100644 --- a/api/core/workflow/nodes/end/end_stream_processor.py +++ b/api/core/workflow/nodes/end/end_stream_processor.py @@ -42,23 +42,26 @@ class EndStreamProcessor(StreamProcessor): yield event continue - if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids: - stream_out_end_node_ids = self.current_stream_chunk_generating_node_ids[ - event.route_node_state.node_id - ] - else: - stream_out_end_node_ids = self._get_stream_out_end_node_ids(event) - self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] = ( - stream_out_end_node_ids - ) - - if stream_out_end_node_ids: - if self.has_output and event.node_id not in self.output_node_ids: - event.chunk_content = "\n" + event.chunk_content - - self.output_node_ids.add(event.node_id) - self.has_output = True - yield event + # Iterate all variable selectors for all end nodes; stream if matched + matched = False # Flag to indicate if a match is found + for end_node_id, value_selectors_list in self.end_stream_param.end_stream_variable_selector_mapping.items(): + # Loop through all variable selectors configured for the current end node + for value_selector in value_selectors_list: + # Check if the current event's from_variable_selector matches the configured selector + if event.from_variable_selector == value_selector: + # If there was previous output and this node hasn't output yet, prepend a newline + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + # Mark this node as having output to avoid duplicate newlines + self.output_node_ids.add(event.node_id) + # Set the flag indicating output has occurred + self.has_output = True + # Stream the current event + yield event + matched = True # Set matched flag to break out of loops + break # Exit inner loop after match + if matched: + break # Exit outer loop after match elif isinstance(event, NodeRunSucceededEvent): yield event if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids: