|
|
|
@ -1,14 +1,13 @@
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
from datetime import datetime
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
|
|
|
from sqlalchemy import asc, func, or_
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import app
|
|
|
|
import app
|
|
|
|
|
|
|
|
import click
|
|
|
|
from configs import dify_config
|
|
|
|
from configs import dify_config
|
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
|
|
from models.model import App, EndUser, Message, db
|
|
|
|
from models.model import App, EndUser, Message, db
|
|
|
|
from services.app_generate_service import AppGenerateService
|
|
|
|
from services.app_generate_service import AppGenerateService
|
|
|
|
|
|
|
|
from sqlalchemy import asc, func, or_
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.celery.task(queue="dataset")
|
|
|
|
@app.celery.task(queue="dataset")
|
|
|
|
@ -25,7 +24,9 @@ def user_profile_generate_task():
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if len(app_ids) == 0:
|
|
|
|
if len(app_ids) == 0:
|
|
|
|
click.echo(click.style("No app_id provided, skipping memory generation.", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style("No app_id provided, skipping memory generation.", fg="yellow")
|
|
|
|
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
for app_id in app_ids:
|
|
|
|
for app_id in app_ids:
|
|
|
|
@ -37,12 +38,15 @@ def user_profile_generate_task():
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
|
|
|
|
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
click.echo(click.style(f"No users to update. for app_id {app_id}", fg="green"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(f"No users to update. for app_id {app_id}", fg="green")
|
|
|
|
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
click.echo(
|
|
|
|
click.style(
|
|
|
|
click.style(
|
|
|
|
f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}", fg="green"
|
|
|
|
f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}",
|
|
|
|
|
|
|
|
fg="green",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
@ -68,7 +72,9 @@ def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
batch = users_to_update[i : i + batch_size]
|
|
|
|
batch = users_to_update[i : i + batch_size]
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
for user in batch:
|
|
|
|
for user in batch:
|
|
|
|
new_messages, latest_messages_created_at = fetch_new_messages_for_user(user)
|
|
|
|
new_messages, latest_messages_created_at = fetch_new_messages_for_user(
|
|
|
|
|
|
|
|
user
|
|
|
|
|
|
|
|
)
|
|
|
|
process_user_memory(user, new_messages)
|
|
|
|
process_user_memory(user, new_messages)
|
|
|
|
process_user_health_summary(user, new_messages)
|
|
|
|
process_user_health_summary(user, new_messages)
|
|
|
|
user.profile_updated_at = latest_messages_created_at
|
|
|
|
user.profile_updated_at = latest_messages_created_at
|
|
|
|
@ -78,7 +84,11 @@ def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
db.session.commit()
|
|
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
user_ids = [user.id for user in batch]
|
|
|
|
user_ids = [user.id for user in batch]
|
|
|
|
click.echo(click.style(f"Error updating memory for user {user_ids}: {str(e)}", fg="red"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"Error updating memory for user {user_ids}: {str(e)}", fg="red"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
db.session.rollback()
|
|
|
|
db.session.rollback()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -86,11 +96,14 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]:
|
|
|
|
"""Fetch users to update memory for."""
|
|
|
|
"""Fetch users to update memory for."""
|
|
|
|
|
|
|
|
|
|
|
|
latest_message_query = db.session.query(
|
|
|
|
latest_message_query = db.session.query(
|
|
|
|
Message.from_end_user_id, func.max(Message.created_at).label('latest_message_time')
|
|
|
|
Message.from_end_user_id,
|
|
|
|
|
|
|
|
func.max(Message.created_at).label("latest_message_time"),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
latest_message_query = latest_message_query.filter(Message.app_id == app_id)
|
|
|
|
latest_message_query = latest_message_query.filter(Message.app_id == app_id)
|
|
|
|
latest_message_subquery = latest_message_query.group_by(Message.from_end_user_id).subquery()
|
|
|
|
latest_message_subquery = latest_message_query.group_by(
|
|
|
|
|
|
|
|
Message.from_end_user_id
|
|
|
|
|
|
|
|
).subquery()
|
|
|
|
|
|
|
|
|
|
|
|
# Then join with EndUser to find users who need memory updates
|
|
|
|
# Then join with EndUser to find users who need memory updates
|
|
|
|
users_query = (
|
|
|
|
users_query = (
|
|
|
|
@ -103,7 +116,8 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]:
|
|
|
|
EndUser.app_id == app_id,
|
|
|
|
EndUser.app_id == app_id,
|
|
|
|
or_(
|
|
|
|
or_(
|
|
|
|
EndUser.profile_updated_at.is_(None),
|
|
|
|
EndUser.profile_updated_at.is_(None),
|
|
|
|
EndUser.profile_updated_at < latest_message_subquery.c.latest_message_time,
|
|
|
|
EndUser.profile_updated_at
|
|
|
|
|
|
|
|
< latest_message_subquery.c.latest_message_time,
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
@ -118,10 +132,14 @@ def fetch_users_to_update(app_id: str) -> list[EndUser]:
|
|
|
|
def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]:
|
|
|
|
def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]:
|
|
|
|
"""Fetch new messages for a user."""
|
|
|
|
"""Fetch new messages for a user."""
|
|
|
|
|
|
|
|
|
|
|
|
message_query = db.session.query(Message).filter(Message.from_end_user_id == user.id)
|
|
|
|
message_query = db.session.query(Message).filter(
|
|
|
|
|
|
|
|
Message.from_end_user_id == user.id
|
|
|
|
|
|
|
|
)
|
|
|
|
message_query = message_query.filter(Message.app_id == user.app_id)
|
|
|
|
message_query = message_query.filter(Message.app_id == user.app_id)
|
|
|
|
if user.profile_updated_at:
|
|
|
|
if user.profile_updated_at:
|
|
|
|
message_query = message_query.filter(Message.created_at > user.profile_updated_at)
|
|
|
|
message_query = message_query.filter(
|
|
|
|
|
|
|
|
Message.created_at > user.profile_updated_at
|
|
|
|
|
|
|
|
)
|
|
|
|
new_messages = message_query.order_by(asc(Message.created_at)).all()
|
|
|
|
new_messages = message_query.order_by(asc(Message.created_at)).all()
|
|
|
|
|
|
|
|
|
|
|
|
# Format messages for input - safely handle missing query attributes
|
|
|
|
# Format messages for input - safely handle missing query attributes
|
|
|
|
@ -131,7 +149,9 @@ def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]:
|
|
|
|
|
|
|
|
|
|
|
|
# If no valid messages remain, exit early
|
|
|
|
# If no valid messages remain, exit early
|
|
|
|
if not message_texts:
|
|
|
|
if not message_texts:
|
|
|
|
click.echo(click.style(f"No valid message content for user id {user.id}", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(f"No valid message content for user id {user.id}", fg="yellow")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return "\n".join(message_texts), new_messages[-1].created_at
|
|
|
|
return "\n".join(message_texts), new_messages[-1].created_at
|
|
|
|
|
|
|
|
|
|
|
|
@ -142,12 +162,22 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
|
|
|
|
|
|
|
|
memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID
|
|
|
|
memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID
|
|
|
|
if memory_app_id == "":
|
|
|
|
if memory_app_id == "":
|
|
|
|
click.echo(click.style("No memory generation app_id provided, skipping memory generation.", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
"No memory generation app_id provided, skipping memory generation.",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
memory_app_model = db.session.query(App).filter(App.id == memory_app_id).first()
|
|
|
|
memory_app_model = db.session.query(App).filter(App.id == memory_app_id).first()
|
|
|
|
if memory_app_model is None:
|
|
|
|
if memory_app_model is None:
|
|
|
|
click.echo(click.style(f"App not found for memory generation app_id {memory_app_id}", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"App not found for memory generation app_id {memory_app_id}",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Set up arguments for memory generation
|
|
|
|
# Set up arguments for memory generation
|
|
|
|
@ -165,7 +195,11 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False
|
|
|
|
app_model=memory_app_model,
|
|
|
|
|
|
|
|
user=user,
|
|
|
|
|
|
|
|
args=args,
|
|
|
|
|
|
|
|
invoke_from=InvokeFrom.SCHEDULER,
|
|
|
|
|
|
|
|
streaming=False,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(response)
|
|
|
|
click.echo(response)
|
|
|
|
@ -178,22 +212,38 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Updated memory for user {user.id}", fg="green"))
|
|
|
|
click.echo(click.style(f"Updated memory for user {user.id}", fg="green"))
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
click.echo(click.style(f"Failed to update memory for user {user.id}, {str(e)}", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"Failed to update memory for user {user.id}, {str(e)}", fg="yellow"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
"""Process a user to update health status."""
|
|
|
|
"""Process a user to update health status."""
|
|
|
|
click.echo(click.style(f"Updating health summary for user id {user.id}", fg="green"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(f"Updating health summary for user id {user.id}", fg="green")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID
|
|
|
|
health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID
|
|
|
|
if health_summary_app_id == "":
|
|
|
|
if health_summary_app_id == "":
|
|
|
|
click.echo(click.style("No health_summary_app_id provided, skipping health summary generation.", fg="yellow"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
"No health_summary_app_id provided, skipping health summary generation.",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
health_summary_app_model = db.session.query(App).filter(App.id == health_summary_app_id).first()
|
|
|
|
health_summary_app_model = (
|
|
|
|
|
|
|
|
db.session.query(App).filter(App.id == health_summary_app_id).first()
|
|
|
|
|
|
|
|
)
|
|
|
|
if health_summary_app_model is None:
|
|
|
|
if health_summary_app_model is None:
|
|
|
|
click.echo(
|
|
|
|
click.echo(
|
|
|
|
click.style(f"App not found for health summary generation app_id {health_summary_app_id}", fg="yellow")
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"App not found for health summary generation app_id {health_summary_app_id}",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
@ -205,9 +255,9 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Start to generate health summary for user {user.id}", fg="green"))
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(f"Start to generate health summary for user {user.id}", fg="green")
|
|
|
|
try:
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
app_model=health_summary_app_model,
|
|
|
|
app_model=health_summary_app_model,
|
|
|
|
@ -241,10 +291,3 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
user.topics = result["topics"]
|
|
|
|
user.topics = result["topics"]
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Updated health summary for user {user.id}", fg="green"))
|
|
|
|
click.echo(click.style(f"Updated health summary for user {user.id}", fg="green"))
|
|
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
|
|
click.echo(click.style(f"Failed to parse health summary for user {user.id}, invalid json format", fg="yellow"))
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
click.echo(click.style(f"Failed to update health summary for user {user.id}, {str(e)}", fg="yellow"))
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|