diff --git a/api/schedule/user_profile_generate_task.py b/api/schedule/user_profile_generate_task.py index 6bf3db2b12..e466cb5fc6 100644 --- a/api/schedule/user_profile_generate_task.py +++ b/api/schedule/user_profile_generate_task.py @@ -1,3 +1,4 @@ +import json import time import uuid from datetime import datetime @@ -99,8 +100,8 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]: .filter( EndUser.app_id == app_id, or_( - EndUser.memory_updated_at.is_(None), - EndUser.memory_updated_at < latest_message_subquery.c.latest_message_time, + EndUser.profile_updated_at.is_(None), + EndUser.profile_updated_at < latest_message_subquery.c.latest_message_time, ), ) ) @@ -174,19 +175,73 @@ def process_user_memory(user: EndUser, new_messages: str): else: click.echo(click.style(f"Failed to update memory for user {user.id}, invalid response format", fg="yellow")) + try: + + if not isinstance(response, dict): + return + + import json + + result = response["data"]["outputs"]["result"] + user.memory = result + + click.echo(click.style(f"Updated memory for user {user.id}", fg="green")) + except Exception as e: + click.echo(click.style(f"Failed to update memory for user {user.id}, {str(e)}", fg="yellow")) + def process_user_health_summary(user: EndUser, new_messages: str): """Process a user to update health status.""" - click.echo(click.style(f"Updating health status for user id {user.id}", fg="green")) + click.echo(click.style(f"Updating health summary for user id {user.id}", fg="green")) health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID if health_summary_app_id == "": + click.echo(click.style("No health_summary_app_id provided, skipping health summary generation.", fg="yellow")) + return + + health_summary_app_model = App.query.filter(App.id == health_summary_app_id).first() + if health_summary_app_model is None: click.echo( - click.style( - "No health_summary_app_id generation app_id provided, skipping health summary generation.", fg="yellow" - ) + click.style(f"App not found for health summary generation app_id {health_summary_app_id}", fg="yellow") ) return - # TODO: Implement health status generation - pass + args = { + "inputs": { + "name": user.name, + "profile": user.extra_profile, + "new_messages": new_messages, + } + } + + click.echo(click.style(f"Start to generate health summary for user {user.id}", fg="green")) + response = AppGenerateService.generate( + app_model=health_summary_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False + ) + + try: + + if not isinstance(response, dict): + return + + import json + + result = response["data"]["outputs"]["result"] + result = json.loads(result) + if "health_status" in result: + user.health_status = result["health_status"] + + if "summary" in result: + user.summary = result["summary"] + + if "topics" in result and isinstance(result["topics"], list): + user.topics = result["topics"] + + click.echo(click.style(f"Updated health summary for user {user.id}", fg="green")) + + except json.JSONDecodeError: + click.echo(click.style(f"Failed to parse health summary for user {user.id}, invalid json format", fg="yellow")) + return + except Exception as e: + click.echo(click.style(f"Failed to update health summary for user {user.id}, {str(e)}", fg="yellow")) + return