From eec838c67ab7ce30530dec59079392af41aa7b12 Mon Sep 17 00:00:00 2001 From: ytqh Date: Wed, 19 Mar 2025 15:31:20 +0800 Subject: [PATCH] Revert "optimize thinking process" This reverts commit d8d665f7f20632fc6d0a3f26e94e4e0c22690dee. --- .../base_app_generate_response_converter.py | 156 ++++++------------ 1 file changed, 49 insertions(+), 107 deletions(-) diff --git a/api/core/app/apps/base_app_generate_response_converter.py b/api/core/app/apps/base_app_generate_response_converter.py index 9dd6985783..4295cc5162 100644 --- a/api/core/app/apps/base_app_generate_response_converter.py +++ b/api/core/app/apps/base_app_generate_response_converter.py @@ -26,13 +26,57 @@ class AppGenerateResponseConverter(ABC): else: def _generate_full_response() -> Generator[str, Any, None]: - # Use the extracted method to handle thinking process logic - for processed_chunk in cls._process_stream_chunks_with_thinking(response): - if processed_chunk == "ping": - yield f"event: {processed_chunk}\n\n" + # Track if we're currently in a thinking process section + is_thinking_process = False + thinking_tag = "" + # Buffer to accumulate thinking process content + thinking_buffer = "" + + for chunk in cls.convert_stream_full_response(response): + + if chunk == "ping": + yield f"event: {chunk}\n\n" + continue + + try: + chunk_dict = json.loads(chunk) + except json.JSONDecodeError: + yield f"data: {chunk}\n\n" + continue + + if ( + not is_thinking_process + and "data" in chunk_dict + and "inputs" in chunk_dict["data"] + and chunk_dict["data"]["inputs"] is not None + and "thinking_tag" in chunk_dict["data"]["inputs"] + ): + is_thinking_process = True + elif not is_thinking_process: + yield f"data: {chunk}\n\n" + continue + + if not "answer" in chunk_dict: + yield f"data: {chunk}\n\n" + continue + + # extract message text from chunk + message_text = chunk_dict["answer"] + + thinking_buffer += message_text + + if not thinking_tag in thinking_buffer: # thinking process end, return as usual continue - yield f"data: {processed_chunk}\n\n" + # remove tags and its content + remaining_buffer = thinking_buffer.split(thinking_tag, 1)[1].strip() + thinking_tag = None + is_thinking_process = False + + chunk_dict["answer"] = remaining_buffer + chunk = json.dumps(chunk_dict) + + yield f"data: {chunk}\n\n" return _generate_full_response() elif invoke_from == InvokeFrom.DEBUGGER: @@ -157,105 +201,3 @@ class AppGenerateResponseConverter(ABC): } return data - - @classmethod - def _process_stream_chunks_with_thinking( - cls, stream_response: Generator[AppStreamResponse, None, None] - ) -> Generator[str, None, None]: - """ - Process stream chunks and handle thinking process sections. - - This method processes each chunk from the stream response, handling any - thinking process sections by accumulating content until the closing tag - is found, then yielding only the content after the thinking process. - - :param stream_response: Stream of response chunks - :yield: Processed chunks with thinking process sections removed - """ - # Constants for thinking process handling - THINKING_END_TAG = "" - - # State variables - is_thinking_process = False - thinking_buffer = "" - - for chunk in cls.convert_stream_full_response(stream_response): - # Handle ping messages - if chunk == "ping": - yield "ping" - continue - - # Parse JSON chunks - try: - chunk_dict = json.loads(chunk) - except json.JSONDecodeError: - # Pass non-JSON chunks through unchanged - yield chunk - continue - - # Check if we're entering a thinking process - if not is_thinking_process: - # Check for thinking tag indicator in inputs - has_thinking_tag = ( - "data" in chunk_dict - and "inputs" in chunk_dict.get("data", {}) - and chunk_dict.get("data", {}).get("inputs") is not None - and "thinking_tag" in chunk_dict.get("data", {}).get("inputs", {}) - ) - - if has_thinking_tag: - is_thinking_process = True - thinking_buffer = "" # Reset buffer - else: - # No thinking process, pass through unchanged - yield chunk - continue - - # We're in a thinking process, check if chunk has answer - if "answer" not in chunk_dict: - # Unusual case - yield chunks without answers during thinking - yield chunk - continue - - # Add message text to thinking buffer - message_text = chunk_dict["answer"] - thinking_buffer += message_text - - # Check if thinking process has ended - if THINKING_END_TAG not in thinking_buffer: - # Still in thinking process, don't yield anything - chunk_dict["answer"] = "" - yield json.dumps(chunk_dict) - continue - - # Extract content after the thinking process - try: - # Split at the end tag and take content after it - remaining_parts = thinking_buffer.split(THINKING_END_TAG, 1) - if len(remaining_parts) < 2: - # Unusual case - end tag found but splitting failed - # Reset state and continue with current chunk - is_thinking_process = False - thinking_buffer = "" - yield chunk - continue - - remaining_buffer = remaining_parts[1].strip() - - # Create modified chunk with only the content after thinking process - chunk_dict["answer"] = remaining_buffer - modified_chunk = json.dumps(chunk_dict) - - # Reset state variables - is_thinking_process = False - thinking_buffer = "" - - yield modified_chunk - - except Exception as e: - # Log error and try to recover by resetting state - logging.error(f"Error processing thinking buffer: {str(e)}") - is_thinking_process = False - thinking_buffer = "" - # Try to return something useful - yield chunk