Compare commits

...

7 Commits

Author SHA1 Message Date
takatost 0c6ad1df64 optimize conversation / message insert 1 year ago
takatost 40fb522f56 add trace 1 year ago
takatost 96d9951d5c disable redis cache 1 year ago
takatost d36201f7ff remove unused code 1 year ago
takatost b46c7935b1 test disable socket patch 1 year ago
takatost 206e6e1e7c add debug print 1 year ago
takatost 8685c0d48b test replace gevent socket patch in redis 1 year ago

@ -5,6 +5,7 @@ on:
branches:
- "main"
- "deploy/dev"
- "fix/redis-slow-in-gevent"
release:
types: [published]

@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
default=0,
)
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
description="Maximum number of connections to Redis",
default=200,
)
REDIS_USE_SSL: bool = Field(
description="Enable SSL/TLS for the Redis connection",
default=False,

@ -226,13 +226,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
is_first_conversation = True
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
(conversation, message) = self._init_generate_records(
application_generate_entity=application_generate_entity,
conversation=conversation,
override_model_configs=workflow.features_dict if is_first_conversation else None,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(

@ -1,4 +1,5 @@
import logging
import time
from collections.abc import Mapping
from typing import Any, cast
@ -101,6 +102,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
):
return
# trace start time
start_time = time.perf_counter()
# Init conversation variables
stmt = select(ConversationVariable).where(
ConversationVariable.app_id == self.conversation.app_id,
@ -128,6 +132,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
conversation_dialogue_count = self.conversation.dialogue_count
db.session.commit()
# trace end time
end_time = time.perf_counter()
print(f"conversation_dialogue_count time: {end_time - start_time}")
# trace start time
start_time = time.perf_counter()
# Create a variable pool.
system_inputs = {
SystemVariableKey.QUERY: query,
@ -151,6 +162,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# init graph
graph = self._init_graph(graph_config=workflow.graph_dict)
# trace end time
end_time = time.perf_counter()
print(f"init graph time: {end_time - start_time}")
db.session.close()
# RUN WORKFLOW

@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
QueuePingEvent,
QueueStopEvent,
)
from extensions.ext_redis import redis_client
class PublishFrom(Enum):
@ -32,10 +31,10 @@ class AppQueueManager:
self._user_id = user_id
self._invoke_from = invoke_from
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
redis_client.setex(
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
)
# user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
# redis_client.setex(
# AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
# )
q = queue.Queue()
@ -114,26 +113,27 @@ class AppQueueManager:
Set task stop flag
:return:
"""
result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
if result is None:
return
return
# result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
# if result is None:
# return
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
if result.decode("utf-8") != f"{user_prefix}-{user_id}":
return
# user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
# if result.decode("utf-8") != f"{user_prefix}-{user_id}":
# return
stopped_cache_key = cls._generate_stopped_cache_key(task_id)
redis_client.setex(stopped_cache_key, 600, 1)
# stopped_cache_key = cls._generate_stopped_cache_key(task_id)
# redis_client.setex(stopped_cache_key, 600, 1)
def _is_stopped(self) -> bool:
"""
Check if task is stopped
:return:
"""
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
result = redis_client.get(stopped_cache_key)
if result is not None:
return True
# stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
# result = redis_client.get(stopped_cache_key)
# if result is not None:
# return True
return False

@ -1,8 +1,9 @@
import json
import logging
from collections.abc import Generator
import uuid
from collections.abc import Generator, Mapping
from datetime import datetime, timezone
from typing import Optional, Union
from typing import Any, Optional, Union
from sqlalchemy import and_
@ -137,6 +138,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
AdvancedChatAppGenerateEntity,
],
conversation: Optional[Conversation] = None,
override_model_configs: Optional[Mapping[str, Any]] = None,
) -> tuple[Conversation, Message]:
"""
Initialize generate records
@ -158,14 +160,12 @@ class MessageBasedAppGenerator(BaseAppGenerator):
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
app_model_config_id = None
override_model_configs = None
model_provider = None
model_id = None
else:
app_model_config_id = app_config.app_model_config_id
model_provider = application_generate_entity.model_conf.provider
model_id = application_generate_entity.model_conf.model
override_model_configs = None
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
AppMode.AGENT_CHAT,
AppMode.CHAT,
@ -177,61 +177,63 @@ class MessageBasedAppGenerator(BaseAppGenerator):
introduction = self._get_conversation_introduction(application_generate_entity)
if not conversation:
conversation = Conversation(
app_id=app_config.app_id,
app_model_config_id=app_model_config_id,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
mode=app_config.app_mode.value,
name="New conversation",
inputs=application_generate_entity.inputs,
introduction=introduction,
system_instruction="",
system_instruction_tokens=0,
status="normal",
invoke_from=application_generate_entity.invoke_from.value,
from_source=from_source,
from_end_user_id=end_user_id,
from_account_id=account_id,
)
db.session.add(conversation)
db.session.commit()
db.session.refresh(conversation)
with db.Session(bind=db.engine, expire_on_commit=False) as session:
conversation = Conversation()
conversation.id = str(uuid.uuid4())
conversation.app_id = app_config.app_id
conversation.app_model_config_id = app_model_config_id
conversation.model_provider = model_provider
conversation.model_id = model_id
conversation.override_model_configs = (
json.dumps(override_model_configs) if override_model_configs else None
)
conversation.mode = app_config.app_mode.value
conversation.name = "New conversation"
conversation.inputs = application_generate_entity.inputs
conversation.introduction = introduction
conversation.system_instruction = ""
conversation.system_instruction_tokens = 0
conversation.status = "normal"
conversation.invoke_from = application_generate_entity.invoke_from.value
conversation.from_source = from_source
conversation.from_end_user_id = end_user_id
conversation.from_account_id = account_id
session.add(conversation)
session.commit()
session.refresh(conversation)
else:
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
message = Message(
app_id=app_config.app_id,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
conversation_id=conversation.id,
inputs=application_generate_entity.inputs,
query=application_generate_entity.query or "",
message="",
message_tokens=0,
message_unit_price=0,
message_price_unit=0,
answer="",
answer_tokens=0,
answer_unit_price=0,
answer_price_unit=0,
parent_message_id=getattr(application_generate_entity, "parent_message_id", None),
provider_response_latency=0,
total_price=0,
currency="USD",
invoke_from=application_generate_entity.invoke_from.value,
from_source=from_source,
from_end_user_id=end_user_id,
from_account_id=account_id,
)
db.session.add(message)
db.session.commit()
db.session.refresh(message)
with db.Session(bind=db.engine, expire_on_commit=False) as session:
message = Message()
message.app_id = app_config.app_id
message.model_provider = model_provider
message.model_id = model_id
message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
message.conversation_id = conversation.id
message.inputs = application_generate_entity.inputs
message.query = application_generate_entity.query or ""
message.message = ""
message.message_tokens = 0
message.message_unit_price = 0
message.answer = ""
message.answer_tokens = 0
message.answer_unit_price = 0
message.answer_price_unit = 0
message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
message.provider_response_latency = 0
message.total_price = 0
message.currency = "USD"
message.invoke_from = application_generate_entity.invoke_from.value
message.from_source = from_source
message.from_end_user_id = end_user_id
message.from_account_id = account_id
session.add(message)
session.commit()
session.refresh(message)
for file in application_generate_entity.files:
message_file = MessageFile(

Loading…
Cancel
Save