diff --git a/api/core/app/apps/base_app_generate_response_converter.py b/api/core/app/apps/base_app_generate_response_converter.py index be4027132b..4295cc5162 100644 --- a/api/core/app/apps/base_app_generate_response_converter.py +++ b/api/core/app/apps/base_app_generate_response_converter.py @@ -1,4 +1,6 @@ +import json import logging +import re from abc import ABC, abstractmethod from collections.abc import Generator, Mapping from typing import Any, Union @@ -18,7 +20,66 @@ class AppGenerateResponseConverter(ABC): response: Union[AppBlockingResponse, Generator[AppStreamResponse, Any, None]], invoke_from: InvokeFrom, ) -> Mapping[str, Any] | Generator[str, None, None]: - if invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API}: + if invoke_from == InvokeFrom.SERVICE_API: + if isinstance(response, AppBlockingResponse): + return cls.convert_blocking_full_response(response) + else: + + def _generate_full_response() -> Generator[str, Any, None]: + # Track if we're currently in a thinking process section + is_thinking_process = False + thinking_tag = "" + # Buffer to accumulate thinking process content + thinking_buffer = "" + + for chunk in cls.convert_stream_full_response(response): + + if chunk == "ping": + yield f"event: {chunk}\n\n" + continue + + try: + chunk_dict = json.loads(chunk) + except json.JSONDecodeError: + yield f"data: {chunk}\n\n" + continue + + if ( + not is_thinking_process + and "data" in chunk_dict + and "inputs" in chunk_dict["data"] + and chunk_dict["data"]["inputs"] is not None + and "thinking_tag" in chunk_dict["data"]["inputs"] + ): + is_thinking_process = True + elif not is_thinking_process: + yield f"data: {chunk}\n\n" + continue + + if not "answer" in chunk_dict: + yield f"data: {chunk}\n\n" + continue + + # extract message text from chunk + message_text = chunk_dict["answer"] + + thinking_buffer += message_text + + if not thinking_tag in thinking_buffer: # thinking process end, return as usual + continue + + # remove tags and its content + remaining_buffer = thinking_buffer.split(thinking_tag, 1)[1].strip() + thinking_tag = None + is_thinking_process = False + + chunk_dict["answer"] = remaining_buffer + chunk = json.dumps(chunk_dict) + + yield f"data: {chunk}\n\n" + + return _generate_full_response() + elif invoke_from == InvokeFrom.DEBUGGER: if isinstance(response, AppBlockingResponse): return cls.convert_blocking_full_response(response) else: @@ -113,8 +174,10 @@ class AppGenerateResponseConverter(ABC): ProviderTokenNotInitError: {"code": "provider_not_initialize", "status": 400}, QuotaExceededError: { "code": "provider_quota_exceeded", - "message": "Your quota for Dify Hosted Model Provider has been exhausted. " - "Please go to Settings -> Model Provider to complete your own provider credentials.", + "message": ( + "Your quota for Dify Hosted Model Provider has been exhausted. " + "Please go to Settings -> Model Provider to complete your own provider credentials." + ), "status": 400, }, ModelCurrentlyNotSupportError: {"code": "model_currently_not_support", "status": 400},