diff --git a/api/core/model_runtime/model_providers/model_provider_factory.py b/api/core/model_runtime/model_providers/model_provider_factory.py index ad46f64ec3..e7bdc5dfd8 100644 --- a/api/core/model_runtime/model_providers/model_provider_factory.py +++ b/api/core/model_runtime/model_providers/model_provider_factory.py @@ -24,6 +24,7 @@ from core.plugin.entities.plugin import ModelProviderID from core.plugin.entities.plugin_daemon import PluginModelProviderEntity from core.plugin.impl.asset import PluginAssetManager from core.plugin.impl.model import PluginModelClient +from extensions.ext_redis import redis_client logger = logging.getLogger(__name__) @@ -83,22 +84,61 @@ class ModelProviderFactory: contexts.plugin_model_providers.set(None) contexts.plugin_model_providers_lock.set(Lock()) - with contexts.plugin_model_providers_lock.get(): - plugin_model_providers = contexts.plugin_model_providers.get() - if plugin_model_providers is not None: + # 1. First check (no lock) - fast path + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers + + # 2. SLOW OPERATION (NO LOCK HELD) + # Fetch the data outside of any lock. Other threads can do this at the same time. + # It's okay if multiple threads fetch simultaneously; we'll just use the first + # result that gets written. + try: + plugin_providers_data = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + except Exception as e: + # Handle potential exceptions from the HTTP request + logger.exception(f"Failed to fetch plugin model providers for tenant {self.tenant_id}") + return [] + + # 3. Acquire lock ONLY to write to the shared cache + lock_key = f"plugin_model_providers_lock:{self.tenant_id}" + try: + with redis_client.lock(lock_key, timeout=10): # Shorter timeout is fine now + # 4. Double-check inside the lock + # Another thread might have finished its fetch and written to cache while we were fetching. + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers # Return the data from the other thread + + # 5. Write to cache (this is fast) + plugin_model_providers = [] + for provider in plugin_providers_data: + provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider + plugin_model_providers.append(provider) + + contexts.plugin_model_providers.set(plugin_model_providers) return plugin_model_providers - plugin_model_providers = [] - contexts.plugin_model_providers.set(plugin_model_providers) + except Exception as e: + # Fallback to original implementation if Redis is not available + logger.warning(f"Redis lock failed, falling back to thread-local lock: {e}") - # Fetch plugin model providers - plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + with contexts.plugin_model_providers_lock.get(): + plugin_model_providers = contexts.plugin_model_providers.get() + if plugin_model_providers is not None: + return plugin_model_providers - for provider in plugin_providers: - provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider - plugin_model_providers.append(provider) + plugin_model_providers = [] + contexts.plugin_model_providers.set(plugin_model_providers) - return plugin_model_providers + # Fetch plugin model providers + plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id) + + for provider in plugin_providers: + provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider + plugin_model_providers.append(provider) + + return plugin_model_providers def get_provider_schema(self, provider: str) -> ProviderEntity: """