pull/22671/merge
baonudesifeizhai 7 months ago committed by GitHub
commit 903c5f077b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,7 @@ from core.plugin.entities.plugin import ModelProviderID
from core.plugin.entities.plugin_daemon import PluginModelProviderEntity from core.plugin.entities.plugin_daemon import PluginModelProviderEntity
from core.plugin.impl.asset import PluginAssetManager from core.plugin.impl.asset import PluginAssetManager
from core.plugin.impl.model import PluginModelClient from core.plugin.impl.model import PluginModelClient
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -83,22 +84,61 @@ class ModelProviderFactory:
contexts.plugin_model_providers.set(None) contexts.plugin_model_providers.set(None)
contexts.plugin_model_providers_lock.set(Lock()) contexts.plugin_model_providers_lock.set(Lock())
with contexts.plugin_model_providers_lock.get(): # 1. First check (no lock) - fast path
plugin_model_providers = contexts.plugin_model_providers.get() plugin_model_providers = contexts.plugin_model_providers.get()
if plugin_model_providers is not None: if plugin_model_providers is not None:
return plugin_model_providers
# 2. SLOW OPERATION (NO LOCK HELD)
# Fetch the data outside of any lock. Other threads can do this at the same time.
# It's okay if multiple threads fetch simultaneously; we'll just use the first
# result that gets written.
try:
plugin_providers_data = self.plugin_model_manager.fetch_model_providers(self.tenant_id)
except Exception as e:
# Handle potential exceptions from the HTTP request
logger.exception(f"Failed to fetch plugin model providers for tenant {self.tenant_id}")
return []
# 3. Acquire lock ONLY to write to the shared cache
lock_key = f"plugin_model_providers_lock:{self.tenant_id}"
try:
with redis_client.lock(lock_key, timeout=10): # Shorter timeout is fine now
# 4. Double-check inside the lock
# Another thread might have finished its fetch and written to cache while we were fetching.
plugin_model_providers = contexts.plugin_model_providers.get()
if plugin_model_providers is not None:
return plugin_model_providers # Return the data from the other thread
# 5. Write to cache (this is fast)
plugin_model_providers = []
for provider in plugin_providers_data:
provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider
plugin_model_providers.append(provider)
contexts.plugin_model_providers.set(plugin_model_providers)
return plugin_model_providers return plugin_model_providers
plugin_model_providers = [] except Exception as e:
contexts.plugin_model_providers.set(plugin_model_providers) # Fallback to original implementation if Redis is not available
logger.warning(f"Redis lock failed, falling back to thread-local lock: {e}")
# Fetch plugin model providers with contexts.plugin_model_providers_lock.get():
plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id) plugin_model_providers = contexts.plugin_model_providers.get()
if plugin_model_providers is not None:
return plugin_model_providers
for provider in plugin_providers: plugin_model_providers = []
provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider contexts.plugin_model_providers.set(plugin_model_providers)
plugin_model_providers.append(provider)
return plugin_model_providers # Fetch plugin model providers
plugin_providers = self.plugin_model_manager.fetch_model_providers(self.tenant_id)
for provider in plugin_providers:
provider.declaration.provider = provider.plugin_id + "/" + provider.declaration.provider
plugin_model_providers.append(provider)
return plugin_model_providers
def get_provider_schema(self, provider: str) -> ProviderEntity: def get_provider_schema(self, provider: str) -> ProviderEntity:
""" """

Loading…
Cancel
Save