fix:修改项目重启后继续订阅mqtt消息

plp
HuangHuiKang 1 month ago
parent 1a33190acb
commit 04ccfa06fe

@ -19,6 +19,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceMod
import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelrules.DeviceModelRulesDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.mqttdatarecord.MqttDataRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper;
@ -29,13 +30,16 @@ import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelattribute.DeviceModelAtt
import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelrules.DeviceModelRulesMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.gateway.GatewayMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.mqttdatarecord.MqttDataRecordMapper;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceAttributeMapper;
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice;
import cn.iocoder.yudao.module.iot.service.gateway.GatewayService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -114,6 +118,13 @@ public class DeviceServiceImpl implements DeviceService {
@Resource
private IMqttservice mqttService;
// @Resource
// private GatewayService gatewayService;
@Resource
private GatewayMapper gatewayMapper;
@Override
@Transactional(rollbackFor = Exception.class)
@ -1052,7 +1063,7 @@ public class DeviceServiceImpl implements DeviceService {
}
//TODO 待优化
if (("MQTT".equals(deviceDO.getProtocol()))) {
if (!"MQTT".equals(deviceDO.getProtocol())) {
throw exception(DEVICE_MQTT_TOPIC_EXIST);
}
@ -1073,29 +1084,85 @@ public class DeviceServiceImpl implements DeviceService {
return deviceMapper.selectOne(Wrappers.<DeviceDO>lambdaQuery().eq(DeviceDO::getTopic,topic));
}
private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) throws MqttException {
private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) {
try {
// 更新数据库状态
// 1. 更新设备启用状态
deviceDO.setIsEnable(enabled);
deviceMapper.updateById(deviceDO);
// MQTT 操作
String topic = deviceDO.getTopic();
// 2. 启用设备
if (enabled) {
// 2.1 MQTT订阅
try {
mqttService.subscribeTopic(deviceDO.getTopic());
mqttService.subscribeTopic(topic);
log.info("MQTT订阅成功: {}", topic);
} catch (MqttException e) {
log.error("订阅主题失败: {}", deviceDO.getTopic(), e);
log.error("MQTT订阅失败: {}", topic, e);
}
// 2.2 保存到iot_gateway表-用于重启后自动订阅
GatewayDO gateway = gatewayMapper.selectOne(
new LambdaQueryWrapper<GatewayDO>()
.eq(GatewayDO::getTopic, topic)
.last("LIMIT 1")
);
if (gateway == null) {
gateway = new GatewayDO();
gateway.setGatewayName(deviceDO.getDeviceName());
gateway.setGatewayCode(deviceDO.getDeviceCode());
gateway.setTopic(topic);
gateway.setIsEnable(true);
gateway.setDeleted(false);
gatewayMapper.insert(gateway);
log.info("新增gateway订阅记录成功 topic={}", topic);
} else {
gateway.setIsEnable(true);
gateway.setUpdateTime(LocalDateTime.now());
gatewayMapper.updateById(gateway);
log.info("更新gateway启用状态 topic={}", topic);
}
}
// 3. 禁用设备
else {
// 3.1 MQTT取消订阅
try {
mqttService.unsubscribeTopic(deviceDO.getTopic());
mqttService.unsubscribeTopic(topic);
log.info("MQTT取消订阅成功: {}", topic);
} catch (MqttException e) {
log.error("取消订阅主题失败: {}", deviceDO.getTopic(), e);
log.error("MQTT取消订阅失败: {}", topic, e);
}
// 3.2 更新gateway状态为禁用
gatewayMapper.update(
null,
new LambdaUpdateWrapper<GatewayDO>()
.eq(GatewayDO::getTopic, topic)
.set(GatewayDO::getIsEnable, false)
.set(GatewayDO::getUpdateTime, LocalDateTime.now())
);
log.info("gateway订阅记录已禁用 topic={}", topic);
}
} catch (Exception e) {
// 捕获数据库更新等其他异常
log.error("更新设备状态失败: {}", deviceDO.getDeviceCode(), e);
log.error("更新设备状态失败 deviceCode={}", deviceDO.getDeviceCode(), e);
}
}

Loading…
Cancel
Save