From fbd23d364fd3b07c79210914ca162e194f919c48 Mon Sep 17 00:00:00 2001 From: ytqh Date: Sat, 8 Mar 2025 22:32:37 +0800 Subject: [PATCH] impl memory --- api/configs/school/__init__.py | 5 + api/schedule/user_memory_generate_task.py | 163 +++++++++++++++++++--- 2 files changed, 150 insertions(+), 18 deletions(-) diff --git a/api/configs/school/__init__.py b/api/configs/school/__init__.py index af60fd5166..23a3435ebd 100644 --- a/api/configs/school/__init__.py +++ b/api/configs/school/__init__.py @@ -16,3 +16,8 @@ class SchoolConfig(BaseSettings): description="Default tenant id for school-level features.", default="5cd3029e-7f92-428a-a5c8-14a790c70233", ) + + MEMORY_GENERATION_APP_ID: str = Field( + description="App id for memory generation.", + default="b278ba96-fa8e-48a8-b3e9-debe34468be0", + ) diff --git a/api/schedule/user_memory_generate_task.py b/api/schedule/user_memory_generate_task.py index bb3d63d286..493a9b51a8 100644 --- a/api/schedule/user_memory_generate_task.py +++ b/api/schedule/user_memory_generate_task.py @@ -1,43 +1,170 @@ import time +from datetime import datetime +from typing import Optional import app import click +from configs import dify_config from core.app.entities.app_invoke_entities import InvokeFrom -from models.model import App, EndUser +from models.model import App, EndUser, Message, db from services.app_generate_service import AppGenerateService +from sqlalchemy import and_, asc, desc, func, or_ @app.celery.task(queue="dataset") def user_memory_generate_task(): + """Generate or update user memory based on recent messages.""" + click.echo(click.style("Starting user memory generate task.", fg="green")) start_at = time.perf_counter() - # Your task logic here - # ... - click.echo(click.style("TODO: ytqh user memory generate task.", fg="green")) + # Filter by app_id if provided + app_id = dify_config.DEFAULT_APP_ID + if app_id == "": + click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow")) + return + + memory_app_id = dify_config.MEMORY_GENERATION_APP_ID + if memory_app_id == "": + click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow")) + return + + users_to_update = fetch_users_to_update(app_id) + + if users_to_update is None or len(users_to_update) == 0: + click.echo(click.style("No users to update.", fg="green")) + return + + click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green")) - # TODO: ytqh call user memory workflow api - ## API KEY: app-4UVXs1hBgEWxzp5Zvh5FINbv - ## TASK ID: 760d9bef-20ee-4fa3-8650-ca300e292b32 + updated_users_count = 0 + batch_size = 10 - # get app model by using TASK ID - app_model = App.query.filter(App.id == "760d9bef-20ee-4fa3-8650-ca300e292b32").first() - # get end user by using app model - end_user = EndUser.query.filter(EndUser.external_user_id == "81f30432-fa3f-4279-a6c9-8b61c874699a").first() + # Process in batches to avoid memory issues + for i in range(0, len(users_to_update), batch_size): + batch = users_to_update[i : i + batch_size] + + try: + for user in batch: + process_user(user, memory_app_id, app_id) + updated_users_count += 1 + + # Commit after each batch + db.session.commit() + except Exception as e: + click.echo(click.style(f"Error updating memory for user {user.id}: {str(e)}", fg="red")) + db.session.rollback() + + end_at = time.perf_counter() + click.echo( + click.style( + f"Updated memory for {updated_users_count} users memory. Latency: {end_at - start_at}", + fg="green", + ) + ) + + +def fetch_users_to_update(app_id: str): + """Fetch users to update memory for.""" + + latest_message_query = db.session.query( + Message.from_end_user_id, func.max(Message.created_at).label('latest_message_time') + ) + + latest_message_query = latest_message_query.filter(Message.app_id == app_id) + latest_message_subquery = latest_message_query.group_by(Message.from_end_user_id).subquery() + + # Then join with EndUser to find users who need memory updates + users_query = ( + db.session.query(EndUser, latest_message_subquery.c.latest_message_time) + .join( + latest_message_subquery, + EndUser.id == latest_message_subquery.c.from_end_user_id, + ) + .filter( + EndUser.app_id == app_id, + or_( + EndUser.memory_updated_at.is_(None), + EndUser.memory_updated_at < latest_message_subquery.c.latest_message_time, + ), + ) + ) + results = users_query.all() + # Extract users from the query results (each result is a tuple of (EndUser, latest_message_time)) + users = [result[0] for result in results] + + return users + + +def process_user(user: EndUser, memory_app_id: str, app_id: str): + """Process a user to update memory.""" + click.echo(click.style(f"Updating memory for user id {user.id}", fg="green")) + + memory_app_model = App.query.filter(App.id == memory_app_id).first() + if memory_app_model is None: + click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow")) + return + + # Get the latest messages efficiently + message_query = db.session.query(Message).filter(Message.from_end_user_id == user.id) + + # Filter messages by app_id to ensure consistency + message_query = message_query.filter(Message.app_id == app_id) + + # Only include messages created after the last memory update + if user.memory_updated_at: + message_query = message_query.filter(Message.created_at > user.memory_updated_at) + + latest_messages = message_query.order_by(asc(Message.created_at)).limit(10).all() + + # Skip if no messages found (unlikely due to our query, but just to be safe) + if not latest_messages: + click.echo(click.style(f"No messages found for user id {user.id}", fg="yellow")) + return + + click.echo(click.style(f"Found {len(latest_messages)} messages for user id {user.id}", fg="green")) + + # Format messages for input - safely handle missing query attributes + message_texts = [] + for msg in latest_messages: + click.echo(click.style(f"Message: {msg.query} & {msg.answer}", fg="green")) + message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}") + + # If no valid messages remain, exit early + if not message_texts: + click.echo(click.style(f"No valid message content for user id {user.id}", fg="yellow")) + return + + formatted_messages = "\n".join(message_texts) + + # Set up arguments for memory generation args = { "inputs": { - "new_messages": "sssss", - "current_memory": "sssss", + "new_messages": formatted_messages, + "current_memory": user.memory or "", } } + click.echo(click.style(f"Args: {args}", fg="green")) + + # Call the memory generation service response = AppGenerateService.generate( - app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False + app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False ) - # save response to db - click.echo(click.style(f"TODO: ytqh save response {response} to db.", fg="green")) + click.echo(click.style(f"Response: {response}, type: {type(response)}", fg="green")) - end_at = time.perf_counter() - click.echo(click.style(f"Task completed successfully. Latency: {end_at - start_at}", fg="green")) + # Save the updated memory to the user + if ( + response + and isinstance(response, dict) + and "data" in response + and "outputs" in response["data"] + and "result" in response["data"]["outputs"] + ): + user.memory = response["data"]["outputs"]["result"] + user.memory_updated_at = latest_messages[-1].created_at + click.echo(click.style(f"Updated memory for user {user.id}", fg="green")) + else: + click.echo(click.style(f"Failed to update memory for user {user.id}, invalid response", fg="yellow"))