diff --git a/api/.env.example b/api/.env.example index 695d8eab66..f327ff3cdf 100644 --- a/api/.env.example +++ b/api/.env.example @@ -433,4 +433,4 @@ MAX_SUBMIT_COUNT=100 LOGIN_LOCKOUT_DURATION=86400 # User memory generate task interval in minutes -USER_MEMORY_GENERATE_TASK_INTERVAL=5 \ No newline at end of file +USER_PROFILE_GENERATE_TASK_INTERVAL=5 \ No newline at end of file diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 5447a81826..0bea0620f4 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -695,7 +695,7 @@ class CeleryBeatConfig(BaseSettings): default=1, ) - USER_MEMORY_GENERATE_TASK_INTERVAL: int = Field( + USER_PROFILE_GENERATE_TASK_INTERVAL: int = Field( description="Interval in seconds for user memory generate task execution, default to 5 minutes", default=5, ) diff --git a/api/configs/school/__init__.py b/api/configs/school/__init__.py index 9dbfb33924..cf5e08dadd 100644 --- a/api/configs/school/__init__.py +++ b/api/configs/school/__init__.py @@ -12,7 +12,7 @@ class SchoolConfig(BaseSettings): default="", ) - NEED_MEMORY_GENERATION_APP_IDS: str = Field( + NEED_USER_PROFILE_GENERATION_APP_IDS: str = Field( description="Development app ids for school-level features.", default="", ) @@ -22,7 +22,12 @@ class SchoolConfig(BaseSettings): default="", ) - MEMORY_GENERATION_APP_ID: str = Field( + USER_MEMORY_GENERATION_APP_ID: str = Field( description="App id for memory generation.", default="", ) + + USER_HEALTH_SUMMARY_GENERATION_APP_ID: str = Field( + description="App id for health summary generation.", + default="", + ) diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 5d9e850a29..20926d5ea7 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -69,7 +69,7 @@ def init_app(app: DifyApp) -> Celery: "schedule.update_tidb_serverless_status_task", # "schedule.clean_messages", # "schedule.mail_clean_document_notify_task", - "schedule.user_memory_generate_task", + "schedule.user_profile_generate_task", ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME beat_schedule = { @@ -98,9 +98,9 @@ def init_app(app: DifyApp) -> Celery: # "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task", # "schedule": crontab(minute="0", hour="10", day_of_week="1"), # }, - "user_memory_generate_task": { - "task": "schedule.user_memory_generate_task.user_memory_generate_task", - "schedule": timedelta(minutes=dify_config.USER_MEMORY_GENERATE_TASK_INTERVAL), + "user_profile_generate_task": { + "task": "schedule.user_profile_generate_task.user_profile_generate_task", + "schedule": timedelta(minutes=dify_config.USER_PROFILE_GENERATE_TASK_INTERVAL), }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) diff --git a/api/models/model.py b/api/models/model.py index 21b3d86c4a..b3417bc1e4 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -1366,25 +1366,37 @@ class EndUser(UserMixin, db.Model): # type: ignore[name-defined] return None return self.extra_profile.get("major") - def update_summary(self, summary: str): + @memory.setter + def memory(self, value: str): self.extra_profile = { - "summary": summary, + "summary": self.summary, + "topics": self.topics, + "major": self.major, + "memory": value, + } + + @summary.setter + def summary(self, value: str): + self.extra_profile = { + "summary": value, "topics": self.topics, "major": self.major, } - def update_topics(self, topics: list[str]): + @topics.setter + def topics(self, value: list[str]): self.extra_profile = { "summary": self.summary, - "topics": topics, + "topics": value, "major": self.major, } - def update_major(self, major: str): + @major.setter + def major(self, value: str): self.extra_profile = { "summary": self.summary, "topics": self.topics, - "major": major, + "major": value, } diff --git a/api/schedule/user_memory_generate_task.py b/api/schedule/user_profile_generate_task.py similarity index 71% rename from api/schedule/user_memory_generate_task.py rename to api/schedule/user_profile_generate_task.py index d49efabb88..6bf3db2b12 100644 --- a/api/schedule/user_memory_generate_task.py +++ b/api/schedule/user_profile_generate_task.py @@ -13,20 +13,15 @@ from sqlalchemy import and_, asc, desc, func, or_ @app.celery.task(queue="dataset") -def user_memory_generate_task(): +def user_profile_generate_task(): """Generate or update user memory based on recent messages.""" click.echo(click.style("Starting user memory generate task.", fg="green")) start_at = time.perf_counter() - memory_app_id = dify_config.MEMORY_GENERATION_APP_ID - if memory_app_id == "": - click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow")) - return - app_ids = ( - dify_config.NEED_MEMORY_GENERATION_APP_IDS.split(",") - if dify_config.NEED_MEMORY_GENERATION_APP_IDS + dify_config.NEED_USER_PROFILE_GENERATION_APP_IDS.split(",") + if dify_config.NEED_USER_PROFILE_GENERATION_APP_IDS else [dify_config.DEFAULT_APP_ID] ) @@ -40,9 +35,11 @@ def user_memory_generate_task(): return for app_id in app_ids: - update_memory_for_appid(app_id, memory_app_id) + users_to_update = fetch_users_to_update(app_id) + update_user_profile_for_appid(users_to_update) end_at = time.perf_counter() + click.echo( click.style( f"Updated memory for app_id {app_ids} users memory. Latency: {end_at - start_at}", @@ -51,17 +48,14 @@ def user_memory_generate_task(): ) -def update_memory_for_appid(app_id: str, memory_app_id: str): +def update_user_profile_for_appid(users_to_update: list[EndUser]): """Update memory for a given app_id.""" - users_to_update = fetch_users_to_update(app_id) if users_to_update is None or len(users_to_update) == 0: click.echo(click.style("No users to update.", fg="green")) return - click.echo( - click.style(f"Found {len(users_to_update)} users who need memory updates. for app_id {app_id}", fg="green") - ) + click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green")) updated_users_count = 0 batch_size = 10 @@ -71,7 +65,10 @@ def update_memory_for_appid(app_id: str, memory_app_id: str): batch = users_to_update[i : i + batch_size] try: for user in batch: - process_user(user, memory_app_id, app_id) + new_messages, latest_messages_created_at = fetch_new_messages_for_user(user) + process_user_memory(user, new_messages) + process_user_health_summary(user, new_messages) + user.profile_updated_at = latest_messages_created_at updated_users_count += 1 # Commit after each batch @@ -82,7 +79,7 @@ def update_memory_for_appid(app_id: str, memory_app_id: str): db.session.rollback() -def fetch_users_to_update(app_id: str): +def fetch_users_to_update(app_id: str) -> list[EndUser]: """Fetch users to update memory for.""" latest_message_query = db.session.query( @@ -115,56 +112,51 @@ def fetch_users_to_update(app_id: str): return users -def process_user(user: EndUser, memory_app_id: str, app_id: str): - """Process a user to update memory.""" - click.echo(click.style(f"Updating memory for user id {user.id}", fg="green")) +def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]: + """Fetch new messages for a user.""" - memory_app_model = App.query.filter(App.id == memory_app_id).first() - if memory_app_model is None: - click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow")) - return - - # Get the latest messages efficiently message_query = db.session.query(Message).filter(Message.from_end_user_id == user.id) - - # Filter messages by app_id to ensure consistency - message_query = message_query.filter(Message.app_id == app_id) - - # Only include messages created after the last memory update - if user.memory_updated_at: - message_query = message_query.filter(Message.created_at > user.memory_updated_at) - - latest_messages = message_query.order_by(asc(Message.created_at)).all() - - # Skip if no messages found (unlikely due to our query, but just to be safe) - if not latest_messages: - click.echo(click.style(f"No messages found for user id {user.id}", fg="yellow")) - return - - click.echo(click.style(f"Found {len(latest_messages)} messages for user id {user.id}", fg="green")) + message_query = message_query.filter(Message.app_id == user.app_id) + if user.profile_updated_at: + message_query = message_query.filter(Message.created_at > user.profile_updated_at) + new_messages = message_query.order_by(asc(Message.created_at)).all() # Format messages for input - safely handle missing query attributes message_texts = [] - for msg in latest_messages: + for msg in new_messages: message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}\n") # If no valid messages remain, exit early if not message_texts: click.echo(click.style(f"No valid message content for user id {user.id}", fg="yellow")) + + return "\n".join(message_texts), new_messages[-1].created_at + + +def process_user_memory(user: EndUser, new_messages: str): + """Process a user to update memory.""" + click.echo(click.style(f"Updating memory for user id {user.id}", fg="green")) + + memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID + if memory_app_id == "": + click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow")) return - formatted_messages = "\n".join(message_texts) + memory_app_model = App.query.filter(App.id == memory_app_id).first() + if memory_app_model is None: + click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow")) + return # Set up arguments for memory generation args = { "inputs": { - "new_messages": formatted_messages, + "new_messages": new_messages, "current_memory": user.memory or "", } } # Call the memory generation service - click.echo(click.style(f"Start to generate memory for user {user.id} in app {app_id}", fg="green")) + click.echo(click.style(f"Start to generate memory for user {user.id}", fg="green")) response = AppGenerateService.generate( app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False ) @@ -178,7 +170,23 @@ def process_user(user: EndUser, memory_app_id: str, app_id: str): and "result" in response["data"]["outputs"] ): user.memory = response["data"]["outputs"]["result"] - user.memory_updated_at = latest_messages[-1].created_at - click.echo(click.style(f"Updated memory for user {user.id} in app {app_id}", fg="green")) + click.echo(click.style(f"Updated memory for user {user.id}", fg="green")) else: click.echo(click.style(f"Failed to update memory for user {user.id}, invalid response format", fg="yellow")) + + +def process_user_health_summary(user: EndUser, new_messages: str): + """Process a user to update health status.""" + click.echo(click.style(f"Updating health status for user id {user.id}", fg="green")) + + health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID + if health_summary_app_id == "": + click.echo( + click.style( + "No health_summary_app_id generation app_id provided, skipping health summary generation.", fg="yellow" + ) + ) + return + + # TODO: Implement health status generation + pass diff --git a/docker/.env.example b/docker/.env.example index fca4c46e5f..5a735c9c99 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -943,12 +943,10 @@ ALLOW_REGISTER=true DEFAULT_APP_ID= DEFAULT_TENANT_ID= DEBUG_CODE_FOR_LOGIN= -NEED_MEMORY_GENERATION_APP_IDS= +PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES=5 # User memory generate task interval in minutes -USER_MEMORY_GENERATE_TASK_INTERVAL=5 - -# Memory generation app id -MEMORY_GENERATION_APP_ID= - -PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES=5 \ No newline at end of file +NEED_USER_PROFILE_GENERATION_APP_IDS= +USER_PROFILE_GENERATE_TASK_INTERVAL=5 +USER_MEMORY_GENERATION_APP_ID= +USER_HEALTH_SUMMARY_GENERATION_APP_ID= diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 9520a041f2..e08d895472 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -396,10 +396,11 @@ x-shared-env: &shared-api-worker-env DEFAULT_APP_ID: ${DEFAULT_APP_ID:-} DEFAULT_TENANT_ID: ${DEFAULT_TENANT_ID:-} DEBUG_CODE_FOR_LOGIN: ${DEBUG_CODE_FOR_LOGIN:-} - NEED_MEMORY_GENERATION_APP_IDS: ${NEED_MEMORY_GENERATION_APP_IDS:-} - USER_MEMORY_GENERATE_TASK_INTERVAL: ${USER_MEMORY_GENERATE_TASK_INTERVAL:-5} - MEMORY_GENERATION_APP_ID: ${MEMORY_GENERATION_APP_ID:-} PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES: ${PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES:-5} + NEED_USER_PROFILE_GENERATION_APP_IDS: ${NEED_USER_PROFILE_GENERATION_APP_IDS:-} + USER_PROFILE_GENERATE_TASK_INTERVAL: ${USER_PROFILE_GENERATE_TASK_INTERVAL:-5} + USER_MEMORY_GENERATION_APP_ID: ${USER_MEMORY_GENERATION_APP_ID:-} + USER_HEALTH_SUMMARY_GENERATION_APP_ID: ${USER_HEALTH_SUMMARY_GENERATION_APP_ID:-} services: # API service