refactor: enforce return object in app generator

pull/9184/head
Yeuoly 2 years ago
parent a073de44e9
commit ec711d094d
No known key found for this signature in database
GPG Key ID: A66E7E320FB19F61

@ -4,7 +4,7 @@ import os
import threading import threading
import uuid import uuid
from collections.abc import Generator from collections.abc import Generator
from typing import Literal, Union, overload from typing import Any, Literal, Union, overload
from flask import Flask, current_app from flask import Flask, current_app
from pydantic import ValidationError from pydantic import ValidationError
@ -47,7 +47,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
args: dict, args: dict,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: Literal[True] = True, stream: Literal[True] = True,
) -> Generator[str, None, None]: ... ) -> Generator[dict | str, None, None]: ...
@overload @overload
def generate( def generate(
@ -59,6 +59,16 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
stream: Literal[False] = False, stream: Literal[False] = False,
) -> dict: ... ) -> dict: ...
@overload
def generate(
self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
stream: bool = True,
) -> Union[dict[str, Any], Generator[dict | str, Any, None]]: ...
def generate( def generate(
self, app_model: App, self, app_model: App,
workflow: Workflow, workflow: Workflow,
@ -152,7 +162,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
def single_iteration_generate(self, app_model: App, def single_iteration_generate(self, app_model: App,
workflow: Workflow, workflow: Workflow,
node_id: str, node_id: str,
user: Account, user: Account | EndUser,
args: dict, args: dict,
stream: bool = True): stream: bool = True):
""" """
@ -325,7 +335,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
user=user, user=user,
stream=stream, stream=stream,
) )
return AdvancedChatAppGenerateResponseConverter.convert( return AdvancedChatAppGenerateResponseConverter.convert(
response=response, response=response,
invoke_from=invoke_from invoke_from=invoke_from

@ -1,4 +1,3 @@
import json
from collections.abc import Generator from collections.abc import Generator
from typing import Any, cast from typing import Any, cast
@ -56,7 +55,7 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
return response return response
@classmethod @classmethod
def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[str, Any, None]: def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[dict | str, Any, None]:
""" """
Convert stream full response. Convert stream full response.
:param stream_response: stream response :param stream_response: stream response
@ -82,10 +81,10 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(data) response_chunk.update(data)
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk
@classmethod @classmethod
def convert_stream_simple_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[str, Any, None]: def convert_stream_simple_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[dict | str, Any, None]:
""" """
Convert stream simple response. Convert stream simple response.
:param stream_response: stream response :param stream_response: stream response
@ -119,4 +118,4 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk

@ -35,7 +35,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
args: dict, args: dict,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: Literal[True] = True, stream: Literal[True] = True,
) -> Generator[str, None, None]: ... ) -> Generator[dict | str, None, None]: ...
@overload @overload
def generate( def generate(
@ -46,12 +46,21 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
stream: Literal[False] = False, stream: Literal[False] = False,
) -> dict: ... ) -> dict: ...
@overload
def generate(
self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
stream: bool = False,
) -> dict | Generator[dict | str, None, None]: ...
def generate(self, app_model: App, def generate(self, app_model: App,
user: Union[Account, EndUser], user: Union[Account, EndUser],
args: Any, args: Any,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: bool = True) \ stream: bool = True) \
-> Union[dict, Generator[str, None, None]]: -> Union[dict, Generator[dict | str, None, None]]:
""" """
Generate App response. Generate App response.

@ -1,4 +1,3 @@
import json
from collections.abc import Generator from collections.abc import Generator
from typing import cast from typing import cast
@ -52,7 +51,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod @classmethod
def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream full response. Convert stream full response.
:param stream_response: stream response :param stream_response: stream response
@ -78,11 +77,11 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(data) response_chunk.update(data)
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk
@classmethod @classmethod
def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream simple response. Convert stream simple response.
:param stream_response: stream response :param stream_response: stream response
@ -114,4 +113,4 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk

@ -21,24 +21,16 @@ class AppGenerateResponseConverter(ABC):
if isinstance(response, AppBlockingResponse): if isinstance(response, AppBlockingResponse):
return cls.convert_blocking_full_response(response) return cls.convert_blocking_full_response(response)
else: else:
def _generate_full_response() -> Generator[str, Any, None]: def _generate_full_response() -> Generator[dict | str, Any, None]:
for chunk in cls.convert_stream_full_response(response): yield from cls.convert_stream_simple_response(response)
if chunk == 'ping':
yield f'event: {chunk}\n\n'
else:
yield f'data: {chunk}\n\n'
return _generate_full_response() return _generate_full_response()
else: else:
if isinstance(response, AppBlockingResponse): if isinstance(response, AppBlockingResponse):
return cls.convert_blocking_simple_response(response) return cls.convert_blocking_simple_response(response)
else: else:
def _generate_simple_response() -> Generator[str, Any, None]: def _generate_simple_response() -> Generator[dict | str, Any, None]:
for chunk in cls.convert_stream_simple_response(response): yield from cls.convert_stream_simple_response(response)
if chunk == 'ping':
yield f'event: {chunk}\n\n'
else:
yield f'data: {chunk}\n\n'
return _generate_simple_response() return _generate_simple_response()
@ -55,7 +47,7 @@ class AppGenerateResponseConverter(ABC):
@classmethod @classmethod
@abstractmethod @abstractmethod
def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) \ def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
raise NotImplementedError raise NotImplementedError
@classmethod @classmethod

