|
|
|
|
@ -1,5 +1,7 @@
|
|
|
|
|
import socket
|
|
|
|
|
|
|
|
|
|
import redis
|
|
|
|
|
from redis.connection import Connection, SSLConnection
|
|
|
|
|
from redis.connection import SSLConnection
|
|
|
|
|
from redis.sentinel import Sentinel
|
|
|
|
|
|
|
|
|
|
from configs import dify_config
|
|
|
|
|
@ -42,9 +44,56 @@ class RedisClientWrapper(redis.Redis):
|
|
|
|
|
redis_client = RedisClientWrapper()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GeventSafeConnection(redis.Connection):
|
|
|
|
|
socket_socket_class: type[socket.socket] | None = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def _connect(self):
|
|
|
|
|
"Create a TCP socket connection"
|
|
|
|
|
# we want to mimic what socket.create_connection does to support
|
|
|
|
|
# ipv4/ipv6, but we want to set options prior to calling
|
|
|
|
|
# socket.connect()
|
|
|
|
|
err = None
|
|
|
|
|
for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM):
|
|
|
|
|
family, socktype, proto, canonname, socket_address = res
|
|
|
|
|
sock = None
|
|
|
|
|
try:
|
|
|
|
|
socket_socket_class = self.socket_socket_class or socket.socket
|
|
|
|
|
sock = socket_socket_class(family, socktype, proto)
|
|
|
|
|
# TCP_NODELAY
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
|
|
|
|
|
|
# TCP_KEEPALIVE
|
|
|
|
|
if self.socket_keepalive:
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
|
|
|
|
for k, v in self.socket_keepalive_options.items():
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, k, v)
|
|
|
|
|
|
|
|
|
|
# set the socket_connect_timeout before we connect
|
|
|
|
|
sock.settimeout(self.socket_connect_timeout)
|
|
|
|
|
|
|
|
|
|
# connect
|
|
|
|
|
sock.connect(socket_address)
|
|
|
|
|
|
|
|
|
|
# set the socket_timeout now that we're connected
|
|
|
|
|
sock.settimeout(self.socket_timeout)
|
|
|
|
|
return sock
|
|
|
|
|
|
|
|
|
|
except OSError as _:
|
|
|
|
|
err = _
|
|
|
|
|
if sock is not None:
|
|
|
|
|
sock.close()
|
|
|
|
|
|
|
|
|
|
if err is not None:
|
|
|
|
|
raise err
|
|
|
|
|
raise OSError("socket.getaddrinfo returned an empty list")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_app(app):
|
|
|
|
|
global redis_client
|
|
|
|
|
connection_class = Connection
|
|
|
|
|
connection_class = GeventSafeConnection
|
|
|
|
|
if dify_config.REDIS_USE_SSL:
|
|
|
|
|
connection_class = SSLConnection
|
|
|
|
|
|
|
|
|
|
|