|
|
|
|
@ -1,4 +1,5 @@
|
|
|
|
|
from typing import Any, Union
|
|
|
|
|
import functools
|
|
|
|
|
from typing import Any, Callable, Generic, TypeVar, Union
|
|
|
|
|
|
|
|
|
|
import redis
|
|
|
|
|
from redis.cluster import ClusterNode, RedisCluster
|
|
|
|
|
@ -8,6 +9,44 @@ from redis.sentinel import Sentinel
|
|
|
|
|
from configs import dify_config
|
|
|
|
|
from dify_app import DifyApp
|
|
|
|
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KeyPrefixMethodProxy(Generic[T]):
|
|
|
|
|
"""
|
|
|
|
|
KeyPrefixMethodProxy is a generic class used to proxy method calls.
|
|
|
|
|
It can preprocess parameters before calling a method, such as prefixing specific keys.
|
|
|
|
|
Key features include:
|
|
|
|
|
If a method's argument contains'key', prefix it.
|
|
|
|
|
If the method's positional parameter contains a key of type string, prefix it.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, client: T, prefix: str):
|
|
|
|
|
self._client = client
|
|
|
|
|
self._prefix = prefix
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, item):
|
|
|
|
|
attr = getattr(self._client, item)
|
|
|
|
|
if callable(attr):
|
|
|
|
|
return self._wrap_method(attr)
|
|
|
|
|
return attr
|
|
|
|
|
|
|
|
|
|
def _wrap_method(self, method: Callable) -> Callable:
|
|
|
|
|
@functools.wraps(method)
|
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
|
if "key" in kwargs:
|
|
|
|
|
if isinstance(kwargs["key"], str):
|
|
|
|
|
kwargs["key"] = self._add_prefix(kwargs["key"])
|
|
|
|
|
elif args:
|
|
|
|
|
if args and isinstance(args[0], str):
|
|
|
|
|
args = (self._add_prefix(args[0]),) + args[1:]
|
|
|
|
|
return method(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
def _add_prefix(self, key: str) -> str:
|
|
|
|
|
return f"{self._prefix}:{key}" if self._prefix else key
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RedisClientWrapper:
|
|
|
|
|
"""
|
|
|
|
|
@ -32,14 +71,19 @@ class RedisClientWrapper:
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self._client = None
|
|
|
|
|
self._prefix = None
|
|
|
|
|
|
|
|
|
|
def initialize(self, client):
|
|
|
|
|
def initialize(self, client, prefix=None):
|
|
|
|
|
if self._client is None:
|
|
|
|
|
self._client = client
|
|
|
|
|
if prefix is not None:
|
|
|
|
|
self._prefix = prefix
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, item):
|
|
|
|
|
if self._client is None:
|
|
|
|
|
raise RuntimeError("Redis client is not initialized. Call init_app first.")
|
|
|
|
|
if self._prefix is not None:
|
|
|
|
|
return getattr(KeyPrefixMethodProxy(self._client, self._prefix), item)
|
|
|
|
|
return getattr(self._client, item)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -75,7 +119,7 @@ def init_app(app: DifyApp):
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
|
|
|
|
|
redis_client.initialize(master)
|
|
|
|
|
redis_client.initialize(master, prefix=dify_config.REDIS_KEY_PREFIX)
|
|
|
|
|
elif dify_config.REDIS_USE_CLUSTERS:
|
|
|
|
|
assert dify_config.REDIS_CLUSTERS is not None, "REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True"
|
|
|
|
|
nodes = [
|
|
|
|
|
@ -83,7 +127,10 @@ def init_app(app: DifyApp):
|
|
|
|
|
for node in dify_config.REDIS_CLUSTERS.split(",")
|
|
|
|
|
]
|
|
|
|
|
# FIXME: mypy error here, try to figure out how to fix it
|
|
|
|
|
redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD)) # type: ignore
|
|
|
|
|
redis_client.initialize(
|
|
|
|
|
RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD), # type: ignore
|
|
|
|
|
prefix=dify_config.REDIS_KEY_PREFIX,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
redis_params.update(
|
|
|
|
|
{
|
|
|
|
|
@ -93,6 +140,6 @@ def init_app(app: DifyApp):
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
pool = redis.ConnectionPool(**redis_params)
|
|
|
|
|
redis_client.initialize(redis.Redis(connection_pool=pool))
|
|
|
|
|
redis_client.initialize(redis.Redis(connection_pool=pool), prefix=dify_config.REDIS_KEY_PREFIX)
|
|
|
|
|
|
|
|
|
|
app.extensions["redis"] = redis_client
|
|
|
|
|
|