pull/21891/head
ytqh 1 year ago
parent cb00f43e98
commit 32601f34b5

@ -1,10 +1,17 @@
from datetime import datetime
from controllers.admin import api
from flask import Blueprint
from controllers.admin.wraps import validate_admin_token_and_extract_info
from flask import Blueprint, request
from flask_restful import Api, Resource # type: ignore
from models.model import Account, App
from services.stats_service import StatsService
from werkzeug.exceptions import BadRequest
class RiskStats(Resource):
def get(self):
@validate_admin_token_and_extract_info
def get(self, app_model: App, account: Account):
"""Get risk level statistics.
---
tags:
@ -35,6 +42,10 @@ class RiskStats(Resource):
high_risk_count:
type: integer
description: Current number of high risk users
high_risk_percentage:
type: number
format: float
description: Percentage of high risk users
daily_changes:
type: object
properties:
@ -47,11 +58,38 @@ class RiskStats(Resource):
400:
description: Invalid date parameters
"""
pass
try:
# Parse date parameters
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not start_date_str or not end_date_str:
raise BadRequest("start_date and end_date are required")
try:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
except ValueError:
raise BadRequest("Invalid date format. Use YYYY-MM-DD")
# Get risk statistics from service
risk_stats = StatsService.get_risk_stats(
start_date=start_date,
end_date=end_date,
app_id=app_model.id,
organization_id=account.current_organization_id,
)
return risk_stats
except BadRequest as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "An error occurred while processing the request"}, 500
class UserStats(Resource):
def get(self):
@validate_admin_token_and_extract_info
def get(self, app_model: App, account: Account):
"""Get daily user statistics.
---
tags:
@ -96,11 +134,39 @@ class UserStats(Resource):
400:
description: Invalid date parameters
"""
pass
try:
# Parse date parameters
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not start_date_str or not end_date_str:
raise BadRequest("start_date and end_date are required")
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
end_date = end_date.replace(hour=23, minute=59, second=59)
except ValueError:
raise BadRequest("Invalid date format. Use YYYY-MM-DD")
# Get user statistics from service
user_stats = StatsService.get_user_stats(
start_date=start_date,
end_date=end_date,
app_id=app_model.id,
organization_id=account.current_organization_id,
)
return user_stats
except BadRequest as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "An error occurred while processing the request"}, 500
class ConversationStats(Resource):
def get(self):
@validate_admin_token_and_extract_info
def get(self, app_model: App, account: Account):
"""Get daily conversation statistics.
---
tags:
@ -146,7 +212,34 @@ class ConversationStats(Resource):
400:
description: Invalid date parameters
"""
pass
try:
# Parse date parameters
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not start_date_str or not end_date_str:
raise BadRequest("start_date and end_date are required")
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
end_date = end_date.replace(hour=23, minute=59, second=59)
except ValueError:
raise BadRequest("Invalid date format. Use YYYY-MM-DD")
# Get conversation statistics from service
conversation_stats = StatsService.get_conversation_stats(
start_date=start_date,
end_date=end_date,
app_id=app_model.id,
organization_id=account.current_organization_id,
)
return conversation_stats
except BadRequest as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "An error occurred while processing the request"}, 500
api.add_resource(RiskStats, '/stats/risk')

