add basic user profile generation

pull/21891/head
ytqh 1 year ago
parent 9fd25f37dc
commit d7efd1511c

@ -433,4 +433,4 @@ MAX_SUBMIT_COUNT=100
LOGIN_LOCKOUT_DURATION=86400
# User memory generate task interval in minutes
USER_MEMORY_GENERATE_TASK_INTERVAL=5
USER_PROFILE_GENERATE_TASK_INTERVAL=5

@ -695,7 +695,7 @@ class CeleryBeatConfig(BaseSettings):
default=1,
)
USER_MEMORY_GENERATE_TASK_INTERVAL: int = Field(
USER_PROFILE_GENERATE_TASK_INTERVAL: int = Field(
description="Interval in seconds for user memory generate task execution, default to 5 minutes",
default=5,
)

@ -12,7 +12,7 @@ class SchoolConfig(BaseSettings):
default="",
)
NEED_MEMORY_GENERATION_APP_IDS: str = Field(
NEED_USER_PROFILE_GENERATION_APP_IDS: str = Field(
description="Development app ids for school-level features.",
default="",
)
@ -22,7 +22,12 @@ class SchoolConfig(BaseSettings):
default="",
)
MEMORY_GENERATION_APP_ID: str = Field(
USER_MEMORY_GENERATION_APP_ID: str = Field(
description="App id for memory generation.",
default="",
)
USER_HEALTH_SUMMARY_GENERATION_APP_ID: str = Field(
description="App id for health summary generation.",
default="",
)

@ -69,7 +69,7 @@ def init_app(app: DifyApp) -> Celery:
"schedule.update_tidb_serverless_status_task",
# "schedule.clean_messages",
# "schedule.mail_clean_document_notify_task",
"schedule.user_memory_generate_task",
"schedule.user_profile_generate_task",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
@ -98,9 +98,9 @@ def init_app(app: DifyApp) -> Celery:
# "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task",
# "schedule": crontab(minute="0", hour="10", day_of_week="1"),
# },
"user_memory_generate_task": {
"task": "schedule.user_memory_generate_task.user_memory_generate_task",
"schedule": timedelta(minutes=dify_config.USER_MEMORY_GENERATE_TASK_INTERVAL),
"user_profile_generate_task": {
"task": "schedule.user_profile_generate_task.user_profile_generate_task",
"schedule": timedelta(minutes=dify_config.USER_PROFILE_GENERATE_TASK_INTERVAL),
},
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

@ -1366,25 +1366,37 @@ class EndUser(UserMixin, db.Model): # type: ignore[name-defined]
return None
return self.extra_profile.get("major")
def update_summary(self, summary: str):
@memory.setter
def memory(self, value: str):
self.extra_profile = {
"summary": summary,
"summary": self.summary,
"topics": self.topics,
"major": self.major,
"memory": value,
}
@summary.setter
def summary(self, value: str):
self.extra_profile = {
"summary": value,
"topics": self.topics,
"major": self.major,
}
def update_topics(self, topics: list[str]):
@topics.setter
def topics(self, value: list[str]):
self.extra_profile = {
"summary": self.summary,
"topics": topics,
"topics": value,
"major": self.major,
}
def update_major(self, major: str):
@major.setter
def major(self, value: str):
self.extra_profile = {
"summary": self.summary,
"topics": self.topics,
"major": major,
"major": value,
}

@ -13,20 +13,15 @@ from sqlalchemy import and_, asc, desc, func, or_
@app.celery.task(queue="dataset")
def user_memory_generate_task():
def user_profile_generate_task():
"""Generate or update user memory based on recent messages."""
click.echo(click.style("Starting user memory generate task.", fg="green"))
start_at = time.perf_counter()
memory_app_id = dify_config.MEMORY_GENERATION_APP_ID
if memory_app_id == "":
click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow"))
return
app_ids = (
dify_config.NEED_MEMORY_GENERATION_APP_IDS.split(",")
if dify_config.NEED_MEMORY_GENERATION_APP_IDS
dify_config.NEED_USER_PROFILE_GENERATION_APP_IDS.split(",")
if dify_config.NEED_USER_PROFILE_GENERATION_APP_IDS
else [dify_config.DEFAULT_APP_ID]
)
@ -40,9 +35,11 @@ def user_memory_generate_task():
return
for app_id in app_ids:
update_memory_for_appid(app_id, memory_app_id)
users_to_update = fetch_users_to_update(app_id)
update_user_profile_for_appid(users_to_update)
end_at = time.perf_counter()
click.echo(
click.style(
f"Updated memory for app_id {app_ids} users memory. Latency: {end_at - start_at}",
@ -51,17 +48,14 @@ def user_memory_generate_task():
)
def update_memory_for_appid(app_id: str, memory_app_id: str):
def update_user_profile_for_appid(users_to_update: list[EndUser]):
"""Update memory for a given app_id."""
users_to_update = fetch_users_to_update(app_id)
if users_to_update is None or len(users_to_update) == 0:
click.echo(click.style("No users to update.", fg="green"))
return
click.echo(
click.style(f"Found {len(users_to_update)} users who need memory updates. for app_id {app_id}", fg="green")
)
click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green"))
updated_users_count = 0
batch_size = 10
@ -71,7 +65,10 @@ def update_memory_for_appid(app_id: str, memory_app_id: str):
batch = users_to_update[i : i + batch_size]
try:
for user in batch:
process_user(user, memory_app_id, app_id)
new_messages, latest_messages_created_at = fetch_new_messages_for_user(user)
process_user_memory(user, new_messages)
process_user_health_summary(user, new_messages)
user.profile_updated_at = latest_messages_created_at
updated_users_count += 1
# Commit after each batch
@ -82,7 +79,7 @@ def update_memory_for_appid(app_id: str, memory_app_id: str):
db.session.rollback()
def fetch_users_to_update(app_id: str):
def fetch_users_to_update(app_id: str) -> list[EndUser]:
"""Fetch users to update memory for."""
latest_message_query = db.session.query(
@ -115,56 +112,51 @@ def fetch_users_to_update(app_id: str):
return users
def process_user(user: EndUser, memory_app_id: str, app_id: str):
"""Process a user to update memory."""
click.echo(click.style(f"Updating memory for user id {user.id}", fg="green"))
def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]:
"""Fetch new messages for a user."""
memory_app_model = App.query.filter(App.id == memory_app_id).first()
if memory_app_model is None:
click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow"))
return
# Get the latest messages efficiently
message_query = db.session.query(Message).filter(Message.from_end_user_id == user.id)
# Filter messages by app_id to ensure consistency
message_query = message_query.filter(Message.app_id == app_id)
# Only include messages created after the last memory update
if user.memory_updated_at:
message_query = message_query.filter(Message.created_at > user.memory_updated_at)
latest_messages = message_query.order_by(asc(Message.created_at)).all()
# Skip if no messages found (unlikely due to our query, but just to be safe)
if not latest_messages:
click.echo(click.style(f"No messages found for user id {user.id}", fg="yellow"))
return
click.echo(click.style(f"Found {len(latest_messages)} messages for user id {user.id}", fg="green"))
message_query = message_query.filter(Message.app_id == user.app_id)
if user.profile_updated_at:
message_query = message_query.filter(Message.created_at > user.profile_updated_at)
new_messages = message_query.order_by(asc(Message.created_at)).all()
# Format messages for input - safely handle missing query attributes
message_texts = []
for msg in latest_messages:
for msg in new_messages:
message_texts.append(f"user: {msg.query}\nassistant: {msg.answer}\n")
# If no valid messages remain, exit early
if not message_texts:
click.echo(click.style(f"No valid message content for user id {user.id}", fg="yellow"))
return "\n".join(message_texts), new_messages[-1].created_at
def process_user_memory(user: EndUser, new_messages: str):
"""Process a user to update memory."""
click.echo(click.style(f"Updating memory for user id {user.id}", fg="green"))
memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID
if memory_app_id == "":
click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow"))
return
formatted_messages = "\n".join(message_texts)
memory_app_model = App.query.filter(App.id == memory_app_id).first()
if memory_app_model is None:
click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow"))
return
# Set up arguments for memory generation
args = {
"inputs": {
"new_messages": formatted_messages,
"new_messages": new_messages,
"current_memory": user.memory or "",
}
}
# Call the memory generation service
click.echo(click.style(f"Start to generate memory for user {user.id} in app {app_id}", fg="green"))
click.echo(click.style(f"Start to generate memory for user {user.id}", fg="green"))
response = AppGenerateService.generate(
app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False
)
@ -178,7 +170,23 @@ def process_user(user: EndUser, memory_app_id: str, app_id: str):
and "result" in response["data"]["outputs"]
):
user.memory = response["data"]["outputs"]["result"]
user.memory_updated_at = latest_messages[-1].created_at
click.echo(click.style(f"Updated memory for user {user.id} in app {app_id}", fg="green"))
click.echo(click.style(f"Updated memory for user {user.id}", fg="green"))
else:
click.echo(click.style(f"Failed to update memory for user {user.id}, invalid response format", fg="yellow"))
def process_user_health_summary(user: EndUser, new_messages: str):
"""Process a user to update health status."""
click.echo(click.style(f"Updating health status for user id {user.id}", fg="green"))
health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID
if health_summary_app_id == "":
click.echo(
click.style(
"No health_summary_app_id generation app_id provided, skipping health summary generation.", fg="yellow"
)
)
return
# TODO: Implement health status generation
pass

@ -943,12 +943,10 @@ ALLOW_REGISTER=true
DEFAULT_APP_ID=
DEFAULT_TENANT_ID=
DEBUG_CODE_FOR_LOGIN=
NEED_MEMORY_GENERATION_APP_IDS=
PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES=5
# User memory generate task interval in minutes
USER_MEMORY_GENERATE_TASK_INTERVAL=5
# Memory generation app id
MEMORY_GENERATION_APP_ID=
PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES=5
NEED_USER_PROFILE_GENERATION_APP_IDS=
USER_PROFILE_GENERATE_TASK_INTERVAL=5
USER_MEMORY_GENERATION_APP_ID=
USER_HEALTH_SUMMARY_GENERATION_APP_ID=

@ -396,10 +396,11 @@ x-shared-env: &shared-api-worker-env
DEFAULT_APP_ID: ${DEFAULT_APP_ID:-}
DEFAULT_TENANT_ID: ${DEFAULT_TENANT_ID:-}
DEBUG_CODE_FOR_LOGIN: ${DEBUG_CODE_FOR_LOGIN:-}
NEED_MEMORY_GENERATION_APP_IDS: ${NEED_MEMORY_GENERATION_APP_IDS:-}
USER_MEMORY_GENERATE_TASK_INTERVAL: ${USER_MEMORY_GENERATE_TASK_INTERVAL:-5}
MEMORY_GENERATION_APP_ID: ${MEMORY_GENERATION_APP_ID:-}
PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES: ${PHONE_CODE_LOGIN_TOKEN_EXPIRY_MINUTES:-5}
NEED_USER_PROFILE_GENERATION_APP_IDS: ${NEED_USER_PROFILE_GENERATION_APP_IDS:-}
USER_PROFILE_GENERATE_TASK_INTERVAL: ${USER_PROFILE_GENERATE_TASK_INTERVAL:-5}
USER_MEMORY_GENERATION_APP_ID: ${USER_MEMORY_GENERATION_APP_ID:-}
USER_HEALTH_SUMMARY_GENERATION_APP_ID: ${USER_HEALTH_SUMMARY_GENERATION_APP_ID:-}
services:
# API service

Loading…
Cancel
Save