Revert "optimize thinking process"

This reverts commit d8d665f7f2.
pull/21891/head
ytqh 1 year ago
parent d8d665f7f2
commit eec838c67a

@ -26,13 +26,57 @@ class AppGenerateResponseConverter(ABC):
else:
def _generate_full_response() -> Generator[str, Any, None]:
# Use the extracted method to handle thinking process logic
for processed_chunk in cls._process_stream_chunks_with_thinking(response):
if processed_chunk == "ping":
yield f"event: {processed_chunk}\n\n"
# Track if we're currently in a thinking process section
is_thinking_process = False
thinking_tag = "</thinking_process>"
# Buffer to accumulate thinking process content
thinking_buffer = ""
for chunk in cls.convert_stream_full_response(response):
if chunk == "ping":
yield f"event: {chunk}\n\n"
continue
try:
chunk_dict = json.loads(chunk)
except json.JSONDecodeError:
yield f"data: {chunk}\n\n"
continue
if (
not is_thinking_process
and "data" in chunk_dict
and "inputs" in chunk_dict["data"]
and chunk_dict["data"]["inputs"] is not None
and "thinking_tag" in chunk_dict["data"]["inputs"]
):
is_thinking_process = True
elif not is_thinking_process:
yield f"data: {chunk}\n\n"
continue
if not "answer" in chunk_dict:
yield f"data: {chunk}\n\n"
continue
# extract message text from chunk
message_text = chunk_dict["answer"]
thinking_buffer += message_text
if not thinking_tag in thinking_buffer: # thinking process end, return as usual
continue
yield f"data: {processed_chunk}\n\n"
# remove <thinking_process> tags and its content
remaining_buffer = thinking_buffer.split(thinking_tag, 1)[1].strip()
thinking_tag = None
is_thinking_process = False
chunk_dict["answer"] = remaining_buffer
chunk = json.dumps(chunk_dict)
yield f"data: {chunk}\n\n"
return _generate_full_response()
elif invoke_from == InvokeFrom.DEBUGGER:
@ -157,105 +201,3 @@ class AppGenerateResponseConverter(ABC):
}
return data
@classmethod
def _process_stream_chunks_with_thinking(
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[str, None, None]:
"""
Process stream chunks and handle thinking process sections.
This method processes each chunk from the stream response, handling any
thinking process sections by accumulating content until the closing tag
is found, then yielding only the content after the thinking process.
:param stream_response: Stream of response chunks
:yield: Processed chunks with thinking process sections removed
"""
# Constants for thinking process handling
THINKING_END_TAG = "</thinking_process>"
# State variables
is_thinking_process = False
thinking_buffer = ""
for chunk in cls.convert_stream_full_response(stream_response):
# Handle ping messages
if chunk == "ping":
yield "ping"
continue
# Parse JSON chunks
try:
chunk_dict = json.loads(chunk)
except json.JSONDecodeError:
# Pass non-JSON chunks through unchanged
yield chunk
continue
# Check if we're entering a thinking process
if not is_thinking_process:
# Check for thinking tag indicator in inputs
has_thinking_tag = (
"data" in chunk_dict
and "inputs" in chunk_dict.get("data", {})
and chunk_dict.get("data", {}).get("inputs") is not None
and "thinking_tag" in chunk_dict.get("data", {}).get("inputs", {})
)
if has_thinking_tag:
is_thinking_process = True
thinking_buffer = "" # Reset buffer
else:
# No thinking process, pass through unchanged
yield chunk
continue
# We're in a thinking process, check if chunk has answer
if "answer" not in chunk_dict:
# Unusual case - yield chunks without answers during thinking
yield chunk
continue
# Add message text to thinking buffer
message_text = chunk_dict["answer"]
thinking_buffer += message_text
# Check if thinking process has ended
if THINKING_END_TAG not in thinking_buffer:
# Still in thinking process, don't yield anything
chunk_dict["answer"] = ""
yield json.dumps(chunk_dict)
continue
# Extract content after the thinking process
try:
# Split at the end tag and take content after it
remaining_parts = thinking_buffer.split(THINKING_END_TAG, 1)
if len(remaining_parts) < 2:
# Unusual case - end tag found but splitting failed
# Reset state and continue with current chunk
is_thinking_process = False
thinking_buffer = ""
yield chunk
continue
remaining_buffer = remaining_parts[1].strip()
# Create modified chunk with only the content after thinking process
chunk_dict["answer"] = remaining_buffer
modified_chunk = json.dumps(chunk_dict)
# Reset state variables
is_thinking_process = False
thinking_buffer = ""
yield modified_chunk
except Exception as e:
# Log error and try to recover by resetting state
logging.error(f"Error processing thinking buffer: {str(e)}")
is_thinking_process = False
thinking_buffer = ""
# Try to return something useful
yield chunk

Loading…
Cancel
Save