From 04ccfa06fe06d31ff40445b26372f698e15aed7f Mon Sep 17 00:00:00 2001 From: HuangHuiKang Date: Fri, 6 Feb 2026 16:55:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E6=94=B9=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E9=87=8D=E5=90=AF=E5=90=8E=E7=BB=A7=E7=BB=AD=E8=AE=A2=E9=98=85?= =?UTF-8?q?mqtt=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/service/device/DeviceServiceImpl.java | 91 ++++++++++++++++--- 1 file changed, 79 insertions(+), 12 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index 3e6111e71..ec1243a14 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -19,6 +19,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceMod import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelrules.DeviceModelRulesDO; import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO; import cn.iocoder.yudao.module.iot.dal.dataobject.mqttdatarecord.MqttDataRecordDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; @@ -29,13 +30,16 @@ import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelattribute.DeviceModelAtt import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelrules.DeviceModelRulesMapper; import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper; import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.gateway.GatewayMapper; import cn.iocoder.yudao.module.iot.dal.mysql.mqttdatarecord.MqttDataRecordMapper; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceAttributeMapper; import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; +import cn.iocoder.yudao.module.iot.service.gateway.GatewayService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -114,6 +118,13 @@ public class DeviceServiceImpl implements DeviceService { @Resource private IMqttservice mqttService; +// @Resource +// private GatewayService gatewayService; + + @Resource + private GatewayMapper gatewayMapper; + + @Override @Transactional(rollbackFor = Exception.class) @@ -1052,7 +1063,7 @@ public class DeviceServiceImpl implements DeviceService { } //TODO 待优化 - if (("MQTT".equals(deviceDO.getProtocol()))) { + if (!"MQTT".equals(deviceDO.getProtocol())) { throw exception(DEVICE_MQTT_TOPIC_EXIST); } @@ -1073,31 +1084,87 @@ public class DeviceServiceImpl implements DeviceService { return deviceMapper.selectOne(Wrappers.lambdaQuery().eq(DeviceDO::getTopic,topic)); } - private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) throws MqttException { + private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) { + try { - // 更新数据库状态 + // 1. 更新设备启用状态 deviceDO.setIsEnable(enabled); deviceMapper.updateById(deviceDO); - // MQTT 操作 + String topic = deviceDO.getTopic(); + + // 2. 启用设备 if (enabled) { + + // 2.1 MQTT订阅 try { - mqttService.subscribeTopic(deviceDO.getTopic()); + mqttService.subscribeTopic(topic); + log.info("MQTT订阅成功: {}", topic); } catch (MqttException e) { - log.error("订阅主题失败: {}", deviceDO.getTopic(), e); + log.error("MQTT订阅失败: {}", topic, e); } - } else { + + // 2.2 保存到iot_gateway表-用于重启后自动订阅 + GatewayDO gateway = gatewayMapper.selectOne( + new LambdaQueryWrapper() + .eq(GatewayDO::getTopic, topic) + .last("LIMIT 1") + ); + + if (gateway == null) { + + gateway = new GatewayDO(); + gateway.setGatewayName(deviceDO.getDeviceName()); + gateway.setGatewayCode(deviceDO.getDeviceCode()); + gateway.setTopic(topic); + gateway.setIsEnable(true); + gateway.setDeleted(false); + + gatewayMapper.insert(gateway); + + log.info("新增gateway订阅记录成功 topic={}", topic); + + } else { + + gateway.setIsEnable(true); + gateway.setUpdateTime(LocalDateTime.now()); + + gatewayMapper.updateById(gateway); + + log.info("更新gateway启用状态 topic={}", topic); + } + + } + + // 3. 禁用设备 + else { + + // 3.1 MQTT取消订阅 try { - mqttService.unsubscribeTopic(deviceDO.getTopic()); + mqttService.unsubscribeTopic(topic); + log.info("MQTT取消订阅成功: {}", topic); } catch (MqttException e) { - log.error("取消订阅主题失败: {}", deviceDO.getTopic(), e); + log.error("MQTT取消订阅失败: {}", topic, e); } + + // 3.2 更新gateway状态为禁用 + gatewayMapper.update( + null, + new LambdaUpdateWrapper() + .eq(GatewayDO::getTopic, topic) + .set(GatewayDO::getIsEnable, false) + .set(GatewayDO::getUpdateTime, LocalDateTime.now()) + ); + + log.info("gateway订阅记录已禁用 topic={}", topic); } + } catch (Exception e) { - // 捕获数据库更新等其他异常 - log.error("更新设备状态失败: {}", deviceDO.getDeviceCode(), e); + + log.error("更新设备状态失败 deviceCode={}", deviceDO.getDeviceCode(), e); + + } } - } } \ No newline at end of file