diff --git a/api/core/app/apps/base_app_generate_response_converter.py b/api/core/app/apps/base_app_generate_response_converter.py index 4295cc5162..9dd6985783 100644 --- a/api/core/app/apps/base_app_generate_response_converter.py +++ b/api/core/app/apps/base_app_generate_response_converter.py @@ -26,57 +26,13 @@ class AppGenerateResponseConverter(ABC): else: def _generate_full_response() -> Generator[str, Any, None]: - # Track if we're currently in a thinking process section - is_thinking_process = False - thinking_tag = "" - # Buffer to accumulate thinking process content - thinking_buffer = "" - - for chunk in cls.convert_stream_full_response(response): - - if chunk == "ping": - yield f"event: {chunk}\n\n" - continue - - try: - chunk_dict = json.loads(chunk) - except json.JSONDecodeError: - yield f"data: {chunk}\n\n" - continue - - if ( - not is_thinking_process - and "data" in chunk_dict - and "inputs" in chunk_dict["data"] - and chunk_dict["data"]["inputs"] is not None - and "thinking_tag" in chunk_dict["data"]["inputs"] - ): - is_thinking_process = True - elif not is_thinking_process: - yield f"data: {chunk}\n\n" - continue - - if not "answer" in chunk_dict: - yield f"data: {chunk}\n\n" - continue - - # extract message text from chunk - message_text = chunk_dict["answer"] - - thinking_buffer += message_text - - if not thinking_tag in thinking_buffer: # thinking process end, return as usual + # Use the extracted method to handle thinking process logic + for processed_chunk in cls._process_stream_chunks_with_thinking(response): + if processed_chunk == "ping": + yield f"event: {processed_chunk}\n\n" continue - # remove tags and its content - remaining_buffer = thinking_buffer.split(thinking_tag, 1)[1].strip() - thinking_tag = None - is_thinking_process = False - - chunk_dict["answer"] = remaining_buffer - chunk = json.dumps(chunk_dict) - - yield f"data: {chunk}\n\n" + yield f"data: {processed_chunk}\n\n" return _generate_full_response() elif invoke_from == InvokeFrom.DEBUGGER: @@ -201,3 +157,105 @@ class AppGenerateResponseConverter(ABC): } return data + + @classmethod + def _process_stream_chunks_with_thinking( + cls, stream_response: Generator[AppStreamResponse, None, None] + ) -> Generator[str, None, None]: + """ + Process stream chunks and handle thinking process sections. + + This method processes each chunk from the stream response, handling any + thinking process sections by accumulating content until the closing tag + is found, then yielding only the content after the thinking process. + + :param stream_response: Stream of response chunks + :yield: Processed chunks with thinking process sections removed + """ + # Constants for thinking process handling + THINKING_END_TAG = "" + + # State variables + is_thinking_process = False + thinking_buffer = "" + + for chunk in cls.convert_stream_full_response(stream_response): + # Handle ping messages + if chunk == "ping": + yield "ping" + continue + + # Parse JSON chunks + try: + chunk_dict = json.loads(chunk) + except json.JSONDecodeError: + # Pass non-JSON chunks through unchanged + yield chunk + continue + + # Check if we're entering a thinking process + if not is_thinking_process: + # Check for thinking tag indicator in inputs + has_thinking_tag = ( + "data" in chunk_dict + and "inputs" in chunk_dict.get("data", {}) + and chunk_dict.get("data", {}).get("inputs") is not None + and "thinking_tag" in chunk_dict.get("data", {}).get("inputs", {}) + ) + + if has_thinking_tag: + is_thinking_process = True + thinking_buffer = "" # Reset buffer + else: + # No thinking process, pass through unchanged + yield chunk + continue + + # We're in a thinking process, check if chunk has answer + if "answer" not in chunk_dict: + # Unusual case - yield chunks without answers during thinking + yield chunk + continue + + # Add message text to thinking buffer + message_text = chunk_dict["answer"] + thinking_buffer += message_text + + # Check if thinking process has ended + if THINKING_END_TAG not in thinking_buffer: + # Still in thinking process, don't yield anything + chunk_dict["answer"] = "" + yield json.dumps(chunk_dict) + continue + + # Extract content after the thinking process + try: + # Split at the end tag and take content after it + remaining_parts = thinking_buffer.split(THINKING_END_TAG, 1) + if len(remaining_parts) < 2: + # Unusual case - end tag found but splitting failed + # Reset state and continue with current chunk + is_thinking_process = False + thinking_buffer = "" + yield chunk + continue + + remaining_buffer = remaining_parts[1].strip() + + # Create modified chunk with only the content after thinking process + chunk_dict["answer"] = remaining_buffer + modified_chunk = json.dumps(chunk_dict) + + # Reset state variables + is_thinking_process = False + thinking_buffer = "" + + yield modified_chunk + + except Exception as e: + # Log error and try to recover by resetting state + logging.error(f"Error processing thinking buffer: {str(e)}") + is_thinking_process = False + thinking_buffer = "" + # Try to return something useful + yield chunk