@ -1,5 +1,6 @@
from collections.abc import Mapping from collections.abc import Generator, Mapping
from typing import Any, Optional import json
from typing import Any, Optional, Union
from core.app.app_config.entities import AppConfig, VariableEntity, VariableEntityType from core.app.app_config.entities import AppConfig, VariableEntity, VariableEntityType
@ -54,3 +55,20 @@ class BaseAppGenerator:
if isinstance(value, str): if isinstance(value, str):
return value.replace('\x00', '') return value.replace('\x00', '')
return value return value
@classmethod
def convert_to_event_stream(cls, generator: Union[dict, Generator[dict| str, None, None]]):
"""
Convert messages into event stream
"""
if isinstance(generator, dict):
return generator
else:
def gen():
for message in generator:
if isinstance(message, dict):
yield f'data: {json.dumps(message)}\n\n'
else:
yield f'event: {message}\n\n'
return gen()

@ -3,7 +3,7 @@ import time
from abc import abstractmethod from abc import abstractmethod
from collections.abc import Generator from collections.abc import Generator
from enum import Enum from enum import Enum
from typing import Any from typing import Any, Optional
from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm import DeclarativeMeta
@ -118,7 +118,7 @@ class AppQueueManager:
Set task stop flag Set task stop flag
:return: :return:
""" """
result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) result: Optional[Any] = redis_client.get(cls._generate_task_belong_cache_key(task_id))
if result is None: if result is None:
return return

@ -35,7 +35,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
args: Any, args: Any,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: Literal[True] = True, stream: Literal[True] = True,
) -> Generator[str, None, None]: ... ) -> Generator[dict | str, None, None]: ...
@overload @overload
def generate( def generate(
@ -46,13 +46,22 @@ class ChatAppGenerator(MessageBasedAppGenerator):
stream: Literal[False] = False, stream: Literal[False] = False,
) -> dict: ... ) -> dict: ...
@overload
def generate(
self, app_model: App,
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: bool = False,
) -> Union[dict, Generator[dict | str, None, None]]: ...
def generate( def generate(
self, app_model: App, self, app_model: App,
user: Union[Account, EndUser], user: Union[Account, EndUser],
args: Any, args: Any,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: bool = True, stream: bool = True,
) -> Union[dict, Generator[str, None, None]]: ) -> Union[dict, Generator[dict | str, None, None]]:
""" """
Generate App response. Generate App response.

@ -1,4 +1,3 @@
import json
from collections.abc import Generator from collections.abc import Generator
from typing import cast from typing import cast
@ -52,7 +51,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod @classmethod
def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream full response. Convert stream full response.
:param stream_response: stream response :param stream_response: stream response
@ -78,11 +77,11 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(data) response_chunk.update(data)
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk
@classmethod @classmethod
def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream simple response. Convert stream simple response.
:param stream_response: stream response :param stream_response: stream response
@ -114,4 +113,4 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk

@ -48,6 +48,15 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
stream: Literal[False] = False, stream: Literal[False] = False,
) -> dict: ... ) -> dict: ...
@overload
def generate(
self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
stream: bool = False,
) -> dict | Generator[str, None, None]: ...
def generate(self, app_model: App, def generate(self, app_model: App,
user: Union[Account, EndUser], user: Union[Account, EndUser],
args: Any, args: Any,

@ -1,4 +1,3 @@
import json
from collections.abc import Generator from collections.abc import Generator
from typing import cast from typing import cast
@ -51,7 +50,7 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod @classmethod
def convert_stream_full_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \ def convert_stream_full_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream full response. Convert stream full response.
:param stream_response: stream response :param stream_response: stream response
@ -76,11 +75,11 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(data) response_chunk.update(data)
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk
@classmethod @classmethod
def convert_stream_simple_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \ def convert_stream_simple_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream simple response. Convert stream simple response.
:param stream_response: stream response :param stream_response: stream response
@ -111,4 +110,4 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk

@ -40,7 +40,8 @@ class WorkflowAppGenerator(BaseAppGenerator):
args: dict, args: dict,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: Literal[True] = True, stream: Literal[True] = True,
) -> Generator[str, None, None]: ... call_depth: int = 0,
) -> Generator[dict | str, None, None]: ...
@overload @overload
def generate( def generate(
@ -50,8 +51,20 @@ class WorkflowAppGenerator(BaseAppGenerator):
args: dict, args: dict,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: Literal[False] = False, stream: Literal[False] = False,
call_depth: int = 0,
) -> dict: ... ) -> dict: ...
@overload
def generate(
self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
stream: bool = False,
call_depth: int = 0,
) -> dict | Generator[dict | str, None, None]: ...
def generate( def generate(
self, app_model: App, self, app_model: App,
workflow: Workflow, workflow: Workflow,
@ -127,7 +140,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
application_generate_entity: WorkflowAppGenerateEntity, application_generate_entity: WorkflowAppGenerateEntity,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: bool = True, stream: bool = True,
) -> Union[dict, Generator[str, None, None]]: ) -> Union[dict, Generator[str | dict, None, None]]:
""" """
Generate App response. Generate App response.
@ -173,7 +186,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
def single_iteration_generate(self, app_model: App, def single_iteration_generate(self, app_model: App,
workflow: Workflow, workflow: Workflow,
node_id: str, node_id: str,
user: Account, user: Account | EndUser,
args: dict, args: dict,
stream: bool = True): stream: bool = True):
""" """

