feat: When using redis cluster for caching, add global_prefix to all keys cached by redis

pull/13652/head
chenjh3 1 year ago
parent 6529240da6
commit c9d7e3b5cb

@ -36,6 +36,7 @@ REDIS_USERNAME=
REDIS_PASSWORD=difyai123456 REDIS_PASSWORD=difyai123456
REDIS_USE_SSL=false REDIS_USE_SSL=false
REDIS_DB=0 REDIS_DB=0
REDIS_KEY_PREFIX=
# redis Sentinel configuration. # redis Sentinel configuration.
REDIS_USE_SENTINEL=false REDIS_USE_SENTINEL=false

@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
default=0, default=0,
) )
REDIS_KEY_PREFIX: Optional[str] = Field(
description="Redis global string key prefix (if required)",
default=None,
)
REDIS_USE_SSL: bool = Field( REDIS_USE_SSL: bool = Field(
description="Enable SSL/TLS for the Redis connection", description="Enable SSL/TLS for the Redis connection",
default=False, default=False,

@ -2,6 +2,7 @@ import logging
import time import time
import uuid import uuid
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
from configs import dify_config
from datetime import timedelta from datetime import timedelta
from typing import Any, Optional, Union from typing import Any, Optional, Union
@ -12,8 +13,9 @@ logger = logging.getLogger(__name__)
class RateLimit: class RateLimit:
_MAX_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:max_active_requests" _KEY_PREFIX = dify_config.REDIS_KEY_PREFIX if dify_config.REDIS_KEY_PREFIX is not None else "dify"
_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:active_requests" _MAX_ACTIVE_REQUESTS_KEY = _KEY_PREFIX + ":rate_limit:{}:max_active_requests"
_ACTIVE_REQUESTS_KEY = _KEY_PREFIX + ":rate_limit:{}:active_requests"
_UNLIMITED_REQUEST_ID = "unlimited_request_id" _UNLIMITED_REQUEST_ID = "unlimited_request_id"
_REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes _REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes
_ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes _ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes

@ -1,4 +1,5 @@
from typing import Any, Union from typing import Any, Union, Callable, TypeVar, Generic
import functools
import redis import redis
from redis.cluster import ClusterNode, RedisCluster from redis.cluster import ClusterNode, RedisCluster
@ -8,6 +9,42 @@ from redis.sentinel import Sentinel
from configs import dify_config from configs import dify_config
from dify_app import DifyApp from dify_app import DifyApp
T = TypeVar('T')
class KeyPrefixMethodProxy(Generic[T]):
"""
KeyPrefixMethodProxy is a generic class used to proxy method calls.
It can preprocess parameters before calling a method, such as prefixing specific keys.
Key features include:
If a method's argument contains'key', prefix it.
If the method's positional parameter contains a key of type string, prefix it.
"""
def __init__(self, client: T, prefix: str):
self._client = client
self._prefix = prefix
def __getattr__(self, item):
attr = getattr(self._client, item)
if callable(attr):
return self._wrap_method(attr)
return attr
def _wrap_method(self, method: Callable) -> Callable:
@functools.wraps(method)
def wrapper(*args, **kwargs):
if 'key' in kwargs:
if isinstance(kwargs['key'], str):
kwargs['key'] = self._add_prefix(kwargs['key'])
elif args:
if args and isinstance(args[0], str):
args = (self._add_prefix(args[0]),) + args[1:]
return method(*args, **kwargs)
return wrapper
def _add_prefix(self, key: str) -> str:
return f"{self._prefix}:{key}" if self._prefix else key
class RedisClientWrapper: class RedisClientWrapper:
""" """
@ -32,14 +69,19 @@ class RedisClientWrapper:
def __init__(self): def __init__(self):
self._client = None self._client = None
self._prefix = None
def initialize(self, client): def initialize(self, client, prefix=None):
if self._client is None: if self._client is None:
self._client = client self._client = client
if prefix is not None:
self._prefix = prefix
def __getattr__(self, item): def __getattr__(self, item):
if self._client is None: if self._client is None:
raise RuntimeError("Redis client is not initialized. Call init_app first.") raise RuntimeError("Redis client is not initialized. Call init_app first.")
if self._prefix is not None:
return getattr(KeyPrefixMethodProxy(self._client, self._prefix), item)
return getattr(self._client, item) return getattr(self._client, item)
@ -75,7 +117,7 @@ def init_app(app: DifyApp):
}, },
) )
master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params) master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
redis_client.initialize(master) redis_client.initialize(master, prefix=dify_config.REDIS_KEY_PREFIX)
elif dify_config.REDIS_USE_CLUSTERS: elif dify_config.REDIS_USE_CLUSTERS:
assert dify_config.REDIS_CLUSTERS is not None, "REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True" assert dify_config.REDIS_CLUSTERS is not None, "REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True"
nodes = [ nodes = [
@ -83,7 +125,7 @@ def init_app(app: DifyApp):
for node in dify_config.REDIS_CLUSTERS.split(",") for node in dify_config.REDIS_CLUSTERS.split(",")
] ]
# FIXME: mypy error here, try to figure out how to fix it # FIXME: mypy error here, try to figure out how to fix it
redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD)) # type: ignore redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD), prefix=dify_config.REDIS_KEY_PREFIX) # type: ignore
else: else:
redis_params.update( redis_params.update(
{ {
@ -93,6 +135,6 @@ def init_app(app: DifyApp):
} }
) )
pool = redis.ConnectionPool(**redis_params) pool = redis.ConnectionPool(**redis_params)
redis_client.initialize(redis.Redis(connection_pool=pool)) redis_client.initialize(redis.Redis(connection_pool=pool), prefix=dify_config.REDIS_KEY_PREFIX)
app.extensions["redis"] = redis_client app.extensions["redis"] = redis_client

Loading…
Cancel
Save