fix: adjust permission check logic to avoid sso_verified apps

pull/20496/head
GareArc 12 months ago
parent 2b9b852c31
commit 5a076e4274
No known key found for this signature in database

@ -11,6 +11,7 @@ from libs.passport import PassportService
from models.model import App, AppMode from models.model import App, AppMode
from services.app_service import AppService from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService from services.enterprise.enterprise_service import EnterpriseService
from services.webapp_auth_service import WebAppAuthService
class AppParameterApi(WebApiResource): class AppParameterApi(WebApiResource):
@ -93,6 +94,8 @@ class AppWebAuthPermission(Resource):
app_id = args["appId"] app_id = args["appId"]
app_code = AppService.get_app_code_by_id(app_id) app_code = AppService.get_app_code_by_id(app_id)
res = True
if WebAppAuthService.is_app_require_permission_check(app_id=app_id):
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code) res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code)
return {"result": res} return {"result": res}

@ -21,13 +21,13 @@ class PassportResource(Resource):
def get(self): def get(self):
system_features = FeatureService.get_system_features() system_features = FeatureService.get_system_features()
app_code = request.headers.get("X-App-Code") app_code = request.headers.get("X-App-Code")
enterprise_login_token = request.args.get("enterprise_login_token") web_app_access_token = request.args.get("web_app_access_token")
if app_code is None: if app_code is None:
raise Unauthorized("X-App-Code header is missing.") raise Unauthorized("X-App-Code header is missing.")
# exchange token for enterprise logined web user # exchange token for enterprise logined web user
enterprise_user_decoded = decode_enterprise_webapp_user_id(enterprise_login_token) enterprise_user_decoded = decode_enterprise_webapp_user_id(web_app_access_token)
if enterprise_user_decoded: if enterprise_user_decoded:
# a web user has already logged in, exchange a token for this app without redirecting to the login page # a web user has already logged in, exchange a token for this app without redirecting to the login page
return exchange_token_for_existing_web_user( return exchange_token_for_existing_web_user(
@ -105,6 +105,8 @@ def exchange_token_for_existing_web_user(app_code: str, enterprise_user_decoded:
app_model = db.session.query(App).filter(App.id == site.app_id).first() app_model = db.session.query(App).filter(App.id == site.app_id).first()
if not app_model or app_model.status != "normal" or not app_model.enable_site: if not app_model or app_model.status != "normal" or not app_model.enable_site:
raise NotFound() raise NotFound()
end_user = None
if end_user_id:
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first() end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
if not end_user: if not end_user:
end_user = EndUser( end_user = EndUser(

@ -4,12 +4,15 @@ from flask import request
from flask_restful import Resource # type: ignore from flask_restful import Resource # type: ignore
from werkzeug.exceptions import BadRequest, NotFound, Unauthorized from werkzeug.exceptions import BadRequest, NotFound, Unauthorized
from controllers.web.error import WebAppAuthAccessDeniedError, WebAppAuthRequiredError from controllers.web.error import (WebAppAuthAccessDeniedError,
WebAppAuthRequiredError)
from extensions.ext_database import db from extensions.ext_database import db
from libs.passport import PassportService from libs.passport import PassportService
from models.model import App, EndUser, Site from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import EnterpriseService, WebAppSettings from services.enterprise.enterprise_service import (EnterpriseService,
WebAppSettings)
from services.feature_service import FeatureService from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
def validate_jwt_token(view=None): def validate_jwt_token(view=None):
@ -45,7 +48,8 @@ def decode_jwt_token():
raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.") raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")
decoded = PassportService().verify(tk) decoded = PassportService().verify(tk)
app_code = decoded.get("app_code") app_code = decoded.get("app_code")
app_model = db.session.query(App).filter(App.id == decoded["app_id"]).first() app_id = decoded.get("app_id")
app_model = db.session.query(App).filter(App.id == app_id).first()
site = db.session.query(Site).filter(Site.code == app_code).first() site = db.session.query(Site).filter(Site.code == app_code).first()
if not app_model: if not app_model:
raise NotFound() raise NotFound()
@ -53,7 +57,8 @@ def decode_jwt_token():
raise BadRequest("Site URL is no longer valid.") raise BadRequest("Site URL is no longer valid.")
if app_model.enable_site is False: if app_model.enable_site is False:
raise BadRequest("Site is disabled.") raise BadRequest("Site is disabled.")
end_user = db.session.query(EndUser).filter(EndUser.id == decoded["end_user_id"]).first() end_user_id = decoded.get("end_user_id")
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
if not end_user: if not end_user:
raise NotFound() raise NotFound()
@ -115,9 +120,7 @@ def _validate_user_accessibility(
if not webapp_settings: if not webapp_settings:
raise WebAppAuthRequiredError("Web app settings not found.") raise WebAppAuthRequiredError("Web app settings not found.")
access_modes_require_permission_check = ["private", "private_all"] if WebAppAuthService.is_app_require_permission_check(access_mode=webapp_settings.access_mode):
if webapp_settings.access_mode in access_modes_require_permission_check:
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code): if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code):
raise WebAppAuthAccessDeniedError() raise WebAppAuthAccessDeniedError()

@ -11,6 +11,8 @@ from libs.passport import PassportService
from libs.password import compare_password from libs.password import compare_password
from models.account import Account, AccountStatus from models.account import Account, AccountStatus
from models.model import App, EndUser, Site from models.model import App, EndUser, Site
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.errors.account import (AccountLoginError, AccountNotFoundError, from services.errors.account import (AccountLoginError, AccountNotFoundError,
AccountPasswordError) AccountPasswordError)
from tasks.mail_email_code_login import send_email_code_login_mail_task from tasks.mail_email_code_login import send_email_code_login_mail_task
@ -61,7 +63,7 @@ class WebAppAuthService:
code = "".join([str(random.randint(0, 9)) for _ in range(6)]) code = "".join([str(random.randint(0, 9)) for _ in range(6)])
token = TokenManager.generate_token( token = TokenManager.generate_token(
account=account, email=email, token_type="webapp_email_code_login", additional_data={"code": code} account=account, email=email, token_type="email_code_login", additional_data={"code": code}
) )
send_email_code_login_mail_task.delay( send_email_code_login_mail_task.delay(
language=language, language=language,
@ -73,11 +75,11 @@ class WebAppAuthService:
@classmethod @classmethod
def get_email_code_login_data(cls, token: str) -> Optional[dict[str, Any]]: def get_email_code_login_data(cls, token: str) -> Optional[dict[str, Any]]:
return TokenManager.get_token_data(token, "webapp_email_code_login") return TokenManager.get_token_data(token, "email_code_login")
@classmethod @classmethod
def revoke_email_code_login_token(cls, token: str): def revoke_email_code_login_token(cls, token: str):
TokenManager.revoke_token(token, "webapp_email_code_login") TokenManager.revoke_token(token, "email_code_login")
@classmethod @classmethod
def create_end_user(cls, app_code, email) -> EndUser: def create_end_user(cls, app_code, email) -> EndUser:
@ -111,3 +113,28 @@ class WebAppAuthService:
token: str = PassportService().issue(payload) token: str = PassportService().issue(payload)
return token return token
@classmethod
def is_app_require_permission_check(cls, app_code: str = None, app_id: str = None, access_mode: str = None) -> bool:
"""
Check if the app requires permission check based on its access mode.
"""
modes_requiring_permission_check = [
"private",
"private_all",
]
if access_mode:
return access_mode in modes_requiring_permission_check
if not app_code and not app_id:
raise ValueError("Either app_code or app_id must be provided.")
if app_code:
app_id = AppService.get_app_id_by_code(app_code)
if not app_id:
raise ValueError("App ID could not be determined from the provided app_code.")
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
if webapp_settings and webapp_settings.access_mode in modes_requiring_permission_check:
return True
return False

Loading…
Cancel
Save