From c51cb5bc0a26a322fef6063cba3ea05aaf6d6359 Mon Sep 17 00:00:00 2001 From: baonudesifeizhai Date: Sun, 20 Jul 2025 04:08:23 -0400 Subject: [PATCH] fix: resolve plugin model providers lock contention issue - Move HTTP requests outside of locks to prevent blocking - Implement Redis distributed lock with fallback to thread-local lock - Add double-checked locking pattern for better performance - Fix logging to use logger.exception instead of logger.error - Improve concurrency by allowing multiple threads to fetch data simultaneously - Maintain cache consistency with Redis lock protecting cache writes Closes #22227 --- .../model_providers/model_provider_factory.py | 62 +++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/api/core/model_runtime/model_providers/model_provider_factory.py b/api/core/model_runtime/model_providers/model_provider_factory.py index ad46f64ec3..e7bdc5dfd8 100644 --- a/api/core/model_runtime/model_providers/model_provider_factory.py +++ b/api/core/model_runtime/model_providers/model_provider_factory.py @@ -24,6 +24,7 @@ from core.plugin.entities.plugin import ModelProviderID from core.plugin.entities.plugin_daemon import PluginModelProviderEntity from core.plugin.impl.asset import PluginAssetManager from core.plugin.impl.model import PluginModelClient +from extensions.ext_redis import redis_client logger = logging.getLogger(__name__) @@ -83,22 +84,61 @@ class ModelProviderFactory: contexts.plugin_model_providers.set(None) contexts.plugin_model_providers_lock.set(Lock()) - with contexts.plugin_model_providers_lock.get(): - plugin_model_providers = contexts.plugin_model_providers.get() - if plugin_model_providers is not None: + # 1. First check (no lock) - fast path + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers + + # 2. SLOW OPERATION (NO LOCK HELD) + # Fetch the data outside of any lock. Other threads can do this at the same time. + # It's okay if multiple threads fetch simultaneously; we'll just use the first + # result that gets written. + try: + plugin_providers_data = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + except Exception as e: + # Handle potential exceptions from the HTTP request + logger.exception(f"Failed to fetch plugin model providers for tenant {self.tenant_id}") + return [] + + # 3. Acquire lock ONLY to write to the shared cache + lock_key = f"plugin_model_providers_lock:{self.tenant_id}" + try: + with redis_client.lock(lock_key, timeout=10): # Shorter timeout is fine now + # 4. Double-check inside the lock + # Another thread might have finished its fetch and written to cache while we were fetching. + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers # Return the data from the other thread + + # 5. Write to cache (this is fast) + plugin_model_providers = [] + for provider in plugin_providers_data: + provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider + plugin_model_providers.append(provider) + + contexts.plugin_model_providers.set(plugin_model_providers) return plugin_model_providers - plugin_model_providers = [] - contexts.plugin_model_providers.set(plugin_model_providers) + except Exception as e: + # Fallback to original implementation if Redis is not available + logger.warning(f"Redis lock failed, falling back to thread-local lock: {e}") - # Fetch plugin model providers - plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + with contexts.plugin_model_providers_lock.get(): + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers - for provider in plugin_providers: - provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider - plugin_model_providers.append(provider) + plugin_model_providers = [] + contexts.plugin_model_providers.set(plugin_model_providers) - return plugin_model_providers + # Fetch plugin model providers + plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + + for provider in plugin_providers: + provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider + plugin_model_providers.append(provider) + + return plugin_model_providers def get_provider_schema(self, provider: str) -> ProviderEntity: """