@ -1,4 +1,3 @@
import json
from collections.abc import Generator from collections.abc import Generator
from typing import cast from typing import cast
@ -36,7 +35,7 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod @classmethod
def convert_stream_full_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \ def convert_stream_full_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream full response. Convert stream full response.
:param stream_response: stream response :param stream_response: stream response
@ -60,11 +59,11 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(data) response_chunk.update(data)
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk
@classmethod @classmethod
def convert_stream_simple_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \ def convert_stream_simple_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \
-> Generator[str, None, None]: -> Generator[dict | str, None, None]:
""" """
Convert stream simple response. Convert stream simple response.
:param stream_response: stream response :param stream_response: stream response
@ -90,4 +89,4 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter):
response_chunk.update(sub_stream_response.to_ignore_detail_dict()) response_chunk.update(sub_stream_response.to_ignore_detail_dict())
else: else:
response_chunk.update(sub_stream_response.to_dict()) response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk) yield response_chunk

@ -42,48 +42,58 @@ class AppGenerateService:
request_id = rate_limit.enter(request_id) request_id = rate_limit.enter(request_id)
if app_model.mode == AppMode.COMPLETION.value: if app_model.mode == AppMode.COMPLETION.value:
return rate_limit.generate( return rate_limit.generate(
CompletionAppGenerator().generate( CompletionAppGenerator.convert_to_event_stream(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming CompletionAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
),
), ),
request_id, request_id,
) )
elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
return rate_limit.generate( return rate_limit.generate(
AgentChatAppGenerator().generate( AgentChatAppGenerator.convert_to_event_stream(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming AgentChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
),
), ),
request_id, request_id,
) )
elif app_model.mode == AppMode.CHAT.value: elif app_model.mode == AppMode.CHAT.value:
return rate_limit.generate( return rate_limit.generate(
ChatAppGenerator().generate( ChatAppGenerator.convert_to_event_stream(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming ChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
),
), ),
request_id, request_id,
) )
elif app_model.mode == AppMode.ADVANCED_CHAT.value: elif app_model.mode == AppMode.ADVANCED_CHAT.value:
workflow = cls._get_workflow(app_model, invoke_from) workflow = cls._get_workflow(app_model, invoke_from)
return rate_limit.generate( return rate_limit.generate(
AdvancedChatAppGenerator().generate( AdvancedChatAppGenerator.convert_to_event_stream(
AdvancedChatAppGenerator().generate(
app_model=app_model, app_model=app_model,
workflow=workflow, workflow=workflow,
user=user, user=user,
args=args, args=args,
invoke_from=invoke_from, invoke_from=invoke_from,
stream=streaming, stream=streaming,
),
), ),
request_id, request_id,
) )
elif app_model.mode == AppMode.WORKFLOW.value: elif app_model.mode == AppMode.WORKFLOW.value:
workflow = cls._get_workflow(app_model, invoke_from) workflow = cls._get_workflow(app_model, invoke_from)
return rate_limit.generate( return rate_limit.generate(
WorkflowAppGenerator().generate( WorkflowAppGenerator.convert_to_event_stream(
app_model=app_model, WorkflowAppGenerator().generate(
workflow=workflow, app_model=app_model,
user=user, workflow=workflow,
args=args, user=user,
invoke_from=invoke_from, args=args,
stream=streaming, invoke_from=invoke_from,
stream=streaming,
),
), ),
request_id, request_id,
) )
@ -108,13 +118,17 @@ class AppGenerateService:
): ):
if app_model.mode == AppMode.ADVANCED_CHAT.value: if app_model.mode == AppMode.ADVANCED_CHAT.value:
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
return AdvancedChatAppGenerator().single_iteration_generate( return AdvancedChatAppGenerator.convert_to_event_stream(
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming AdvancedChatAppGenerator().single_iteration_generate(
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
)
) )
elif app_model.mode == AppMode.WORKFLOW.value: elif app_model.mode == AppMode.WORKFLOW.value:
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
return WorkflowAppGenerator().single_iteration_generate( return AdvancedChatAppGenerator.convert_to_event_stream(
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming WorkflowAppGenerator().single_iteration_generate(
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
)
) )
else: else:
raise ValueError(f"Invalid app mode {app_model.mode}") raise ValueError(f"Invalid app mode {app_model.mode}")

Loading…
Cancel
Save