|
|
|
|
@ -7,13 +7,15 @@ from typing import Any, Optional
|
|
|
|
|
import pika
|
|
|
|
|
from pika.exceptions import AMQPChannelError, AMQPConnectionError, StreamLostError
|
|
|
|
|
|
|
|
|
|
from configs import dify_config
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
class RabbitMQClient:
|
|
|
|
|
"""
|
|
|
|
|
RabbitMQ客户端,实现自动重连和连接管理
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, queue_name: str):
|
|
|
|
|
self.queue_name = queue_name
|
|
|
|
|
self._connection_params = self._get_connection_params()
|
|
|
|
|
@ -26,12 +28,13 @@ class RabbitMQClient:
|
|
|
|
|
def _get_connection_params(self) -> pika.ConnectionParameters:
|
|
|
|
|
"""获取RabbitMQ连接参数"""
|
|
|
|
|
RABBITMQ_CONFIG = {
|
|
|
|
|
'host': '127.0.0.1',
|
|
|
|
|
'host': dify_config.MQ_HOST,
|
|
|
|
|
'port': 5672,
|
|
|
|
|
'username': 'apitable',
|
|
|
|
|
'password': 'apitable@com',
|
|
|
|
|
'password': 'apitable@com',
|
|
|
|
|
'virtual_host': '/'
|
|
|
|
|
}
|
|
|
|
|
print('MQ_HOST:' + RABBITMQ_CONFIG['host'])
|
|
|
|
|
print('virtual_host:' + RABBITMQ_CONFIG['virtual_host'])
|
|
|
|
|
return pika.ConnectionParameters(
|
|
|
|
|
host=RABBITMQ_CONFIG['host'],
|
|
|
|
|
@ -67,7 +70,7 @@ class RabbitMQClient:
|
|
|
|
|
def _ensure_connection(self) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
确保RabbitMQ连接和通道可用
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 连接是否成功
|
|
|
|
|
"""
|
|
|
|
|
@ -84,11 +87,11 @@ class RabbitMQClient:
|
|
|
|
|
def publish_json(self, message: dict[str, Any], max_retries: int = 3) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
发布任意JSON消息到队列
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
message: 要发送的消息字典
|
|
|
|
|
max_retries: 最大重试次数
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 发送是否成功
|
|
|
|
|
"""
|
|
|
|
|
@ -96,7 +99,7 @@ class RabbitMQClient:
|
|
|
|
|
while retries < max_retries and not self._stopping:
|
|
|
|
|
try:
|
|
|
|
|
channel = self._get_channel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 发布消息并等待确认
|
|
|
|
|
channel.basic_publish(
|
|
|
|
|
exchange='',
|
|
|
|
|
@ -111,7 +114,7 @@ class RabbitMQClient:
|
|
|
|
|
logger.info(f"消息发送成功: {message}")
|
|
|
|
|
self._reconnect_delay = 1 # 重置重连延迟
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except (AMQPConnectionError, AMQPChannelError, StreamLostError) as e:
|
|
|
|
|
# logger.exception(f"发送消息失败 (尝试 {retries + 1}/{max_retries}): {str(e)}")
|
|
|
|
|
# 清除当前线程的连接和通道
|
|
|
|
|
@ -125,21 +128,21 @@ class RabbitMQClient:
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# logger.exception(f"发送消息时发生未知错误: {str(e)}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def publish_message(self, task_id: str, action: str, status: Optional[str] = None,
|
|
|
|
|
extra: Optional[str] = None, reason: Optional[str] = None) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
发布任务状态更新消息
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
action: 动作类型
|
|
|
|
|
status: 状态(FINISHED或EXCEPTION)
|
|
|
|
|
extra: 额外信息(字符串)
|
|
|
|
|
reason: 原因(异常时使用)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 是否发送成功
|
|
|
|
|
"""
|
|
|
|
|
@ -147,14 +150,14 @@ class RabbitMQClient:
|
|
|
|
|
"taskId": task_id,
|
|
|
|
|
"action": action
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
message["status"] = status
|
|
|
|
|
if extra:
|
|
|
|
|
message["extra"] = extra # 直接设置字符串
|
|
|
|
|
if reason:
|
|
|
|
|
message["reason"] = reason[:500] if reason else None # 限制reason长度
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return self.publish_json(message)
|
|
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
|
@ -165,11 +168,11 @@ class RabbitMQClient:
|
|
|
|
|
self._local.channel.close()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.info(f"关闭通道时发生错误: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(self._local, 'connection') and self._local.connection and not self._local.connection.is_closed:
|
|
|
|
|
try:
|
|
|
|
|
self._local.connection.close()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.info(f"关闭连接时发生错误: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("RabbitMQ连接已关闭")
|
|
|
|
|
|