|
|
|
@ -58,26 +58,32 @@ public class MqttserviceImpl implements IMqttservice, ApplicationListener<Applic
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void subscribeMysqlTopic() {
|
|
|
|
public void subscribeMysqlTopic() {
|
|
|
|
if (client.isConnected()) {
|
|
|
|
if (!client.isConnected()) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
List<GatewayDO> gatewayList = gatewayService.selectListByIsEnable(IsEnableConstant.IsEnableTrue);
|
|
|
|
log.warn("MQTT未连接,尝试重新连接...");
|
|
|
|
for (GatewayDO gateway : gatewayList) {
|
|
|
|
client.reconnect();
|
|
|
|
if (StringUtils.isNotBlank(gateway.getTopic())) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
SubscriptTopic topic = new SubscriptTopic(gateway.getTopic(),
|
|
|
|
log.error("MQTT重连失败", e);
|
|
|
|
gateway.getTopic(), Pattern.QUEUE, 0, dataHandler);
|
|
|
|
return;
|
|
|
|
defaultBizTopicSet.getTopicMap().add(topic);
|
|
|
|
}
|
|
|
|
try {
|
|
|
|
}
|
|
|
|
client.subscribe(topic.getSubTopic(), 0, dataHandler);
|
|
|
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
List<GatewayDO> gatewayList = gatewayService.selectListByIsEnable(IsEnableConstant.IsEnableTrue);
|
|
|
|
e.printStackTrace();
|
|
|
|
for (GatewayDO gateway : gatewayList) {
|
|
|
|
}
|
|
|
|
if (StringUtils.isNotBlank(gateway.getTopic())) {
|
|
|
|
|
|
|
|
SubscriptTopic topic = new SubscriptTopic(gateway.getTopic(),
|
|
|
|
|
|
|
|
gateway.getTopic(), Pattern.QUEUE, 0, dataHandler);
|
|
|
|
|
|
|
|
defaultBizTopicSet.getTopicMap().add(topic);
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
client.subscribe(topic.getSubTopic(), 0, dataHandler);
|
|
|
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(), client, options));
|
|
|
|
|
|
|
|
log.info("共订阅: " + defaultBizTopicSet.getTopicMap().size() + " 个主题!");
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
log.error("Mqtt is not connected !");
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
client.setCallback(new MqttCallbackImpl(defaultBizTopicSet.getTopicMap(), client, options));
|
|
|
|
|
|
|
|
log.info("共订阅: " + defaultBizTopicSet.getTopicMap().size() + " 个主题!");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -86,6 +92,13 @@ public class MqttserviceImpl implements IMqttservice, ApplicationListener<Applic
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public int subscribeTopic(String topic) throws MqttException {
|
|
|
|
public int subscribeTopic(String topic) throws MqttException {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//断线重连
|
|
|
|
|
|
|
|
if (!client.isConnected()) {
|
|
|
|
|
|
|
|
log.warn("MQTT未连接,尝试重新连接...");
|
|
|
|
|
|
|
|
client.reconnect();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
SubscriptTopic subscriptTopic = new SubscriptTopic(topic, topic, Pattern.QUEUE, 0, dataHandler);
|
|
|
|
SubscriptTopic subscriptTopic = new SubscriptTopic(topic, topic, Pattern.QUEUE, 0, dataHandler);
|
|
|
|
defaultBizTopicSet.getTopicMap().add(subscriptTopic);
|
|
|
|
defaultBizTopicSet.getTopicMap().add(subscriptTopic);
|
|
|
|
client.subscribe(topic, 0, dataHandler);
|
|
|
|
client.subscribe(topic, 0, dataHandler);
|
|
|
|
|