suppor multi memory gen

pull/21891/head
ytqh 1 year ago
parent 681f4ac7a8
commit 1458341c2f

@ -1,4 +1,5 @@
import time
import uuid
from datetime import datetime
from typing import Optional
@ -18,24 +19,49 @@ def user_memory_generate_task():
click.echo(click.style("Starting user memory generate task.", fg="green"))
start_at = time.perf_counter()
# Filter by app_id if provided
app_id = dify_config.DEFAULT_APP_ID
if app_id == "":
click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow"))
return
memory_app_id = dify_config.MEMORY_GENERATION_APP_ID
if memory_app_id == "":
click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow"))
return
app_ids = (
dify_config.NEED_MEMORY_GENERATION_APP_IDS.split(",")
if dify_config.NEED_MEMORY_GENERATION_APP_IDS
else [dify_config.DEFAULT_APP_ID]
)
if len(app_ids) == 0:
click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow"))
return
for app_id in app_ids:
if len(app_id) == 0:
click.echo(click.style(f"Invalid app_id: {app_id}", fg="red"))
return
for app_id in app_ids:
update_memory_for_appid(app_id, memory_app_id)
end_at = time.perf_counter()
click.echo(
click.style(
f"Updated memory for app_id {app_ids} users memory. Latency: {end_at - start_at}",
fg="green",
)
)
def update_memory_for_appid(app_id: str, memory_app_id: str):
"""Update memory for a given app_id."""
users_to_update = fetch_users_to_update(app_id)
if users_to_update is None or len(users_to_update) == 0:
click.echo(click.style("No users to update.", fg="green"))
return
click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green"))
click.echo(
click.style(f"Found {len(users_to_update)} users who need memory updates. for app_id {app_id}", fg="green")
)
updated_users_count = 0
batch_size = 10
@ -45,24 +71,24 @@ def user_memory_generate_task():
batch = users_to_update[i : i + batch_size]
try:
for user in batch:
process_user(user, memory_app_id, app_id)
updated_users_count += 1
# Process users in parallel
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(batch), 5)) as executor:
futures = [executor.submit(process_user, user, memory_app_id, app_id) for user in batch]
# Wait for all futures to complete
for future in futures:
future.result() # This will raise any exceptions that occurred
updated_users_count += len(batch)
# Commit after each batch
db.session.commit()
except Exception as e:
click.echo(click.style(f"Error updating memory for user {user.id}: {str(e)}", fg="red"))
user_ids = [user.id for user in batch]
click.echo(click.style(f"Error updating memory for user {user_ids}: {str(e)}", fg="red"))
db.session.rollback()
end_at = time.perf_counter()
click.echo(
click.style(
f"Updated memory for {updated_users_count} users memory. Latency: {end_at - start_at}",
fg="green",
)
)
def fetch_users_to_update(app_id: str):
"""Fetch users to update memory for."""
@ -128,8 +154,7 @@ def process_user(user: EndUser, memory_app_id: str, app_id: str):
# Format messages for input - safely handle missing query attributes
message_texts = []
for msg in latest_messages:
click.echo(click.style(f"Message: {msg.query} & {msg.answer}", fg="green"))
message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}")
message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}\n")
# If no valid messages remain, exit early
if not message_texts:

@ -943,6 +943,7 @@ ALLOW_REGISTER=true
DEFAULT_APP_ID=
DEFAULT_TENANT_ID=
DEBUG_EMAIL_CODE_FOR_LOGIN=
NEED_MEMORY_GENERATION_APP_IDS=
# User memory generate task interval in minutes
USER_MEMORY_GENERATE_TASK_INTERVAL=5

@ -396,6 +396,7 @@ x-shared-env: &shared-api-worker-env
DEFAULT_APP_ID: ${DEFAULT_APP_ID:-}
DEFAULT_TENANT_ID: ${DEFAULT_TENANT_ID:-}
DEBUG_EMAIL_CODE_FOR_LOGIN: ${DEBUG_EMAIL_CODE_FOR_LOGIN:-}
NEED_MEMORY_GENERATION_APP_IDS: ${NEED_MEMORY_GENERATION_APP_IDS:-}
USER_MEMORY_GENERATE_TASK_INTERVAL: ${USER_MEMORY_GENERATE_TASK_INTERVAL:-5}
MEMORY_GENERATION_APP_ID: ${MEMORY_GENERATION_APP_ID:-}

Loading…
Cancel
Save