|
|
|
|
@ -9,6 +9,8 @@ from libs.passport import PassportService
|
|
|
|
|
from libs.password import compare_password
|
|
|
|
|
from models.account import Account, AccountStatus
|
|
|
|
|
from models.model import App, EndUser, Site
|
|
|
|
|
from services.app_service import AppService
|
|
|
|
|
from services.enterprise.enterprise_service import EnterpriseService
|
|
|
|
|
from services.errors.account import (AccountLoginError, AccountNotFoundError,
|
|
|
|
|
AccountPasswordError)
|
|
|
|
|
from tasks.mail_email_code_login import send_email_code_login_mail_task
|
|
|
|
|
@ -60,7 +62,7 @@ class WebAppAuthService:
|
|
|
|
|
|
|
|
|
|
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
|
|
|
|
|
token = TokenManager.generate_token(
|
|
|
|
|
account=account, email=email, token_type="webapp_email_code_login", additional_data={"code": code}
|
|
|
|
|
account=account, email=email, token_type="email_code_login", additional_data={"code": code}
|
|
|
|
|
)
|
|
|
|
|
send_email_code_login_mail_task.delay(
|
|
|
|
|
language=language,
|
|
|
|
|
@ -72,11 +74,11 @@ class WebAppAuthService:
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def get_email_code_login_data(cls, token: str) -> Optional[dict[str, Any]]:
|
|
|
|
|
return TokenManager.get_token_data(token, "webapp_email_code_login")
|
|
|
|
|
return TokenManager.get_token_data(token, "email_code_login")
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def revoke_email_code_login_token(cls, token: str):
|
|
|
|
|
TokenManager.revoke_token(token, "webapp_email_code_login")
|
|
|
|
|
TokenManager.revoke_token(token, "email_code_login")
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def create_end_user(cls, app_code, email) -> EndUser:
|
|
|
|
|
@ -114,3 +116,28 @@ class WebAppAuthService:
|
|
|
|
|
|
|
|
|
|
token: str = PassportService().issue(payload)
|
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def is_app_require_permission_check(cls, app_code: str = None, app_id: str = None, access_mode: str = None) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Check if the app requires permission check based on its access mode.
|
|
|
|
|
"""
|
|
|
|
|
modes_requiring_permission_check = [
|
|
|
|
|
"private",
|
|
|
|
|
"private_all",
|
|
|
|
|
]
|
|
|
|
|
if access_mode:
|
|
|
|
|
return access_mode in modes_requiring_permission_check
|
|
|
|
|
|
|
|
|
|
if not app_code and not app_id:
|
|
|
|
|
raise ValueError("Either app_code or app_id must be provided.")
|
|
|
|
|
|
|
|
|
|
if app_code:
|
|
|
|
|
app_id = AppService.get_app_id_by_code(app_code)
|
|
|
|
|
if not app_id:
|
|
|
|
|
raise ValueError("App ID could not be determined from the provided app_code.")
|
|
|
|
|
|
|
|
|
|
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
|
|
|
|
|
if webapp_settings and webapp_settings.access_mode in modes_requiring_permission_check:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|