diff --git a/api/.env.example b/api/.env.example index 7154272376..4620087667 100644 --- a/api/.env.example +++ b/api/.env.example @@ -500,4 +500,5 @@ QUEUE_MONITOR_ALERT_EMAILS= QUEUE_MONITOR_INTERVAL=30 -ZZ_DIFY_STARTED_URL=http://127.0.0.1:8077/ps/api/u/difyStarted +ZZ_DIFY_STARTED_URL=http://192.168.1.117:8077/ps/api/u/difyStarted +MQ_HOST=192.168.1.4 diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index 2dcf1710b0..5a69ed3018 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -25,6 +25,7 @@ from .vdb.elasticsearch_config import ElasticsearchConfig from .vdb.huawei_cloud_config import HuaweiCloudConfig from .vdb.lindorm_config import LindormConfig from .vdb.milvus_config import MilvusConfig +from .vdb.mq_config import MqInfo from .vdb.myscale_config import MyScaleConfig from .vdb.oceanbase_config import OceanBaseVectorConfig from .vdb.opengauss_config import OpenGaussConfig @@ -301,6 +302,7 @@ class MiddlewareConfig( HuaweiCloudConfig, MilvusConfig, MyScaleConfig, + MqInfo, OpenSearchConfig, OracleConfig, PGVectorConfig, diff --git a/api/configs/middleware/vdb/mq_config.py b/api/configs/middleware/vdb/mq_config.py index e69de29bb2..3e94bf5d18 100644 --- a/api/configs/middleware/vdb/mq_config.py +++ b/api/configs/middleware/vdb/mq_config.py @@ -0,0 +1,14 @@ +from pydantic import Field +from pydantic_settings import BaseSettings + + +class MqInfo(BaseSettings): + """ + Packaging build information + """ + + MQ_HOST: str = Field( + description="Rabbit Mq host", + default="127.0.0.1", + ) + diff --git a/api/core/workflow/nodes/mq/rabbitmq_client.py b/api/core/workflow/nodes/mq/rabbitmq_client.py index 0414c329f3..61fba62c87 100644 --- a/api/core/workflow/nodes/mq/rabbitmq_client.py +++ b/api/core/workflow/nodes/mq/rabbitmq_client.py @@ -7,13 +7,15 @@ from typing import Any, Optional import pika from pika.exceptions import AMQPChannelError, AMQPConnectionError, StreamLostError +from configs import dify_config + logger = logging.getLogger(__name__) class RabbitMQClient: """ RabbitMQ客户端,实现自动重连和连接管理 """ - + def __init__(self, queue_name: str): self.queue_name = queue_name self._connection_params = self._get_connection_params() @@ -26,12 +28,13 @@ class RabbitMQClient: def _get_connection_params(self) -> pika.ConnectionParameters: """获取RabbitMQ连接参数""" RABBITMQ_CONFIG = { - 'host': '127.0.0.1', + 'host': dify_config.MQ_HOST, 'port': 5672, 'username': 'apitable', - 'password': 'apitable@com', + 'password': 'apitable@com', 'virtual_host': '/' } + print('MQ_HOST:' + RABBITMQ_CONFIG['host']) print('virtual_host:' + RABBITMQ_CONFIG['virtual_host']) return pika.ConnectionParameters( host=RABBITMQ_CONFIG['host'], @@ -67,7 +70,7 @@ class RabbitMQClient: def _ensure_connection(self) -> bool: """ 确保RabbitMQ连接和通道可用 - + Returns: bool: 连接是否成功 """ @@ -84,11 +87,11 @@ class RabbitMQClient: def publish_json(self, message: dict[str, Any], max_retries: int = 3) -> bool: """ 发布任意JSON消息到队列 - + Args: message: 要发送的消息字典 max_retries: 最大重试次数 - + Returns: bool: 发送是否成功 """ @@ -96,7 +99,7 @@ class RabbitMQClient: while retries < max_retries and not self._stopping: try: channel = self._get_channel() - + # 发布消息并等待确认 channel.basic_publish( exchange='', @@ -111,7 +114,7 @@ class RabbitMQClient: logger.info(f"消息发送成功: {message}") self._reconnect_delay = 1 # 重置重连延迟 return True - + except (AMQPConnectionError, AMQPChannelError, StreamLostError) as e: # logger.exception(f"发送消息失败 (尝试 {retries + 1}/{max_retries}): {str(e)}") # 清除当前线程的连接和通道 @@ -125,21 +128,21 @@ class RabbitMQClient: except Exception as e: # logger.exception(f"发送消息时发生未知错误: {str(e)}") return False - + return False def publish_message(self, task_id: str, action: str, status: Optional[str] = None, extra: Optional[str] = None, reason: Optional[str] = None) -> bool: """ 发布任务状态更新消息 - + Args: task_id: 任务ID action: 动作类型 status: 状态(FINISHED或EXCEPTION) extra: 额外信息(字符串) reason: 原因(异常时使用) - + Returns: bool: 是否发送成功 """ @@ -147,14 +150,14 @@ class RabbitMQClient: "taskId": task_id, "action": action } - + if status: message["status"] = status if extra: message["extra"] = extra # 直接设置字符串 if reason: message["reason"] = reason[:500] if reason else None # 限制reason长度 - + return self.publish_json(message) def close(self) -> None: @@ -165,11 +168,11 @@ class RabbitMQClient: self._local.channel.close() except Exception as e: logger.info(f"关闭通道时发生错误: {str(e)}") - + if hasattr(self._local, 'connection') and self._local.connection and not self._local.connection.is_closed: try: self._local.connection.close() except Exception as e: logger.info(f"关闭连接时发生错误: {str(e)}") - + logger.info("RabbitMQ连接已关闭")