From 57d7761e72e3b905f9e16976fea99aa0616c79c3 Mon Sep 17 00:00:00 2001 From: chenshuichuan <1154693969@qq.com> Date: Mon, 3 Jun 2024 14:31:49 +0800 Subject: [PATCH] add mqtt record --- yudao-module-iot/yudao-module-iot-biz/pom.xml | 19 +- .../admin/gateway/GatewayController.java | 43 ++- .../mqttrecord/vo/MqttRecordPageReqVO.java | 2 + .../admin/mqttrecord/vo/MqttRecordRespVO.java | 3 + .../mqttrecord/vo/MqttRecordSaveReqVO.java | 2 + .../dataobject/mqttrecord/MqttRecordDO.java | 4 + .../iot/dal/mysql/gateway/GatewayMapper.java | 4 + .../mysql/mqttrecord/MqttRecordMapper.java | 1 + .../iot/framework/constant/Constants.java | 6 + .../framework/constant/IsEnableConstant.java | 12 + .../iot/framework/mqtt/annotation/Topic.java | 41 +++ .../mqtt/common/MqttCallbackImpl.java | 103 ++++++ .../iot/framework/mqtt/common/MsgDecoder.java | 16 + .../iot/framework/mqtt/common/MsgEncoder.java | 13 + .../framework/mqtt/common/SuperConsumer.java | 37 +++ .../mqtt/config/DefaultBizTopicSet.java | 17 + .../mqtt/config/DefaultEmqConfig.java | 96 ++++++ .../mqtt/config/DefaultEmqProperties.java | 47 +++ .../mqtt/config/DefaultMqttStarter.java | 35 ++ .../framework/mqtt/consumer/IMqttservice.java | 21 ++ .../mqtt/consumer/MqttDataHandler.java | 97 ++++++ .../mqtt/consumer/impl/AsyncService.java | 150 +++++++++ .../mqtt/consumer/impl/MqttserviceImpl.java | 111 +++++++ .../iot/framework/mqtt/entity/Connect.java | 55 ++++ .../iot/framework/mqtt/entity/Disconnect.java | 33 ++ .../iot/framework/mqtt/entity/IoData.java | 13 + .../iot/framework/mqtt/entity/MqttData.java | 19 ++ .../iot/framework/mqtt/entity/Pattern.java | 21 ++ .../framework/mqtt/entity/SubscriptTopic.java | 36 +++ .../mqtt/utils/ApplicationContextUtil.java | 48 +++ .../iot/framework/mqtt/utils/DateUtils.java | 304 ++++++++++++++++++ .../framework/mqtt/utils/MqttDataUtils.java | 21 ++ .../framework/mqtt/utils/PubMessageUtils.java | 44 +++ .../iot/framework/mqtt/utils/ThreadUtils.java | 14 + .../framework/redis/RedisKeyConstants.java | 18 ++ .../iot/service/gateway/GatewayService.java | 4 + .../service/gateway/GatewayServiceImpl.java | 13 + .../src/main/resources/application.yaml | 15 +- 38 files changed, 1528 insertions(+), 10 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/Constants.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/IsEnableConstant.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/annotation/Topic.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MqttCallbackImpl.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgDecoder.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgEncoder.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/SuperConsumer.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultBizTopicSet.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqProperties.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultMqttStarter.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/IMqttservice.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/MqttserviceImpl.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Connect.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Disconnect.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/IoData.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/MqttData.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Pattern.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/SubscriptTopic.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ApplicationContextUtil.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/DateUtils.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/MqttDataUtils.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/PubMessageUtils.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ThreadUtils.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/redis/RedisKeyConstants.java diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index fc4c3c8373..95d41bef17 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -14,8 +14,7 @@ ${project.artifactId} - report 模块,主要实现数据可视化报表等功能: - 1. 基于「积木报表」实现,打印设计、报表设计、图形设计、大屏设计等。 + iot 模块,主要实现数据 @@ -58,12 +57,6 @@ cn.iocoder.boot yudao-spring-boot-starter-test - - - - org.jeecgframework.jimureport - jimureport-spring-boot-starter - xerces @@ -74,6 +67,16 @@ cn.iocoder.boot yudao-spring-boot-starter-excel + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + + + + cn.iocoder.boot + yudao-spring-boot-starter-redis + diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/gateway/GatewayController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/gateway/GatewayController.java index 1a4310e163..d8760bad4c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/gateway/GatewayController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/gateway/GatewayController.java @@ -1,6 +1,10 @@ package cn.iocoder.yudao.module.iot.controller.admin.gateway; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; +import cn.iocoder.yudao.module.iot.framework.constant.IsEnableConstant; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.web.bind.annotation.*; import org.springframework.validation.annotation.Validated; import org.springframework.security.access.prepost.PreAuthorize; @@ -37,19 +41,56 @@ public class GatewayController { @Resource private GatewayService gatewayService; + @Resource + private IMqttservice mqttservice; @PostMapping("/create") @Operation(summary = "创建网关") @PreAuthorize("@ss.hasPermission('iot:gateway:create')") public CommonResult createGateway(@Valid @RequestBody GatewaySaveReqVO createReqVO) { - return success(gatewayService.createGateway(createReqVO)); + Long id = gatewayService.createGateway(createReqVO); + + if (id != null && IsEnableConstant.IsEnableTrue == createReqVO.getIsEnable() + && StringUtils.isNotBlank(createReqVO.getTopic())) { + try { + mqttservice.subscribeTopic(createReqVO.getTopic()); + } catch (MqttException e) { + e.printStackTrace(); + } + } + return success(id); } @PutMapping("/update") @Operation(summary = "更新网关") @PreAuthorize("@ss.hasPermission('iot:gateway:update')") public CommonResult updateGateway(@Valid @RequestBody GatewaySaveReqVO updateReqVO) { + GatewayDO old = gatewayService.getGateway(updateReqVO.getId()); gatewayService.updateGateway(updateReqVO); + /**todo 订阅*/ + try { + if (old != null && StringUtils.isNotBlank(old.getTopic()) && StringUtils.isNotBlank(updateReqVO.getTopic())) { + boolean sameTopic = false; + if (old.getTopic().equals(updateReqVO.getTopic())) sameTopic = true; + if (sameTopic) { + //启用订阅 + if (updateReqVO.getIsEnable().equals(IsEnableConstant.IsEnableTrue)) + mqttservice.subscribeTopic(updateReqVO.getTopic()); + //禁用订阅 + else mqttservice.unsubscribeTopic(updateReqVO.getTopic()); + } else { + //禁用旧订阅 + if (old.getIsEnable().equals(IsEnableConstant.IsEnableTrue)) + mqttservice.unsubscribeTopic(old.getTopic()); + //启用新订阅 + if (updateReqVO.getIsEnable().equals(IsEnableConstant.IsEnableTrue)) + mqttservice.subscribeTopic(updateReqVO.getTopic()); + } + } + } catch (MqttException e) { + e.printStackTrace(); + } + return success(true); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordPageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordPageReqVO.java index 82f0780c6f..d9f9ca65c8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordPageReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordPageReqVO.java @@ -21,6 +21,8 @@ public class MqttRecordPageReqVO extends PageParam { @Schema(description = "采集时间") @DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND) private LocalDateTime[] deviceDataTime; + @Schema(description = "采集时间sss") + private Long[] deviceDataTimeLong; @Schema(description = "数据") private String deviceData; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordRespVO.java index 3993f88d4b..af737dc19f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordRespVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordRespVO.java @@ -24,6 +24,9 @@ public class MqttRecordRespVO { @Schema(description = "采集时间", requiredMode = Schema.RequiredMode.REQUIRED) @ExcelProperty("采集时间") private LocalDateTime deviceDataTime; + @Schema(description = "采集时间sss") + @ExcelProperty("采集时间sss") + private Long deviceDataTimeLong; @Schema(description = "数据") @ExcelProperty("数据") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordSaveReqVO.java index a275a2a640..a90a314aab 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/mqttrecord/vo/MqttRecordSaveReqVO.java @@ -23,6 +23,8 @@ public class MqttRecordSaveReqVO { @Schema(description = "采集时间", requiredMode = Schema.RequiredMode.REQUIRED) @NotNull(message = "采集时间不能为空") private LocalDateTime deviceDataTime; + @Schema(description = "采集时间sss") + private Long deviceDataTimeLong; @Schema(description = "数据") private String deviceData; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/mqttrecord/MqttRecordDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/mqttrecord/MqttRecordDO.java index ccefccdbbc..9b23d1932d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/mqttrecord/MqttRecordDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/mqttrecord/MqttRecordDO.java @@ -40,6 +40,10 @@ public class MqttRecordDO extends BaseDO { * 采集时间 */ private LocalDateTime deviceDataTime; + /** + * 采集时间 + */ + private Long deviceDataTimeLong; /** * 数据 */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/gateway/GatewayMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/gateway/GatewayMapper.java index 28a8a16bd0..083e759765 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/gateway/GatewayMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/gateway/GatewayMapper.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.dal.mysql.gateway; import java.util.*; +import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; @@ -31,5 +32,8 @@ public interface GatewayMapper extends BaseMapperX { .eqIfPresent(GatewayDO::getTopic, reqVO.getTopic()) .orderByDesc(GatewayDO::getId)); } + default List selectListByIsEnable(boolean isEnable) { + return selectList(GatewayDO::getIsEnable, isEnable); + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/mqttrecord/MqttRecordMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/mqttrecord/MqttRecordMapper.java index 1a04cca443..4688cbb66c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/mqttrecord/MqttRecordMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/mqttrecord/MqttRecordMapper.java @@ -21,6 +21,7 @@ public interface MqttRecordMapper extends BaseMapperX { return selectPage(reqVO, new LambdaQueryWrapperX() .likeIfPresent(MqttRecordDO::getMachineName, reqVO.getMachineName()) .betweenIfPresent(MqttRecordDO::getDeviceDataTime, reqVO.getDeviceDataTime()) + .betweenIfPresent(MqttRecordDO::getDeviceDataTimeLong, reqVO.getDeviceDataTimeLong()) .eqIfPresent(MqttRecordDO::getDeviceData, reqVO.getDeviceData()) .eqIfPresent(MqttRecordDO::getDeviceCode, reqVO.getDeviceCode()) .likeIfPresent(MqttRecordDO::getDeviceName, reqVO.getDeviceName()) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/Constants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/Constants.java new file mode 100644 index 0000000000..fde11730c6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/Constants.java @@ -0,0 +1,6 @@ +package cn.iocoder.yudao.module.iot.framework.constant; + +public class Constants { + public final static String MQTT_timestamp_format = "yyyy-MM-dd HH:mm:ss.SSS"; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/IsEnableConstant.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/IsEnableConstant.java new file mode 100644 index 0000000000..a656bfc206 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/constant/IsEnableConstant.java @@ -0,0 +1,12 @@ +package cn.iocoder.yudao.module.iot.framework.constant; + +public class IsEnableConstant { + /** + * 停用 + */ + public final static boolean IsEnableFalse = false; + /** + * 启用 + */ + public final static boolean IsEnableTrue = true; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/annotation/Topic.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/annotation/Topic.java new file mode 100644 index 0000000000..17cb970b85 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/annotation/Topic.java @@ -0,0 +1,41 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.annotation; + +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern; +import org.springframework.stereotype.Component; + +import java.lang.annotation.*; + +/** + * 自定义标记注解 + * @author jie + */ +@Component +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Topic { + + /** + * topic + * @return + */ + String topic() default ""; + + /** + * qos + * @return + */ + int qos() default 0; + + /** + * 订阅模式 + * @return + */ + Pattern patten() default Pattern.NONE; + + /** + * 共享订阅组 + * @return + */ + String group() default "group1"; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MqttCallbackImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MqttCallbackImpl.java new file mode 100644 index 0000000000..921345b9f4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MqttCallbackImpl.java @@ -0,0 +1,103 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.common; + +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; + +import java.util.List; + +/** + * mqtt回调类 + * + * @author jie + */ +@Data +@Slf4j +public class MqttCallbackImpl implements MqttCallbackExtended { + private final List topicMap; + private final MqttClient client; + private final MqttConnectOptions option; + + /** + * 客户端断开后触发 + * + * @param throwable 异常 + */ + @SneakyThrows + @Override + public void connectionLost(Throwable throwable) { + while (!client.isConnected()) { + log.info("emqx 重新连接"); + client.connect(option); + Thread.sleep(6000); + } + log.info("emqx 重接成功"); + } + + /** + * 客户端收到消息触发 + * + * @param topic 主题 + * @param message 消息 + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + for (SubscriptTopic subscriptTopic : topicMap) { + if (subscriptTopic.getPattern() != Pattern.NONE && isMatched(subscriptTopic.getTopic(), topic)) { + log.info("MqttCallbackImpl,messageArrived:topic="+topic+",message="+message); + subscriptTopic.getMessageListener().messageArrived(topic, message); + break; + } + } + } + + /** + * 检测一个主题是否为一个通配符表示的子主题 + * + * @param topicFilter 通配符主题 + * @param topic 子主题 + * @return 是否为通配符主题的子主题 + */ + private boolean isMatched(String topicFilter, String topic) { + return MqttTopic.isMatched(topicFilter, topic); + } + + /** + * 发布消息成功 + * + * @param token token + */ + @SneakyThrows + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + String[] topics = token.getTopics(); + for (String topic : topics) { + log.info("向主题:" + topic + "发送数据:" + new String(token.getMessage().getPayload())); + } + } + + /** + * 连接emq服务器后触发 + * + * @param b + * @param s + */ + @Override + public void connectComplete(boolean b, String s) { + if (client.isConnected()) { + for (SubscriptTopic sub : topicMap) { + try { + client.subscribe(sub.getSubTopic(), sub.getQos(), sub.getMessageListener()); + log.info("订阅主题:" + sub.getSubTopic() + " Listener:" + sub.getMessageListener()); + } catch (MqttException e) { + //log.info("订阅主题失败:" + sub.getSubTopic()); + e.printStackTrace(); + } + } + log.info("共订阅: " + topicMap.size() + " 个主题!"); + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgDecoder.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgDecoder.java new file mode 100644 index 0000000000..8b4b032cfe --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgDecoder.java @@ -0,0 +1,16 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.common; + + +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * @author jie + */ +public interface MsgDecoder { + /** + * 下位机消息解码器 + * @param msg + * @return + */ + T decoder(MqttMessage msg); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgEncoder.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgEncoder.java new file mode 100644 index 0000000000..9f7bd2ac45 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/MsgEncoder.java @@ -0,0 +1,13 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.common; + +/** + * @author jie + */ +public interface MsgEncoder { + /** + * 数据库消息编码为string + * @param t + * @return + */ + String encoder(T t); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/SuperConsumer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/SuperConsumer.java new file mode 100644 index 0000000000..ddff0d3f3a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/common/SuperConsumer.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.common; + +import cn.iocoder.yudao.module.iot.framework.mqtt.utils.ThreadUtils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * 封装的主题消费父类 + * + * @author jie + */ +@Slf4j +public abstract class SuperConsumer implements IMqttMessageListener, MsgDecoder { + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { +// log.info("收到主题 : " + topic + " 的消息: " + new String(mqttMessage.getPayload())); + ThreadUtils.executorService.submit(() -> { + try { + T decoder = decoder(mqttMessage); + msgHandler(topic, decoder); + } catch (Exception ex) { + //解决业务处理错误导致断线问题 + log.error(topic+":解决业务处理错误导致断线问题"); + log.error(ex.toString()); + } + }); + } + + /** + * 业务操作 + * + * @param topic + * @param entity + */ + protected abstract void msgHandler(String topic, T entity); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultBizTopicSet.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultBizTopicSet.java new file mode 100644 index 0000000000..c7cb7acd5f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultBizTopicSet.java @@ -0,0 +1,17 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.config; + +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +// 默认的业务主题集合,使用注解,收集所有的业务处理相关类实例 +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DefaultBizTopicSet { + List topicMap = new ArrayList<>(); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqConfig.java new file mode 100644 index 0000000000..015831dca2 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqConfig.java @@ -0,0 +1,96 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.config; + +import cn.iocoder.yudao.module.iot.framework.mqtt.annotation.Topic; +import cn.iocoder.yudao.module.iot.framework.mqtt.common.MqttCallbackImpl; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic; +import cn.iocoder.yudao.module.iot.framework.mqtt.utils.DateUtils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.AnnotationUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * mqtt配置类 + * + * @author jie + */ +@Slf4j +@Configuration +@EnableConfigurationProperties(DefaultEmqProperties.class) +public class DefaultEmqConfig { + /** + * MQTT的连接设置 + * + * @param emqProperties + * @return + */ + @Bean + public MqttConnectOptions getOption(DefaultEmqProperties emqProperties) { + MqttConnectOptions options = new MqttConnectOptions(); + emqProperties.setClientId("clientId"+String.valueOf(DateUtils.getMillsLong())); + log.info("------mqtt clientid ="+ emqProperties.getClientId()); + options.setUserName(emqProperties.getUserName()); + options.setPassword(emqProperties.getPassword().toCharArray()); + // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 + options.setCleanSession(emqProperties.getCleanSession()); + //断线重连 + options.setAutomaticReconnect(emqProperties.getReconnect()); + // 设置超时时间 单位为秒 + options.setConnectionTimeout(emqProperties.getTimeout()); + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*10秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 + options.setKeepAliveInterval(emqProperties.getKeepAlive()); + return options; + } + + @Bean + public DefaultBizTopicSet defaultBizTopicSet(ApplicationContext applicationContext){ + List topicMap = new ArrayList<>(); + //得到所有使用@Topic注解的类 + Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Topic.class); + for (String className : beansWithAnnotation.keySet()) { + Class classByteCode = beansWithAnnotation.get(className).getClass(); + //获取类的注解属性 + Topic annotation = AnnotationUtils.findAnnotation(classByteCode, Topic.class); + String topic = annotation.topic(); + int qos = annotation.qos(); + Pattern patten = annotation.patten(); + String group = annotation.group(); + String subTopic = topic; + if (patten == Pattern.SHARE) { + subTopic = group + "/" + topic; + } else if (patten == Pattern.QUEUE) { + subTopic = topic; + } + topicMap.add(new SubscriptTopic(topic, subTopic, patten, qos, (IMqttMessageListener) applicationContext.getBean(classByteCode))); + } + + return new DefaultBizTopicSet(topicMap); + } + + /** + * 系统默认的MQTT连接 + * + * @param options MQTT连接参数选项 + * @param emqProperties MQTT连接配置 + * @param applicationContext 系统上下文 + * @return + * @throws Exception 异常 + */ + @Bean + public MqttClient mqttClient(MqttConnectOptions options, DefaultEmqProperties emqProperties, DefaultBizTopicSet defaultBizTopicSet, ApplicationContext applicationContext) throws Exception { + MqttClient mqttClient = new MqttClient(emqProperties.getBroker(), emqProperties.getClientId(), new MemoryPersistence()); + mqttClient.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(), mqttClient, options)); + return mqttClient; + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqProperties.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqProperties.java new file mode 100644 index 0000000000..3b281966c4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultEmqProperties.java @@ -0,0 +1,47 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 配置类 + * @author jie + */ +@Data +@ConfigurationProperties(prefix = "emqx") +public class DefaultEmqProperties { + //public static final String PREFIX = "emqx"; + /** + * emq服务器地址 + */ + private String broker; + /** + * emq client id + */ + private String clientId; + /** + * 用户名 + */ + private String userName; + /** + * 密码 + */ + private String password; + /** + * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 + */ + private Boolean cleanSession; + /** + * 是否断线重连 + */ + private Boolean reconnect; + /** + * 连接超时时间 + */ + private Integer timeout; + /** + * 心跳间隔 + */ + private Integer keepAlive; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultMqttStarter.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultMqttStarter.java new file mode 100644 index 0000000000..b44c97236f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/DefaultMqttStarter.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.config; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +@Slf4j +public class DefaultMqttStarter implements ApplicationListener { + @Resource + private MqttConnectOptions options; + + @Resource + private MqttClient mqttClient; + + @SneakyThrows + @Override + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + // catch异常,不然连不上emqx项目就跑不起来 + try { + mqttClient.connect(options); + } catch (MqttException e) { + e.getMessage(); + e.printStackTrace(); + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/IMqttservice.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/IMqttservice.java new file mode 100644 index 0000000000..9d25b9fc83 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/IMqttservice.java @@ -0,0 +1,21 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.consumer; + +import org.eclipse.paho.client.mqttv3.MqttException; + +public interface IMqttservice { + + /** + *一次性定时器 + * 程序启动后去数据库找到所有需要订阅的topic 添加进client订阅 + * */ + public void subscribeMysqlTopic() throws Exception; + /** + * 需要订阅的topic 添加进client订阅 + * */ + public int subscribeTopic(String topic) throws MqttException; + /** + * 关闭订阅topic 从client中关闭topic + * */ + public int unsubscribeTopic(String topic) throws MqttException; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java new file mode 100644 index 0000000000..c845ecf30d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java @@ -0,0 +1,97 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.consumer; + + +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.iotorganization.IotOrganizationDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.mqttrecord.MqttRecordDO; +import cn.iocoder.yudao.module.iot.dal.mysql.mqttrecord.MqttRecordMapper; +import cn.iocoder.yudao.module.iot.framework.constant.Constants; +import cn.iocoder.yudao.module.iot.framework.mqtt.common.SuperConsumer; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.impl.AsyncService; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData; +import cn.iocoder.yudao.module.iot.framework.mqtt.utils.DateUtils; +import cn.iocoder.yudao.module.iot.framework.mqtt.utils.MqttDataUtils; +import cn.iocoder.yudao.module.iot.service.device.DeviceService; +import cn.iocoder.yudao.module.iot.service.iotorganization.IotOrganizationService; +import cn.iocoder.yudao.module.iot.service.mqttrecord.MqttRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.cache.RedisCache; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.text.ParseException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class MqttDataHandler extends SuperConsumer { + + @Resource + private IotOrganizationService organizationService; + @Resource + private DeviceService deviceService; + @Resource + private AsyncService asyncService; + + @Resource + private MqttRecordService mqttRecordService; + @Resource + private MqttRecordMapper mqttRecordMapper; + @Override + public String decoder(MqttMessage msg) { + return new String(msg.getPayload()); + } + + @Override + protected void msgHandler(String topic, String entity) { + log.debug("msgHandler"+":topic="+topic); + log.debug("entity:"+entity); + + MqttData data = MqttDataUtils.parse(entity); + IotOrganizationDO machine = new IotOrganizationDO(); + machine.setId(1L); + if (data!=null) { + //根据设定转化 +// try { +// String transfer = asyncService.transferBase(data,equipments.getEquipmentsId()); +// if(StringUtils.isNotBlank(transfer)) +// entity = transfer; +// }catch (Exception e){ +// log.error("asyncService.transferBase error:"+entity); +// } + + save(machine, entity, data); + } + } + + public void save(IotOrganizationDO machine, String entity, MqttData data) { + try { + long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format); + //timestamp = DateUtils.getMillsLong(); + LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime()); + + MqttRecordDO recordDO = new MqttRecordDO(); + recordDO.setDeviceCode(data.getDeviceID()); + recordDO.setGatewayCode(data.getGatewayID()); + recordDO.setDeviceData(entity); + recordDO.setDeviceDataTime(date); + recordDO.setDeviceDataTimeLong(timestamp); + /**直接保存原始mqtt*/ + mqttRecordMapper.insert(recordDO); + + /** + * 保存解析后的mqtt数据 + * */ + //tsMqttService.insertDataAddress(data, taskId, timestamp, equipment); + } catch (Exception e) { + log.error("-----mqttTableName:"); + e.printStackTrace(); + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java new file mode 100644 index 0000000000..374b31e6d5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java @@ -0,0 +1,150 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.consumer.impl; + +import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData; +import cn.iocoder.yudao.module.iot.service.device.DeviceService; +import com.alibaba.fastjson.JSON; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; + +import javax.annotation.Resource; +import javax.script.ScriptException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +@Configuration +@EnableAsync +@Slf4j +@EnableScheduling +public class AsyncService { + @Resource + private DeviceService deviceService; + + + /**地址转换进制*/ + //@Async +// public String transferBase(MqttData data, String equipmentId) { +// //data数据改变标记 +// boolean isTransfer = false; +// IoPoint vo = new IoPoint(); +// vo.setEquipmentsId(equipmentId); +// vo.setDeviceId(data.getDeviceID()); +// vo.setTransferDataType("---"); +// List pointList = ioPointService.list(ioPointService.buildQueryWrapper(vo)); +// if(pointList!=null && pointList.size()>0){ +// //先转进制 +// for (IoPoint point: pointList) { +// if(data.getDeviceData().containsKey(point.getPointCode()) && StringUtils.isNotBlank(point.getSourceDataType())){ +// String value = data.getDeviceData().get(point.getPointCode()).getV(); +// //防止有些数乱填 +// if(value.contains("."))continue; +// if(StringUtils.isNotBlank(value) && StringUtils.isNotBlank(point.getSourceDataType())){ +// isTransfer = true; +// value = transferBase(value,point.getSourceDataType(),point.getTransferDataType()); +// value = factor(value,point.getFactor()); +// data.getDeviceData().get(point.getPointCode()).setV(value); +// } +// } +// } +// } +// //再进行公式计算 +// Map valueMap = getValueMap(data); +// vo.setTransferDataType(""); +// vo.setCalAddresses("---"); +// pointList = ioPointService.list(ioPointService.buildQueryWrapper(vo)); +// for (IoPoint point: pointList) { +// if(data.getDeviceData().containsKey(point.getPointCode()) && StringUtils.isNotBlank(point.getCalAddresses())){ +// try { +// isTransfer = true; +// String result = FormulaCalculator.calFormula(point.getCalAddresses(),valueMap); +// data.getDeviceData().get(point.getPointCode()).setV(result); +// } catch (ScriptException e) { +// log.error("calFormula point:"+point.toString()); +// } +// } +// } +// if(isTransfer) +// return JSON.toJSONString(data); +// else +// return null; +// } + /**系数**/ + private String factor(String value, String factor){ + if(StringUtils.isNotBlank(factor)){ + return String.format("%.2f",Double.parseDouble(value)*Double.parseDouble(factor)); + } + return value; + } + private Map getValueMap(MqttData data){ + Map valueMap = new HashMap<>(); + for (String key: data.getDeviceData().keySet()){ + valueMap.put(key,data.getDeviceData().get(key).getV()); + } + return valueMap; + } + /** + * java 代码将给定的值value,从一个进制转换到另一个进制,进制取值可能为binary,octal,hexadecimal,decimal等 + * */ + private String transferBase(String value,String originBase, String transferBase){ + switch (transferBase){ + case "decimal":{ + switch (originBase){ + case "binary": + value = convertBase(value,2,10); + break; + case "octal": + value = convertBase(value,8,10); + break; + case "hexadecimal": + value = convertBase(value,16,10); + case "hexfloat": + value = hexToDecimal(value); + } + } + break; + default:log.debug("数据未进行转换!:(value,originBase,transferBase)("+value+","+originBase+","+transferBase+")"); + } + return value; + } + private static String hexToDecimal(String value){ + return String.valueOf(Integer.parseInt(value, 16)); + } + + + public static String convertBase(String value, int fromBase, int toBase) { + return String.valueOf(Integer.parseInt(value, fromBase)); + } + + public static void main(String[] args) { + String value = "479624"; // 要转换的值 + int fromBase = 16; // 起始进制 + int toBase = 10; // 目标进制 + + String convertedValue = convertBase(value, fromBase, toBase); + System.out.println("Converted value: " + convertedValue); + + + int intValue = Integer.parseInt(value, 16); // 将十六进制字符串解析为十进制整数 + System.out.println(intValue); + + intValue = Integer.parseInt("476c", 16); // 将十六进制字符串解析为十进制整数 + System.out.println(intValue); + + // 获取当前日期 + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + Date date = calendar.getTime(); + long timestamp = date.getTime() / 1000; // 将时间戳转换为秒 + + System.out.println("当天 00:00:00 的时间戳:" + timestamp); + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/MqttserviceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/MqttserviceImpl.java new file mode 100644 index 0000000000..01db78cfc4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/MqttserviceImpl.java @@ -0,0 +1,111 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.consumer.impl; + +import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO; +import cn.iocoder.yudao.module.iot.framework.constant.IsEnableConstant; +import cn.iocoder.yudao.module.iot.framework.mqtt.common.MqttCallbackImpl; +import cn.iocoder.yudao.module.iot.framework.mqtt.config.DefaultBizTopicSet; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.MqttDataHandler; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern; +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic; +import cn.iocoder.yudao.module.iot.service.gateway.GatewayService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.DependsOn; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; + +@Service +@EnableScheduling +@DependsOn("defaultMqttStarter") +@Slf4j +public class MqttserviceImpl implements IMqttservice, ApplicationListener { + + @Resource + private GatewayService gatewayService; + + @Resource + private MqttClient client; + @Resource + private DefaultBizTopicSet defaultBizTopicSet; + @Resource + private MqttConnectOptions options; + + @Resource + private MqttDataHandler dataHandler; + + + @Override + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + new Thread(() -> { + subscribeMysqlTopic(); + }).start(); + } + /** + *一次性定时器 + * 程序启动后去数据库找到所有需要订阅的topic 添加进client订阅 + * */ + @Override + public void subscribeMysqlTopic(){ + if(client.isConnected()){ + + List gatewayList = gatewayService.selectListByIsEnable(IsEnableConstant.IsEnableTrue); + for (GatewayDO gateway : gatewayList) { + if(StringUtils.isNotBlank(gateway.getTopic())){ + SubscriptTopic topic = new SubscriptTopic(gateway.getTopic(), + gateway.getTopic(), Pattern.QUEUE, 0, dataHandler ); + defaultBizTopicSet.getTopicMap().add(topic); + try { + client.subscribe(topic.getSubTopic(),0, dataHandler); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options)); + log.info("共订阅: " + defaultBizTopicSet.getTopicMap().size() + " 个主题!"); + } + else{ + log.error("Mqtt is not connected !"); + } + + } + + /** + * 需要订阅的topic 添加进client订阅 + * */ + @Override + public int subscribeTopic(String topic)throws MqttException{ + SubscriptTopic subscriptTopic = new SubscriptTopic(topic, topic,Pattern.QUEUE, 0, dataHandler ); + defaultBizTopicSet.getTopicMap().add(subscriptTopic); + client.subscribe(topic,0, dataHandler); + client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options)); + return 0; + } + /** + * 关闭订阅topic 从client中关闭topic + * */ + @Override + public int unsubscribeTopic(String topic)throws MqttException{ + for (int i = 0; i < defaultBizTopicSet.getTopicMap().size(); i++) { + if(topic.equals(defaultBizTopicSet.getTopicMap().get(i).getSubTopic())){ + defaultBizTopicSet.getTopicMap().remove(defaultBizTopicSet.getTopicMap().get(i)); + break; + } + } + client.unsubscribe(topic); + client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options)); + return 0; + } + + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Connect.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Connect.java new file mode 100644 index 0000000000..7eaa327d6e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Connect.java @@ -0,0 +1,55 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + *

设备上线 + * 邮箱:275236367@qq.com + * 创建时间: 2020/1/4 + * @author jie + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class Connect { + /** + * 客户端id + */ + private String clientid; + /** + * 用户名 + */ + private String username; + /** + * ip地址 + */ + private String ipaddress; + /** + *连接回执 + */ + private int connack; + /** + * 事件触发时间 (ms) + */ + private long ts; + /** + * 协议版本 + */ + private int proto_ver; + /** + * 协议名字 + */ + private String proto_name; + /** + * MQTT clean_start + */ + private boolean clean_start; + /** + *保持连接 + */ + private int keepalive; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Disconnect.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Disconnect.java new file mode 100644 index 0000000000..1165a2906f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Disconnect.java @@ -0,0 +1,33 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + *

设备下线 + * @author jie + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class Disconnect { + /** + * 客户端id + */ + private String clientid; + /** + * 用户名 + */ + private String username; + /** + * 终端连接断开原因 + */ + private String reason; + /** + * 事件触发时间 (ms) + */ + private long ts; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/IoData.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/IoData.java new file mode 100644 index 0000000000..9eeee356c9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/IoData.java @@ -0,0 +1,13 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class IoData { + String u; + String v; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/MqttData.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/MqttData.java new file mode 100644 index 0000000000..fd6ca73454 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/MqttData.java @@ -0,0 +1,19 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +// 同一的数据格式 +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MqttData { + protected String deviceID; + protected String gatewayID; + protected Map deviceData; + protected String deviceDataTime; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Pattern.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Pattern.java new file mode 100644 index 0000000000..51302e5df1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/Pattern.java @@ -0,0 +1,21 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +/** + * 订阅模式 + * @author jie + */ + +public enum Pattern { + /** + * 普通订阅 + */ + NONE, + /** + * 不带群组的共享订阅 + */ + QUEUE, + /** + * 带群组的共享订阅 + */ + SHARE; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/SubscriptTopic.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/SubscriptTopic.java new file mode 100644 index 0000000000..4a55c8bbd4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/entity/SubscriptTopic.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.entity; + +import lombok.*; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; + +/** + * @author jie + */ +@Data +@Builder +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class SubscriptTopic { + /** + * 原主题 + */ + private String topic; + /** + * 订阅主题 + */ + private String subTopic; + /** + * 订阅模式 + */ + private Pattern pattern; + /** + * 消息等级 + */ + private int qos; + /** + * 消费类 + */ + private IMqttMessageListener messageListener; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ApplicationContextUtil.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ApplicationContextUtil.java new file mode 100644 index 0000000000..ee245b02db --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ApplicationContextUtil.java @@ -0,0 +1,48 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.utils; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 类名称:ApplicationContextUtil + * 类描述: + *

+ *

从容器中获取bean + */ +@Component +public class ApplicationContextUtil { + + private static ApplicationContextUtil instance; + @Autowired + private ApplicationContext applicationContext; + + @PostConstruct + public void applicationContextUtil() { + instance = this; + } + + /** + * 根据字节码获取bean + * @param clazz + * @param + * @return + */ + public static T getBean(Class clazz) { + return instance.applicationContext.getBean(clazz); + } + + /** + * 根据名字获取bean + * @param name + * @param + * @return + * @throws BeansException + */ + public static T getBean(String name) throws BeansException { + return (T) instance.applicationContext.getBean(name); + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/DateUtils.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/DateUtils.java new file mode 100644 index 0000000000..1981d0fd5e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/DateUtils.java @@ -0,0 +1,304 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.utils; + +import org.apache.commons.lang3.time.DateFormatUtils; + +import java.lang.management.ManagementFactory; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +public class DateUtils extends org.apache.commons.lang3.time.DateUtils{ + /** + * 将本地时间, 转换成目标时区的时间 + * @param sourceDate + * @return + */ + public static Date convertTimezone(Date sourceDate, String targetZoneId){ + return convertTimezone(sourceDate, TimeZone.getTimeZone(targetZoneId)); + } + + public static Date convertTimezone(Date sourceDate, String sourceZoneId, String targetZoneId){ + TimeZone sourceTimeZone=TimeZone.getTimeZone(sourceZoneId); + TimeZone targetTimeZone=TimeZone.getTimeZone(targetZoneId); + + return convertTimezone(sourceDate, sourceTimeZone, targetTimeZone); + } + + /** + * 将本地时间,转换成对应时区的时间 + * @param localDate + * @param targetTimezone 转换成目标时区所在的时间 + * @return + */ + public static Date convertTimezone(Date localDate, TimeZone targetTimezone){ + return convertTimezone(localDate, TimeZone.getDefault(), targetTimezone); + } + + + /** + * 将sourceDate转换成指定时区的时间 + * @param sourceDate + * @param sourceTimezone sourceDate所在的时区 + * @param targetTimezone 转化成目标时间所在的时区 + * @return + */ + public static Date convertTimezone(Date sourceDate, TimeZone sourceTimezone, TimeZone targetTimezone){ + + + // targetDate - sourceDate=targetTimezone-sourceTimezone + // ---> + // targetDate=sourceDate + (targetTimezone-sourceTimezone) + + + Calendar calendar=Calendar.getInstance(); + // date.getTime() 为时间戳, 为格林尼治到系统现在的时间差,世界各个地方获取的时间戳是一样的, + // 格式化输出时,因为设置了不同的时区,所以输出不一样 + long sourceTime=sourceDate.getTime(); + + + calendar.setTimeZone(sourceTimezone); + calendar.setTimeInMillis(sourceTime);// 设置之后,calendar会计算各种filed对应的值,并保存 + + //获取源时区的到UTC的时区差 + int sourceZoneOffset=calendar.get(Calendar.ZONE_OFFSET); + + + calendar.setTimeZone(targetTimezone); + calendar.setTimeInMillis(sourceTime); + + int targetZoneOffset=calendar.get(Calendar.ZONE_OFFSET); + int targetDaylightOffset=calendar.get(Calendar.DST_OFFSET); // 夏令时 + + + long targetTime=sourceTime+ (targetZoneOffset+targetDaylightOffset) -sourceZoneOffset; + + return new Date(targetTime); + + } + + public static String addDateMinut(String day, int hour){ + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = null; + try { + date = format.parse(day); + } catch (Exception ex) { + ex.printStackTrace(); + } + if (date == null) + return ""; + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + cal.add(Calendar.HOUR, -hour);// 24小时制 + date = cal.getTime(); + cal = null; + return format.format(date); + } + + public static String addDateDay(String time, int day){ + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = null; + try { + date = format.parse(time); + } catch (Exception ex) { + ex.printStackTrace(); + } + if (date == null) + return ""; + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + cal.add(Calendar.DAY_OF_MONTH, -day);// 日 + date = cal.getTime(); + cal = null; + return format.format(date); + } + + + public static String addDateMonth(String day, int hour){ + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = null; + try { + date = format.parse(day); + } catch (Exception ex) { + ex.printStackTrace(); + } + if (date == null) + return ""; + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + cal.add(Calendar.MONTH, hour);// 月 + date = cal.getTime(); + cal = null; + return format.format(date); + } + public static String YYYY = "yyyy"; + + public static String YYYY_MM = "yyyy-MM"; + + public static String YYYY_MM_DD = "yyyy-MM-dd"; + + public static String YYYYMMDDHHMMSS = "yyyyMMddHHmmss"; + + public static String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; + public static String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd HH:mm:ss.SSS"; + private static String[] parsePatterns = { + "yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm", "yyyy-MM", + "yyyy/MM/dd", "yyyy/MM/dd HH:mm:ss", "yyyy/MM/dd HH:mm", "yyyy/MM", + "yyyy.MM.dd", "yyyy.MM.dd HH:mm:ss", "yyyy.MM.dd HH:mm", "yyyy.MM"}; + + /** + * 获取当前Date型日期 + * + * @return Date() 当前日期 + */ + public static Date getNowDate() + { + return new Date(); + } + + /** + * 获取当前日期, 默认格式为yyyy-MM-dd + * + * @return String + */ + public static String getDate() + { + return dateTimeNow(YYYY_MM_DD); + } + public static String getMills() + { + return dateTimeNow(YYYY_MM_DD_HH_MM_SS_SSS); + } + public static String getMills(Date date) + { + return parseDateToStr(YYYY_MM_DD_HH_MM_SS_SSS, date); + } + public static Long getMillsLong() + { + return new Date().getTime(); + } + public static Long getMillsLong(Date date) + { + return date.getTime(); + } + public static final String getTime() + { + return dateTimeNow(YYYY_MM_DD_HH_MM_SS); + } + + public static final String dateTimeNow() + { + return dateTimeNow(YYYYMMDDHHMMSS); + } + + public static final String dateTimeNow(final String format) + { + return parseDateToStr(format, new Date()); + } + + public static final String dateTime(final Date date) + { + return parseDateToStr(YYYY_MM_DD, date); + } + + public static final String parseDateToStr(final String format, final Date date) + { + return new SimpleDateFormat(format).format(date); + } + + public static final Date dateTime(final String format, final String ts) + { + try + { + return new SimpleDateFormat(format).parse(ts); + } + catch (ParseException e) + { + throw new RuntimeException(e); + } + } + + /** + * 日期路径 即年/月/日 如2018/08/08 + */ + public static final String datePath() + { + Date now = new Date(); + return DateFormatUtils.format(now, "yyyy/MM/dd"); + } + + /** + * 日期路径 即年/月/日 如20180808 + */ + public static final String dateTime() + { + Date now = new Date(); + return DateFormatUtils.format(now, "yyyyMMdd"); + } + + /** + * 日期型字符串转化为日期 格式 + */ + public static Date parseDate(Object str) + { + if (str == null) + { + return null; + } + try + { + return parseDate(str.toString(), parsePatterns); + } + catch (ParseException e) + { + return null; + } + } + + /** + * 获取服务器启动时间 + */ + public static Date getServerStartDate() + { + long time = ManagementFactory.getRuntimeMXBean().getStartTime(); + return new Date(time); + } + + /** + * 计算两个时间差 + */ + public static String getDatePoor(Date endDate, Date nowDate) + { + long nd = 1000 * 24 * 60 * 60; + long nh = 1000 * 60 * 60; + long nm = 1000 * 60; + // long ns = 1000; + // 获得两个时间的毫秒时间差异 + long diff = endDate.getTime() - nowDate.getTime(); + // 计算差多少天 + long day = diff / nd; + // 计算差多少小时 + long hour = diff % nd / nh; + // 计算差多少分钟 + long min = diff % nd % nh / nm; + // 计算差多少秒//输出结果 + // long sec = diff % nd % nh % nm / ns; + return day + "天" + hour + "小时" + min + "分钟"; + } + + public static Long strToTimeStamp(String dateTime,String format) throws ParseException { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format); + return simpleDateFormat.parse(dateTime).getTime(); + } + public static LocalDateTime strToLocalDateTime(String dateTimeString) throws ParseException { + // 定义日期时间格式 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + // 使用DateTimeFormatter解析字符串为LocalDateTime + LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter); + return dateTime; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/MqttDataUtils.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/MqttDataUtils.java new file mode 100644 index 0000000000..d17d20a365 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/MqttDataUtils.java @@ -0,0 +1,21 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.utils; + +import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; + +@Slf4j +public class MqttDataUtils { + + public static MqttData parse(String entity){ + try { + MqttData data = JSON.parseObject(entity, MqttData.class); + return data; + }catch (Exception e){ + e.printStackTrace(); + log.error("MqttDataHandler.JSON.parseObject error:"); + } + return null; + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/PubMessageUtils.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/PubMessageUtils.java new file mode 100644 index 0000000000..9556777600 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/PubMessageUtils.java @@ -0,0 +1,44 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; + +/** + * 发布消息工具类 + * 注意,直接注入client pub会报错 报错原因我也不知道 如果有大佬知道就请留言告知一下 + * @author jie + */ +@Slf4j +public class PubMessageUtils { + + public static boolean pub(String topic, String message) throws MqttException, UnsupportedEncodingException { + return pub(topic, message.getBytes(StandardCharsets.UTF_8), 0, false); + } + + public static boolean pub(String topic, String message, int qos) throws MqttException, UnsupportedEncodingException { + return pub(topic, message.getBytes(StandardCharsets.UTF_8), qos, false); + } + + public static boolean pub(String topic, byte[] message) throws MqttException { + return pub(topic, message, 0, false); + } + + public static boolean pub(String topic, byte[] message, int qos) throws MqttException { + return pub(topic, message, qos, false); + } + + public static boolean pub(String topic, byte[] message, int qos, boolean retained) throws MqttException { + try { + MqttClient client = ApplicationContextUtil.getBean(MqttClient.class); + client.publish(topic, message, qos, retained); + } catch (MqttException e) { + log.error(e.toString()); + return false; + } + return true; + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ThreadUtils.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ThreadUtils.java new file mode 100644 index 0000000000..39fe9daa77 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/utils/ThreadUtils.java @@ -0,0 +1,14 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author jie + */ +public class ThreadUtils { + /** + * 线程池 + */ + public static ExecutorService executorService = Executors.newFixedThreadPool(50); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/redis/RedisKeyConstants.java new file mode 100644 index 0000000000..393ee60072 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/redis/RedisKeyConstants.java @@ -0,0 +1,18 @@ +package cn.iocoder.yudao.module.iot.framework.redis; + +/** + * CRM Redis Key 枚举类 + * + * @author 芋道源码 + */ +public interface RedisKeyConstants { + + /** + * 序号的缓存 + * + * KEY 格式:trade_no:{prefix} + * VALUE 数据格式:编号自增 + */ + String NO = "crm:seq_no:"; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayService.java index 6cc34110ce..4e44a169c4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayService.java @@ -7,6 +7,8 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.pojo.PageParam; +import java.util.List; + /** * 网关 Service 接口 * @@ -44,6 +46,7 @@ public interface GatewayService { */ GatewayDO getGateway(Long id); + List selectListByIsEnable(boolean isEnable); /** * 获得网关分页 * @@ -93,4 +96,5 @@ public interface GatewayService { */ DeviceDO getDevice(Long id); + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayServiceImpl.java index 25ec07302f..e631d64299 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/gateway/GatewayServiceImpl.java @@ -2,6 +2,12 @@ package cn.iocoder.yudao.module.iot.service.gateway; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; +import cn.iocoder.yudao.module.iot.framework.constant.Constants; +import cn.iocoder.yudao.module.iot.framework.constant.IsEnableConstant; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; import org.springframework.validation.annotation.Validated; @@ -15,6 +21,8 @@ import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.module.iot.dal.mysql.gateway.GatewayMapper; +import java.util.List; + import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*; @@ -32,6 +40,7 @@ public class GatewayServiceImpl implements GatewayService { @Resource private DeviceMapper deviceMapper; + @Override public Long createGateway(GatewaySaveReqVO createReqVO) { // 插入 @@ -73,6 +82,10 @@ public class GatewayServiceImpl implements GatewayService { return gatewayMapper.selectById(id); } + @Override + public List selectListByIsEnable(boolean isEnable){ + return gatewayMapper.selectListByIsEnable(isEnable); + } @Override public PageResult getGatewayPage(GatewayPageReqVO pageReqVO) { return gatewayMapper.selectPage(pageReqVO); diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index fe592406c8..017537bb94 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -236,6 +236,8 @@ yudao: - rep_demo_jianpiao - tmp_report_data_1 - tmp_report_data_income + - iot_gateway + - iot_mqtt_record sms-code: # 短信验证码相关的配置项 expire-times: 10m send-frequency: 1m @@ -262,4 +264,15 @@ debug: false # 积木报表配置 jeecg: jmreport: - saas-mode: tenant \ No newline at end of file + saas-mode: tenant + +#tcp://47.112.167.85:1883 tcp://47.106.185.127:1883 +emqx: + broker: tcp://47.106.185.127:1883 #broker地址47.112.167.85 + clientId: emqx-9521 + userName: admin #授权账号 一定要授权的 + password: public123 #密码 + cleanSession: true #是否清除会话 + reconnect: true #是否断线重连 + timeout: 20 #连接超时时间 + keepAlive: 10 #心跳间隔 \ No newline at end of file