fix:修改读取数据方式opcuva为mqtt

plp
HuangHuiKang 1 month ago
parent fdba3ea563
commit 572ff4c3dd

@ -2,645 +2,270 @@ package cn.iocoder.yudao.framework.common.util.opc;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import java.net.InetAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
/**
* OPC UA - Eclipse Milo 0.6.9
* OPC UA
*/
import java.util.concurrent.*;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
public class OpcUtils {
// 静态成员变量,所有实例共享
private static OpcUaClient client;
private static String serverUrl;
private static boolean isConnected = false;
private static final String LOG_PREFIX = "[OPC-UA]";
// 私有构造方法,防止实例化
private OpcUtils() {
throw new UnsupportedOperationException("这是一个工具类,不允许创建实例");
}
/**
* OPC Client
*/
private static final ConcurrentHashMap<Long, OpcUaClient> CLIENT_MAP =
new ConcurrentHashMap<>();
/**
* OPC UA
*
*/
public static boolean connect(String url, int timeoutSeconds) {
return connect(url, null, null, timeoutSeconds);
}
private static final ConcurrentHashMap<Long, Object> LOCK_MAP =
new ConcurrentHashMap<>();
public static boolean connect(String url, String username, String password, int timeoutSeconds) {
if (isConnected) {
log.info(" {} 客户端已连接,无需重复连接",LOG_PREFIX);
return true;
}
serverUrl = url;
public static boolean connect(Long deviceId,
String url,
String username,
String password,
int timeoutSeconds) {
try {
log.info(" {} 正在连接到OPC UA服务器 {}",LOG_PREFIX,url);
// 提取主机和端口
final String targetHost = extractHostFromUrl(url);
final int targetPort = extractPortFromUrl(url);
final String path = extractPathFromUrl(url);
System.out.println(LOG_PREFIX + "目标主机: " + targetHost + ", 端口: " + targetPort + ", 路径: " + path);
OpcUaClient client = CLIENT_MAP.get(deviceId);
// 将主机名解析为IP地址
final String ipAddress = resolveToIpAddress(targetHost);
System.out.println(LOG_PREFIX + "解析为IP地址: " + ipAddress);
if (username != null && password != null && !username.isEmpty()) {
System.out.println(LOG_PREFIX + "使用用户名密码认证: " + username);
// 用户名密码认证
client = OpcUaClient.create(url, endpoints -> {
if (endpoints == null || endpoints.isEmpty()) {
System.err.println(LOG_PREFIX + "服务器未返回任何端点");
return Optional.empty();
}
System.out.println(LOG_PREFIX + "发现端点数量: " + endpoints.size());
// 查找无安全策略的端点
for (EndpointDescription endpoint : endpoints) {
String endpointUrl = endpoint.getEndpointUrl();
System.out.println(LOG_PREFIX + "检查端点: " + endpointUrl +
" | 安全策略: " + endpoint.getSecurityPolicyUri());
if ("http://opcfoundation.org/UA/SecurityPolicy#None".equals(endpoint.getSecurityPolicyUri())) {
// 修正端点URL强制使用IP地址
String correctedUrl = forceIpAddressEndpoint(endpointUrl, ipAddress, targetPort, path);
System.out.println(LOG_PREFIX + "强制使用IP地址: " + endpointUrl + " -> " + correctedUrl);
return Optional.of(new EndpointDescription(
correctedUrl,
endpoint.getServer(),
endpoint.getServerCertificate(),
endpoint.getSecurityMode(),
endpoint.getSecurityPolicyUri(),
endpoint.getUserIdentityTokens(),
endpoint.getTransportProfileUri(),
endpoint.getSecurityLevel()
));
}
if (isConnected(client)) {
return true;
}
System.err.println(LOG_PREFIX + "未找到无安全策略的端点");
return Optional.empty();
}, configBuilder -> configBuilder
.setIdentityProvider(new UsernameProvider(username, password))
.setRequestTimeout(UInteger.valueOf(timeoutSeconds * 1000L))
.build());
} else {
System.out.println(LOG_PREFIX + "使用匿名认证");
Object lock = LOCK_MAP.computeIfAbsent(deviceId, k -> new Object());
// 对于匿名认证,手动发现端点并修正
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(url).get(timeoutSeconds, TimeUnit.SECONDS);
synchronized (lock) {
if (endpoints == null || endpoints.isEmpty()) {
System.err.println(LOG_PREFIX + "服务器未返回任何端点");
return false;
client = CLIENT_MAP.get(deviceId);
if (isConnected(client)) {
return true;
}
// 查找无安全策略的端点
Optional<EndpointDescription> selectedEndpoint = Optional.empty();
for (EndpointDescription endpoint : endpoints) {
if ("http://opcfoundation.org/UA/SecurityPolicy#None".equals(endpoint.getSecurityPolicyUri())) {
selectedEndpoint = Optional.of(endpoint);
break;
}
}
log.info("创建OPC连接 deviceId={}, url={}", deviceId, url);
if (!selectedEndpoint.isPresent()) {
System.err.println(LOG_PREFIX + "未找到无安全策略的端点");
return false;
}
// 只用用户名/密码连接
IdentityProvider identityProvider = new UsernameProvider(username, password);
EndpointDescription endpoint = selectedEndpoint.get();
String correctedUrl = forceIpAddressEndpoint(endpoint.getEndpointUrl(), ipAddress, targetPort, path);
System.out.println(LOG_PREFIX + "强制使用IP地址: " + endpoint.getEndpointUrl() + " -> " + correctedUrl);
EndpointDescription correctedEndpoint = new EndpointDescription(
correctedUrl,
endpoint.getServer(),
endpoint.getServerCertificate(),
endpoint.getSecurityMode(),
endpoint.getSecurityPolicyUri(),
endpoint.getUserIdentityTokens(),
endpoint.getTransportProfileUri(),
endpoint.getSecurityLevel()
OpcUaClient newClient = OpcUaClient.create(
url,
// 只选择 SecurityPolicy.None 的端点
endpoints -> endpoints.stream()
.filter(ep -> ep.getSecurityPolicyUri().equals("http://opcfoundation.org/UA/SecurityPolicy#None"))
.findFirst(),
configBuilder -> configBuilder
.setIdentityProvider(identityProvider)
.setRequestTimeout(uint(timeoutSeconds * 1000))
.build()
);
OpcUaClientConfigBuilder configBuilder = OpcUaClientConfig.builder()
.setEndpoint(correctedEndpoint)
.setIdentityProvider(new AnonymousProvider())
.setRequestTimeout(UInteger.valueOf(timeoutSeconds * 1000L));
client = OpcUaClient.create(configBuilder.build());
}
newClient.connect().get(timeoutSeconds, TimeUnit.SECONDS);
client.connect().get(timeoutSeconds, TimeUnit.SECONDS);
CLIENT_MAP.put(deviceId, newClient);
if (validateConnection()) {
isConnected = true;
System.out.println(LOG_PREFIX + "服务器连接成功");
log.info("OPC连接成功 deviceId={}", deviceId);
return true;
} else {
System.err.println(LOG_PREFIX + "连接验证失败");
return false;
}
} catch (Exception e) {
System.err.println(LOG_PREFIX + "连接失败: " + e.getMessage());
e.printStackTrace();
log.error("OPC连接失败 deviceId={}", deviceId, e);
CLIENT_MAP.remove(deviceId);
return false;
}
}
/**
* IP
* 线 + session
*/
private static String resolveToIpAddress(String host) {
try {
// 如果已经是IP地址直接返回
if (host.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
return host;
}
// 尝试解析主机名
InetAddress address = InetAddress.getByName(host);
return address.getHostAddress();
public static List<Object> readValues(Long deviceId,
List<String> addresses) {
} catch (Exception e) {
System.err.println(LOG_PREFIX + "解析主机名失败: " + host + ", 错误: " + e.getMessage());
return host; // 返回原始值
}
}
OpcUaClient client = CLIENT_MAP.get(deviceId);
/**
* 使IPURL
*/
private static String forceIpAddressEndpoint(String endpointUrl, String ipAddress, int defaultPort, String defaultPath) {
try {
// 提取端口和路径
int port = extractPortFromUrl(endpointUrl);
if (port <= 0) {
port = defaultPort;
}
if (!isConnected(client)) {
String path = extractPathFromUrl(endpointUrl);
if (path.isEmpty()) {
path = defaultPath;
}
log.warn("OPC未连接或session失效 deviceId={}", deviceId);
return String.format("opc.tcp://%s:%d%s", ipAddress, port, path);
CLIENT_MAP.remove(deviceId);
} catch (Exception e) {
System.err.println(LOG_PREFIX + "构建IP端点URL失败: " + e.getMessage());
return endpointUrl;
}
return buildNullResult(addresses.size());
}
try {
List<NodeId> nodeIds =
addresses.stream()
.map(OpcUtils::parseNodeIdSafe)
.collect(Collectors.toList());
List<DataValue> values =
client.readValues(
0.0,
TimestampsToReturn.Both,
nodeIds
)
.get(5, TimeUnit.SECONDS);
List<Object> result =
new ArrayList<>(values.size());
for (DataValue dv : values) {
/**
* URL
*/
private static String extractHostFromUrl(String url) {
try {
// 格式: opc.tcp://host:port/path
if (url == null || url.trim().isEmpty()) {
return "localhost";
if (dv == null || dv.getValue() == null) {
result.add(null);
continue;
}
String withoutProtocol = url.replace("opc.tcp://", "");
// 处理IPv6地址
if (withoutProtocol.startsWith("[")) {
// IPv6地址格式: [2001:db8::1]:4840/path
int closeBracket = withoutProtocol.indexOf("]");
if (closeBracket > 0) {
String host = withoutProtocol.substring(1, closeBracket);
return host;
}
}
Variant variant = dv.getValue();
// 普通IPv4或主机名
String[] parts = withoutProtocol.split("[:/]");
if (parts.length > 0 && !parts[0].isEmpty()) {
return parts[0];
}
} catch (Exception e) {
System.err.println(LOG_PREFIX + "提取主机地址失败: " + e.getMessage());
}
return "localhost";
}
/**
* URL
*/
private static int extractPortFromUrl(String url) {
try {
if (url == null || url.trim().isEmpty()) {
return 4840; // 默认OPC UA端口
result.add(
variant != null
? variant.getValue()
: null
);
}
String withoutProtocol = url.replace("opc.tcp://", "");
return result;
// 查找端口号
int portStart = -1;
int portEnd = -1;
} catch (Exception e) {
if (withoutProtocol.startsWith("[")) {
// IPv6地址: [2001:db8::1]:4840/path
int closeBracket = withoutProtocol.indexOf("]");
if (closeBracket > 0) {
portStart = withoutProtocol.indexOf(":", closeBracket);
}
} else {
// IPv4或主机名: host:4840/path
int firstColon = withoutProtocol.indexOf(":");
if (firstColon > 0) {
portStart = firstColon;
}
}
log.error("OPC读取失败 deviceId={}", deviceId, e);
if (portStart > 0) {
portStart++; // 跳过冒号
portEnd = withoutProtocol.indexOf("/", portStart);
if (portEnd < 0) {
portEnd = withoutProtocol.length();
}
CLIENT_MAP.remove(deviceId);
String portStr = withoutProtocol.substring(portStart, portEnd);
if (portStr.matches("\\d+")) {
return Integer.parseInt(portStr);
return buildNullResult(addresses.size());
}
}
} catch (Exception e) {
System.err.println(LOG_PREFIX + "提取端口失败: " + e.getMessage());
}
return 4840; // 默认OPC UA端口
}
/**
* URL
*
*/
private static String extractPathFromUrl(String url) {
try {
if (url == null || url.trim().isEmpty()) {
return "";
}
String withoutProtocol = url.replace("opc.tcp://", "");
// 查找路径开始位置
int pathStart = -1;
if (withoutProtocol.startsWith("[")) {
// IPv6地址: [2001:db8::1]:4840/path
int closeBracket = withoutProtocol.indexOf("]");
if (closeBracket > 0) {
// 检查是否有端口
int colonAfterBracket = withoutProtocol.indexOf(":", closeBracket);
if (colonAfterBracket > 0) {
// 有端口: [2001:db8::1]:4840/path
int slashAfterPort = withoutProtocol.indexOf("/", colonAfterBracket);
if (slashAfterPort > 0) {
return withoutProtocol.substring(slashAfterPort);
}
} else {
// 无端口: [2001:db8::1]/path
int slashAfterBracket = withoutProtocol.indexOf("/", closeBracket);
if (slashAfterBracket > 0) {
return withoutProtocol.substring(slashAfterBracket);
}
}
}
} else {
// IPv4或主机名
// 先找主机名结束位置
int hostEnd = withoutProtocol.indexOf(":");
if (hostEnd < 0) {
// 无端口: host/path
hostEnd = withoutProtocol.indexOf("/");
if (hostEnd > 0) {
return withoutProtocol.substring(hostEnd);
}
} else {
// 有端口: host:port/path
int portEnd = withoutProtocol.indexOf("/", hostEnd);
if (portEnd > 0) {
return withoutProtocol.substring(portEnd);
}
}
}
private static boolean isConnected(OpcUaClient client) {
} catch (Exception e) {
System.err.println(LOG_PREFIX + "提取路径失败: " + e.getMessage());
}
return "";
if (client == null) {
return false;
}
/**
* URL
*/
private static String extractProtocolFromUrl(String url) {
try {
if (url == null || url.trim().isEmpty()) {
return "opc.tcp";
}
int protocolEnd = url.indexOf("://");
if (protocolEnd > 0) {
return url.substring(0, protocolEnd);
}
} catch (Exception e) {
System.err.println(LOG_PREFIX + "提取协议失败: " + e.getMessage());
}
return "opc.tcp";
}
client.getSession()
.get(200, TimeUnit.MILLISECONDS);
/**
* URL
*/
private static String buildUrl(String protocol, String host, int port, String path) {
StringBuilder url = new StringBuilder();
if (protocol == null || protocol.isEmpty()) {
protocol = "opc.tcp";
}
url.append(protocol).append("://");
// 处理IPv6地址
if (host.contains(":")) {
url.append("[").append(host).append("]");
} else {
url.append(host);
}
return true;
if (port > 0) {
url.append(":").append(port);
}
} catch (Exception e) {
if (path != null && !path.isEmpty()) {
if (!path.startsWith("/")) {
url.append("/");
return false;
}
url.append(path);
}
return url.toString();
}
/**
* URL使
* NodeId
*
* ns=2;s=Tag
* ns=2;i=123
* ns=3;g=UUID
*/
private static String correctEndpointUrl(String endpointUrl, String targetHost, int targetPort) {
if (endpointUrl == null || endpointUrl.trim().isEmpty()) {
return endpointUrl;
}
private static NodeId parseNodeIdSafe(String address) {
try {
// 提取原URL的各个部分
String protocol = extractProtocolFromUrl(endpointUrl);
String originalHost = extractHostFromUrl(endpointUrl);
int originalPort = extractPortFromUrl(endpointUrl);
String path = extractPathFromUrl(endpointUrl);
// 决定使用哪个主机
String useHost = targetHost;
int usePort = (targetPort > 0) ? targetPort : originalPort;
// 如果原端口无效,使用默认端口
if (usePort <= 0) {
usePort = 4840;
}
// 构建修正后的URL
return buildUrl(protocol, useHost, usePort, path);
} catch (Exception e) {
System.err.println(LOG_PREFIX + "修正端点URL失败: " + e.getMessage());
return endpointUrl;
}
if (address == null ||
address.trim().isEmpty()) {
return null;
}
return NodeId.parse(address.trim());
} catch (Exception e) {
log.warn("NodeId解析失败: {}", address);
/**
*
*/
private static boolean isLocalAddress(String url) {
if (url == null) return false;
return url.contains("127.0.0.1") ||
url.contains("localhost") ||
url.contains("127.0.1.1") ||
url.contains("[::1]") ||
url.startsWith("opc.tcp://localhost") ||
url.startsWith("opc.tcp://127.");
return null;
}
/**
* URL
*/
private static String correctEndpointUrl(String endpointUrl, String targetHost) {
if (endpointUrl == null) return endpointUrl;
// 替换各种本地地址表示形式
String corrected = endpointUrl
.replace("127.0.0.1", targetHost)
.replace("localhost", targetHost)
.replace("127.0.1.1", targetHost)
.replace("[::1]", targetHost);
// 如果还有localhost:端口的形式
if (corrected.contains("localhost:")) {
corrected = corrected.replace("localhost", targetHost);
}
return corrected;
}
/**
*
* null
*/
public static boolean disconnect() {
if (!isConnected || client == null) {
System.out.println(LOG_PREFIX + "客户端未连接");
return true;
}
try {
client.disconnect().get(5, TimeUnit.SECONDS);
isConnected = false;
client = null;
System.out.println(LOG_PREFIX + "连接已断开");
return true;
} catch (Exception e) {
System.err.println(LOG_PREFIX + "断开连接失败: " + e.getMessage());
return false;
}
}
private static List<Object> buildNullResult(int size) {
/**
*
*/
public static Object readValue(String nodeId) {
return readValue(nodeId, 10);
}
List<Object> result =
new ArrayList<>(size);
/**
*
*/
public static Object readValue(String nodeId, int timeoutSeconds) {
if (!isConnected()) {
System.err.println(LOG_PREFIX + "客户端未连接");
return null;
for (int i = 0; i < size; i++) {
result.add(null);
}
try {
NodeId id = NodeId.parse(nodeId);
DataValue value = client.readValue(0.0, TimestampsToReturn.Both, id)
.get(timeoutSeconds, TimeUnit.SECONDS);
Object result = value.getValue().getValue();
System.out.println(LOG_PREFIX + "读取节点成功: " + nodeId + " = " + result);
return result;
} catch (Exception e) {
System.err.println(LOG_PREFIX + "读取节点值失败[" + nodeId + "]: " + e.getMessage());
return null;
}
}
/**
*
*/
public static boolean writeValue(String nodeId, Object value) {
return writeValue(nodeId, value, 10);
}
/**
*
*
*/
public static boolean writeValue(String nodeId, Object value, int timeoutSeconds) {
if (!isConnected()) {
System.err.println(LOG_PREFIX + "客户端未连接");
return false;
}
public static boolean disconnect(Long deviceId) {
try {
NodeId id = NodeId.parse(nodeId);
DataValue dataValue = new DataValue(new Variant(value), null, null);
client.writeValue(id, dataValue).get(timeoutSeconds, TimeUnit.SECONDS);
System.out.println(LOG_PREFIX + "写入节点成功: " + nodeId + " = " + value);
OpcUaClient client =
CLIENT_MAP.remove(deviceId);
if (client != null) {
client.disconnect().get();
log.info("OPC断开 deviceId={}", deviceId);
return true;
} catch (Exception e) {
System.err.println(LOG_PREFIX + "写入节点值失败[" + nodeId + "]: " + e.getMessage());
return false;
}
}
/**
*
*/
public static boolean isConnected() {
if (!isConnected || client == null) return false;
try {
return validateConnection();
} catch (Exception e) {
isConnected = false;
log.error("断开失败 deviceId={}", deviceId, e);
return false;
}
}
/**
*
*/
public static String getConnectionInfo() {
if (!isConnected) {
return "未连接";
}
try {
return "已连接到: " + serverUrl;
} catch (Exception e) {
return "已连接到: " + serverUrl + ", 会话信息获取失败";
}
return true;
}
/**
*
*
*/
private static boolean validateConnection() {
if (client == null) return false;
public static void disconnectAll() {
CLIENT_MAP.forEach((deviceId, client) -> {
try {
NodeId rootNode = new NodeId(0, 84); // RootFolder
DataValue value = client.readValue(0.0, TimestampsToReturn.Both, rootNode)
.get(5, TimeUnit.SECONDS);
return value != null;
} catch (Exception e) {
return false;
}
}
client.disconnect().get();
} catch (Exception ignored) {}
});
/**
* URL
*/
public static String getServerUrl() {
return serverUrl;
}
CLIENT_MAP.clear();
/**
*
*/
public static OpcUaClient getClient() {
return client;
log.info("全部OPC连接关闭");
}
/**
*
*/
public static void destroy() {
disconnect();
}
/**
* 退
*/
public static void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (isConnected) {
System.out.println(LOG_PREFIX + "检测到JVM关闭正在清理OPC UA连接...");
disconnect();
}
}));
private static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger
uint(long value) {
return org.eclipse.milo.opcua.stack.core.types.builtin.unsigned
.Unsigned.uint(value);
}
}

@ -11,6 +11,7 @@ public interface ErrorCodeConstants {
ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_003_000_000, "设备不存在");
ErrorCode DEVICE_REFERENCES_EXIST = new ErrorCode(1_003_000_000, "存在设备已被引用,请先删除引用。");
ErrorCode DEVICE_MQTT_TOPIC_EXIST = new ErrorCode(1_003_000_000, "设备MQTT主题不存在。");
ErrorCode DEVICE_EXISTS = new ErrorCode(1_003_000_000, "同名或同主题设备已存在");

@ -11,12 +11,14 @@ import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.Device
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice;
import cn.iocoder.yudao.module.iot.service.device.DeviceService;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.quartz.SchedulerException;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
@ -50,6 +52,11 @@ public class DeviceController {
@Resource
private JobService jobService;
@Resource
private OpcUaSubscriptionService opcUaService;
// @Resource
// private IMqttservice mqttService;
@PostMapping("/create")
@Operation(summary = "创建物联设备")
@ -283,6 +290,26 @@ public class DeviceController {
}
// @PostMapping("/subscribe")
// public String subscribeTopic(@RequestParam String topic) {
// try {
// int result = mqttService.subscribeTopic(topic); // 使用服务层安全订阅
// if (result >0 ) {
// return "订阅成功: " + topic;
// } else {
// return "订阅失败: " + topic;
// }
// } catch (MqttException e) {
// return "订阅异常: " + e.getMessage();
// }
// }
@PutMapping("/update-enabled")
@Operation(summary = "更新任务管理启用状态")
@PreAuthorize("@ss.hasPermission('mes:task-management:update')")
public CommonResult<Boolean> updateDeviceEnabled(@Valid @RequestBody DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException {
deviceService.updateDeviceEnabled(updateEnabledReqVO);
return success(true);
}
}

@ -0,0 +1,136 @@
package cn.iocoder.yudao.module.iot.controller.admin.device;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.*;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.*;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
@Slf4j
@Service
public class OpcUaSubscriptionService {
private static final String URL = "opc.tcp://192.168.21.5:4840";
private static final String USERNAME = "bst";
private static final String PASSWORD = "Bst123456";
private OpcUaClient client;
private UaSubscription subscription;
// 保存已订阅的节点
private final ConcurrentHashMap<String, UaMonitoredItem> monitoredItems = new ConcurrentHashMap<>();
/**
* OPC UA
*/
public void subscribeNode(String nodeAddress, int samplingMillis) throws Exception {
if (client == null) {
connect();
}
if (subscription == null) {
subscription = client.getSubscriptionManager()
.createSubscription((double) samplingMillis)
.get();
log.info("创建订阅,采样间隔: {} ms", samplingMillis);
}
NodeId nodeId = NodeId.parse(nodeAddress);
ReadValueId readValueId = new ReadValueId(
nodeId,
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger.valueOf(13), // Value 属性
null,
null
);
MonitoringParameters parameters = new MonitoringParameters(
uint(nodeId.hashCode()),
(double) samplingMillis,
null,
uint(10),
true
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId,
MonitoringMode.Reporting,
parameters
);
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
requests.add(request);
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
requests
).get();
UaMonitoredItem item = items.get(0);
item.setValueConsumer((monitoredItem, value) -> {
Variant variant = value.getValue();
Object v = variant != null ? variant.getValue() : null;
log.info("节点 {} 值变化: {}", nodeAddress, v);
});
monitoredItems.put(nodeAddress, item);
log.info("成功订阅节点: {}", nodeAddress);
}
private void connect() throws Exception {
IdentityProvider identity = new UsernameProvider(USERNAME, PASSWORD);
// 1. 获取端点
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(URL)
.get(5, TimeUnit.SECONDS);
EndpointDescription endpoint = endpoints.stream()
.filter(e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri()))
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到 SecurityPolicy=None 的端点"));
// 2. 构建客户端配置
OpcUaClientConfig config = OpcUaClientConfig.builder()
.setEndpoint(endpoint)
.setIdentityProvider(identity)
.build();
// 3. 创建客户端并连接
client = OpcUaClient.create(config);
client.connect().get();
log.info("成功连接 OPC UA 服务端: {}", URL);
}
// /**
// * 断开连接
// */
// public void disconnect() {
// if (client != null) {
// try {
// client.disconnect().get();
// log.info("断开 OPC UA 连接");
// } catch (Exception e) {
// log.error("断开 OPC UA 连接失败", e);
// }
// }
// }
}

@ -116,15 +116,16 @@ public class DeviceTask implements Task {
logger.info("执行设备任务任务ID: {}, 参数: {}, 时间: {}",
taskId, taskParam, currentTime);
executeDeviceLogic(taskId,taskParam);
executeDeviceLogic(taskId, taskParam);
} catch (Exception e) {
logger.error("执行设备任务异常任务ID: {}", taskId, e);
} finally {
//确保出问题不会打满opcv服务器连接数
OpcUtils.disconnect();
}
// finally {
// //确保出问题不会打满opcv服务器连接数
// OpcUtils.disconnect();
// }
}
/**
@ -137,10 +138,6 @@ public class DeviceTask implements Task {
Long deviceId = sourceDeviceId - 1000000L;
logger.info("处理后设备ID: {}", deviceId);
if (deviceId == null) {
throw new RuntimeException("设备ID不能为空");
}
// 2. 获取设备信息
DeviceDO device = getDeviceInfo(deviceId);
@ -151,7 +148,7 @@ public class DeviceTask implements Task {
processDeviceData(deviceId, device);
// 5. 断开连接
OpcUtils.disconnect();
// OpcUtils.disconnect();
}
@ -183,7 +180,13 @@ public class DeviceTask implements Task {
try {
connected = OpcUtils.connect(device.getUrl(), username, password, 10);
connected = OpcUtils.connect(
device.getId(),
device.getUrl(),
username,
password,
10
);
if (!connected) {
log.error("设备 {} 连接OPC服务器失败URL: {}", device.getId(), device.getUrl());
@ -224,62 +227,74 @@ public class DeviceTask implements Task {
}
/**
*
* - OPC UA
*/
private void processDeviceData(Long deviceId, DeviceDO device) {
DeviceDO deviceDO = deviceMapper.selectById(deviceId);
// 1. 查询点位配置
List<DeviceContactModelDO> points = getDevicePoints(deviceId);
if (CollectionUtils.isEmpty(points)) {
logger.warn("设备 {} 未配置点位", deviceId);
//更新状态为待机中
// 更新状态为待机中
DeviceDO deviceDO = deviceMapper.selectById(deviceId);
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(device.getId());
record.setDeviceId(deviceId);
record.setRule(DeviceStatusEnum.STANDBY.getCode());
record.setTotalStandbyTime(deviceDO.getSampleCycle());
//TODO 创建人和更新人为内置默认管理员
record.setCreator("1");
record.setUpdater("1");
deviceOperationRecordMapper.insert(record);
return;
}
logger.info("设备 {} 需要读取 {} 个点位", deviceId, points.size());
DevicePointRulesDO devicePointRulesDO = devicePointRulesMapper.selectOne(Wrappers.<DevicePointRulesDO>lambdaQuery()
.eq(DevicePointRulesDO::getDeviceId, deviceId)
.eq(DevicePointRulesDO::getIdentifier, DeviceBasicStatusEnum.RUNNING));
if(devicePointRulesDO !=null && devicePointRulesDO.getFieldRule() == null ){
//更新状态为待机中
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(device.getId());
record.setRule(DeviceStatusEnum.STANDBY.getCode());
record.setTotalStandbyTime(deviceDO.getSampleCycle());
//TODO 创建人和更新人为内置默认管理员
record.setCreator("1");
record.setUpdater("1");
deviceOperationRecordMapper.insert(record);
// 2. 构建有效地址列表及索引映射,确保顺序对应
List<String> addresses = new ArrayList<>();
List<Integer> indexMap = new ArrayList<>(); // 对应原 points 的索引
for (int i = 0; i < points.size(); i++) {
String address = StringUtils.trimToEmpty(points.get(i).getAddress());
if (!address.isEmpty()) {
addresses.add(address);
indexMap.add(i);
}
}
// 3. 批量读取 OPC UA 点位
List<Object> values = new ArrayList<>();
try {
if (!addresses.isEmpty()) {
values = OpcUtils.readValues(device.getId(), addresses);
}
} catch (Exception e) {
logger.error("设备 {} 批量读取 OPC UA 点位异常", deviceId, e);
}
// 2. 读取并处理数据
// 4. 处理读取到的数据
int successCount = 0;
List<DeviceContactModelDO> validDataList = new ArrayList<>();
for (int i = 0; i < values.size(); i++) {
DeviceContactModelDO point = points.get(indexMap.get(i));
Object value = values.get(i);
try {
String processedValue = processOpcValue(value);
// 规则判断和告警/运行记录处理
judgmentRules(processedValue, point.getAttributeCode(), device, point.getId());
point.setAddressValue(processedValue.isEmpty() ? null : processedValue);
for (DeviceContactModelDO point : points) {
processSinglePoint(point, validDataList,device);
if (point.getAddressValue() != null) {
successCount++;
}
validDataList.add(point);
} catch (Exception e) {
logger.error("处理点位 {} 异常,地址: {}", point.getId(), point.getAddress(), e);
}
}
// 3. 入库处理
// 5. 入库处理
if (!validDataList.isEmpty()) {
saveToDatabase(deviceId, validDataList, successCount);
} else {
@ -287,6 +302,7 @@ public class DeviceTask implements Task {
}
}
/**
*
*/
@ -296,36 +312,35 @@ public class DeviceTask implements Task {
return deviceContactModelMapper.selectList(query);
}
/**
*
*/
private void processSinglePoint(DeviceContactModelDO point, List<DeviceContactModelDO> validDataList, DeviceDO device) {
try {
String address = StringUtils.trimToEmpty(point.getAddress());
if (address.isEmpty()) {
logger.warn("点位ID {} 地址为空", point.getId());
return;
}
Object value = OpcUtils.readValue(address);
// if (value == null) {
logger.warn("读取点位 {} ,地址: {}", point.getId(), address);
// } else {
String processedValue = processOpcValue(value);
//判断规则
judgmentRules(processedValue,point.getAttributeCode(),device,point.getId());
point.setAddressValue(processedValue.isEmpty() ? null : processedValue);
// /**
// * 处理单个点位
// */
// private void processSinglePoint(DeviceContactModelDO point, List<DeviceContactModelDO> validDataList, DeviceDO device) {
// try {
// String address = StringUtils.trimToEmpty(point.getAddress());
// if (address.isEmpty()) {
// logger.warn("点位ID {} 地址为空", point.getId());
// return;
// }
//
// Object value = OpcUtils.readValue(device.getId(),address);
//// if (value == null) {
// logger.info("读取点位 {} ,地址: {}", point.getId(), address);
//// } else {
// String processedValue = processOpcValue(value);
//
// //判断规则
// judgmentRules(processedValue,point.getAttributeCode(),device,point.getId());
// point.setAddressValue(processedValue.isEmpty() ? null : processedValue);
//
//// }
//
// validDataList.add(point);
// } catch (Exception e) {
// logger.error("处理点位 {} 异常,地址: {}",
// point.getId(), point.getAddress(), e);
// }
// }
validDataList.add(point);
} catch (Exception e) {
logger.error("处理点位 {} 异常,地址: {}",
point.getId(), point.getAddress(), e);
}
}
/**
@ -365,7 +380,7 @@ public class DeviceTask implements Task {
}
}
private void judgmentRules(String processedValue, String attributeCode, DeviceDO device,Long modelId) {
private void judgmentRules(String processedValue, String attributeCode, DeviceDO device, Long modelId) {
if (StringUtils.isBlank(processedValue)) {
logger.warn("待判断的值为空编码attributeCode: {}, deviceId: {}", attributeCode, device.getId());
// return;
@ -408,14 +423,14 @@ public class DeviceTask implements Task {
JSON.toJSONString(pointRulesRespVO));
// 执行匹配成功后的逻辑
handleMatchedSuccessRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode,modelId);
handleMatchedSuccessRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode, modelId);
break;
} else {
logger.debug("规则不匹配: modelId={}, value={}, rule={}",
attributeCode, processedValue,
JSON.toJSONString(pointRulesRespVO));
// 执行匹配失败后的逻辑
handleMatchedFailureRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode);
handleMatchedFailureRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode);
}
}
@ -423,7 +438,7 @@ public class DeviceTask implements Task {
}
}
private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO,PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) {
private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO, PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) {
//TODO 离线待优化
// if (devicePointRulesDO.getIdentifier().equals(DeviceBasicStatusEnum.RUNNING.getCode())){
// DeviceOperationRecordDO record = new DeviceOperationRecordDO();
@ -446,13 +461,13 @@ public class DeviceTask implements Task {
String attributeCode,
Long modelId) {
DeviceContactModelDO deviceContactModelDO = deviceContactModelMapper.selectById(modelId);
if (deviceContactModelDO == null){
if (deviceContactModelDO == null) {
return;
}
//分别处理运行记录和告警记录
if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())){
if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())) {
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(device.getId());
record.setModelId(modelId);
@ -468,7 +483,7 @@ public class DeviceTask implements Task {
calculateAndSetTotalTime(record, pointRulesRespVO.getRule(), device.getSampleCycle());
deviceOperationRecordMapper.insert(record);
}else {
} else {
DeviceWarinningRecordDO deviceWarinningRecordDO = new DeviceWarinningRecordDO();
deviceWarinningRecordDO.setDeviceId(device.getId());
@ -649,12 +664,18 @@ public class DeviceTask implements Task {
*/
private boolean compareNumbers(Double value, Double ruleValue, String operator) {
switch (operator) {
case "EQ": return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度
case "NE": return Math.abs(value - ruleValue) >= 0.000001;
case "GT": return value > ruleValue;
case "GE": return value >= ruleValue;
case "LT": return value < ruleValue;
case "LE": return value <= ruleValue;
case "EQ":
return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度
case "NE":
return Math.abs(value - ruleValue) >= 0.000001;
case "GT":
return value > ruleValue;
case "GE":
return value >= ruleValue;
case "LT":
return value < ruleValue;
case "LE":
return value <= ruleValue;
default:
logger.warn("不支持的操作符: {}", operator);
return false;
@ -666,12 +687,18 @@ public class DeviceTask implements Task {
*/
private boolean compareStrings(String value, String ruleValue, String operator) {
switch (operator) {
case "EQ": return value.equals(ruleValue);
case "NE": return !value.equals(ruleValue);
case "GT": return value.compareTo(ruleValue) > 0;
case "GE": return value.compareTo(ruleValue) >= 0;
case "LT": return value.compareTo(ruleValue) < 0;
case "LE": return value.compareTo(ruleValue) <= 0;
case "EQ":
return value.equals(ruleValue);
case "NE":
return !value.equals(ruleValue);
case "GT":
return value.compareTo(ruleValue) > 0;
case "GE":
return value.compareTo(ruleValue) >= 0;
case "LT":
return value.compareTo(ruleValue) < 0;
case "LE":
return value.compareTo(ruleValue) <= 0;
default:
logger.warn("不支持的操作符: {}", operator);
return false;
@ -694,5 +721,4 @@ public class DeviceTask implements Task {
}
}

@ -0,0 +1,14 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.opcuv;
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
@Component
public class OpcShutdown {
@PreDestroy
public void shutdown() {
OpcUtils.disconnectAll();
}
}

@ -77,4 +77,7 @@ public class DevicePageReqVO extends PageParam {
@Schema(description = "id集合导出用")
private String ids;
@Schema(description = "mqtt订阅主题")
private String topic;
}

@ -120,4 +120,7 @@ public class DeviceRespVO {
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime collectionTime;
@Schema(description = "mqtt订阅主题")
private String topic;
}

@ -75,5 +75,8 @@ public class DeviceSaveReqVO {
// @NotNull
private Integer isConnect;
@Schema(description = "mqtt订阅主题")
private String topic;
}

@ -0,0 +1,20 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import javax.validation.constraints.NotNull;
@Schema(description = "采集设备 - 更新启用状态 Request VO")
@Data
public class DeviceUpdateEnabledReqVO {
@Schema(description = "任务ID", required = true, example = "1024")
@NotNull(message = "任务ID不能为空")
private Long id;
@Schema(description = "是否启用", required = true, example = "true")
@NotNull(message = "启用状态不能为空")
private Boolean enabled;
}

@ -159,11 +159,8 @@ public class RecipeDeviceRecordController {
@RequestParam("id") Long recipeId) {
try {
RecipePlanDetailDO recipePlanDetailDO = recipePlanDetailService.getRecipePlanDetail(recipeId);
//RecipeRespVO recipeRespVO = recipeService.getRecipeWithDeviceId(recipePlanDetailDO.getRecipeId());
//RecipeDO recipeDO = recipeService.getRecipe(recipePlanDetailDO.getRecipeId());
// ========== 第一步:查询配方关联的点位属性信息 ==========
// 1.1 根据recipeId查询iot_recipe_device_attribute表记录
@ -185,6 +182,8 @@ public class RecipeDeviceRecordController {
throw exception(DEVICE_NOT_EXISTS);
}
try {
Map<Long, DeviceContactModelDO> deviceContactModelMap = new HashMap<>();
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelService.selectListByDeviceId(device.getId());
if (!deviceContactModelDOS.isEmpty()){
@ -195,7 +194,7 @@ public class RecipeDeviceRecordController {
));
}
OpcUtils.connect(device.getUrl(),device.getUsername(),device.getPassword(),10);
OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10);
for (RecipeDeviceAttributeDO attributeDO : attributeList) {
DeviceContactModelDO deviceContactModelDO = deviceContactModelMap.get(attributeDO.getAttributeId());
@ -208,16 +207,15 @@ public class RecipeDeviceRecordController {
recipeDeviceRecordDO.setAttributeCode(deviceContactModelDO.getAttributeName());
recipeDeviceRecordDO.setDataType(deviceContactModelDO.getDataType());
recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit());
recipeDeviceRecordDO.setValue((String) OpcUtils.readValue(deviceContactModelDO.getAddress()));
// recipeDeviceRecordDO.setValue((String) OpcUtils.readValues(device.getId(),deviceContactModelDO.getAddress()));
recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class));
}
} finally {
OpcUtils.disconnect();
} catch (Exception e) {
throw new RuntimeException(e);
}
return success(true);
}

@ -111,4 +111,9 @@ public class DeviceDO extends BaseDO {
*/
private String tenantId;
/**
*
*/
private String topic;
}

@ -32,6 +32,7 @@ public interface DeviceMapper extends BaseMapperX<DeviceDO> {
LambdaQueryWrapperX<DeviceDO> deviceDOLambdaQueryWrapperX = new LambdaQueryWrapperX<>();
deviceDOLambdaQueryWrapperX.likeIfPresent(DeviceDO::getDeviceCode, reqVO.getDeviceCode())
.likeIfPresent(DeviceDO::getDeviceName, reqVO.getDeviceName())
.likeIfPresent(DeviceDO::getTopic, reqVO.getTopic())
.eqIfPresent(DeviceDO::getDeviceType, reqVO.getDeviceType())
.eqIfPresent(DeviceDO::getStatus, reqVO.getStatus())
.eqIfPresent(DeviceDO::getReadTopic, reqVO.getReadTopic())

@ -1,8 +1,22 @@
package cn.iocoder.yudao.module.iot.framework.mqtt.consumer;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceBasicStatusEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum;
import cn.iocoder.yudao.module.iot.controller.admin.devicemodelrules.vo.PointRulesRespVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicewarinningrecord.DeviceWarinningRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.iotorganization.IotOrganizationDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.mqttrecord.MqttRecordDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicewarinningrecord.DeviceWarinningRecordMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.mqttrecord.MqttRecordMapper;
import cn.iocoder.yudao.module.iot.framework.constant.Constants;
import cn.iocoder.yudao.module.iot.framework.mqtt.common.SuperConsumer;
@ -11,14 +25,27 @@ import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData;
import cn.iocoder.yudao.module.iot.framework.mqtt.utils.DateUtils;
import cn.iocoder.yudao.module.iot.framework.mqtt.utils.MqttDataUtils;
import cn.iocoder.yudao.module.iot.service.device.DeviceService;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import cn.iocoder.yudao.module.iot.service.iotorganization.IotOrganizationService;
import cn.iocoder.yudao.module.iot.service.mqttrecord.MqttRecordService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@ -27,7 +54,8 @@ public class MqttDataHandler extends SuperConsumer<String> {
@Resource
private IotOrganizationService organizationService;
@Resource
private DeviceService deviceService;
@Lazy
private DeviceMapper deviceMapper;
@Resource
private AsyncService asyncService;
@ -36,6 +64,25 @@ public class MqttDataHandler extends SuperConsumer<String> {
@Resource
private MqttRecordMapper mqttRecordMapper;
@Resource
private DeviceOperationRecordMapper deviceOperationRecordMapper;
@Resource
private DeviceContactModelMapper deviceContactModelMapper;
@Resource
private TDengineService tDengineService;
@Resource
private DevicePointRulesMapper devicePointRulesMapper;
@Resource
private DeviceWarinningRecordMapper deviceWarinningRecordMapper;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public String decoder(MqttMessage msg) {
return new String(msg.getPayload());
@ -58,33 +105,633 @@ public class MqttDataHandler extends SuperConsumer<String> {
// }catch (Exception e){
// log.error("asyncService.transferBase error:"+entity);
// }
//异步线程查询时需带TenantId框架TenantContextHolder限制
//TODO 后续是否要其他TenantId
try {
// 设置租户ID
TenantContextHolder.setTenantId(1L);
save(machine, entity, data, topic);
} finally {
TenantContextHolder.clear();
}
}
}
save(machine, entity, data);
public void save(IotOrganizationDO machine, String entity, MqttData data,String topic) {
// try {
// long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format);
// //timestamp = DateUtils.getMillsLong();
// LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime());
// 如果 deviceDataTime 为空,使用当前时间
// LocalDateTime date;
// long timestamp;
// if (data.getDeviceDataTime() != null && !data.getDeviceDataTime().isEmpty()) {
// try {
// timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format);
// date = DateUtils.strToLocalDateTime(data.getDeviceDataTime());
// } catch (Exception e) {
// log.warn("解析 deviceDataTime 异常, 使用当前时间: {}", data.getDeviceDataTime(), e);
// timestamp = DateUtils.getMillsLong();
// date = LocalDateTime.now();
// }
// } else {
// timestamp = DateUtils.getMillsLong();
// date = LocalDateTime.now();
// }
//
//
//
// MqttRecordDO recordDO = new MqttRecordDO();
// recordDO.setDeviceCode(data.getDeviceID());
// recordDO.setGatewayCode(data.getGatewayID());
// recordDO.setDeviceData(entity);
// recordDO.setDeviceDataTime(date);
// recordDO.setDeviceDataTimeLong(timestamp);
// /**直接保存原始mqtt*/
// mqttRecordMapper.insert(recordDO);
if (StringUtils.isBlank(entity)) {
log.warn("MQTT消息为空 topic={}", topic);
return;
}
if (StringUtils.isBlank(topic)) {
log.warn("MQTT topic为空");
return;
}
try {
JsonNode rootNode = OBJECT_MAPPER.readTree(entity);
JsonNode devListNode = rootNode.get("devList");
if (devListNode == null || devListNode.isEmpty()) {
return;
}
JsonNode varListNode =
devListNode.get(0).get("varList");
if (varListNode == null) {
return;
}
Map<String, Object> varListMap =
OBJECT_MAPPER.convertValue(
varListNode,
Map.class
);
DeviceDO deviceDO = deviceMapper.selectOne(Wrappers.<DeviceDO>lambdaQuery().eq(DeviceDO::getTopic,topic));
log.info("getDeviceByMqttTopic参数{}", topic);
if (deviceDO == null) {
log.info("getDeviceByMqttTopic查询出来deviceDO为空");
return;
}
processDeviceDataFromMqtt(
deviceDO,
varListMap
);
} catch (Exception e) {
log.error("MQTT数据处理异常", e);
}
}
public void processDeviceDataFromMqtt(DeviceDO device,
Map<String, Object> varListMap) {
Long deviceId = device.getId();
// 1. 查询点位配置
List<DeviceContactModelDO> points = getDevicePoints(deviceId);
if (CollectionUtils.isEmpty(points)) {
log.warn("设备 {} 未配置点位", device.getId());
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(deviceId);
record.setRule(DeviceStatusEnum.STANDBY.getCode());
//TODO 待优化
record.setTotalStandbyTime(device.getSampleCycle());
record.setCreator("1");
record.setUpdater("1");
deviceOperationRecordMapper.insert(record);
return;
}
if (varListMap == null || varListMap.isEmpty()) {
log.warn("设备 {} MQTT varList 为空", deviceId);
return;
}
// 查询RUNNING点位规则
DevicePointRulesDO devicePoints = getDevicePointRules(deviceId);
if (StringUtils.isBlank(devicePoints.getFieldRule())){
log.warn("设备 {} 没有RUNNING点位规则", device.getId());
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(deviceId);
record.setRule(DeviceStatusEnum.STANDBY.getCode());
//TODO 待优化
record.setTotalStandbyTime(device.getSampleCycle());
record.setCreator("1");
record.setUpdater("1");
deviceOperationRecordMapper.insert(record);
}
log.info("设备 {} MQTT 数据点位数量 {}", deviceId, varListMap.size());
int successCount = 0;
List<DeviceContactModelDO> validDataList = new ArrayList<>();
// 2. 遍历数据库点位,通过 code 匹配 MQTT
for (DeviceContactModelDO point : points) {
public void save(IotOrganizationDO machine, String entity, MqttData data) {
try {
long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format);
//timestamp = DateUtils.getMillsLong();
LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime());
MqttRecordDO recordDO = new MqttRecordDO();
recordDO.setDeviceCode(data.getDeviceID());
recordDO.setGatewayCode(data.getGatewayID());
recordDO.setDeviceData(entity);
recordDO.setDeviceDataTime(date);
recordDO.setDeviceDataTimeLong(timestamp);
/**直接保存原始mqtt*/
mqttRecordMapper.insert(recordDO);
String code = point.getAttributeCode();
Object value = varListMap.get(code);
if (value == null) {
validDataList.add(point);
continue;
}
String processedValue = processOpcValue(value);
// 规则判断
judgmentRules(
processedValue,
code,
device,
point.getId()
);
point.setAddressValue(processedValue);
successCount++;
validDataList.add(point);
} catch (Exception e) {
log.error("处理 MQTT 点位异常 deviceId={}, code={}",
deviceId,
point.getAttributeCode(),
e);
}
}
// 3. 入库
if (!validDataList.isEmpty()) {
saveToDatabase(
deviceId,
validDataList,
successCount
);
} else {
log.warn("设备 {} 未匹配到 MQTT 数据", deviceId);
}
}
private DevicePointRulesDO getDevicePointRules(Long deviceId) {
List<DevicePointRulesDO> list =
devicePointRulesMapper.selectList(
Wrappers.<DevicePointRulesDO>lambdaQuery()
.eq(DevicePointRulesDO::getDeviceId, deviceId)
.eq(DevicePointRulesDO::getIdentifier, "RUNNING")
.orderByDesc(DevicePointRulesDO::getCreateTime)
.last("LIMIT 1")
);
if (CollectionUtils.isEmpty(list)) {
log.info("设备 {} 未找到 RUNNING 规则", deviceId);
return null;
}
DevicePointRulesDO rule = list.get(0);
log.info("设备 {} 使用 RUNNING 规则规则ID={}, 创建时间={}",
deviceId,
rule.getId(),
rule.getCreateTime());
return rule;
}
/**
* mqtt
* */
//tsMqttService.insertDataAddress(data, taskId, timestamp, equipment);
*
*/
private List<DeviceContactModelDO> getDevicePoints(Long deviceId) {
LambdaQueryWrapper<DeviceContactModelDO> query = new LambdaQueryWrapper<>();
query.eq(DeviceContactModelDO::getDeviceId, deviceId);
return deviceContactModelMapper.selectList(query);
}
/**
* OPC
*/
private String processOpcValue(Object value) {
if (value == null) {
return "";
}
if (value instanceof String) {
return ((String) value).trim();
}
return value.toString();
}
/**
*
*/
private void saveToDatabase(Long deviceId, List<DeviceContactModelDO> dataList, int successCount) {
try {
for (DeviceContactModelDO deviceContactModelDO : dataList) {
deviceContactModelDO.setAddress(null);
}
String json = JSON.toJSONString(dataList);
boolean inserted = tDengineService.insertDeviceData(deviceId, json);
if (inserted) {
log.info("设备 {} 数据入库成功,总数: {},有效: {}",
deviceId, dataList.size(), successCount);
} else {
log.error("设备 {} 数据入库失败", deviceId);
}
} catch (Exception e) {
log.error("设备 {} 数据入库异常", deviceId, e);
}
}
private void judgmentRules(String processedValue, String attributeCode, DeviceDO device, Long modelId) {
if (StringUtils.isBlank(processedValue)) {
log.warn("待判断的值为空编码attributeCode: {}, deviceId: {}", attributeCode, device.getId());
// return;
}
// 1. 查询设备规则
List<DevicePointRulesDO> devicePointRulesDOList = devicePointRulesMapper.selectList(
Wrappers.<DevicePointRulesDO>lambdaQuery()
.eq(DevicePointRulesDO::getDeviceId, device.getId()).orderByDesc(DevicePointRulesDO::getCreateTime));
if (CollectionUtils.isEmpty(devicePointRulesDOList)) {
log.debug("设备 {} 未配置规则", device.getId());
return;
}
// 2. 遍历规则
for (DevicePointRulesDO devicePointRulesDO : devicePointRulesDOList) {
if (StringUtils.isBlank(devicePointRulesDO.getFieldRule())) {
continue;
}
// 3. 解析规则列表
List<PointRulesRespVO> pointRulesVOList = JSON.parseArray(
devicePointRulesDO.getFieldRule(), PointRulesRespVO.class);
if (CollectionUtils.isEmpty(pointRulesVOList)) {
continue;
}
// 4. 找到对应modelId的规则并进行判断
for (PointRulesRespVO pointRulesRespVO : pointRulesVOList) {
if (pointRulesRespVO.getCode() != null &&
pointRulesRespVO.getCode().equals(attributeCode)) {
boolean matched = matchRule(processedValue, pointRulesRespVO);
if (matched) {
log.info("规则匹配成功: modelId={}, value={}, rule={}",
attributeCode, processedValue,
JSON.toJSONString(pointRulesRespVO));
// 执行匹配成功后的逻辑
handleMatchedSuccessRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode, modelId);
break;
} else {
log.debug("规则不匹配: modelId={}, value={}, rule={}",
attributeCode, processedValue,
JSON.toJSONString(pointRulesRespVO));
// 执行匹配失败后的逻辑
handleMatchedFailureRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode);
}
}
}
}
}
private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO, PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) {
//TODO 离线待优化
// if (devicePointRulesDO.getIdentifier().equals(DeviceBasicStatusEnum.RUNNING.getCode())){
// DeviceOperationRecordDO record = new DeviceOperationRecordDO();
// record.setDeviceId(device.getId());
// record.setModelId(modelId);
// record.setRule(pointRulesRespVO.getRule());
// record.setAddressValue(processedValue);
// record.setRecordType(getRecordType(devicePointRulesDO));
// record.setRuleId(devicePointRulesDO.getId());
//
// }
}
private void handleMatchedSuccessRule(DevicePointRulesDO devicePointRulesDO,
PointRulesRespVO pointRulesRespVO,
String processedValue,
DeviceDO device,
String attributeCode,
Long modelId) {
DeviceContactModelDO deviceContactModelDO = deviceContactModelMapper.selectById(modelId);
if (deviceContactModelDO == null) {
return;
}
//分别处理运行记录和告警记录
if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())) {
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
record.setDeviceId(device.getId());
record.setModelId(modelId);
record.setRule(pointRulesRespVO.getRule());
record.setAddressValue(processedValue);
record.setRecordType(getRecordType(devicePointRulesDO));
record.setRuleId(devicePointRulesDO.getId());
//TODO 创建人和更新人为内置默认管理员
record.setCreator("1");
record.setUpdater("1");
// 处理累计时间
calculateAndSetTotalTime(record, pointRulesRespVO.getRule(), device.getSampleCycle());
deviceOperationRecordMapper.insert(record);
} else {
DeviceWarinningRecordDO deviceWarinningRecordDO = new DeviceWarinningRecordDO();
deviceWarinningRecordDO.setDeviceId(device.getId());
deviceWarinningRecordDO.setModelId(modelId);
deviceWarinningRecordDO.setRule(pointRulesRespVO.getRule());
deviceWarinningRecordDO.setAlarmLevel(devicePointRulesDO.getAlarmLevel());
deviceWarinningRecordDO.setAddressValue(processedValue);
deviceWarinningRecordDO.setRuleId(devicePointRulesDO.getId());
deviceWarinningRecordDO.setDeviceName(device.getDeviceName());
deviceWarinningRecordDO.setModelName(deviceContactModelDO.getAttributeName());
deviceWarinningRecordDO.setRuleName(devicePointRulesDO.getFieldName());
//TODO 创建人和更新人为内置默认管理员
deviceWarinningRecordDO.setCreator("1");
deviceWarinningRecordDO.setUpdater("1");
deviceWarinningRecordMapper.insert(deviceWarinningRecordDO);
}
}
private void calculateAndSetTotalTime(DeviceOperationRecordDO record, String ruleCode, Double sampleCycle) {
if (!isTimeRelatedStatus(ruleCode)) {
return;
}
// DeviceOperationRecordDO lastRecord = deviceOperationRecordMapper.selectOne(
// Wrappers.<DeviceOperationRecordDO>lambdaQuery()
// .eq(DeviceOperationRecordDO::getRule, ruleCode)
// .orderByDesc(DeviceOperationRecordDO::getCreateTime)
// .last("LIMIT 1")
// );
if (ruleCode.equals(DeviceStatusEnum.RUNNING.getCode())) {
// Double totalTime = (lastRecord != null && lastRecord.getTotalRunningTime() != null)
// ? lastRecord.getTotalRunningTime() + sampleCycle
// : sampleCycle;
record.setTotalRunningTime(sampleCycle);
} else if (ruleCode.equals(DeviceStatusEnum.STANDBY.getCode())) {
// Double totalTime = (lastRecord != null && lastRecord.getTotalStandbyTime() != null)
// ? lastRecord.getTotalStandbyTime() + sampleCycle
// : sampleCycle;
record.setTotalStandbyTime(sampleCycle);
} else if (ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode())) {
// Double totalTime = (lastRecord != null && lastRecord.getTotalFaultTime() != null)
// ? lastRecord.getTotalFaultTime() + sampleCycle
// : sampleCycle;
record.setTotalFaultTime(sampleCycle);
} else if (ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode())) {
// Double totalTime = (lastRecord != null && lastRecord.getTotalWarningTime() != null)
// ? lastRecord.getTotalWarningTime() + sampleCycle
// : sampleCycle;
record.setTotalWarningTime(sampleCycle);
}
}
private Integer getRecordType(DevicePointRulesDO devicePointRulesDO) {
return devicePointRulesDO.getIdentifier()
.equals(DeviceBasicStatusEnum.RUNNING.getDescription())
? Integer.parseInt(DeviceBasicStatusEnum.RUNNING.getCode())
: Integer.parseInt(DeviceBasicStatusEnum.ALARM.getCode());
}
private boolean isTimeRelatedStatus(String ruleCode) {
return ruleCode.equals(DeviceStatusEnum.RUNNING.getCode()) ||
ruleCode.equals(DeviceStatusEnum.STANDBY.getCode()) ||
ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode()) ||
ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode());
}
/**
*
* : EQ(), NE(), GT(), GE(),
* LT(), LE(), TRUE(), FALSE()
*/
private boolean matchRule(String value, PointRulesRespVO rule) {
if (StringUtils.isBlank(value) || rule == null ||
StringUtils.isBlank(rule.getOperator())) {
return false;
}
try {
String operator = rule.getOperator().toUpperCase();
String inputValue = value.trim().toLowerCase();
String ruleValue = StringUtils.trimToEmpty(rule.getOperatorRule());
// 1. 处理布尔值判断
if ("TRUE".equals(operator) || "FALSE".equals(operator)) {
return matchBooleanRule(inputValue, operator);
}
// 2. 如果operatorRule为空且不是布尔操作符则返回false
if (StringUtils.isBlank(ruleValue)) {
log.warn("规则比较值为空,但操作符不是布尔类型: operator={}", operator);
return false;
}
ruleValue = ruleValue.trim();
// 3. 尝试数值比较
if (isNumeric(inputValue) && isNumeric(ruleValue)) {
Double num1 = Double.parseDouble(inputValue);
Double num2 = Double.parseDouble(ruleValue);
return compareNumbers(num1, num2, operator);
}
// 4. 字符串比较
else {
return compareStrings(inputValue, ruleValue, operator);
}
} catch (Exception e) {
log.error("规则匹配异常: value={}, rule={}, error={}",
value, JSON.toJSONString(rule), e.getMessage());
return false;
}
}
/**
*
*/
private boolean matchBooleanRule(String value, String operator) {
// 常见布尔值表示
boolean booleanValue = parseBoolean(value);
if ("TRUE".equals(operator)) {
return booleanValue;
} else if ("FALSE".equals(operator)) {
return !booleanValue;
}
return false;
}
/**
*
* : true, false, 1, 0, yes, no, on, off
*/
private boolean parseBoolean(String value) {
if (StringUtils.isBlank(value)) {
return false;
}
String lowerValue = value.toLowerCase();
// 常见真值表示
if ("true".equals(lowerValue) ||
"1".equals(lowerValue) ||
"yes".equals(lowerValue) ||
"on".equals(lowerValue) ||
"是".equals(lowerValue) || // 中文支持
"成功".equals(lowerValue)) {
return true;
}
// 常见假值表示
if ("false".equals(lowerValue) ||
"0".equals(lowerValue) ||
"no".equals(lowerValue) ||
"off".equals(lowerValue) ||
"否".equals(lowerValue) || // 中文支持
"失败".equals(lowerValue)) {
return false;
}
// 尝试转换为布尔值
try {
return Boolean.parseBoolean(lowerValue);
} catch (Exception e) {
log.error("-----mqttTableName:");
e.printStackTrace();
log.warn("无法解析为布尔值: {}", value);
return false;
}
}
/**
*
*/
private boolean compareNumbers(Double value, Double ruleValue, String operator) {
switch (operator) {
case "EQ":
return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度
case "NE":
return Math.abs(value - ruleValue) >= 0.000001;
case "GT":
return value > ruleValue;
case "GE":
return value >= ruleValue;
case "LT":
return value < ruleValue;
case "LE":
return value <= ruleValue;
default:
log.warn("不支持的操作符: {}", operator);
return false;
}
}
/**
*
*/
private boolean compareStrings(String value, String ruleValue, String operator) {
switch (operator) {
case "EQ":
return value.equals(ruleValue);
case "NE":
return !value.equals(ruleValue);
case "GT":
return value.compareTo(ruleValue) > 0;
case "GE":
return value.compareTo(ruleValue) >= 0;
case "LT":
return value.compareTo(ruleValue) < 0;
case "LE":
return value.compareTo(ruleValue) <= 0;
default:
log.warn("不支持的操作符: {}", operator);
return false;
}
}
/**
*
*/
private boolean isNumeric(String str) {
if (StringUtils.isBlank(str)) {
return false;
}
try {
Double.parseDouble(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
}

@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.service.device.DeviceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@ -20,6 +21,7 @@ import java.util.Map;
@EnableScheduling
public class AsyncService {
@Resource
@Lazy
private DeviceService deviceService;

@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.eclipse.paho.client.mqttv3.MqttException;
import javax.validation.Valid;
import java.util.Collection;
@ -138,4 +139,8 @@ public interface DeviceService {
List<Map<String, Object>> getMultiDeviceAttributes(String deviceIds);
List<DeviceContactModelDO> getDeviceAttributeList(Long deviceId);
void updateDeviceEnabled(@Valid DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException;
DeviceDO getDeviceByMqttTopic(String topic);
}

@ -33,6 +33,7 @@ import cn.iocoder.yudao.module.iot.dal.mysql.mqttdatarecord.MqttDataRecordMapper
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceAttributeMapper;
import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -43,6 +44,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@ -50,6 +52,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
@ -108,6 +111,8 @@ public class DeviceServiceImpl implements DeviceService {
@Resource
private DeviceOperationRecordMapper deviceOperationRecordMapper;
@Resource
private IMqttservice mqttService;
@Override
@ -282,29 +287,54 @@ public class DeviceServiceImpl implements DeviceService {
@Override
public PageResult<DeviceRespVO> getDevicePage(DevicePageReqVO pageReqVO) {
// 1. 查询分页设备
PageResult<DeviceDO> deviceDOPageResult = deviceMapper.selectPage(pageReqVO);
PageResult<DeviceRespVO> deviceRespVOPageResult = BeanUtils.toBean(deviceDOPageResult, DeviceRespVO.class);
List<Long> deviceIds = deviceDOPageResult.getList().stream()
.map(DeviceDO::getId)
.collect(Collectors.toList());
// 2. 批量获取 TDengine 最新 ts
Map<Long, LocalDateTime> latestTsMap = tdengineService.selectLatestTsBatch(deviceIds);
// 3. 批量获取最新的 DeviceOperationRecord
List<String> ruleCodes = Arrays.stream(DeviceStatusEnum.values())
.map(DeviceStatusEnum::getCode)
.collect(Collectors.toList());
for (DeviceRespVO deviceRespVO : deviceRespVOPageResult.getList()) {
DeviceOperationRecordDO deviceOperationRecordDO = deviceOperationRecordMapper.selectOne(Wrappers.<DeviceOperationRecordDO>lambdaQuery()
.eq(DeviceOperationRecordDO::getDeviceId, deviceRespVO.getId())
List<DeviceOperationRecordDO> operationRecords = deviceOperationRecordMapper.selectList(
Wrappers.<DeviceOperationRecordDO>lambdaQuery()
.in(DeviceOperationRecordDO::getDeviceId, deviceIds)
.in(DeviceOperationRecordDO::getRule, ruleCodes)
.orderByDesc(DeviceOperationRecordDO::getCreateTime)
.last("LIMIT 1"));
if(deviceOperationRecordDO !=null){
deviceRespVO.setOperatingStatus(DeviceStatusEnum.getByCode(deviceOperationRecordDO.getRule()).getName());
deviceRespVO.setCollectionTime(deviceOperationRecordDO.getCreateTime());
}else {
deviceRespVO.setOperatingStatus(DeviceStatusEnum.OFFLINE.getName());
);
}
// 按 deviceId 分组,取最新一条
Map<Long, DeviceOperationRecordDO> latestRecordMap = operationRecords.stream()
.collect(Collectors.toMap(
DeviceOperationRecordDO::getDeviceId,
r -> r,
(r1, r2) -> r1.getCreateTime().isAfter(r2.getCreateTime()) ? r1 : r2
));
// 4. 转换分页 DTO
PageResult<DeviceRespVO> deviceRespVOPageResult = BeanUtils.toBean(deviceDOPageResult, DeviceRespVO.class);
for (DeviceRespVO deviceRespVO : deviceRespVOPageResult.getList()) {
Long deviceId = deviceRespVO.getId();
// 设置最新 ts
if(latestTsMap.get(deviceId) != null){
deviceRespVO.setCollectionTime(latestTsMap.get(deviceId));
}
// 设置运行状态
DeviceOperationRecordDO record = latestRecordMap.get(deviceId);
if (record != null) {
deviceRespVO.setOperatingStatus(DeviceStatusEnum.getByCode(record.getRule()).getName());
} else {
deviceRespVO.setOperatingStatus(DeviceStatusEnum.OFFLINE.getName());
}
}
return deviceRespVOPageResult;
@ -504,7 +534,7 @@ public class DeviceServiceImpl implements DeviceService {
DeviceDO deviceDO = validateConnectRequest(createReqVO);
if (Objects.equals(createReqVO.getIsConnect(), DeviceConnectionStatusEnum.CONNECTED.getStatus())){
boolean connected = OpcUtils.connect(deviceDO.getUrl(),deviceDO.getUsername(),deviceDO.getPassword(),10);
boolean connected = OpcUtils.connect(deviceDO.getId(),deviceDO.getUrl(),deviceDO.getUsername(),deviceDO.getPassword(),10);
if (connected){
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus()));
deviceMapper.updateById(deviceDO);
@ -521,7 +551,7 @@ public class DeviceServiceImpl implements DeviceService {
throw exception(OPC_CONNECT_FAILURE_DOES_NOT_EXIST);
}
}else if(Objects.equals(createReqVO.getIsConnect(), DeviceConnectionStatusEnum.DISCONNECTED.getStatus())){
boolean disconnect = OpcUtils.disconnect();
boolean disconnect = OpcUtils.disconnect(deviceDO.getId());
if (disconnect){
//更新连接状态
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus()));
@ -996,4 +1026,66 @@ public class DeviceServiceImpl implements DeviceService {
.eq(DeviceContactModelDO::getDeviceId,deviceId)
.orderByDesc(DeviceContactModelDO::getId));
}
@Override
public void updateDeviceEnabled(DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException {
// 1. 校验任务是否存在
DeviceDO deviceDO = deviceMapper.selectById(updateEnabledReqVO.getId());
if (deviceDO == null) {
throw exception(DEVICE_NOT_EXISTS);
}
if (StringUtils.isBlank(deviceDO.getTopic())) {
throw exception(DEVICE_MQTT_TOPIC_EXIST);
}
//TODO 待优化
if (("MQTT".equals(deviceDO.getProtocol()))) {
throw exception(DEVICE_MQTT_TOPIC_EXIST);
}
// 2. 如果状态没有变化,直接返回
if (Objects.equals(deviceDO.getIsEnable(), updateEnabledReqVO.getEnabled())) {
return;
}
// 3. 执行状态更新操作
executeEnableUpdate(deviceDO, updateEnabledReqVO.getEnabled());
}
@Override
public DeviceDO getDeviceByMqttTopic(String topic) {
return deviceMapper.selectOne(Wrappers.<DeviceDO>lambdaQuery().eq(DeviceDO::getTopic,topic));
}
private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) throws MqttException {
try {
// 更新数据库状态
deviceDO.setIsEnable(enabled);
deviceMapper.updateById(deviceDO);
// MQTT 操作
if (enabled) {
try {
mqttService.subscribeTopic(deviceDO.getTopic());
} catch (MqttException e) {
log.error("订阅主题失败: {}", deviceDO.getTopic(), e);
}
} else {
try {
mqttService.unsubscribeTopic(deviceDO.getTopic());
} catch (MqttException e) {
log.error("取消订阅主题失败: {}", deviceDO.getTopic(), e);
}
}
} catch (Exception e) {
// 捕获数据库更新等其他异常
log.error("更新设备状态失败: {}", deviceDO.getDeviceCode(), e);
}
}
}

@ -668,20 +668,6 @@ public class TDengineService {
private String decodeQueryData(byte[] blob) {
if (blob == null || blob.length == 0) return "[]";
@ -701,4 +687,42 @@ public class TDengineService {
}
@DS("tdengine")
public LocalDateTime selectLatestTs(Long deviceId) {
String sql = "SELECT ts FROM besure.d_" + deviceId + " ORDER BY ts DESC LIMIT 1";
final LocalDateTime[] latestTs = {null};
jdbcTemplate.query(sql, rs -> {
if (rs.next()) {
latestTs[0] = rs.getTimestamp("ts").toLocalDateTime();
}
});
return latestTs[0];
}
@DS("tdengine")
public Map<Long, LocalDateTime> selectLatestTsBatch(List<Long> deviceIds) {
Map<Long, LocalDateTime> result = new HashMap<>();
for (Long deviceId : deviceIds) {
String tableName = "besure.d_" + deviceId;
String sql = "SELECT ts FROM " + tableName + " ORDER BY ts DESC LIMIT 1";
try {
LocalDateTime ts = jdbcTemplate.queryForObject(sql, (rs, rowNum) -> rs.getTimestamp("ts").toLocalDateTime());
result.put(deviceId, ts);
} catch (Exception e) {
// 表不存在或者查询失败
result.put(deviceId, null);
log.warn("设备: {}, 获取 ts 失败: {}", deviceId, e.getMessage());
}
}
return result;
}
}

@ -107,4 +107,14 @@ public interface DictDataService {
*/
List<DictDataDO> getDictDataListByDictType(String dictType);
/**
* label
*
* @param label
* @return
*/
DictDataDO getDictDataByDictLabel(String label);
}

@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.system.controller.admin.dict.vo.data.DictDataSave
import cn.iocoder.yudao.module.system.dal.dataobject.dict.DictDataDO;
import cn.iocoder.yudao.module.system.dal.dataobject.dict.DictTypeDO;
import cn.iocoder.yudao.module.system.dal.mysql.dict.DictDataMapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -176,4 +177,13 @@ public class DictDataServiceImpl implements DictDataService {
return list;
}
@Override
public DictDataDO getDictDataByDictLabel(String label) {
return dictDataMapper.selectOne(Wrappers.<DictDataDO>lambdaQuery()
.eq(DictDataDO::getLabel,label)
.orderByDesc(DictDataDO::getCreateTime)
.last("LIMIT 1"));
}
}

@ -262,3 +262,15 @@ justauth:
prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE::
timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟
emqx:
is-enable: true # 是否启用 MQTT
broker: tcp://192.168.5.119:1883 # EMQX 服务器地址TCP 协议)
client-id: mqtt-client-besure_server-dev # 客户端ID
user-name: ngsk # 用户名
password: ngskcloud0809 # 密码
clean-session: true # 是否清空 session
reconnect: true # 是否自动断线重连
timeout: 30 # 连接超时时间(秒)
keep-alive: 60 # 心跳间隔(秒)

@ -262,3 +262,14 @@ justauth:
prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE::
timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟
emqx:
is-enable: true # 是否启用 MQTT
broker: tcp://192.168.21.2:1883 # EMQX 服务器地址TCP 协议)
client-id: mqtt-client-besure-server-prod # 客户端ID
user-name: admin # 用户名
password: admin # 密码
clean-session: true # 是否清空 session
reconnect: true # 是否自动断线重连
timeout: 30 # 连接超时时间(秒)
keep-alive: 60 # 心跳间隔(秒)
Loading…
Cancel
Save