@ -0,0 +1,222 @@
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from extensions.ext_database import db
from models import Conversation, EndUser, Message
from sqlalchemy import and_, distinct, func
class StatsService:
@staticmethod
def get_risk_stats(
start_date: datetime, end_date: datetime, app_id: Optional[str] = None, organization_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Get statistics about high risk users
Args:
start_date: The start date for the statistics
end_date: The end date for the statistics
app_id: Optional app ID to filter users by
organization_id: Optional organization ID to filter users by
Returns:
Dictionary containing high risk user count and changes
"""
# Build query with filters
high_risk_query = db.session.query(EndUser).filter(
EndUser.health_status == 'critical', EndUser.updated_at >= start_date, EndUser.updated_at <= end_date
)
total_query = db.session.query(EndUser).filter(EndUser.updated_at >= start_date, EndUser.updated_at <= end_date)
# Apply app_id filter if provided
if app_id:
high_risk_query = high_risk_query.filter(EndUser.app_id == app_id)
total_query = total_query.filter(EndUser.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
high_risk_query = high_risk_query.filter(EndUser.organization_id == organization_id)
total_query = total_query.filter(EndUser.organization_id == organization_id)
high_risk_count = high_risk_query.count()
total_count = total_query.count()
# Get yesterday's count
yesterday = end_date - timedelta(days=1)
yesterday_query = db.session.query(EndUser).filter(
EndUser.health_status == 'critical', EndUser.updated_at <= yesterday
)
# Apply app_id filter if provided
if app_id:
yesterday_query = yesterday_query.filter(EndUser.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
yesterday_query = yesterday_query.filter(EndUser.organization_id == organization_id)
yesterday_high_risk_count = yesterday_query.count()
# Get last week's count
last_week = end_date - timedelta(days=7)
last_week_query = db.session.query(EndUser).filter(
EndUser.health_status == 'critical', EndUser.updated_at <= last_week
)
# Apply app_id filter if provided
if app_id:
last_week_query = last_week_query.filter(EndUser.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
last_week_query = last_week_query.filter(EndUser.organization_id == organization_id)
last_week_high_risk_count = last_week_query.count()
# Calculate changes
from_yesterday = high_risk_count - yesterday_high_risk_count
from_last_week = high_risk_count - last_week_high_risk_count
return {
"high_risk_count": high_risk_count,
"high_risk_percentage": round(high_risk_count / total_count, 2),
"daily_changes": {"from_yesterday": from_yesterday, "from_last_week": from_last_week},
}
@staticmethod
def get_user_stats(
start_date: datetime, end_date: datetime, app_id: Optional[str] = None, organization_id: Optional[str] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""
Get user statistics for a date range
Args:
start_date: The start date for the statistics
end_date: The end date for the statistics
app_id: Optional app ID to filter users by
organization_id: Optional organization ID to filter users by
Returns:
Dictionary containing daily user statistics
"""
# Calculate date range
date_range = []
current_date = start_date
while current_date <= end_date:
date_range.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
daily_stats = []
for date_str in date_range:
date = datetime.strptime(date_str, '%Y-%m-%d')
next_date = date + timedelta(days=1)
# Count active users (users who had a conversation on this date)
active_users_query = db.session.query(distinct(Conversation.from_end_user_id)).filter(
Conversation.created_at >= date, Conversation.created_at < next_date
)
# Apply app_id filter if provided
if app_id:
active_users_query = active_users_query.filter(Conversation.app_id == app_id)
# Apply organization filters for conversations
if organization_id:
active_users_query = active_users_query.filter(Conversation.organization_id == organization_id)
active_users = active_users_query.count()
# Count new users (users who were created on this date)
new_users_query = db.session.query(EndUser).filter(
EndUser.created_at >= date, EndUser.created_at < next_date
)
# Apply app_id filter if provided
if app_id:
new_users_query = new_users_query.filter(EndUser.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
new_users_query = new_users_query.filter(EndUser.organization_id == organization_id)
new_users = new_users_query.count()
daily_stats.append({"date": date_str, "active_users": active_users, "new_users": new_users})
return {"daily_stats": daily_stats}
@staticmethod
def get_conversation_stats(
start_date: datetime, end_date: datetime, app_id: Optional[str] = None, organization_id: Optional[str] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""
Get conversation statistics for a date range
Args:
start_date: The start date for the statistics
end_date: The end date for the statistics
app_id: Optional app ID to filter conversations by
organization_id: Optional organization ID to filter conversations by
Returns:
Dictionary containing daily conversation statistics
"""
# Calculate date range
date_range = []
current_date = start_date
while current_date <= end_date:
date_range.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
daily_stats = []
for date_str in date_range:
date = datetime.strptime(date_str, '%Y-%m-%d')
next_date = date + timedelta(days=1)
# Count total conversations for this date
conv_query = db.session.query(Conversation).filter(
Conversation.created_at >= date, Conversation.created_at < next_date
)
# Apply app_id filter if provided
if app_id:
conv_query = conv_query.filter(Conversation.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
conv_query = conv_query.filter(Conversation.organization_id == organization_id)
total_conversations = conv_query.count()
# Count unique users who had conversations on this date
unique_users_query = db.session.query(distinct(Conversation.from_end_user_id)).filter(
Conversation.created_at >= date, Conversation.created_at < next_date
)
# Apply app_id filter if provided
if app_id:
unique_users_query = unique_users_query.filter(Conversation.app_id == app_id)
# Apply organization_id filter if provided
if organization_id:
unique_users_query = unique_users_query.filter(Conversation.organization_id == organization_id)
unique_users = unique_users_query.count()
# Calculate average conversations per user
avg_conversations_per_user = 0
if unique_users > 0:
avg_conversations_per_user = round(total_conversations / unique_users, 2)
daily_stats.append(
{
"date": date_str,
"total_conversations": total_conversations,
"avg_conversations_per_user": avg_conversations_per_user,
}
)
return {"daily_stats": daily_stats}
Loading…
Cancel
Save