add mqtt record
parent
2543dd5922
commit
57d7761e72
@ -0,0 +1,6 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.constant;
|
||||
|
||||
public class Constants {
|
||||
public final static String MQTT_timestamp_format = "yyyy-MM-dd HH:mm:ss.SSS";
|
||||
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.constant;
|
||||
|
||||
public class IsEnableConstant {
|
||||
/**
|
||||
* 停用
|
||||
*/
|
||||
public final static boolean IsEnableFalse = false;
|
||||
/**
|
||||
* 启用
|
||||
*/
|
||||
public final static boolean IsEnableTrue = true;
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.annotation;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 自定义标记注解
|
||||
* @author jie
|
||||
*/
|
||||
@Component
|
||||
@Documented
|
||||
@Target({ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface Topic {
|
||||
|
||||
/**
|
||||
* topic
|
||||
* @return
|
||||
*/
|
||||
String topic() default "";
|
||||
|
||||
/**
|
||||
* qos
|
||||
* @return
|
||||
*/
|
||||
int qos() default 0;
|
||||
|
||||
/**
|
||||
* 订阅模式
|
||||
* @return
|
||||
*/
|
||||
Pattern patten() default Pattern.NONE;
|
||||
|
||||
/**
|
||||
* 共享订阅组
|
||||
* @return
|
||||
*/
|
||||
String group() default "group1";
|
||||
}
|
||||
@ -0,0 +1,103 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.common;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic;
|
||||
import lombok.Data;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* mqtt回调类
|
||||
*
|
||||
* @author jie
|
||||
*/
|
||||
@Data
|
||||
@Slf4j
|
||||
public class MqttCallbackImpl implements MqttCallbackExtended {
|
||||
private final List<SubscriptTopic> topicMap;
|
||||
private final MqttClient client;
|
||||
private final MqttConnectOptions option;
|
||||
|
||||
/**
|
||||
* 客户端断开后触发
|
||||
*
|
||||
* @param throwable 异常
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
while (!client.isConnected()) {
|
||||
log.info("emqx 重新连接");
|
||||
client.connect(option);
|
||||
Thread.sleep(6000);
|
||||
}
|
||||
log.info("emqx 重接成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* 客户端收到消息触发
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param message 消息
|
||||
*/
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
for (SubscriptTopic subscriptTopic : topicMap) {
|
||||
if (subscriptTopic.getPattern() != Pattern.NONE && isMatched(subscriptTopic.getTopic(), topic)) {
|
||||
log.info("MqttCallbackImpl,messageArrived:topic="+topic+",message="+message);
|
||||
subscriptTopic.getMessageListener().messageArrived(topic, message);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检测一个主题是否为一个通配符表示的子主题
|
||||
*
|
||||
* @param topicFilter 通配符主题
|
||||
* @param topic 子主题
|
||||
* @return 是否为通配符主题的子主题
|
||||
*/
|
||||
private boolean isMatched(String topicFilter, String topic) {
|
||||
return MqttTopic.isMatched(topicFilter, topic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布消息成功
|
||||
*
|
||||
* @param token token
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
String[] topics = token.getTopics();
|
||||
for (String topic : topics) {
|
||||
log.info("向主题:" + topic + "发送数据:" + new String(token.getMessage().getPayload()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接emq服务器后触发
|
||||
*
|
||||
* @param b
|
||||
* @param s
|
||||
*/
|
||||
@Override
|
||||
public void connectComplete(boolean b, String s) {
|
||||
if (client.isConnected()) {
|
||||
for (SubscriptTopic sub : topicMap) {
|
||||
try {
|
||||
client.subscribe(sub.getSubTopic(), sub.getQos(), sub.getMessageListener());
|
||||
log.info("订阅主题:" + sub.getSubTopic() + " Listener:" + sub.getMessageListener());
|
||||
} catch (MqttException e) {
|
||||
//log.info("订阅主题失败:" + sub.getSubTopic());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
log.info("共订阅: " + topicMap.size() + " 个主题!");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,16 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.common;
|
||||
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
|
||||
/**
|
||||
* @author jie
|
||||
*/
|
||||
public interface MsgDecoder<T> {
|
||||
/**
|
||||
* 下位机消息解码器
|
||||
* @param msg
|
||||
* @return
|
||||
*/
|
||||
T decoder(MqttMessage msg);
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.common;
|
||||
|
||||
/**
|
||||
* @author jie
|
||||
*/
|
||||
public interface MsgEncoder<T> {
|
||||
/**
|
||||
* 数据库消息编码为string
|
||||
* @param t
|
||||
* @return
|
||||
*/
|
||||
String encoder(T t);
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.common;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.utils.ThreadUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
|
||||
/**
|
||||
* 封装的主题消费父类
|
||||
*
|
||||
* @author jie
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class SuperConsumer<T> implements IMqttMessageListener, MsgDecoder<T> {
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) {
|
||||
// log.info("收到主题 : " + topic + " 的消息: " + new String(mqttMessage.getPayload()));
|
||||
ThreadUtils.executorService.submit(() -> {
|
||||
try {
|
||||
T decoder = decoder(mqttMessage);
|
||||
msgHandler(topic, decoder);
|
||||
} catch (Exception ex) {
|
||||
//解决业务处理错误导致断线问题
|
||||
log.error(topic+":解决业务处理错误导致断线问题");
|
||||
log.error(ex.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 业务操作
|
||||
*
|
||||
* @param topic
|
||||
* @param entity
|
||||
*/
|
||||
protected abstract void msgHandler(String topic, T entity);
|
||||
}
|
||||
@ -0,0 +1,17 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.config;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
// 默认的业务主题集合,使用注解,收集所有的业务处理相关类实例
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class DefaultBizTopicSet {
|
||||
List<SubscriptTopic> topicMap = new ArrayList<>();
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.consumer;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
|
||||
public interface IMqttservice {
|
||||
|
||||
/**
|
||||
*一次性定时器
|
||||
* 程序启动后去数据库找到所有需要订阅的topic 添加进client订阅
|
||||
* */
|
||||
public void subscribeMysqlTopic() throws Exception;
|
||||
/**
|
||||
* 需要订阅的topic 添加进client订阅
|
||||
* */
|
||||
public int subscribeTopic(String topic) throws MqttException;
|
||||
/**
|
||||
* 关闭订阅topic 从client中关闭topic
|
||||
* */
|
||||
public int unsubscribeTopic(String topic) throws MqttException;
|
||||
|
||||
}
|
||||
@ -0,0 +1,97 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.consumer;
|
||||
|
||||
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.iotorganization.IotOrganizationDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.mqttrecord.MqttRecordDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.mysql.mqttrecord.MqttRecordMapper;
|
||||
import cn.iocoder.yudao.module.iot.framework.constant.Constants;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.common.SuperConsumer;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.impl.AsyncService;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.utils.DateUtils;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.utils.MqttDataUtils;
|
||||
import cn.iocoder.yudao.module.iot.service.device.DeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.iotorganization.IotOrganizationService;
|
||||
import cn.iocoder.yudao.module.iot.service.mqttrecord.MqttRecordService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.cache.RedisCache;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MqttDataHandler extends SuperConsumer<String> {
|
||||
|
||||
@Resource
|
||||
private IotOrganizationService organizationService;
|
||||
@Resource
|
||||
private DeviceService deviceService;
|
||||
@Resource
|
||||
private AsyncService asyncService;
|
||||
|
||||
@Resource
|
||||
private MqttRecordService mqttRecordService;
|
||||
@Resource
|
||||
private MqttRecordMapper mqttRecordMapper;
|
||||
@Override
|
||||
public String decoder(MqttMessage msg) {
|
||||
return new String(msg.getPayload());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void msgHandler(String topic, String entity) {
|
||||
log.debug("msgHandler"+":topic="+topic);
|
||||
log.debug("entity:"+entity);
|
||||
|
||||
MqttData data = MqttDataUtils.parse(entity);
|
||||
IotOrganizationDO machine = new IotOrganizationDO();
|
||||
machine.setId(1L);
|
||||
if (data!=null) {
|
||||
//根据设定转化
|
||||
// try {
|
||||
// String transfer = asyncService.transferBase(data,equipments.getEquipmentsId());
|
||||
// if(StringUtils.isNotBlank(transfer))
|
||||
// entity = transfer;
|
||||
// }catch (Exception e){
|
||||
// log.error("asyncService.transferBase error:"+entity);
|
||||
// }
|
||||
|
||||
save(machine, entity, data);
|
||||
}
|
||||
}
|
||||
|
||||
public void save(IotOrganizationDO machine, String entity, MqttData data) {
|
||||
try {
|
||||
long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format);
|
||||
//timestamp = DateUtils.getMillsLong();
|
||||
LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime());
|
||||
|
||||
MqttRecordDO recordDO = new MqttRecordDO();
|
||||
recordDO.setDeviceCode(data.getDeviceID());
|
||||
recordDO.setGatewayCode(data.getGatewayID());
|
||||
recordDO.setDeviceData(entity);
|
||||
recordDO.setDeviceDataTime(date);
|
||||
recordDO.setDeviceDataTimeLong(timestamp);
|
||||
/**直接保存原始mqtt*/
|
||||
mqttRecordMapper.insert(recordDO);
|
||||
|
||||
/**
|
||||
* 保存解析后的mqtt数据
|
||||
* */
|
||||
//tsMqttService.insertDataAddress(data, taskId, timestamp, equipment);
|
||||
} catch (Exception e) {
|
||||
log.error("-----mqttTableName:");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,111 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.consumer.impl;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO;
|
||||
import cn.iocoder.yudao.module.iot.framework.constant.IsEnableConstant;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.common.MqttCallbackImpl;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.config.DefaultBizTopicSet;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.MqttDataHandler;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.Pattern;
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.SubscriptTopic;
|
||||
import cn.iocoder.yudao.module.iot.service.gateway.GatewayService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
@EnableScheduling
|
||||
@DependsOn("defaultMqttStarter")
|
||||
@Slf4j
|
||||
public class MqttserviceImpl implements IMqttservice, ApplicationListener<ApplicationReadyEvent> {
|
||||
|
||||
@Resource
|
||||
private GatewayService gatewayService;
|
||||
|
||||
@Resource
|
||||
private MqttClient client;
|
||||
@Resource
|
||||
private DefaultBizTopicSet defaultBizTopicSet;
|
||||
@Resource
|
||||
private MqttConnectOptions options;
|
||||
|
||||
@Resource
|
||||
private MqttDataHandler dataHandler;
|
||||
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
|
||||
new Thread(() -> {
|
||||
subscribeMysqlTopic();
|
||||
}).start();
|
||||
}
|
||||
/**
|
||||
*一次性定时器
|
||||
* 程序启动后去数据库找到所有需要订阅的topic 添加进client订阅
|
||||
* */
|
||||
@Override
|
||||
public void subscribeMysqlTopic(){
|
||||
if(client.isConnected()){
|
||||
|
||||
List<GatewayDO> gatewayList = gatewayService.selectListByIsEnable(IsEnableConstant.IsEnableTrue);
|
||||
for (GatewayDO gateway : gatewayList) {
|
||||
if(StringUtils.isNotBlank(gateway.getTopic())){
|
||||
SubscriptTopic topic = new SubscriptTopic(gateway.getTopic(),
|
||||
gateway.getTopic(), Pattern.QUEUE, 0, dataHandler );
|
||||
defaultBizTopicSet.getTopicMap().add(topic);
|
||||
try {
|
||||
client.subscribe(topic.getSubTopic(),0, dataHandler);
|
||||
} catch (MqttException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options));
|
||||
log.info("共订阅: " + defaultBizTopicSet.getTopicMap().size() + " 个主题!");
|
||||
}
|
||||
else{
|
||||
log.error("Mqtt is not connected !");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 需要订阅的topic 添加进client订阅
|
||||
* */
|
||||
@Override
|
||||
public int subscribeTopic(String topic)throws MqttException{
|
||||
SubscriptTopic subscriptTopic = new SubscriptTopic(topic, topic,Pattern.QUEUE, 0, dataHandler );
|
||||
defaultBizTopicSet.getTopicMap().add(subscriptTopic);
|
||||
client.subscribe(topic,0, dataHandler);
|
||||
client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options));
|
||||
return 0;
|
||||
}
|
||||
/**
|
||||
* 关闭订阅topic 从client中关闭topic
|
||||
* */
|
||||
@Override
|
||||
public int unsubscribeTopic(String topic)throws MqttException{
|
||||
for (int i = 0; i < defaultBizTopicSet.getTopicMap().size(); i++) {
|
||||
if(topic.equals(defaultBizTopicSet.getTopicMap().get(i).getSubTopic())){
|
||||
defaultBizTopicSet.getTopicMap().remove(defaultBizTopicSet.getTopicMap().get(i));
|
||||
break;
|
||||
}
|
||||
}
|
||||
client.unsubscribe(topic);
|
||||
client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(),client,options));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,55 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* <p> 设备上线
|
||||
* 邮箱:275236367@qq.com
|
||||
* 创建时间: 2020/1/4
|
||||
* @author jie
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Connect {
|
||||
/**
|
||||
* 客户端id
|
||||
*/
|
||||
private String clientid;
|
||||
/**
|
||||
* 用户名
|
||||
*/
|
||||
private String username;
|
||||
/**
|
||||
* ip地址
|
||||
*/
|
||||
private String ipaddress;
|
||||
/**
|
||||
*连接回执
|
||||
*/
|
||||
private int connack;
|
||||
/**
|
||||
* 事件触发时间 (ms)
|
||||
*/
|
||||
private long ts;
|
||||
/**
|
||||
* 协议版本
|
||||
*/
|
||||
private int proto_ver;
|
||||
/**
|
||||
* 协议名字
|
||||
*/
|
||||
private String proto_name;
|
||||
/**
|
||||
* MQTT clean_start
|
||||
*/
|
||||
private boolean clean_start;
|
||||
/**
|
||||
*保持连接
|
||||
*/
|
||||
private int keepalive;
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* <p>设备下线
|
||||
* @author jie
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Disconnect {
|
||||
/**
|
||||
* 客户端id
|
||||
*/
|
||||
private String clientid;
|
||||
/**
|
||||
* 用户名
|
||||
*/
|
||||
private String username;
|
||||
/**
|
||||
* 终端连接断开原因
|
||||
*/
|
||||
private String reason;
|
||||
/**
|
||||
* 事件触发时间 (ms)
|
||||
*/
|
||||
private long ts;
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class IoData {
|
||||
String u;
|
||||
String v;
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
// 同一的数据格式
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class MqttData {
|
||||
protected String deviceID;
|
||||
protected String gatewayID;
|
||||
protected Map<String,IoData> deviceData;
|
||||
protected String deviceDataTime;
|
||||
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
/**
|
||||
* 订阅模式
|
||||
* @author jie
|
||||
*/
|
||||
|
||||
public enum Pattern {
|
||||
/**
|
||||
* 普通订阅
|
||||
*/
|
||||
NONE,
|
||||
/**
|
||||
* 不带群组的共享订阅
|
||||
*/
|
||||
QUEUE,
|
||||
/**
|
||||
* 带群组的共享订阅
|
||||
*/
|
||||
SHARE;
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.entity;
|
||||
|
||||
import lombok.*;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
||||
|
||||
/**
|
||||
* @author jie
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SubscriptTopic {
|
||||
/**
|
||||
* 原主题
|
||||
*/
|
||||
private String topic;
|
||||
/**
|
||||
* 订阅主题
|
||||
*/
|
||||
private String subTopic;
|
||||
/**
|
||||
* 订阅模式
|
||||
*/
|
||||
private Pattern pattern;
|
||||
/**
|
||||
* 消息等级
|
||||
*/
|
||||
private int qos;
|
||||
/**
|
||||
* 消费类
|
||||
*/
|
||||
private IMqttMessageListener messageListener;
|
||||
|
||||
}
|
||||
@ -0,0 +1,48 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.utils;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 类名称:ApplicationContextUtil
|
||||
* 类描述:
|
||||
* <p>
|
||||
* <p>从容器中获取bean
|
||||
*/
|
||||
@Component
|
||||
public class ApplicationContextUtil {
|
||||
|
||||
private static ApplicationContextUtil instance;
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@PostConstruct
|
||||
public void applicationContextUtil() {
|
||||
instance = this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据字节码获取bean
|
||||
* @param clazz
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public static <T> T getBean(Class<T> clazz) {
|
||||
return instance.applicationContext.getBean(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据名字获取bean
|
||||
* @param name
|
||||
* @param <T>
|
||||
* @return
|
||||
* @throws BeansException
|
||||
*/
|
||||
public static <T> T getBean(String name) throws BeansException {
|
||||
return (T) instance.applicationContext.getBean(name);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,304 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.utils;
|
||||
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.TimeZone;
|
||||
|
||||
public class DateUtils extends org.apache.commons.lang3.time.DateUtils{
|
||||
/**
|
||||
* 将本地时间, 转换成目标时区的时间
|
||||
* @param sourceDate
|
||||
* @return
|
||||
*/
|
||||
public static Date convertTimezone(Date sourceDate, String targetZoneId){
|
||||
return convertTimezone(sourceDate, TimeZone.getTimeZone(targetZoneId));
|
||||
}
|
||||
|
||||
public static Date convertTimezone(Date sourceDate, String sourceZoneId, String targetZoneId){
|
||||
TimeZone sourceTimeZone=TimeZone.getTimeZone(sourceZoneId);
|
||||
TimeZone targetTimeZone=TimeZone.getTimeZone(targetZoneId);
|
||||
|
||||
return convertTimezone(sourceDate, sourceTimeZone, targetTimeZone);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将本地时间,转换成对应时区的时间
|
||||
* @param localDate
|
||||
* @param targetTimezone 转换成目标时区所在的时间
|
||||
* @return
|
||||
*/
|
||||
public static Date convertTimezone(Date localDate, TimeZone targetTimezone){
|
||||
return convertTimezone(localDate, TimeZone.getDefault(), targetTimezone);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 将sourceDate转换成指定时区的时间
|
||||
* @param sourceDate
|
||||
* @param sourceTimezone sourceDate所在的时区
|
||||
* @param targetTimezone 转化成目标时间所在的时区
|
||||
* @return
|
||||
*/
|
||||
public static Date convertTimezone(Date sourceDate, TimeZone sourceTimezone, TimeZone targetTimezone){
|
||||
|
||||
|
||||
// targetDate - sourceDate=targetTimezone-sourceTimezone
|
||||
// --->
|
||||
// targetDate=sourceDate + (targetTimezone-sourceTimezone)
|
||||
|
||||
|
||||
Calendar calendar=Calendar.getInstance();
|
||||
// date.getTime() 为时间戳, 为格林尼治到系统现在的时间差,世界各个地方获取的时间戳是一样的,
|
||||
// 格式化输出时,因为设置了不同的时区,所以输出不一样
|
||||
long sourceTime=sourceDate.getTime();
|
||||
|
||||
|
||||
calendar.setTimeZone(sourceTimezone);
|
||||
calendar.setTimeInMillis(sourceTime);// 设置之后,calendar会计算各种filed对应的值,并保存
|
||||
|
||||
//获取源时区的到UTC的时区差
|
||||
int sourceZoneOffset=calendar.get(Calendar.ZONE_OFFSET);
|
||||
|
||||
|
||||
calendar.setTimeZone(targetTimezone);
|
||||
calendar.setTimeInMillis(sourceTime);
|
||||
|
||||
int targetZoneOffset=calendar.get(Calendar.ZONE_OFFSET);
|
||||
int targetDaylightOffset=calendar.get(Calendar.DST_OFFSET); // 夏令时
|
||||
|
||||
|
||||
long targetTime=sourceTime+ (targetZoneOffset+targetDaylightOffset) -sourceZoneOffset;
|
||||
|
||||
return new Date(targetTime);
|
||||
|
||||
}
|
||||
|
||||
public static String addDateMinut(String day, int hour){
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = null;
|
||||
try {
|
||||
date = format.parse(day);
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
if (date == null)
|
||||
return "";
|
||||
Calendar cal = Calendar.getInstance();
|
||||
cal.setTime(date);
|
||||
cal.add(Calendar.HOUR, -hour);// 24小时制
|
||||
date = cal.getTime();
|
||||
cal = null;
|
||||
return format.format(date);
|
||||
}
|
||||
|
||||
public static String addDateDay(String time, int day){
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = null;
|
||||
try {
|
||||
date = format.parse(time);
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
if (date == null)
|
||||
return "";
|
||||
Calendar cal = Calendar.getInstance();
|
||||
cal.setTime(date);
|
||||
cal.add(Calendar.DAY_OF_MONTH, -day);// 日
|
||||
date = cal.getTime();
|
||||
cal = null;
|
||||
return format.format(date);
|
||||
}
|
||||
|
||||
|
||||
public static String addDateMonth(String day, int hour){
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = null;
|
||||
try {
|
||||
date = format.parse(day);
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
if (date == null)
|
||||
return "";
|
||||
Calendar cal = Calendar.getInstance();
|
||||
cal.setTime(date);
|
||||
cal.add(Calendar.MONTH, hour);// 月
|
||||
date = cal.getTime();
|
||||
cal = null;
|
||||
return format.format(date);
|
||||
}
|
||||
public static String YYYY = "yyyy";
|
||||
|
||||
public static String YYYY_MM = "yyyy-MM";
|
||||
|
||||
public static String YYYY_MM_DD = "yyyy-MM-dd";
|
||||
|
||||
public static String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
|
||||
|
||||
public static String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
|
||||
public static String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd HH:mm:ss.SSS";
|
||||
private static String[] parsePatterns = {
|
||||
"yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm", "yyyy-MM",
|
||||
"yyyy/MM/dd", "yyyy/MM/dd HH:mm:ss", "yyyy/MM/dd HH:mm", "yyyy/MM",
|
||||
"yyyy.MM.dd", "yyyy.MM.dd HH:mm:ss", "yyyy.MM.dd HH:mm", "yyyy.MM"};
|
||||
|
||||
/**
|
||||
* 获取当前Date型日期
|
||||
*
|
||||
* @return Date() 当前日期
|
||||
*/
|
||||
public static Date getNowDate()
|
||||
{
|
||||
return new Date();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前日期, 默认格式为yyyy-MM-dd
|
||||
*
|
||||
* @return String
|
||||
*/
|
||||
public static String getDate()
|
||||
{
|
||||
return dateTimeNow(YYYY_MM_DD);
|
||||
}
|
||||
public static String getMills()
|
||||
{
|
||||
return dateTimeNow(YYYY_MM_DD_HH_MM_SS_SSS);
|
||||
}
|
||||
public static String getMills(Date date)
|
||||
{
|
||||
return parseDateToStr(YYYY_MM_DD_HH_MM_SS_SSS, date);
|
||||
}
|
||||
public static Long getMillsLong()
|
||||
{
|
||||
return new Date().getTime();
|
||||
}
|
||||
public static Long getMillsLong(Date date)
|
||||
{
|
||||
return date.getTime();
|
||||
}
|
||||
public static final String getTime()
|
||||
{
|
||||
return dateTimeNow(YYYY_MM_DD_HH_MM_SS);
|
||||
}
|
||||
|
||||
public static final String dateTimeNow()
|
||||
{
|
||||
return dateTimeNow(YYYYMMDDHHMMSS);
|
||||
}
|
||||
|
||||
public static final String dateTimeNow(final String format)
|
||||
{
|
||||
return parseDateToStr(format, new Date());
|
||||
}
|
||||
|
||||
public static final String dateTime(final Date date)
|
||||
{
|
||||
return parseDateToStr(YYYY_MM_DD, date);
|
||||
}
|
||||
|
||||
public static final String parseDateToStr(final String format, final Date date)
|
||||
{
|
||||
return new SimpleDateFormat(format).format(date);
|
||||
}
|
||||
|
||||
public static final Date dateTime(final String format, final String ts)
|
||||
{
|
||||
try
|
||||
{
|
||||
return new SimpleDateFormat(format).parse(ts);
|
||||
}
|
||||
catch (ParseException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 日期路径 即年/月/日 如2018/08/08
|
||||
*/
|
||||
public static final String datePath()
|
||||
{
|
||||
Date now = new Date();
|
||||
return DateFormatUtils.format(now, "yyyy/MM/dd");
|
||||
}
|
||||
|
||||
/**
|
||||
* 日期路径 即年/月/日 如20180808
|
||||
*/
|
||||
public static final String dateTime()
|
||||
{
|
||||
Date now = new Date();
|
||||
return DateFormatUtils.format(now, "yyyyMMdd");
|
||||
}
|
||||
|
||||
/**
|
||||
* 日期型字符串转化为日期 格式
|
||||
*/
|
||||
public static Date parseDate(Object str)
|
||||
{
|
||||
if (str == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
try
|
||||
{
|
||||
return parseDate(str.toString(), parsePatterns);
|
||||
}
|
||||
catch (ParseException e)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务器启动时间
|
||||
*/
|
||||
public static Date getServerStartDate()
|
||||
{
|
||||
long time = ManagementFactory.getRuntimeMXBean().getStartTime();
|
||||
return new Date(time);
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算两个时间差
|
||||
*/
|
||||
public static String getDatePoor(Date endDate, Date nowDate)
|
||||
{
|
||||
long nd = 1000 * 24 * 60 * 60;
|
||||
long nh = 1000 * 60 * 60;
|
||||
long nm = 1000 * 60;
|
||||
// long ns = 1000;
|
||||
// 获得两个时间的毫秒时间差异
|
||||
long diff = endDate.getTime() - nowDate.getTime();
|
||||
// 计算差多少天
|
||||
long day = diff / nd;
|
||||
// 计算差多少小时
|
||||
long hour = diff % nd / nh;
|
||||
// 计算差多少分钟
|
||||
long min = diff % nd % nh / nm;
|
||||
// 计算差多少秒//输出结果
|
||||
// long sec = diff % nd % nh % nm / ns;
|
||||
return day + "天" + hour + "小时" + min + "分钟";
|
||||
}
|
||||
|
||||
public static Long strToTimeStamp(String dateTime,String format) throws ParseException {
|
||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
|
||||
return simpleDateFormat.parse(dateTime).getTime();
|
||||
}
|
||||
public static LocalDateTime strToLocalDateTime(String dateTimeString) throws ParseException {
|
||||
// 定义日期时间格式
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
// 使用DateTimeFormatter解析字符串为LocalDateTime
|
||||
LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
|
||||
return dateTime;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.utils;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
@Slf4j
|
||||
public class MqttDataUtils {
|
||||
|
||||
public static MqttData parse(String entity){
|
||||
try {
|
||||
MqttData data = JSON.parseObject(entity, MqttData.class);
|
||||
return data;
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error("MqttDataHandler.JSON.parseObject error:");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
package cn.iocoder.yudao.module.iot.framework.mqtt.utils;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* @author jie
|
||||
*/
|
||||
public class ThreadUtils {
|
||||
/**
|
||||
* 线程池
|
||||
*/
|
||||
public static ExecutorService executorService = Executors.newFixedThreadPool(50);
|
||||
}
|
||||
Loading…
Reference in New Issue