diff --git a/api/core/workflow/nodes/end/end_stream_processor.py b/api/core/workflow/nodes/end/end_stream_processor.py index a6fb2ffc18..fcf4011dae 100644 --- a/api/core/workflow/nodes/end/end_stream_processor.py +++ b/api/core/workflow/nodes/end/end_stream_processor.py @@ -42,23 +42,26 @@ class EndStreamProcessor(StreamProcessor): yield event continue - if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids: - stream_out_end_node_ids = self.current_stream_chunk_generating_node_ids[ - event.route_node_state.node_id - ] - else: - stream_out_end_node_ids = self._get_stream_out_end_node_ids(event) - self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] = ( - stream_out_end_node_ids - ) - - if stream_out_end_node_ids: - if self.has_output and event.node_id not in self.output_node_ids: - event.chunk_content = "\n" + event.chunk_content - - self.output_node_ids.add(event.node_id) - self.has_output = True - yield event + # Iterate all variable selectors for all end nodes; stream if matched + matched = False # Flag to indicate if a match is found + for end_node_id, value_selectors_list in self.end_stream_param.end_stream_variable_selector_mapping.items(): + # Loop through all variable selectors configured for the current end node + for value_selector in value_selectors_list: + # Check if the current event's from_variable_selector matches the configured selector + if event.from_variable_selector == value_selector: + # If there was previous output and this node hasn't output yet, prepend a newline + if self.has_output and event.node_id not in self.output_node_ids: + event.chunk_content = "\n" + event.chunk_content + # Mark this node as having output to avoid duplicate newlines + self.output_node_ids.add(event.node_id) + # Set the flag indicating output has occurred + self.has_output = True + # Stream the current event + yield event + matched = True # Set matched flag to break out of loops + break # Exit inner loop after match + if matched: + break # Exit outer loop after match elif isinstance(event, NodeRunSucceededEvent): yield event if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids: