From 1458341c2f96f89dcafea790e38f8eccae664cd2 Mon Sep 17 00:00:00 2001 From: ytqh Date: Sun, 9 Mar 2025 11:01:03 +0800 Subject: [PATCH] suppor multi memory gen --- api/schedule/user_memory_generate_task.py | 67 ++++++++++++++++------- docker/.env.example | 1 + docker/docker-compose.yaml | 1 + 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/api/schedule/user_memory_generate_task.py b/api/schedule/user_memory_generate_task.py index 493a9b51a8..cf47d51a76 100644 --- a/api/schedule/user_memory_generate_task.py +++ b/api/schedule/user_memory_generate_task.py @@ -1,4 +1,5 @@ import time +import uuid from datetime import datetime from typing import Optional @@ -18,24 +19,49 @@ def user_memory_generate_task(): click.echo(click.style("Starting user memory generate task.", fg="green")) start_at = time.perf_counter() - # Filter by app_id if provided - app_id = dify_config.DEFAULT_APP_ID - if app_id == "": - click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow")) - return - memory_app_id = dify_config.MEMORY_GENERATION_APP_ID if memory_app_id == "": click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow")) return + app_ids = ( + dify_config.NEED_MEMORY_GENERATION_APP_IDS.split(",") + if dify_config.NEED_MEMORY_GENERATION_APP_IDS + else [dify_config.DEFAULT_APP_ID] + ) + + if len(app_ids) == 0: + click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow")) + return + + for app_id in app_ids: + if len(app_id) == 0: + click.echo(click.style(f"Invalid app_id: {app_id}", fg="red")) + return + + for app_id in app_ids: + update_memory_for_appid(app_id, memory_app_id) + + end_at = time.perf_counter() + click.echo( + click.style( + f"Updated memory for app_id {app_ids} users memory. Latency: {end_at - start_at}", + fg="green", + ) + ) + + +def update_memory_for_appid(app_id: str, memory_app_id: str): + """Update memory for a given app_id.""" users_to_update = fetch_users_to_update(app_id) if users_to_update is None or len(users_to_update) == 0: click.echo(click.style("No users to update.", fg="green")) return - click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green")) + click.echo( + click.style(f"Found {len(users_to_update)} users who need memory updates. for app_id {app_id}", fg="green") + ) updated_users_count = 0 batch_size = 10 @@ -45,24 +71,24 @@ def user_memory_generate_task(): batch = users_to_update[i : i + batch_size] try: - for user in batch: - process_user(user, memory_app_id, app_id) - updated_users_count += 1 + # Process users in parallel + from concurrent.futures import ThreadPoolExecutor + + with ThreadPoolExecutor(max_workers=min(len(batch), 5)) as executor: + futures = [executor.submit(process_user, user, memory_app_id, app_id) for user in batch] + # Wait for all futures to complete + for future in futures: + future.result() # This will raise any exceptions that occurred + + updated_users_count += len(batch) # Commit after each batch db.session.commit() except Exception as e: - click.echo(click.style(f"Error updating memory for user {user.id}: {str(e)}", fg="red")) + user_ids = [user.id for user in batch] + click.echo(click.style(f"Error updating memory for user {user_ids}: {str(e)}", fg="red")) db.session.rollback() - end_at = time.perf_counter() - click.echo( - click.style( - f"Updated memory for {updated_users_count} users memory. Latency: {end_at - start_at}", - fg="green", - ) - ) - def fetch_users_to_update(app_id: str): """Fetch users to update memory for.""" @@ -128,8 +154,7 @@ def process_user(user: EndUser, memory_app_id: str, app_id: str): # Format messages for input - safely handle missing query attributes message_texts = [] for msg in latest_messages: - click.echo(click.style(f"Message: {msg.query} & {msg.answer}", fg="green")) - message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}") + message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}\n") # If no valid messages remain, exit early if not message_texts: diff --git a/docker/.env.example b/docker/.env.example index d7ba26e476..14370f9ecb 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -943,6 +943,7 @@ ALLOW_REGISTER=true DEFAULT_APP_ID= DEFAULT_TENANT_ID= DEBUG_EMAIL_CODE_FOR_LOGIN= +NEED_MEMORY_GENERATION_APP_IDS= # User memory generate task interval in minutes USER_MEMORY_GENERATE_TASK_INTERVAL=5 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 8c958a2c7b..56fedd0a07 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -396,6 +396,7 @@ x-shared-env: &shared-api-worker-env DEFAULT_APP_ID: ${DEFAULT_APP_ID:-} DEFAULT_TENANT_ID: ${DEFAULT_TENANT_ID:-} DEBUG_EMAIL_CODE_FOR_LOGIN: ${DEBUG_EMAIL_CODE_FOR_LOGIN:-} + NEED_MEMORY_GENERATION_APP_IDS: ${NEED_MEMORY_GENERATION_APP_IDS:-} USER_MEMORY_GENERATE_TASK_INTERVAL: ${USER_MEMORY_GENERATE_TASK_INTERVAL:-5} MEMORY_GENERATION_APP_ID: ${MEMORY_GENERATION_APP_ID:-}