|
|
|
@ -2,7 +2,6 @@ import logging
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
from datetime import datetime
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
|
|
|
from sqlalchemy import asc, func, or_
|
|
|
|
from sqlalchemy import asc, func, or_
|
|
|
|
|
|
|
|
|
|
|
|
import app
|
|
|
|
import app
|
|
|
|
@ -20,7 +19,6 @@ def user_profile_generate_task():
|
|
|
|
"""Generate or update user memory based on recent messages."""
|
|
|
|
"""Generate or update user memory based on recent messages."""
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("Starting user profile generate task.")
|
|
|
|
logger.info("Starting user profile generate task.")
|
|
|
|
click.echo(click.style("Starting user profile generate task.", fg="green"))
|
|
|
|
|
|
|
|
start_at = time.perf_counter()
|
|
|
|
start_at = time.perf_counter()
|
|
|
|
|
|
|
|
|
|
|
|
app_ids = (
|
|
|
|
app_ids = (
|
|
|
|
@ -30,43 +28,29 @@ def user_profile_generate_task():
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if len(app_ids) == 0:
|
|
|
|
if len(app_ids) == 0:
|
|
|
|
click.echo(
|
|
|
|
logger.info("No app_id provided, skipping user profile generation.")
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
"No app_id provided, skipping user profile generation.", fg="yellow"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
for app_id in app_ids:
|
|
|
|
for app_id in app_ids:
|
|
|
|
if len(app_id) == 0:
|
|
|
|
if len(app_id) == 0:
|
|
|
|
click.echo(click.style(f"Invalid app_id: {app_id}", fg="red"))
|
|
|
|
logger.error(f"Invalid app_id: {app_id}")
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
for app_id in app_ids:
|
|
|
|
for app_id in app_ids:
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
users_to_update = fetch_users_to_update(app_id)
|
|
|
|
|
|
|
|
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
if users_to_update is None or len(users_to_update) == 0:
|
|
|
|
click.echo(
|
|
|
|
logger.info(f"No users to update. for app_id {app_id}")
|
|
|
|
click.style(f"No users to update. for app_id {app_id}", fg="green")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
logger.info(
|
|
|
|
click.style(
|
|
|
|
f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}"
|
|
|
|
f"Found {len(users_to_update)} users profile and memory updates. in app_id {app_id}",
|
|
|
|
|
|
|
|
fg="green",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
update_user_profile_for_appid(users_to_update)
|
|
|
|
|
|
|
|
|
|
|
|
end_at = time.perf_counter()
|
|
|
|
end_at = time.perf_counter()
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
logger.info(f"Finished user profile generate task. Latency: {end_at - start_at}")
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"Updated profile and memory for app_id {app_ids} users. Latency: {end_at - start_at}",
|
|
|
|
|
|
|
|
fg="green",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
@ -91,13 +75,9 @@ def update_user_profile_for_appid(users_to_update: list[EndUser]):
|
|
|
|
# Commit after each batch
|
|
|
|
# Commit after each batch
|
|
|
|
db.session.commit()
|
|
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
user_ids = [user.id for user in batch]
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"Error updating memory for user {user_ids}: {str(e)}", fg="red"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
db.session.rollback()
|
|
|
|
db.session.rollback()
|
|
|
|
|
|
|
|
user_ids = [user.id for user in batch]
|
|
|
|
|
|
|
|
logger.exception(f"Error updating user profile for user {user_ids}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_users_to_update(app_id: str) -> list[EndUser]:
|
|
|
|
def fetch_users_to_update(app_id: str) -> list[EndUser]:
|
|
|
|
@ -157,35 +137,25 @@ def fetch_new_messages_for_user(user: EndUser) -> tuple[str, datetime]:
|
|
|
|
|
|
|
|
|
|
|
|
# If no valid messages remain, exit early
|
|
|
|
# If no valid messages remain, exit early
|
|
|
|
if not message_texts:
|
|
|
|
if not message_texts:
|
|
|
|
click.echo(
|
|
|
|
logger.warning(f"No valid message content for user id {user.id}")
|
|
|
|
click.style(f"No valid message content for user id {user.id}", fg="yellow")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return "\n".join(message_texts), new_messages[-1].created_at
|
|
|
|
return "\n".join(message_texts), new_messages[-1].created_at
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
"""Process a user to update memory."""
|
|
|
|
"""Process a user to update memory."""
|
|
|
|
click.echo(click.style(f"Updating memory for user id {user.id}", fg="green"))
|
|
|
|
logger.info(f"Updating memory for user id {user.id}")
|
|
|
|
|
|
|
|
|
|
|
|
memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID
|
|
|
|
memory_app_id = dify_config.USER_MEMORY_GENERATION_APP_ID
|
|
|
|
if memory_app_id == "":
|
|
|
|
if memory_app_id == "":
|
|
|
|
click.echo(
|
|
|
|
logger.warning(
|
|
|
|
click.style(
|
|
|
|
"No memory generation app_id provided, skipping memory generation."
|
|
|
|
"No memory generation app_id provided, skipping memory generation.",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
memory_app_model = db.session.query(App).filter(App.id == memory_app_id).first()
|
|
|
|
memory_app_model = db.session.query(App).filter(App.id == memory_app_id).first()
|
|
|
|
if memory_app_model is None:
|
|
|
|
if memory_app_model is None:
|
|
|
|
click.echo(
|
|
|
|
logger.error(f"App not found for memory generation app_id {memory_app_id}")
|
|
|
|
click.style(
|
|
|
|
|
|
|
|
f"App not found for memory generation app_id {memory_app_id}",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Set up arguments for memory generation
|
|
|
|
# Set up arguments for memory generation
|
|
|
|
@ -198,9 +168,6 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Call the memory generation service
|
|
|
|
|
|
|
|
click.echo(click.style(f"Start to generate memory for user {user.id}", fg="green"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
app_model=memory_app_model,
|
|
|
|
app_model=memory_app_model,
|
|
|
|
user=user,
|
|
|
|
user=user,
|
|
|
|
@ -209,7 +176,7 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
streaming=False,
|
|
|
|
streaming=False,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(response)
|
|
|
|
logger.info(f"Generated memory raw response for user {user.id}: {response}")
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
return
|
|
|
|
return
|
|
|
|
@ -217,22 +184,17 @@ def process_user_memory(user: EndUser, new_messages: str):
|
|
|
|
result = response["data"]["outputs"]["result"]
|
|
|
|
result = response["data"]["outputs"]["result"]
|
|
|
|
user.memory = result
|
|
|
|
user.memory = result
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Updated memory for user {user.id}", fg="green"))
|
|
|
|
logger.info(f"Successfully updated memory for user {user.id}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
"""Process a user to update health status."""
|
|
|
|
"""Process a user to update health status."""
|
|
|
|
click.echo(
|
|
|
|
logger.info(f"Updating health summary for user id {user.id}")
|
|
|
|
click.style(f"Updating health summary for user id {user.id}", fg="green")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID
|
|
|
|
health_summary_app_id = dify_config.USER_HEALTH_SUMMARY_GENERATION_APP_ID
|
|
|
|
if health_summary_app_id == "":
|
|
|
|
if health_summary_app_id == "":
|
|
|
|
click.echo(
|
|
|
|
logger.warning(
|
|
|
|
click.style(
|
|
|
|
"No health summary app_id provided, skipping health summary generation."
|
|
|
|
"No health_summary_app_id provided, skipping health summary generation.",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
@ -240,11 +202,8 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
db.session.query(App).filter(App.id == health_summary_app_id).first()
|
|
|
|
db.session.query(App).filter(App.id == health_summary_app_id).first()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
if health_summary_app_model is None:
|
|
|
|
if health_summary_app_model is None:
|
|
|
|
click.echo(
|
|
|
|
logger.error(
|
|
|
|
click.style(
|
|
|
|
f"App not found for health summary generation app_id {health_summary_app_id}"
|
|
|
|
f"App not found for health summary generation app_id {health_summary_app_id}",
|
|
|
|
|
|
|
|
fg="yellow",
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
@ -256,9 +215,7 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(
|
|
|
|
logger.info(f"Start to generate health summary for user {user.id}")
|
|
|
|
click.style(f"Start to generate health summary for user {user.id}", fg="green")
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
response = AppGenerateService.generate(
|
|
|
|
app_model=health_summary_app_model,
|
|
|
|
app_model=health_summary_app_model,
|
|
|
|
@ -268,7 +225,7 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
streaming=False,
|
|
|
|
streaming=False,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(response)
|
|
|
|
logger.info(f"Generated health summary raw response for user {user.id}: {response}")
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
if not isinstance(response, dict):
|
|
|
|
return
|
|
|
|
return
|
|
|
|
@ -283,8 +240,6 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
|
|
|
|
|
|
|
|
result = json.loads(result)
|
|
|
|
result = json.loads(result)
|
|
|
|
|
|
|
|
|
|
|
|
click.echo("parsed result: " + result)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "health_status" in result:
|
|
|
|
if "health_status" in result:
|
|
|
|
user.health_status = result["health_status"]
|
|
|
|
user.health_status = result["health_status"]
|
|
|
|
|
|
|
|
|
|
|
|
@ -294,4 +249,4 @@ def process_user_health_summary(user: EndUser, new_messages: str):
|
|
|
|
if "topics" in result and isinstance(result["topics"], list):
|
|
|
|
if "topics" in result and isinstance(result["topics"], list):
|
|
|
|
user.topics = result["topics"]
|
|
|
|
user.topics = result["topics"]
|
|
|
|
|
|
|
|
|
|
|
|
click.echo(click.style(f"Updated health summary for user {user.id}", fg="green"))
|
|
|
|
logger.info(f"Successfully updated health summary for user {user.id}")
|
|
|
|
|