|
|
|
@ -26,57 +26,13 @@ class AppGenerateResponseConverter(ABC):
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
def _generate_full_response() -> Generator[str, Any, None]:
|
|
|
|
def _generate_full_response() -> Generator[str, Any, None]:
|
|
|
|
# Track if we're currently in a thinking process section
|
|
|
|
# Use the extracted method to handle thinking process logic
|
|
|
|
is_thinking_process = False
|
|
|
|
for processed_chunk in cls._process_stream_chunks_with_thinking(response):
|
|
|
|
thinking_tag = "</thinking_process>"
|
|
|
|
if processed_chunk == "ping":
|
|
|
|
# Buffer to accumulate thinking process content
|
|
|
|
yield f"event: {processed_chunk}\n\n"
|
|
|
|
thinking_buffer = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for chunk in cls.convert_stream_full_response(response):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if chunk == "ping":
|
|
|
|
|
|
|
|
yield f"event: {chunk}\n\n"
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
chunk_dict = json.loads(chunk)
|
|
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
|
|
yield f"data: {chunk}\n\n"
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
not is_thinking_process
|
|
|
|
|
|
|
|
and "data" in chunk_dict
|
|
|
|
|
|
|
|
and "inputs" in chunk_dict["data"]
|
|
|
|
|
|
|
|
and chunk_dict["data"]["inputs"] is not None
|
|
|
|
|
|
|
|
and "thinking_tag" in chunk_dict["data"]["inputs"]
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
is_thinking_process = True
|
|
|
|
|
|
|
|
elif not is_thinking_process:
|
|
|
|
|
|
|
|
yield f"data: {chunk}\n\n"
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not "answer" in chunk_dict:
|
|
|
|
|
|
|
|
yield f"data: {chunk}\n\n"
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# extract message text from chunk
|
|
|
|
|
|
|
|
message_text = chunk_dict["answer"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
thinking_buffer += message_text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not thinking_tag in thinking_buffer: # thinking process end, return as usual
|
|
|
|
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# remove <thinking_process> tags and its content
|
|
|
|
yield f"data: {processed_chunk}\n\n"
|
|
|
|
remaining_buffer = thinking_buffer.split(thinking_tag, 1)[1].strip()
|
|
|
|
|
|
|
|
thinking_tag = None
|
|
|
|
|
|
|
|
is_thinking_process = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chunk_dict["answer"] = remaining_buffer
|
|
|
|
|
|
|
|
chunk = json.dumps(chunk_dict)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yield f"data: {chunk}\n\n"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return _generate_full_response()
|
|
|
|
return _generate_full_response()
|
|
|
|
elif invoke_from == InvokeFrom.DEBUGGER:
|
|
|
|
elif invoke_from == InvokeFrom.DEBUGGER:
|
|
|
|
@ -201,3 +157,105 @@ class AppGenerateResponseConverter(ABC):
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
|
|
def _process_stream_chunks_with_thinking(
|
|
|
|
|
|
|
|
cls, stream_response: Generator[AppStreamResponse, None, None]
|
|
|
|
|
|
|
|
) -> Generator[str, None, None]:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Process stream chunks and handle thinking process sections.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
This method processes each chunk from the stream response, handling any
|
|
|
|
|
|
|
|
thinking process sections by accumulating content until the closing tag
|
|
|
|
|
|
|
|
is found, then yielding only the content after the thinking process.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param stream_response: Stream of response chunks
|
|
|
|
|
|
|
|
:yield: Processed chunks with thinking process sections removed
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Constants for thinking process handling
|
|
|
|
|
|
|
|
THINKING_END_TAG = "</thinking_process>"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# State variables
|
|
|
|
|
|
|
|
is_thinking_process = False
|
|
|
|
|
|
|
|
thinking_buffer = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for chunk in cls.convert_stream_full_response(stream_response):
|
|
|
|
|
|
|
|
# Handle ping messages
|
|
|
|
|
|
|
|
if chunk == "ping":
|
|
|
|
|
|
|
|
yield "ping"
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Parse JSON chunks
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
chunk_dict = json.loads(chunk)
|
|
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
|
|
# Pass non-JSON chunks through unchanged
|
|
|
|
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Check if we're entering a thinking process
|
|
|
|
|
|
|
|
if not is_thinking_process:
|
|
|
|
|
|
|
|
# Check for thinking tag indicator in inputs
|
|
|
|
|
|
|
|
has_thinking_tag = (
|
|
|
|
|
|
|
|
"data" in chunk_dict
|
|
|
|
|
|
|
|
and "inputs" in chunk_dict.get("data", {})
|
|
|
|
|
|
|
|
and chunk_dict.get("data", {}).get("inputs") is not None
|
|
|
|
|
|
|
|
and "thinking_tag" in chunk_dict.get("data", {}).get("inputs", {})
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if has_thinking_tag:
|
|
|
|
|
|
|
|
is_thinking_process = True
|
|
|
|
|
|
|
|
thinking_buffer = "" # Reset buffer
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# No thinking process, pass through unchanged
|
|
|
|
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# We're in a thinking process, check if chunk has answer
|
|
|
|
|
|
|
|
if "answer" not in chunk_dict:
|
|
|
|
|
|
|
|
# Unusual case - yield chunks without answers during thinking
|
|
|
|
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Add message text to thinking buffer
|
|
|
|
|
|
|
|
message_text = chunk_dict["answer"]
|
|
|
|
|
|
|
|
thinking_buffer += message_text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Check if thinking process has ended
|
|
|
|
|
|
|
|
if THINKING_END_TAG not in thinking_buffer:
|
|
|
|
|
|
|
|
# Still in thinking process, don't yield anything
|
|
|
|
|
|
|
|
chunk_dict["answer"] = ""
|
|
|
|
|
|
|
|
yield json.dumps(chunk_dict)
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Extract content after the thinking process
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
# Split at the end tag and take content after it
|
|
|
|
|
|
|
|
remaining_parts = thinking_buffer.split(THINKING_END_TAG, 1)
|
|
|
|
|
|
|
|
if len(remaining_parts) < 2:
|
|
|
|
|
|
|
|
# Unusual case - end tag found but splitting failed
|
|
|
|
|
|
|
|
# Reset state and continue with current chunk
|
|
|
|
|
|
|
|
is_thinking_process = False
|
|
|
|
|
|
|
|
thinking_buffer = ""
|
|
|
|
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
remaining_buffer = remaining_parts[1].strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Create modified chunk with only the content after thinking process
|
|
|
|
|
|
|
|
chunk_dict["answer"] = remaining_buffer
|
|
|
|
|
|
|
|
modified_chunk = json.dumps(chunk_dict)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Reset state variables
|
|
|
|
|
|
|
|
is_thinking_process = False
|
|
|
|
|
|
|
|
thinking_buffer = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yield modified_chunk
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
# Log error and try to recover by resetting state
|
|
|
|
|
|
|
|
logging.error(f"Error processing thinking buffer: {str(e)}")
|
|
|
|
|
|
|
|
is_thinking_process = False
|
|
|
|
|
|
|
|
thinking_buffer = ""
|
|
|
|
|
|
|
|
# Try to return something useful
|
|
|
|
|
|
|
|
yield chunk
|
|
|
|
|