From 942161081ef102cc6b2a3a3b39dc95e87bc48726 Mon Sep 17 00:00:00 2001 From: ytqh Date: Fri, 9 May 2025 21:08:45 +0800 Subject: [PATCH] feat: fix failed profile update but commit to db change --- api/schedule/user_profile_generate_task.py | 153 +++++++++++++-------- 1 file changed, 98 insertions(+), 55 deletions(-) diff --git a/api/schedule/user_profile_generate_task.py b/api/schedule/user_profile_generate_task.py index ea98c06cf7..71210e2947 100644 --- a/api/schedule/user_profile_generate_task.py +++ b/api/schedule/user_profile_generate_task.py @@ -1,14 +1,13 @@ import time from datetime import datetime -import click -from sqlalchemy import asc, func, or_ - import app +import click from configs import dify_config from core.app.entities.app_invoke_entities import InvokeFrom from models.model import App, EndUser, Message, db from services.app_generate_service import AppGenerateService +from sqlalchemy import asc, func, or_ @app.celery.task(queue="dataset") @@ -25,7 +24,9 @@ def user_profile_generate_task(): ) if len(app_ids) == 0: - click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow")) + click.echo( + click.style("No app_id provided, skipping memory generation.", fg="yellow") + ) return for app_id in app_ids: @@ -37,12 +38,15 @@ def user_profile_generate_task(): users_to_update = fetch_users_to_update(app_id) if users_to_update is None or len(users_to_update) == 0: - click.echo(click.style(f"No users to update. for app_id {app_id}", fg="green")) + click.echo( + click.style(f"No users to update. for app_id {app_id}", fg="green") + ) continue click.echo( click.style( - f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}", fg="green" + f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}", + fg="green", ) ) update_user_profile_for_appid(users_to_update) @@ -68,7 +72,9 @@ def update_user_profile_for_appid(users_to_update: list[EndUser]): batch = users_to_update[i : i + batch_size] try: for user in batch: - new_messages, latest_messages_created_at = fetch_new_messages_for_user(user) + new_messages, latest_messages_created_at = fetch_new_messages_for_user( + user + ) process_user_memory(user, new_messages) process_user_health_summary(user, new_messages) user.profile_updated_at = latest_messages_created_at @@ -78,7 +84,11 @@ def update_user_profile_for_appid(users_to_update: list[EndUser]): db.session.commit() except Exception as e: user_ids = [user.id for user in batch] - click.echo(click.style(f"Error updating memory for user {user_ids}: {str(e)}", fg="red")) + click.echo( + click.style( + f"Error updating memory for user {user_ids}: {str(e)}", fg="red" + ) + ) db.session.rollback() @@ -86,11 +96,14 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]: """Fetch users to update memory for.""" latest_message_query = db.session.query( - Message.from_end_user_id, func.max(Message.created_at).label('latest_message_time') + Message.from_end_user_id, + func.max(Message.created_at).label("latest_message_time"), ) latest_message_query = latest_message_query.filter(Message.app_id == app_id) - latest_message_subquery = latest_message_query.group_by(Message.from_end_user_id).subquery() + latest_message_subquery = latest_message_query.group_by( + Message.from_end_user_id + ).subquery() # Then join with EndUser to find users who need memory updates users_query = ( @@ -103,7 +116,8 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]: EndUser.app_id == app_id, or_( EndUser.profile_updated_at.is_(None), - EndUser.profile_updated_at < latest_message_subquery.c.latest_message_time, + EndUser.profile_updated_at + < latest_message_subquery.c.latest_message_time, ), ) ) @@ -118,10 +132,14 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]: def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]: """Fetch new messages for a user.""" - message_query = db.session.query(Message).filter(Message.from_end_user_id == user.id) + message_query = db.session.query(Message).filter( + Message.from_end_user_id == user.id + ) message_query = message_query.filter(Message.app_id == user.app_id) if user.profile_updated_at: - message_query = message_query.filter(Message.created_at > user.profile_updated_at) + message_query = message_query.filter( + Message.created_at > user.profile_updated_at + ) new_messages = message_query.order_by(asc(Message.created_at)).all() # Format messages for input - safely handle missing query attributes @@ -131,7 +149,9 @@ def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]: # If no valid messages remain, exit early if not message_texts: - click.echo(click.style(f"No valid message content for user id {user.id}", fg="yellow")) + click.echo( + click.style(f"No valid message content for user id {user.id}", fg="yellow") + ) return "\n".join(message_texts), new_messages[-1].created_at @@ -142,12 +162,22 @@ def process_user_memory(user: EndUser, new_messages: str): memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID if memory_app_id == "": - click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow")) + click.echo( + click.style( + "No memory generation app_id provided, skipping memory generation.", + fg="yellow", + ) + ) return memory_app_model = db.session.query(App).filter(App.id == memory_app_id).first() if memory_app_model is None: - click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow")) + click.echo( + click.style( + f"App not found for memory generation app_id {memory_app_id}", + fg="yellow", + ) + ) return # Set up arguments for memory generation @@ -165,7 +195,11 @@ def process_user_memory(user: EndUser, new_messages: str): try: response = AppGenerateService.generate( - app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False + app_model=memory_app_model, + user=user, + args=args, + invoke_from=InvokeFrom.SCHEDULER, + streaming=False, ) click.echo(response) @@ -178,22 +212,38 @@ def process_user_memory(user: EndUser, new_messages: str): click.echo(click.style(f"Updated memory for user {user.id}", fg="green")) except Exception as e: - click.echo(click.style(f"Failed to update memory for user {user.id}, {str(e)}", fg="yellow")) + click.echo( + click.style( + f"Failed to update memory for user {user.id}, {str(e)}", fg="yellow" + ) + ) def process_user_health_summary(user: EndUser, new_messages: str): """Process a user to update health status.""" - click.echo(click.style(f"Updating health summary for user id {user.id}", fg="green")) + click.echo( + click.style(f"Updating health summary for user id {user.id}", fg="green") + ) health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID if health_summary_app_id == "": - click.echo(click.style("No health_summary_app_id provided, skipping health summary generation.", fg="yellow")) + click.echo( + click.style( + "No health_summary_app_id provided, skipping health summary generation.", + fg="yellow", + ) + ) return - health_summary_app_model = db.session.query(App).filter(App.id == health_summary_app_id).first() + health_summary_app_model = ( + db.session.query(App).filter(App.id == health_summary_app_id).first() + ) if health_summary_app_model is None: click.echo( - click.style(f"App not found for health summary generation app_id {health_summary_app_id}", fg="yellow") + click.style( + f"App not found for health summary generation app_id {health_summary_app_id}", + fg="yellow", + ) ) return @@ -205,46 +255,39 @@ def process_user_health_summary(user: EndUser, new_messages: str): } } - click.echo(click.style(f"Start to generate health summary for user {user.id}", fg="green")) - - try: - - response = AppGenerateService.generate( - app_model=health_summary_app_model, - user=user, - args=args, - invoke_from=InvokeFrom.SCHEDULER, - streaming=False, - ) + click.echo( + click.style(f"Start to generate health summary for user {user.id}", fg="green") + ) - click.echo(response) + response = AppGenerateService.generate( + app_model=health_summary_app_model, + user=user, + args=args, + invoke_from=InvokeFrom.SCHEDULER, + streaming=False, + ) - if not isinstance(response, dict): - return + click.echo(response) - import json + if not isinstance(response, dict): + return - result = response["data"]["outputs"]["result"] + import json - # preprocess result in case of ```json xxxx``` - if result.startswith("```json") or result.endswith("```"): - result = result.strip("```json").strip("```") + result = response["data"]["outputs"]["result"] - result = json.loads(result) - if "health_status" in result: - user.health_status = result["health_status"] + # preprocess result in case of ```json xxxx``` + if result.startswith("```json") or result.endswith("```"): + result = result.strip("```json").strip("```") - if "summary" in result: - user.summary = result["summary"] + result = json.loads(result) + if "health_status" in result: + user.health_status = result["health_status"] - if "topics" in result and isinstance(result["topics"], list): - user.topics = result["topics"] + if "summary" in result: + user.summary = result["summary"] - click.echo(click.style(f"Updated health summary for user {user.id}", fg="green")) + if "topics" in result and isinstance(result["topics"], list): + user.topics = result["topics"] - except json.JSONDecodeError: - click.echo(click.style(f"Failed to parse health summary for user {user.id}, invalid json format", fg="yellow")) - return - except Exception as e: - click.echo(click.style(f"Failed to update health summary for user {user.id}, {str(e)}", fg="yellow")) - return + click.echo(click.style(f"Updated health summary for user {user.id}", fg="green"))