feat: add granted_at

pull/20496/head
GareArc 12 months ago
parent 623be84bb1
commit 1df22e0c79
No known key found for this signature in database

@ -6,7 +6,7 @@ from controllers.web import api
from controllers.web.error import WebAppAuthRequiredError
from extensions.ext_database import db
from flask import request
from flask_restful import Resource # type: ignore
from flask_restful import Resource
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import EnterpriseService
@ -85,8 +85,23 @@ def decode_enterprise_webapp_user_id(jwt_token: str | None):
decoded = PassportService().verify(jwt_token)
source = decoded.get("token_source")
auth_type = decoded.get("auth_type")
granted_at = decoded.get("granted_at")
if not source or source != "webapp_login_token":
raise Unauthorized("Invalid token source. Expected 'webapp_login_token'.")
if not auth_type:
raise Unauthorized("Missing auth_type in the token.")
if not granted_at:
raise Unauthorized("Missing granted_at in the token.")
# check if sso has been updated
if auth_type == "external":
last_update_time = EnterpriseService.get_app_sso_settings_last_update_time()
if granted_at and datetime.fromisoformat(granted_at) < last_update_time:
raise Unauthorized("SSO settings have been updated. Please re-login.")
elif auth_type == "internal":
last_update_time = EnterpriseService.get_workspace_sso_settings_last_update_time()
if granted_at and datetime.fromisoformat(granted_at) < last_update_time:
raise Unauthorized("SSO settings have been updated. Please re-login.")
return decoded

@ -1,6 +1,7 @@
from pydantic import BaseModel, Field
from datetime import datetime
from pydantic import BaseModel, Field
from services.enterprise.base import EnterpriseRequest
@ -18,9 +19,33 @@ class EnterpriseService:
return EnterpriseRequest.send_request("GET", "/info")
@classmethod
def get_workspace_info(cls, tenant_id:str):
def get_workspace_info(cls, tenant_id: str):
return EnterpriseRequest.send_request("GET", f"/workspace/{tenant_id}/info")
@classmethod
def get_app_sso_settings_last_update_time(cls) -> datetime:
data = EnterpriseRequest.send_request("GET", "/sso/app/last-update-time")
print(data)
if not data:
raise ValueError("No data found.")
try:
# parse the UTC timestamp from the response
return datetime.fromisoformat(data.replace("Z", "+00:00"))
except ValueError as e:
raise ValueError(f"Invalid date format: {data}") from e
@classmethod
def get_workspace_sso_settings_last_update_time(cls) -> datetime:
data = EnterpriseRequest.send_request("GET", "/sso/workspace/last-update-time")
print(data)
if not data:
raise ValueError("No data found.")
try:
# parse the UTC timestamp from the response
return datetime.fromisoformat(data.replace("Z", "+00:00"))
except ValueError as e:
raise ValueError(f"Invalid date format: {data}") from e
class WebAppAuth:
@classmethod
def is_user_allowed_to_access_webapp(cls, user_id: str, app_code: str) -> bool:

@ -109,6 +109,7 @@ class WebAppAuthService:
"session_id": account.email,
"token_source": "webapp_login_token",
"auth_type": "internal",
"granted_at": datetime.now(UTC).isoformat(),
"exp": exp,
}

Loading…
Cancel
Save