diff --git a/api/tests/unit_tests/libs/test_passport.py b/api/tests/unit_tests/libs/test_passport.py index fe61feaa99..f125611612 100644 --- a/api/tests/unit_tests/libs/test_passport.py +++ b/api/tests/unit_tests/libs/test_passport.py @@ -194,86 +194,3 @@ class TestPassportService: decoded = passport_service.verify(token) assert decoded == payload - # Real-world usage scenarios - def test_should_handle_web_user_authentication_scenario(self, passport_service): - """Test typical web user authentication flow""" - # User login - issue token - user_payload = { - "user_id": "user-123", - "app_code": "web-app", - "app_id": "app-456", - "iss": "dify", - "iat": int(time.time()), - } - token = passport_service.issue(user_payload) - - # Verify token on subsequent request - decoded = passport_service.verify(token) - - assert decoded["user_id"] == user_payload["user_id"] - assert decoded["app_code"] == user_payload["app_code"] - assert decoded["app_id"] == user_payload["app_id"] - assert "iat" in decoded - - def test_should_handle_api_bearer_token_scenario(self, passport_service): - """Test API authentication with Bearer token""" - # API client gets token - api_payload = { - "client_id": "api-client-789", - "scopes": ["read", "write"], - "iat": int(time.time()), - "exp": int(time.time()) + 3600, # 1 hour expiry - } - token = passport_service.issue(api_payload) - - # Verify token in API request - decoded = passport_service.verify(token) - - assert decoded["client_id"] == api_payload["client_id"] - assert decoded["scopes"] == api_payload["scopes"] - assert decoded["exp"] > time.time() # Not expired - - def test_should_handle_token_refresh_scenario(self, passport_service): - """Test token refresh scenario""" - # Original token - original_payload = { - "user_id": "123", - "session_id": "session-abc", - "iat": int(time.time()), - } - original_token = passport_service.issue(original_payload) - decoded = passport_service.verify(original_token) - - # Issue new token with updated timestamp - refresh_payload = { - **decoded, - "iat": int(time.time()), - "refreshed": True, - } - refreshed_token = passport_service.issue(refresh_payload) - - # Verify refreshed token - refreshed_decoded = passport_service.verify(refreshed_token) - - assert refreshed_decoded["user_id"] == original_payload["user_id"] - assert refreshed_decoded["session_id"] == original_payload["session_id"] - assert refreshed_decoded["refreshed"] is True - assert refreshed_decoded["iat"] >= decoded["iat"] - - # Concurrent access test - def test_should_handle_concurrent_token_operations(self, passport_service): - """Test concurrent token issue and verify operations""" - import concurrent.futures - - def issue_and_verify(index): - payload = {"thread_id": index, "data": f"thread-{index}"} - token = passport_service.issue(payload) - decoded = passport_service.verify(token) - return decoded["thread_id"] == index - - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - futures = [executor.submit(issue_and_verify, i) for i in range(100)] - results = [future.result() for future in concurrent.futures.as_completed(futures)] - - assert all(results) - assert len(results) == 100