|
|
|
@ -37,13 +37,23 @@ def user_profile_generate_task():
|
|
|
|
|
|
|
|
|
|
|
|
for app_id in app_ids:
|
|
|
|
for app_id in app_ids:
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
|
|
|
|
click.echo(click.style(f"No users to update. for app_id {app_id}", fg="green"))
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}", fg="green"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
|
|
|
|
|
|
|
|
end_at = time.perf_counter()
|
|
|
|
end_at = time.perf_counter()
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
click.echo(
|
|
|
|
click.style(
|
|
|
|
click.style(
|
|
|
|
f"Updated memory for app_id {app_ids} users memory. Latency: {end_at - start_at}",
|
|
|
|
f"Updated profile and memory for app_id {app_ids} users. Latency: {end_at - start_at}",
|
|
|
|
fg="green",
|
|
|
|
fg="green",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
@ -52,12 +62,6 @@ def user_profile_generate_task():
|
|
|
|
def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
"""Update memory for a given app_id."""
|
|
|
|
"""Update memory for a given app_id."""
|
|
|
|
|
|
|
|
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
|
|
|
|
click.echo(click.style("No users to update.", fg="green"))
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Found {len(users_to_update)} users who need memory updates.", fg="green"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
updated_users_count = 0
|
|
|
|
updated_users_count = 0
|
|
|
|
batch_size = 10
|
|
|
|
batch_size = 10
|
|
|
|
|
|
|
|
|
|
|
|
@ -153,35 +157,24 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
"inputs": {
|
|
|
|
"inputs": {
|
|
|
|
"new_messages": new_messages,
|
|
|
|
"new_messages": new_messages,
|
|
|
|
"current_memory": user.memory or "",
|
|
|
|
"current_memory": user.memory or "",
|
|
|
|
|
|
|
|
"name": user.name,
|
|
|
|
|
|
|
|
"profile": user.profile,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Call the memory generation service
|
|
|
|
# Call the memory generation service
|
|
|
|
click.echo(click.style(f"Start to generate memory for user {user.id}", fg="green"))
|
|
|
|
click.echo(click.style(f"Start to generate memory for user {user.id}", fg="green"))
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
|
|
|
|
app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Save the updated memory to the user
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
response
|
|
|
|
|
|
|
|
and isinstance(response, dict)
|
|
|
|
|
|
|
|
and "data" in response
|
|
|
|
|
|
|
|
and "outputs" in response["data"]
|
|
|
|
|
|
|
|
and "result" in response["data"]["outputs"]
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
user.memory = response["data"]["outputs"]["result"]
|
|
|
|
|
|
|
|
click.echo(click.style(f"Updated memory for user {user.id}", fg="green"))
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
click.echo(click.style(f"Failed to update memory for user {user.id}, invalid response format", fg="yellow"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
|
|
|
|
app_model=memory_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(response)
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = response["data"]["outputs"]["result"]
|
|
|
|
result = response["data"]["outputs"]["result"]
|
|
|
|
user.memory = result
|
|
|
|
user.memory = result
|
|
|
|
|
|
|
|
|
|
|
|
@ -209,18 +202,25 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
args = {
|
|
|
|
args = {
|
|
|
|
"inputs": {
|
|
|
|
"inputs": {
|
|
|
|
"name": user.name,
|
|
|
|
"name": user.name,
|
|
|
|
"profile": user.extra_profile,
|
|
|
|
"profile": user.profile,
|
|
|
|
"new_messages": new_messages,
|
|
|
|
"new_messages": new_messages,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Start to generate health summary for user {user.id}", fg="green"))
|
|
|
|
click.echo(click.style(f"Start to generate health summary for user {user.id}", fg="green"))
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
|
|
|
|
app_model=health_summary_app_model, user=user, args=args, invoke_from=InvokeFrom.SCHEDULER, streaming=False
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
|
|
|
|
app_model=health_summary_app_model,
|
|
|
|
|
|
|
|
user=user,
|
|
|
|
|
|
|
|
args=args,
|
|
|
|
|
|
|
|
invoke_from=InvokeFrom.SCHEDULER,
|
|
|
|
|
|
|
|
streaming=False,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(response)
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|