@ -42,23 +42,26 @@ class EndStreamProcessor(StreamProcessor):
yield event
continue
if event . route_node_state . node_id in self . current_stream_chunk_generating_node_ids :
stream_out_end_node_ids = self . current_stream_chunk_generating_node_ids [
event . route_node_state . node_id
]
else :
stream_out_end_node_ids = self . _get_stream_out_end_node_ids ( event )
self . current_stream_chunk_generating_node_ids [ event . route_node_state . node_id ] = (
stream_out_end_node_ids
)
if stream_out_end_node_ids :
if self . has_output and event . node_id not in self . output_node_ids :
event . chunk_content = " \n " + event . chunk_content
self . output_node_ids . add ( event . node_id )
self . has_output = True
yield event
# Iterate all variable selectors for all end nodes; stream if matched
matched = False # Flag to indicate if a match is found
for end_node_id , value_selectors_list in self . end_stream_param . end_stream_variable_selector_mapping . items ( ) :
# Loop through all variable selectors configured for the current end node
for value_selector in value_selectors_list :
# Check if the current event's from_variable_selector matches the configured selector
if event . from_variable_selector == value_selector :
# If there was previous output and this node hasn't output yet, prepend a newline
if self . has_output and event . node_id not in self . output_node_ids :
event . chunk_content = " \n " + event . chunk_content
# Mark this node as having output to avoid duplicate newlines
self . output_node_ids . add ( event . node_id )
# Set the flag indicating output has occurred
self . has_output = True
# Stream the current event
yield event
matched = True # Set matched flag to break out of loops
break # Exit inner loop after match
if matched :
break # Exit outer loop after match
elif isinstance ( event , NodeRunSucceededEvent ) :
yield event
if event . route_node_state . node_id in self . current_stream_chunk_generating_node_ids :