pull/21981/merge
阿七 11 months ago committed by GitHub
commit 819de50d08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -42,23 +42,26 @@ class EndStreamProcessor(StreamProcessor):
yield event
continue
if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids:
stream_out_end_node_ids = self.current_stream_chunk_generating_node_ids[
event.route_node_state.node_id
]
else:
stream_out_end_node_ids = self._get_stream_out_end_node_ids(event)
self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] = (
stream_out_end_node_ids
)
if stream_out_end_node_ids:
if self.has_output and event.node_id not in self.output_node_ids:
event.chunk_content = "\n" + event.chunk_content
self.output_node_ids.add(event.node_id)
self.has_output = True
yield event
# Iterate all variable selectors for all end nodes; stream if matched
matched = False # Flag to indicate if a match is found
for end_node_id, value_selectors_list in self.end_stream_param.end_stream_variable_selector_mapping.items():
# Loop through all variable selectors configured for the current end node
for value_selector in value_selectors_list:
# Check if the current event's from_variable_selector matches the configured selector
if event.from_variable_selector == value_selector:
# If there was previous output and this node hasn't output yet, prepend a newline
if self.has_output and event.node_id not in self.output_node_ids:
event.chunk_content = "\n" + event.chunk_content
# Mark this node as having output to avoid duplicate newlines
self.output_node_ids.add(event.node_id)
# Set the flag indicating output has occurred
self.has_output = True
# Stream the current event
yield event
matched = True # Set matched flag to break out of loops
break # Exit inner loop after match
if matched:
break # Exit outer loop after match
elif isinstance(event, NodeRunSucceededEvent):
yield event
if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids:

@ -0,0 +1,9 @@
FROM langgenius/dify-api:1.5.1
RUN apt-get update && apt-get install -y git \
&& rm -rf /var/lib/apt/lists/*
COPY my_endnode_streaming.patch /app/my_endnode_streaming.patch
RUN cd /app && git apply my_endnode_streaming.patch
# docker build -f docker/Dockerfile.patch -t land007/dify-api:1.5.1 .

@ -518,7 +518,7 @@ x-shared-env: &shared-api-worker-env
services:
# API service
api:
image: langgenius/dify-api:1.5.1
image: land007/dify-api:1.5.1
restart: always
environment:
# Use the shared environment variables.

@ -0,0 +1,48 @@
diff --git a/api/core/workflow/nodes/end/end_stream_processor.py b/api/core/workflow/nodes/end/end_stream_processor.py
index a6fb2ffc1..fcf4011da 100644
--- a/api/core/workflow/nodes/end/end_stream_processor.py
+++ b/api/core/workflow/nodes/end/end_stream_processor.py
@@ -42,23 +42,26 @@ class EndStreamProcessor(StreamProcessor):
yield event
continue
- if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids:
- stream_out_end_node_ids = self.current_stream_chunk_generating_node_ids[
- event.route_node_state.node_id
- ]
- else:
- stream_out_end_node_ids = self._get_stream_out_end_node_ids(event)
- self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] = (
- stream_out_end_node_ids
- )
-
- if stream_out_end_node_ids:
- if self.has_output and event.node_id not in self.output_node_ids:
- event.chunk_content = "\n" + event.chunk_content
-
- self.output_node_ids.add(event.node_id)
- self.has_output = True
- yield event
+ # Iterate all variable selectors for all end nodes; stream if matched
+ matched = False # Flag to indicate if a match is found
+ for end_node_id, value_selectors_list in self.end_stream_param.end_stream_variable_selector_mapping.items():
+ # Loop through all variable selectors configured for the current end node
+ for value_selector in value_selectors_list:
+ # Check if the current event's from_variable_selector matches the configured selector
+ if event.from_variable_selector == value_selector:
+ # If there was previous output and this node hasn't output yet, prepend a newline
+ if self.has_output and event.node_id not in self.output_node_ids:
+ event.chunk_content = "\n" + event.chunk_content
+ # Mark this node as having output to avoid duplicate newlines
+ self.output_node_ids.add(event.node_id)
+ # Set the flag indicating output has occurred
+ self.has_output = True
+ # Stream the current event
+ yield event
+ matched = True # Set matched flag to break out of loops
+ break # Exit inner loop after match
+ if matched:
+ break # Exit outer loop after match
elif isinstance(event, NodeRunSucceededEvent):
yield event
if event.route_node_state.node_id in self.current_stream_chunk_generating_node_ids:
Loading…
Cancel
Save