|
|
|
|
@ -1,10 +1,11 @@
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from configs import dify_config
|
|
|
|
|
from models.account import Account
|
|
|
|
|
from services.account_service import AccountService
|
|
|
|
|
from services.account_service import AccountService, TenantService
|
|
|
|
|
from services.errors.account import (
|
|
|
|
|
AccountLoginError,
|
|
|
|
|
AccountNotFoundError,
|
|
|
|
|
@ -12,9 +13,10 @@ from services.errors.account import (
|
|
|
|
|
AccountRegisterError,
|
|
|
|
|
CurrentPasswordIncorrectError,
|
|
|
|
|
)
|
|
|
|
|
from tests.unit_tests.services.services_test_help import ServiceDbTestHelper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AccountServiceTestDataFactory:
|
|
|
|
|
class TestAccountAssociatedDataFactory:
|
|
|
|
|
"""Factory class for creating test data and mock objects for account service tests."""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
@ -41,7 +43,8 @@ class AccountServiceTestDataFactory:
|
|
|
|
|
account.interface_language = interface_language
|
|
|
|
|
account.interface_theme = interface_theme
|
|
|
|
|
account.timezone = timezone
|
|
|
|
|
account.last_active_at = MagicMock()
|
|
|
|
|
# Set last_active_at to a datetime object that's older than 10 minutes
|
|
|
|
|
account.last_active_at = datetime.now() - timedelta(minutes=15)
|
|
|
|
|
account.initialized_at = None
|
|
|
|
|
for key, value in kwargs.items():
|
|
|
|
|
setattr(account, key, value)
|
|
|
|
|
@ -149,57 +152,6 @@ class TestAccountService:
|
|
|
|
|
|
|
|
|
|
yield mock_db
|
|
|
|
|
|
|
|
|
|
def _setup_smart_db_query_mock(self, mock_db, query_results):
|
|
|
|
|
"""
|
|
|
|
|
Smart database query mock that responds based on model type and query parameters.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
mock_db: Mock database session
|
|
|
|
|
query_results: Dict mapping (model_name, filter_key, filter_value) to return value
|
|
|
|
|
Example: {('Account', 'email', 'test@example.com'): mock_account}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def query_side_effect(model):
|
|
|
|
|
mock_query = MagicMock()
|
|
|
|
|
|
|
|
|
|
def filter_by_side_effect(**kwargs):
|
|
|
|
|
mock_filter_result = MagicMock()
|
|
|
|
|
|
|
|
|
|
def first_side_effect():
|
|
|
|
|
# Find matching result based on model and filter parameters
|
|
|
|
|
for (model_name, filter_key, filter_value), result in query_results.items():
|
|
|
|
|
if model.__name__ == model_name and filter_key in kwargs and kwargs[filter_key] == filter_value:
|
|
|
|
|
return result
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
mock_filter_result.first.side_effect = first_side_effect
|
|
|
|
|
|
|
|
|
|
# Handle order_by calls for complex queries
|
|
|
|
|
def order_by_side_effect(*args, **kwargs):
|
|
|
|
|
mock_order_result = MagicMock()
|
|
|
|
|
|
|
|
|
|
def order_first_side_effect():
|
|
|
|
|
# Look for order_by results in the same query_results dict
|
|
|
|
|
for (model_name, filter_key, filter_value), result in query_results.items():
|
|
|
|
|
if (
|
|
|
|
|
model.__name__ == model_name
|
|
|
|
|
and filter_key == "order_by"
|
|
|
|
|
and filter_value == "first_available"
|
|
|
|
|
):
|
|
|
|
|
return result
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
mock_order_result.first.side_effect = order_first_side_effect
|
|
|
|
|
return mock_order_result
|
|
|
|
|
|
|
|
|
|
mock_filter_result.order_by.side_effect = order_by_side_effect
|
|
|
|
|
return mock_filter_result
|
|
|
|
|
|
|
|
|
|
mock_query.filter_by.side_effect = filter_by_side_effect
|
|
|
|
|
return mock_query
|
|
|
|
|
|
|
|
|
|
mock_db.session.query.side_effect = query_side_effect
|
|
|
|
|
|
|
|
|
|
def _assert_database_operations_called(self, mock_db):
|
|
|
|
|
"""Helper method to verify database operations were called."""
|
|
|
|
|
mock_db.session.commit.assert_called()
|
|
|
|
|
@ -218,11 +170,11 @@ class TestAccountService:
|
|
|
|
|
def test_authenticate_success(self, mock_db_dependencies, mock_password_dependencies):
|
|
|
|
|
"""Test successful authentication with correct email and password."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("Account", "email", "test@example.com"): mock_account}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = True
|
|
|
|
|
|
|
|
|
|
@ -237,7 +189,7 @@ class TestAccountService:
|
|
|
|
|
"""Test authentication when account does not exist."""
|
|
|
|
|
# Setup smart database query mock - no matching results
|
|
|
|
|
query_results = {("Account", "email", "notfound@example.com"): None}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
self._assert_exception_raised(
|
|
|
|
|
@ -247,11 +199,11 @@ class TestAccountService:
|
|
|
|
|
def test_authenticate_account_banned(self, mock_db_dependencies):
|
|
|
|
|
"""Test authentication when account is banned."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock(status="banned")
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock(status="banned")
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("Account", "email", "banned@example.com"): mock_account}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
self._assert_exception_raised(AccountLoginError, AccountService.authenticate, "banned@example.com", "password")
|
|
|
|
|
@ -259,11 +211,11 @@ class TestAccountService:
|
|
|
|
|
def test_authenticate_password_error(self, mock_db_dependencies, mock_password_dependencies):
|
|
|
|
|
"""Test authentication with wrong password."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("Account", "email", "test@example.com"): mock_account}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = False
|
|
|
|
|
|
|
|
|
|
@ -275,11 +227,11 @@ class TestAccountService:
|
|
|
|
|
def test_authenticate_pending_account_activates(self, mock_db_dependencies, mock_password_dependencies):
|
|
|
|
|
"""Test authentication for a pending account, which should activate on login."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock(status="pending")
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock(status="pending")
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("Account", "email", "pending@example.com"): mock_account}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = True
|
|
|
|
|
|
|
|
|
|
@ -404,7 +356,7 @@ class TestAccountService:
|
|
|
|
|
def test_update_account_password_success(self, mock_db_dependencies, mock_password_dependencies):
|
|
|
|
|
"""Test successful password update with correct current password and valid new password."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = True
|
|
|
|
|
mock_password_dependencies["valid_password"].return_value = None
|
|
|
|
|
mock_password_dependencies["hash_password"].return_value = b"new_hashed_password"
|
|
|
|
|
@ -429,7 +381,7 @@ class TestAccountService:
|
|
|
|
|
def test_update_account_password_current_password_incorrect(self, mock_password_dependencies):
|
|
|
|
|
"""Test password update with incorrect current password."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = False
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
@ -449,7 +401,7 @@ class TestAccountService:
|
|
|
|
|
def test_update_account_password_invalid_new_password(self, mock_password_dependencies):
|
|
|
|
|
"""Test password update with invalid new password."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_password_dependencies["compare_password"].return_value = True
|
|
|
|
|
mock_password_dependencies["valid_password"].side_effect = ValueError("Password too short")
|
|
|
|
|
|
|
|
|
|
@ -461,39 +413,39 @@ class TestAccountService:
|
|
|
|
|
# Verify password validation was called
|
|
|
|
|
mock_password_dependencies["valid_password"].assert_called_once_with("short")
|
|
|
|
|
|
|
|
|
|
# ==================== User Loading Tests ====================
|
|
|
|
|
# ==================== User Loading Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_load_user_success(self, mock_db_dependencies):
|
|
|
|
|
"""Test successful user loading with current tenant."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_tenant_join = AccountServiceTestDataFactory.create_tenant_join_mock()
|
|
|
|
|
def test_load_user_success(self, mock_db_dependencies):
|
|
|
|
|
"""Test successful user loading with current tenant."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_tenant_join = TestAccountAssociatedDataFactory.create_tenant_join_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): mock_tenant_join,
|
|
|
|
|
}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): mock_tenant_join,
|
|
|
|
|
}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = MagicMock()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = datetime.now()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result == mock_account
|
|
|
|
|
assert mock_account.set_tenant_id.called
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result == mock_account
|
|
|
|
|
assert mock_account.set_tenant_id.called
|
|
|
|
|
|
|
|
|
|
def test_load_user_not_found(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user does not exist."""
|
|
|
|
|
# Setup smart database query mock - no matching results
|
|
|
|
|
query_results = {("Account", "id", "non-existent-user"): None}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("non-existent-user")
|
|
|
|
|
@ -504,11 +456,11 @@ class TestAccountService:
|
|
|
|
|
def test_load_user_banned(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user is banned."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock(status="banned")
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock(status="banned")
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("Account", "id", "user-123"): mock_account}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
self._assert_exception_raised(
|
|
|
|
|
@ -517,55 +469,339 @@ class TestAccountService:
|
|
|
|
|
"user-123",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_load_user_no_current_tenant(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user has no current tenant but has available tenants."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
mock_available_tenant = AccountServiceTestDataFactory.create_tenant_join_mock(current=False)
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock for complex scenario
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): None, # No current tenant
|
|
|
|
|
("TenantAccountJoin", "order_by", "first_available"): mock_available_tenant, # First available tenant
|
|
|
|
|
def test_load_user_no_current_tenant(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user has no current tenant but has available tenants."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_available_tenant = TestAccountAssociatedDataFactory.create_tenant_join_mock(current=False)
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock for complex scenario
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): None, # No current tenant
|
|
|
|
|
("TenantAccountJoin", "order_by", "first_available"): mock_available_tenant, # First available tenant
|
|
|
|
|
}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = datetime.now()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result == mock_account
|
|
|
|
|
assert mock_available_tenant.current is True
|
|
|
|
|
self._assert_database_operations_called(mock_db_dependencies["db"])
|
|
|
|
|
|
|
|
|
|
def test_load_user_no_tenants(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user has no tenants at all."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock for no tenants scenario
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): None, # No current tenant
|
|
|
|
|
("TenantAccountJoin", "order_by", "first_available"): None, # No available tenants
|
|
|
|
|
}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = datetime.now()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestTenantService:
|
|
|
|
|
"""
|
|
|
|
|
Comprehensive unit tests for TenantService methods.
|
|
|
|
|
|
|
|
|
|
This test suite covers all tenant-related operations including:
|
|
|
|
|
- Tenant creation and management
|
|
|
|
|
- Member management and permissions
|
|
|
|
|
- Tenant switching
|
|
|
|
|
- Role updates and permission checks
|
|
|
|
|
- Error conditions and edge cases
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_db_dependencies(self):
|
|
|
|
|
"""Common mock setup for database dependencies."""
|
|
|
|
|
with patch("services.account_service.db") as mock_db:
|
|
|
|
|
mock_db.session.add = MagicMock()
|
|
|
|
|
mock_db.session.commit = MagicMock()
|
|
|
|
|
yield {
|
|
|
|
|
"db": mock_db,
|
|
|
|
|
}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = MagicMock()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_rsa_dependencies(self):
|
|
|
|
|
"""Mock setup for RSA-related functions."""
|
|
|
|
|
with patch("services.account_service.generate_key_pair") as mock_generate_key_pair:
|
|
|
|
|
yield mock_generate_key_pair
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result == mock_account
|
|
|
|
|
assert mock_available_tenant.current is True
|
|
|
|
|
self._assert_database_operations_called(mock_db_dependencies["db"])
|
|
|
|
|
|
|
|
|
|
def test_load_user_no_tenants(self, mock_db_dependencies):
|
|
|
|
|
"""Test user loading when user has no tenants at all."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = AccountServiceTestDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock for no tenants scenario
|
|
|
|
|
query_results = {
|
|
|
|
|
("Account", "id", "user-123"): mock_account,
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): None, # No current tenant
|
|
|
|
|
("TenantAccountJoin", "order_by", "first_available"): None, # No available tenants
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_external_service_dependencies(self):
|
|
|
|
|
"""Mock setup for external service dependencies."""
|
|
|
|
|
with (
|
|
|
|
|
patch("services.account_service.FeatureService") as mock_feature_service,
|
|
|
|
|
patch("services.account_service.BillingService") as mock_billing_service,
|
|
|
|
|
):
|
|
|
|
|
yield {
|
|
|
|
|
"feature_service": mock_feature_service,
|
|
|
|
|
"billing_service": mock_billing_service,
|
|
|
|
|
}
|
|
|
|
|
self._setup_smart_db_query_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock datetime
|
|
|
|
|
with patch("services.account_service.datetime") as mock_datetime:
|
|
|
|
|
mock_now = MagicMock()
|
|
|
|
|
mock_datetime.now.return_value = mock_now
|
|
|
|
|
mock_datetime.UTC = "UTC"
|
|
|
|
|
def _assert_database_operations_called(self, mock_db):
|
|
|
|
|
"""Helper method to verify database operations were called."""
|
|
|
|
|
mock_db.session.commit.assert_called()
|
|
|
|
|
|
|
|
|
|
def _assert_exception_raised(self, exception_type, callable_func, *args, **kwargs):
|
|
|
|
|
"""Helper method to verify that specific exception is raised."""
|
|
|
|
|
with pytest.raises(exception_type):
|
|
|
|
|
callable_func(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
# ==================== Tenant Creation Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_create_owner_tenant_if_not_exist_new_user(
|
|
|
|
|
self, mock_db_dependencies, mock_rsa_dependencies, mock_external_service_dependencies
|
|
|
|
|
):
|
|
|
|
|
"""Test creating owner tenant for new user without existing tenants."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock - no existing tenant joins
|
|
|
|
|
query_results = {
|
|
|
|
|
("TenantAccountJoin", "account_id", "user-123"): None,
|
|
|
|
|
("TenantAccountJoin", "tenant_id", "tenant-456"): None, # For has_roles check
|
|
|
|
|
}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Setup external service mocks
|
|
|
|
|
mock_external_service_dependencies[
|
|
|
|
|
"feature_service"
|
|
|
|
|
].get_system_features.return_value.is_allow_create_workspace = True
|
|
|
|
|
mock_external_service_dependencies[
|
|
|
|
|
"feature_service"
|
|
|
|
|
].get_system_features.return_value.license.workspaces.is_available.return_value = True
|
|
|
|
|
|
|
|
|
|
# Mock tenant creation
|
|
|
|
|
mock_tenant = MagicMock()
|
|
|
|
|
mock_tenant.id = "tenant-456"
|
|
|
|
|
mock_tenant.name = "Test User's Workspace"
|
|
|
|
|
|
|
|
|
|
# Mock database operations
|
|
|
|
|
mock_db_dependencies["db"].session.add = MagicMock()
|
|
|
|
|
|
|
|
|
|
# Mock RSA key generation
|
|
|
|
|
mock_rsa_dependencies.return_value = "mock_public_key"
|
|
|
|
|
|
|
|
|
|
# Mock has_roles method to return False (no existing owner)
|
|
|
|
|
with patch("services.account_service.TenantService.has_roles") as mock_has_roles:
|
|
|
|
|
mock_has_roles.return_value = False
|
|
|
|
|
|
|
|
|
|
# Mock Tenant creation to set proper ID
|
|
|
|
|
with patch("services.account_service.Tenant") as mock_tenant_class:
|
|
|
|
|
mock_tenant_instance = MagicMock()
|
|
|
|
|
mock_tenant_instance.id = "tenant-456"
|
|
|
|
|
mock_tenant_instance.name = "Test User's Workspace"
|
|
|
|
|
mock_tenant_class.return_value = mock_tenant_instance
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = AccountService.load_user("user-123")
|
|
|
|
|
TenantService.create_owner_tenant_if_not_exist(mock_account)
|
|
|
|
|
|
|
|
|
|
# Verify tenant was created with correct parameters
|
|
|
|
|
mock_db_dependencies["db"].session.add.assert_called()
|
|
|
|
|
|
|
|
|
|
# Get all calls to session.add
|
|
|
|
|
add_calls = mock_db_dependencies["db"].session.add.call_args_list
|
|
|
|
|
|
|
|
|
|
# Should have at least 2 calls: one for Tenant, one for TenantAccountJoin
|
|
|
|
|
assert len(add_calls) >= 2
|
|
|
|
|
|
|
|
|
|
# Verify Tenant was added with correct name
|
|
|
|
|
tenant_added = False
|
|
|
|
|
tenant_account_join_added = False
|
|
|
|
|
|
|
|
|
|
for call in add_calls:
|
|
|
|
|
added_object = call[0][0] # First argument of the call
|
|
|
|
|
|
|
|
|
|
# Check if it's a Tenant object
|
|
|
|
|
if hasattr(added_object, "name") and hasattr(added_object, "id"):
|
|
|
|
|
# This should be a Tenant object
|
|
|
|
|
assert added_object.name == "Test User's Workspace"
|
|
|
|
|
tenant_added = True
|
|
|
|
|
|
|
|
|
|
# Check if it's a TenantAccountJoin object
|
|
|
|
|
elif (
|
|
|
|
|
hasattr(added_object, "tenant_id")
|
|
|
|
|
and hasattr(added_object, "account_id")
|
|
|
|
|
and hasattr(added_object, "role")
|
|
|
|
|
):
|
|
|
|
|
# This should be a TenantAccountJoin object
|
|
|
|
|
assert added_object.tenant_id is not None
|
|
|
|
|
assert added_object.account_id == "user-123"
|
|
|
|
|
assert added_object.role == "owner"
|
|
|
|
|
tenant_account_join_added = True
|
|
|
|
|
|
|
|
|
|
assert tenant_added, "Tenant object was not added to database"
|
|
|
|
|
assert tenant_account_join_added, "TenantAccountJoin object was not added to database"
|
|
|
|
|
|
|
|
|
|
self._assert_database_operations_called(mock_db_dependencies["db"])
|
|
|
|
|
assert mock_rsa_dependencies.called, "RSA key generation was not called"
|
|
|
|
|
|
|
|
|
|
# ==================== Member Management Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_create_tenant_member_success(self, mock_db_dependencies):
|
|
|
|
|
"""Test successful tenant member creation."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_tenant = MagicMock()
|
|
|
|
|
mock_tenant.id = "tenant-456"
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock - no existing member
|
|
|
|
|
query_results = {("TenantAccountJoin", "tenant_id", "tenant-456"): None}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Mock database operations
|
|
|
|
|
mock_db_dependencies["db"].session.add = MagicMock()
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
result = TenantService.create_tenant_member(mock_tenant, mock_account, "normal")
|
|
|
|
|
|
|
|
|
|
# Verify member was created with correct parameters
|
|
|
|
|
assert result is not None
|
|
|
|
|
mock_db_dependencies["db"].session.add.assert_called_once()
|
|
|
|
|
|
|
|
|
|
# Verify the TenantAccountJoin object was added with correct parameters
|
|
|
|
|
added_tenant_account_join = mock_db_dependencies["db"].session.add.call_args[0][0]
|
|
|
|
|
assert added_tenant_account_join.tenant_id == "tenant-456"
|
|
|
|
|
assert added_tenant_account_join.account_id == "user-123"
|
|
|
|
|
assert added_tenant_account_join.role == "normal"
|
|
|
|
|
|
|
|
|
|
self._assert_database_operations_called(mock_db_dependencies["db"])
|
|
|
|
|
|
|
|
|
|
# ==================== Tenant Switching Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_switch_tenant_success(self):
|
|
|
|
|
"""Test successful tenant switching."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
mock_tenant_join = TestAccountAssociatedDataFactory.create_tenant_join_mock(
|
|
|
|
|
tenant_id="tenant-456", account_id="user-123", current=False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Mock the complex query in switch_tenant method
|
|
|
|
|
with patch("services.account_service.db") as mock_db:
|
|
|
|
|
# Mock the join query that returns the tenant_account_join
|
|
|
|
|
mock_query = MagicMock()
|
|
|
|
|
mock_filter = MagicMock()
|
|
|
|
|
mock_filter.first.return_value = mock_tenant_join
|
|
|
|
|
mock_query.filter.return_value = mock_filter
|
|
|
|
|
mock_query.join.return_value = mock_query
|
|
|
|
|
mock_db.session.query.return_value = mock_query
|
|
|
|
|
|
|
|
|
|
# Verify results
|
|
|
|
|
assert result is None
|
|
|
|
|
# Execute test
|
|
|
|
|
TenantService.switch_tenant(mock_account, "tenant-456")
|
|
|
|
|
|
|
|
|
|
# Verify tenant was switched
|
|
|
|
|
assert mock_tenant_join.current is True
|
|
|
|
|
self._assert_database_operations_called(mock_db)
|
|
|
|
|
|
|
|
|
|
def test_switch_tenant_no_tenant_id(self):
|
|
|
|
|
"""Test tenant switching without providing tenant ID."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_account = TestAccountAssociatedDataFactory.create_account_mock()
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
self._assert_exception_raised(ValueError, TenantService.switch_tenant, mock_account, None)
|
|
|
|
|
|
|
|
|
|
# ==================== Role Management Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_update_member_role_success(self):
|
|
|
|
|
"""Test successful member role update."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_tenant = MagicMock()
|
|
|
|
|
mock_tenant.id = "tenant-456"
|
|
|
|
|
mock_member = TestAccountAssociatedDataFactory.create_account_mock(account_id="member-789")
|
|
|
|
|
mock_operator = TestAccountAssociatedDataFactory.create_account_mock(account_id="operator-123")
|
|
|
|
|
mock_target_join = TestAccountAssociatedDataFactory.create_tenant_join_mock(
|
|
|
|
|
tenant_id="tenant-456", account_id="member-789", role="normal"
|
|
|
|
|
)
|
|
|
|
|
mock_operator_join = TestAccountAssociatedDataFactory.create_tenant_join_mock(
|
|
|
|
|
tenant_id="tenant-456", account_id="operator-123", role="owner"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Mock the database queries in update_member_role method
|
|
|
|
|
with patch("services.account_service.db") as mock_db:
|
|
|
|
|
# Mock the first query for operator permission check
|
|
|
|
|
mock_query1 = MagicMock()
|
|
|
|
|
mock_filter1 = MagicMock()
|
|
|
|
|
mock_filter1.first.return_value = mock_operator_join
|
|
|
|
|
mock_query1.filter_by.return_value = mock_filter1
|
|
|
|
|
|
|
|
|
|
# Mock the second query for target member
|
|
|
|
|
mock_query2 = MagicMock()
|
|
|
|
|
mock_filter2 = MagicMock()
|
|
|
|
|
mock_filter2.first.return_value = mock_target_join
|
|
|
|
|
mock_query2.filter_by.return_value = mock_filter2
|
|
|
|
|
|
|
|
|
|
# Make the query method return different mocks for different calls
|
|
|
|
|
mock_db.session.query.side_effect = [mock_query1, mock_query2]
|
|
|
|
|
|
|
|
|
|
# Execute test
|
|
|
|
|
TenantService.update_member_role(mock_tenant, mock_member, "admin", mock_operator)
|
|
|
|
|
|
|
|
|
|
# Verify role was updated
|
|
|
|
|
assert mock_target_join.role == "admin"
|
|
|
|
|
self._assert_database_operations_called(mock_db)
|
|
|
|
|
|
|
|
|
|
# ==================== Permission Check Tests ====================
|
|
|
|
|
|
|
|
|
|
def test_check_member_permission_success(self, mock_db_dependencies):
|
|
|
|
|
"""Test successful member permission check."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_tenant = MagicMock()
|
|
|
|
|
mock_tenant.id = "tenant-456"
|
|
|
|
|
mock_operator = TestAccountAssociatedDataFactory.create_account_mock(account_id="operator-123")
|
|
|
|
|
mock_member = TestAccountAssociatedDataFactory.create_account_mock(account_id="member-789")
|
|
|
|
|
mock_operator_join = TestAccountAssociatedDataFactory.create_tenant_join_mock(
|
|
|
|
|
tenant_id="tenant-456", account_id="operator-123", role="owner"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Setup smart database query mock
|
|
|
|
|
query_results = {("TenantAccountJoin", "tenant_id", "tenant-456"): mock_operator_join}
|
|
|
|
|
ServiceDbTestHelper.setup_db_query_filter_by_mock(mock_db_dependencies["db"], query_results)
|
|
|
|
|
|
|
|
|
|
# Execute test - should not raise exception
|
|
|
|
|
TenantService.check_member_permission(mock_tenant, mock_operator, mock_member, "add")
|
|
|
|
|
|
|
|
|
|
def test_check_member_permission_operate_self(self):
|
|
|
|
|
"""Test member permission check when operator tries to operate self."""
|
|
|
|
|
# Setup test data
|
|
|
|
|
mock_tenant = MagicMock()
|
|
|
|
|
mock_tenant.id = "tenant-456"
|
|
|
|
|
mock_operator = TestAccountAssociatedDataFactory.create_account_mock(account_id="operator-123")
|
|
|
|
|
|
|
|
|
|
# Execute test and verify exception
|
|
|
|
|
from services.errors.account import CannotOperateSelfError
|
|
|
|
|
|
|
|
|
|
self._assert_exception_raised(
|
|
|
|
|
CannotOperateSelfError,
|
|
|
|
|
TenantService.check_member_permission,
|
|
|
|
|
mock_tenant,
|
|
|
|
|
mock_operator,
|
|
|
|
|
mock_operator, # Same as operator
|
|
|
|
|
"add",
|
|
|
|
|
)
|
|
|
|
|
|