diff --git a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/opc/OpcUtils.java b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/opc/OpcUtils.java index ed7871103..037efd617 100644 --- a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/opc/OpcUtils.java +++ b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/opc/OpcUtils.java @@ -2,645 +2,270 @@ package cn.iocoder.yudao.framework.common.util.opc; import lombok.extern.slf4j.Slf4j; import org.eclipse.milo.opcua.sdk.client.OpcUaClient; -import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig; -import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder; import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; -import org.eclipse.milo.opcua.stack.client.DiscoveryClient; -import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; -import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; -import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; -import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; +import org.eclipse.milo.opcua.stack.core.types.builtin.*; import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; -import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; - -import java.net.InetAddress; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; - -/** - * OPC UA连接静态工具类 - 适用于Eclipse Milo 0.6.9 - * 提供OPC UA服务器的连接、断开、数据读写等功能(静态方法版本) - */ + +import java.util.concurrent.*; +import java.util.*; +import java.util.stream.Collectors; + @Slf4j public class OpcUtils { - // 静态成员变量,所有实例共享 - private static OpcUaClient client; - private static String serverUrl; - private static boolean isConnected = false; - private static final String LOG_PREFIX = "[OPC-UA]"; - - // 私有构造方法,防止实例化 - private OpcUtils() { - throw new UnsupportedOperationException("这是一个工具类,不允许创建实例"); - } + /** + * OPC Client缓存池 + */ + private static final ConcurrentHashMap CLIENT_MAP = + new ConcurrentHashMap<>(); /** - * 连接OPC UA服务器(匿名认证) + * 连接锁 */ - public static boolean connect(String url, int timeoutSeconds) { - return connect(url, null, null, timeoutSeconds); - } + private static final ConcurrentHashMap LOCK_MAP = + new ConcurrentHashMap<>(); - public static boolean connect(String url, String username, String password, int timeoutSeconds) { - if (isConnected) { - log.info(" {} 客户端已连接,无需重复连接",LOG_PREFIX); - return true; - } - serverUrl = url; + public static boolean connect(Long deviceId, + String url, + String username, + String password, + int timeoutSeconds) { try { - log.info(" {} 正在连接到OPC UA服务器 {}",LOG_PREFIX,url); - - // 提取主机和端口 - final String targetHost = extractHostFromUrl(url); - final int targetPort = extractPortFromUrl(url); - final String path = extractPathFromUrl(url); - - System.out.println(LOG_PREFIX + "目标主机: " + targetHost + ", 端口: " + targetPort + ", 路径: " + path); - - - // 将主机名解析为IP地址 - final String ipAddress = resolveToIpAddress(targetHost); - System.out.println(LOG_PREFIX + "解析为IP地址: " + ipAddress); - - if (username != null && password != null && !username.isEmpty()) { - System.out.println(LOG_PREFIX + "使用用户名密码认证: " + username); - - // 用户名密码认证 - client = OpcUaClient.create(url, endpoints -> { - if (endpoints == null || endpoints.isEmpty()) { - System.err.println(LOG_PREFIX + "服务器未返回任何端点"); - return Optional.empty(); - } - - System.out.println(LOG_PREFIX + "发现端点数量: " + endpoints.size()); - - // 查找无安全策略的端点 - for (EndpointDescription endpoint : endpoints) { - String endpointUrl = endpoint.getEndpointUrl(); - System.out.println(LOG_PREFIX + "检查端点: " + endpointUrl + - " | 安全策略: " + endpoint.getSecurityPolicyUri()); - - if ("http://opcfoundation.org/UA/SecurityPolicy#None".equals(endpoint.getSecurityPolicyUri())) { - // 修正端点URL,强制使用IP地址 - String correctedUrl = forceIpAddressEndpoint(endpointUrl, ipAddress, targetPort, path); - System.out.println(LOG_PREFIX + "强制使用IP地址: " + endpointUrl + " -> " + correctedUrl); - - return Optional.of(new EndpointDescription( - correctedUrl, - endpoint.getServer(), - endpoint.getServerCertificate(), - endpoint.getSecurityMode(), - endpoint.getSecurityPolicyUri(), - endpoint.getUserIdentityTokens(), - endpoint.getTransportProfileUri(), - endpoint.getSecurityLevel() - )); - } - } - - System.err.println(LOG_PREFIX + "未找到无安全策略的端点"); - return Optional.empty(); - }, configBuilder -> configBuilder - .setIdentityProvider(new UsernameProvider(username, password)) - .setRequestTimeout(UInteger.valueOf(timeoutSeconds * 1000L)) - .build()); - } else { - System.out.println(LOG_PREFIX + "使用匿名认证"); - - // 对于匿名认证,手动发现端点并修正 - List endpoints = DiscoveryClient.getEndpoints(url).get(timeoutSeconds, TimeUnit.SECONDS); - - if (endpoints == null || endpoints.isEmpty()) { - System.err.println(LOG_PREFIX + "服务器未返回任何端点"); - return false; - } + OpcUaClient client = CLIENT_MAP.get(deviceId); - // 查找无安全策略的端点 - Optional selectedEndpoint = Optional.empty(); - for (EndpointDescription endpoint : endpoints) { - if ("http://opcfoundation.org/UA/SecurityPolicy#None".equals(endpoint.getSecurityPolicyUri())) { - selectedEndpoint = Optional.of(endpoint); - break; - } - } + if (isConnected(client)) { + return true; + } + + Object lock = LOCK_MAP.computeIfAbsent(deviceId, k -> new Object()); - if (!selectedEndpoint.isPresent()) { - System.err.println(LOG_PREFIX + "未找到无安全策略的端点"); - return false; + synchronized (lock) { + + client = CLIENT_MAP.get(deviceId); + if (isConnected(client)) { + return true; } - EndpointDescription endpoint = selectedEndpoint.get(); - String correctedUrl = forceIpAddressEndpoint(endpoint.getEndpointUrl(), ipAddress, targetPort, path); - System.out.println(LOG_PREFIX + "强制使用IP地址: " + endpoint.getEndpointUrl() + " -> " + correctedUrl); - - EndpointDescription correctedEndpoint = new EndpointDescription( - correctedUrl, - endpoint.getServer(), - endpoint.getServerCertificate(), - endpoint.getSecurityMode(), - endpoint.getSecurityPolicyUri(), - endpoint.getUserIdentityTokens(), - endpoint.getTransportProfileUri(), - endpoint.getSecurityLevel() + log.info("创建OPC连接 deviceId={}, url={}", deviceId, url); + + // 只用用户名/密码连接 + IdentityProvider identityProvider = new UsernameProvider(username, password); + + OpcUaClient newClient = OpcUaClient.create( + url, + // 只选择 SecurityPolicy.None 的端点 + endpoints -> endpoints.stream() + .filter(ep -> ep.getSecurityPolicyUri().equals("http://opcfoundation.org/UA/SecurityPolicy#None")) + .findFirst(), + configBuilder -> configBuilder + .setIdentityProvider(identityProvider) + .setRequestTimeout(uint(timeoutSeconds * 1000)) + .build() ); - OpcUaClientConfigBuilder configBuilder = OpcUaClientConfig.builder() - .setEndpoint(correctedEndpoint) - .setIdentityProvider(new AnonymousProvider()) - .setRequestTimeout(UInteger.valueOf(timeoutSeconds * 1000L)); - - client = OpcUaClient.create(configBuilder.build()); - } + newClient.connect().get(timeoutSeconds, TimeUnit.SECONDS); - client.connect().get(timeoutSeconds, TimeUnit.SECONDS); + CLIENT_MAP.put(deviceId, newClient); - if (validateConnection()) { - isConnected = true; - System.out.println(LOG_PREFIX + "服务器连接成功"); + log.info("OPC连接成功 deviceId={}", deviceId); return true; - } else { - System.err.println(LOG_PREFIX + "连接验证失败"); - return false; } } catch (Exception e) { - System.err.println(LOG_PREFIX + "连接失败: " + e.getMessage()); - e.printStackTrace(); + log.error("OPC连接失败 deviceId={}", deviceId, e); + CLIENT_MAP.remove(deviceId); return false; } } - /** - * 将主机名解析为IP地址 - */ - private static String resolveToIpAddress(String host) { - try { - // 如果已经是IP地址,直接返回 - if (host.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) { - return host; - } - - // 尝试解析主机名 - InetAddress address = InetAddress.getByName(host); - return address.getHostAddress(); - - } catch (Exception e) { - System.err.println(LOG_PREFIX + "解析主机名失败: " + host + ", 错误: " + e.getMessage()); - return host; // 返回原始值 - } - } /** - * 强制使用IP地址构建端点URL + * 批量读取(线程安全 + 自动检测session) */ - private static String forceIpAddressEndpoint(String endpointUrl, String ipAddress, int defaultPort, String defaultPath) { - try { - // 提取端口和路径 - int port = extractPortFromUrl(endpointUrl); - if (port <= 0) { - port = defaultPort; - } - - String path = extractPathFromUrl(endpointUrl); - if (path.isEmpty()) { - path = defaultPath; - } - - return String.format("opc.tcp://%s:%d%s", ipAddress, port, path); + public static List readValues(Long deviceId, + List addresses) { - } catch (Exception e) { - System.err.println(LOG_PREFIX + "构建IP端点URL失败: " + e.getMessage()); - return endpointUrl; - } - } + OpcUaClient client = CLIENT_MAP.get(deviceId); + if (!isConnected(client)) { + log.warn("OPC未连接或session失效 deviceId={}", deviceId); + CLIENT_MAP.remove(deviceId); + return buildNullResult(addresses.size()); + } - /** - * 从URL中提取主机地址 - */ - private static String extractHostFromUrl(String url) { try { - // 格式: opc.tcp://host:port/path - if (url == null || url.trim().isEmpty()) { - return "localhost"; - } - - String withoutProtocol = url.replace("opc.tcp://", ""); - // 处理IPv6地址 - if (withoutProtocol.startsWith("[")) { - // IPv6地址格式: [2001:db8::1]:4840/path - int closeBracket = withoutProtocol.indexOf("]"); - if (closeBracket > 0) { - String host = withoutProtocol.substring(1, closeBracket); - return host; - } - } - // 普通IPv4或主机名 - String[] parts = withoutProtocol.split("[:/]"); - if (parts.length > 0 && !parts[0].isEmpty()) { - return parts[0]; - } - } catch (Exception e) { - System.err.println(LOG_PREFIX + "提取主机地址失败: " + e.getMessage()); - } - return "localhost"; - } + List nodeIds = + addresses.stream() + .map(OpcUtils::parseNodeIdSafe) + .collect(Collectors.toList()); - /** - * 从URL中提取端口 - */ - private static int extractPortFromUrl(String url) { - try { - if (url == null || url.trim().isEmpty()) { - return 4840; // 默认OPC UA端口 - } + List values = + client.readValues( + 0.0, + TimestampsToReturn.Both, + nodeIds + ) + .get(5, TimeUnit.SECONDS); - String withoutProtocol = url.replace("opc.tcp://", ""); + List result = + new ArrayList<>(values.size()); - // 查找端口号 - int portStart = -1; - int portEnd = -1; + for (DataValue dv : values) { - if (withoutProtocol.startsWith("[")) { - // IPv6地址: [2001:db8::1]:4840/path - int closeBracket = withoutProtocol.indexOf("]"); - if (closeBracket > 0) { - portStart = withoutProtocol.indexOf(":", closeBracket); - } - } else { - // IPv4或主机名: host:4840/path - int firstColon = withoutProtocol.indexOf(":"); - if (firstColon > 0) { - portStart = firstColon; + if (dv == null || dv.getValue() == null) { + result.add(null); + continue; } - } - if (portStart > 0) { - portStart++; // 跳过冒号 - portEnd = withoutProtocol.indexOf("/", portStart); - if (portEnd < 0) { - portEnd = withoutProtocol.length(); - } + Variant variant = dv.getValue(); - String portStr = withoutProtocol.substring(portStart, portEnd); - if (portStr.matches("\\d+")) { - return Integer.parseInt(portStr); - } + result.add( + variant != null + ? variant.getValue() + : null + ); } + return result; + } catch (Exception e) { - System.err.println(LOG_PREFIX + "提取端口失败: " + e.getMessage()); - } - return 4840; // 默认OPC UA端口 - } - /** - * 从URL中提取路径 - */ - private static String extractPathFromUrl(String url) { - try { - if (url == null || url.trim().isEmpty()) { - return ""; - } + log.error("OPC读取失败 deviceId={}", deviceId, e); - String withoutProtocol = url.replace("opc.tcp://", ""); - - // 查找路径开始位置 - int pathStart = -1; - - if (withoutProtocol.startsWith("[")) { - // IPv6地址: [2001:db8::1]:4840/path - int closeBracket = withoutProtocol.indexOf("]"); - if (closeBracket > 0) { - // 检查是否有端口 - int colonAfterBracket = withoutProtocol.indexOf(":", closeBracket); - if (colonAfterBracket > 0) { - // 有端口: [2001:db8::1]:4840/path - int slashAfterPort = withoutProtocol.indexOf("/", colonAfterBracket); - if (slashAfterPort > 0) { - return withoutProtocol.substring(slashAfterPort); - } - } else { - // 无端口: [2001:db8::1]/path - int slashAfterBracket = withoutProtocol.indexOf("/", closeBracket); - if (slashAfterBracket > 0) { - return withoutProtocol.substring(slashAfterBracket); - } - } - } - } else { - // IPv4或主机名 - // 先找主机名结束位置 - int hostEnd = withoutProtocol.indexOf(":"); - if (hostEnd < 0) { - // 无端口: host/path - hostEnd = withoutProtocol.indexOf("/"); - if (hostEnd > 0) { - return withoutProtocol.substring(hostEnd); - } - } else { - // 有端口: host:port/path - int portEnd = withoutProtocol.indexOf("/", hostEnd); - if (portEnd > 0) { - return withoutProtocol.substring(portEnd); - } - } - } + CLIENT_MAP.remove(deviceId); - } catch (Exception e) { - System.err.println(LOG_PREFIX + "提取路径失败: " + e.getMessage()); + return buildNullResult(addresses.size()); } - return ""; } + /** - * 从URL中提取协议 + * 判断连接是否有效(关键方法) */ - private static String extractProtocolFromUrl(String url) { - try { - if (url == null || url.trim().isEmpty()) { - return "opc.tcp"; - } + private static boolean isConnected(OpcUaClient client) { - int protocolEnd = url.indexOf("://"); - if (protocolEnd > 0) { - return url.substring(0, protocolEnd); - } - } catch (Exception e) { - System.err.println(LOG_PREFIX + "提取协议失败: " + e.getMessage()); + if (client == null) { + return false; } - return "opc.tcp"; - } - /** - * 构建完整的URL - */ - private static String buildUrl(String protocol, String host, int port, String path) { - StringBuilder url = new StringBuilder(); + try { - if (protocol == null || protocol.isEmpty()) { - protocol = "opc.tcp"; - } - url.append(protocol).append("://"); + client.getSession() + .get(200, TimeUnit.MILLISECONDS); - // 处理IPv6地址 - if (host.contains(":")) { - url.append("[").append(host).append("]"); - } else { - url.append(host); - } + return true; - if (port > 0) { - url.append(":").append(port); - } + } catch (Exception e) { - if (path != null && !path.isEmpty()) { - if (!path.startsWith("/")) { - url.append("/"); - } - url.append(path); + return false; } - - return url.toString(); } + /** - * 修正端点URL,强制使用指定的主机和端口 + * 安全解析NodeId(支持全部格式) + * + * ns=2;s=Tag + * ns=2;i=123 + * ns=3;g=UUID */ - private static String correctEndpointUrl(String endpointUrl, String targetHost, int targetPort) { - if (endpointUrl == null || endpointUrl.trim().isEmpty()) { - return endpointUrl; - } + private static NodeId parseNodeIdSafe(String address) { try { - // 提取原URL的各个部分 - String protocol = extractProtocolFromUrl(endpointUrl); - String originalHost = extractHostFromUrl(endpointUrl); - int originalPort = extractPortFromUrl(endpointUrl); - String path = extractPathFromUrl(endpointUrl); - - // 决定使用哪个主机 - String useHost = targetHost; - int usePort = (targetPort > 0) ? targetPort : originalPort; - - // 如果原端口无效,使用默认端口 - if (usePort <= 0) { - usePort = 4840; + + if (address == null || + address.trim().isEmpty()) { + return null; } - // 构建修正后的URL - return buildUrl(protocol, useHost, usePort, path); + return NodeId.parse(address.trim()); } catch (Exception e) { - System.err.println(LOG_PREFIX + "修正端点URL失败: " + e.getMessage()); - return endpointUrl; - } - } - - + log.warn("NodeId解析失败: {}", address); - - /** - * 检查是否为本地地址 - */ - private static boolean isLocalAddress(String url) { - if (url == null) return false; - - return url.contains("127.0.0.1") || - url.contains("localhost") || - url.contains("127.0.1.1") || - url.contains("[::1]") || - url.startsWith("opc.tcp://localhost") || - url.startsWith("opc.tcp://127."); + return null; + } } + /** - * 修正端点URL中的本地地址 + * 构造null结果(防止任务崩溃) */ - private static String correctEndpointUrl(String endpointUrl, String targetHost) { - if (endpointUrl == null) return endpointUrl; - - // 替换各种本地地址表示形式 - String corrected = endpointUrl - .replace("127.0.0.1", targetHost) - .replace("localhost", targetHost) - .replace("127.0.1.1", targetHost) - .replace("[::1]", targetHost); - - // 如果还有localhost:端口的形式 - if (corrected.contains("localhost:")) { - corrected = corrected.replace("localhost", targetHost); - } + private static List buildNullResult(int size) { - return corrected; - } + List result = + new ArrayList<>(size); - /** - * 断开连接 - */ - public static boolean disconnect() { - if (!isConnected || client == null) { - System.out.println(LOG_PREFIX + "客户端未连接"); - return true; + for (int i = 0; i < size; i++) { + result.add(null); } - try { - client.disconnect().get(5, TimeUnit.SECONDS); - isConnected = false; - client = null; - System.out.println(LOG_PREFIX + "连接已断开"); - return true; - } catch (Exception e) { - System.err.println(LOG_PREFIX + "断开连接失败: " + e.getMessage()); - return false; - } + return result; } - /** - * 读取节点值 - */ - public static Object readValue(String nodeId) { - return readValue(nodeId, 10); - } /** - * 读取节点值(自定义超时时间) + * 断开指定设备 */ - public static Object readValue(String nodeId, int timeoutSeconds) { - if (!isConnected()) { - System.err.println(LOG_PREFIX + "客户端未连接"); - return null; - } + public static boolean disconnect(Long deviceId) { try { - NodeId id = NodeId.parse(nodeId); - DataValue value = client.readValue(0.0, TimestampsToReturn.Both, id) - .get(timeoutSeconds, TimeUnit.SECONDS); - Object result = value.getValue().getValue(); - System.out.println(LOG_PREFIX + "读取节点成功: " + nodeId + " = " + result); - return result; - } catch (Exception e) { - System.err.println(LOG_PREFIX + "读取节点值失败[" + nodeId + "]: " + e.getMessage()); - return null; - } - } - - /** - * 写入节点值 - */ - public static boolean writeValue(String nodeId, Object value) { - return writeValue(nodeId, value, 10); - } - /** - * 写入节点值(自定义超时时间) - */ - public static boolean writeValue(String nodeId, Object value, int timeoutSeconds) { - if (!isConnected()) { - System.err.println(LOG_PREFIX + "客户端未连接"); - return false; - } + OpcUaClient client = + CLIENT_MAP.remove(deviceId); - try { - NodeId id = NodeId.parse(nodeId); - DataValue dataValue = new DataValue(new Variant(value), null, null); + if (client != null) { - client.writeValue(id, dataValue).get(timeoutSeconds, TimeUnit.SECONDS); - System.out.println(LOG_PREFIX + "写入节点成功: " + nodeId + " = " + value); - return true; - } catch (Exception e) { - System.err.println(LOG_PREFIX + "写入节点值失败[" + nodeId + "]: " + e.getMessage()); - return false; - } - } + client.disconnect().get(); - /** - * 检查连接状态 - */ - public static boolean isConnected() { - if (!isConnected || client == null) return false; + log.info("OPC断开 deviceId={}", deviceId); + return true; + } - try { - return validateConnection(); } catch (Exception e) { - isConnected = false; + + log.error("断开失败 deviceId={}", deviceId, e); return false; - } - } - /** - * 获取连接详细信息 - */ - public static String getConnectionInfo() { - if (!isConnected) { - return "未连接"; - } - try { - return "已连接到: " + serverUrl; - } catch (Exception e) { - return "已连接到: " + serverUrl + ", 会话信息获取失败"; } + return true; + } + /** - * 验证连接有效性 + * 系统关闭时调用 */ - private static boolean validateConnection() { - if (client == null) return false; + public static void disconnectAll() { - try { - NodeId rootNode = new NodeId(0, 84); // RootFolder - DataValue value = client.readValue(0.0, TimestampsToReturn.Both, rootNode) - .get(5, TimeUnit.SECONDS); - return value != null; - } catch (Exception e) { - return false; - } - } + CLIENT_MAP.forEach((deviceId, client) -> { - /** - * 获取服务器URL - */ - public static String getServerUrl() { - return serverUrl; - } + try { + client.disconnect().get(); + } catch (Exception ignored) {} + }); - /** - * 获取客户端实例(用于高级操作) - */ - public static OpcUaClient getClient() { - return client; - } + CLIENT_MAP.clear(); - /** - * 显式资源清理方法 - */ - public static void destroy() { - disconnect(); + log.info("全部OPC连接关闭"); } - /** - * 注册关闭钩子,确保程序退出时清理资源 - */ - public static void registerShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - if (isConnected) { - System.out.println(LOG_PREFIX + "检测到JVM关闭,正在清理OPC UA连接..."); - disconnect(); - } - })); + + private static + org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger + uint(long value) { + + return org.eclipse.milo.opcua.stack.core.types.builtin.unsigned + .Unsigned.uint(value); } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 3da585734..553e0eb65 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -11,6 +11,7 @@ public interface ErrorCodeConstants { ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_003_000_000, "设备不存在"); ErrorCode DEVICE_REFERENCES_EXIST = new ErrorCode(1_003_000_000, "存在设备已被引用,请先删除引用。"); + ErrorCode DEVICE_MQTT_TOPIC_EXIST = new ErrorCode(1_003_000_000, "设备MQTT主题不存在。"); ErrorCode DEVICE_EXISTS = new ErrorCode(1_003_000_000, "同名或同主题设备已存在"); @@ -74,6 +75,7 @@ public interface ErrorCodeConstants { ErrorCode RECIPE_CODE_DUPLICATE = new ErrorCode(1_003_000_004, "编码已存在"); ErrorCode RECIPE_CODE_EMPTY = new ErrorCode(1_003_000_005, "配方编码不能为空"); ErrorCode RECIPE_PLAN_DETAIL_NOT_EXISTS = new ErrorCode(1_003_000_006, "配方计划详情表(配方库)不存在"); + ErrorCode RECIPE_PLAN_DETAIL_CODE_EXISTS = new ErrorCode(1_003_000_006, "编码已存在"); ErrorCode RECIPE_POINT_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_007, "IoT配方点位记录不存在"); ErrorCode RECIPE_DEVICE_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_008, "设备点位采集值记录不存在"); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java index 263a4f6f4..8c9cebb32 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java @@ -11,12 +11,14 @@ import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.Device import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; import cn.iocoder.yudao.module.iot.service.device.DeviceService; import cn.iocoder.yudao.module.iot.service.device.TDengineService; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; +import org.eclipse.paho.client.mqttv3.MqttException; import org.quartz.SchedulerException; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; @@ -50,6 +52,11 @@ public class DeviceController { @Resource private JobService jobService; + @Resource + private OpcUaSubscriptionService opcUaService; + +// @Resource +// private IMqttservice mqttService; @PostMapping("/create") @@ -318,6 +325,26 @@ public class DeviceController { } - +// @PostMapping("/subscribe") +// public String subscribeTopic(@RequestParam String topic) { +// try { +// int result = mqttService.subscribeTopic(topic); // 使用服务层安全订阅 +// if (result >0 ) { +// return "订阅成功: " + topic; +// } else { +// return "订阅失败: " + topic; +// } +// } catch (MqttException e) { +// return "订阅异常: " + e.getMessage(); +// } +// } + + @PutMapping("/update-enabled") + @Operation(summary = "更新任务管理启用状态") + @PreAuthorize("@ss.hasPermission('mes:task-management:update')") + public CommonResult updateDeviceEnabled(@Valid @RequestBody DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException { + deviceService.updateDeviceEnabled(updateEnabledReqVO); + return success(true); + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java new file mode 100644 index 000000000..a4cfb7506 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java @@ -0,0 +1,136 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device; + + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; +import org.eclipse.milo.opcua.sdk.client.api.subscriptions.*; +import org.eclipse.milo.opcua.stack.client.DiscoveryClient; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; +import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode; +import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; +import org.eclipse.milo.opcua.stack.core.types.structured.*; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; + +@Slf4j +@Service +public class OpcUaSubscriptionService { + + private static final String URL = "opc.tcp://192.168.21.5:4840"; + private static final String USERNAME = "bst"; + private static final String PASSWORD = "Bst123456"; + + private OpcUaClient client; + private UaSubscription subscription; + + // 保存已订阅的节点 + private final ConcurrentHashMap monitoredItems = new ConcurrentHashMap<>(); + + /** + * 订阅 OPC UA 节点变化 + */ + public void subscribeNode(String nodeAddress, int samplingMillis) throws Exception { + + if (client == null) { + connect(); + } + + if (subscription == null) { + subscription = client.getSubscriptionManager() + .createSubscription((double) samplingMillis) + .get(); + log.info("创建订阅,采样间隔: {} ms", samplingMillis); + } + + NodeId nodeId = NodeId.parse(nodeAddress); + + ReadValueId readValueId = new ReadValueId( + nodeId, + org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger.valueOf(13), // Value 属性 + null, + null + ); + + MonitoringParameters parameters = new MonitoringParameters( + uint(nodeId.hashCode()), + (double) samplingMillis, + null, + uint(10), + true + ); + + MonitoredItemCreateRequest request = new MonitoredItemCreateRequest( + readValueId, + MonitoringMode.Reporting, + parameters + ); + List requests = new ArrayList<>(); + requests.add(request); + List items = subscription.createMonitoredItems( + TimestampsToReturn.Both, + requests + ).get(); + + UaMonitoredItem item = items.get(0); + item.setValueConsumer((monitoredItem, value) -> { + Variant variant = value.getValue(); + Object v = variant != null ? variant.getValue() : null; + log.info("节点 {} 值变化: {}", nodeAddress, v); + }); + + monitoredItems.put(nodeAddress, item); + + log.info("成功订阅节点: {}", nodeAddress); + } + + private void connect() throws Exception { + IdentityProvider identity = new UsernameProvider(USERNAME, PASSWORD); + + // 1. 获取端点 + List endpoints = DiscoveryClient.getEndpoints(URL) + .get(5, TimeUnit.SECONDS); + + EndpointDescription endpoint = endpoints.stream() + .filter(e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri())) + .findFirst() + .orElseThrow(() -> new RuntimeException("未找到 SecurityPolicy=None 的端点")); + + // 2. 构建客户端配置 + OpcUaClientConfig config = OpcUaClientConfig.builder() + .setEndpoint(endpoint) + .setIdentityProvider(identity) + .build(); + + // 3. 创建客户端并连接 + client = OpcUaClient.create(config); + client.connect().get(); + + log.info("成功连接 OPC UA 服务端: {}", URL); + } + +// /** +// * 断开连接 +// */ +// public void disconnect() { +// if (client != null) { +// try { +// client.disconnect().get(); +// log.info("断开 OPC UA 连接"); +// } catch (Exception e) { +// log.error("断开 OPC UA 连接失败", e); +// } +// } +// } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java index 39295bccf..3789ce03c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java @@ -116,15 +116,16 @@ public class DeviceTask implements Task { logger.info("执行设备任务,任务ID: {}, 参数: {}, 时间: {}", taskId, taskParam, currentTime); - executeDeviceLogic(taskId,taskParam); + executeDeviceLogic(taskId, taskParam); } catch (Exception e) { logger.error("执行设备任务异常,任务ID: {}", taskId, e); - } finally { - //确保出问题不会打满opcv服务器连接数 - OpcUtils.disconnect(); } +// finally { +// //确保出问题不会打满opcv服务器连接数 +// OpcUtils.disconnect(); +// } } /** @@ -137,10 +138,6 @@ public class DeviceTask implements Task { Long deviceId = sourceDeviceId - 1000000L; logger.info("处理后设备ID: {}", deviceId); - if (deviceId == null) { - throw new RuntimeException("设备ID不能为空"); - } - // 2. 获取设备信息 DeviceDO device = getDeviceInfo(deviceId); @@ -151,7 +148,7 @@ public class DeviceTask implements Task { processDeviceData(deviceId, device); // 5. 断开连接 - OpcUtils.disconnect(); +// OpcUtils.disconnect(); } @@ -183,7 +180,13 @@ public class DeviceTask implements Task { try { - connected = OpcUtils.connect(device.getUrl(), username, password, 10); + connected = OpcUtils.connect( + device.getId(), + device.getUrl(), + username, + password, + 10 + ); if (!connected) { log.error("设备 {} 连接OPC服务器失败,URL: {}", device.getId(), device.getUrl()); @@ -224,62 +227,74 @@ public class DeviceTask implements Task { } /** - * 处理设备数据 + * 处理设备数据 - 安全批量读取 OPC UA 点位 */ private void processDeviceData(Long deviceId, DeviceDO device) { - DeviceDO deviceDO = deviceMapper.selectById(deviceId); // 1. 查询点位配置 List points = getDevicePoints(deviceId); if (CollectionUtils.isEmpty(points)) { - - - logger.warn("设备 {} 未配置点位", deviceId); - //更新状态为待机中 + // 更新状态为待机中 + DeviceDO deviceDO = deviceMapper.selectById(deviceId); DeviceOperationRecordDO record = new DeviceOperationRecordDO(); - record.setDeviceId(device.getId()); + record.setDeviceId(deviceId); record.setRule(DeviceStatusEnum.STANDBY.getCode()); record.setTotalStandbyTime(deviceDO.getSampleCycle()); - //TODO 创建人和更新人为内置默认管理员 record.setCreator("1"); record.setUpdater("1"); - - deviceOperationRecordMapper.insert(record); return; } logger.info("设备 {} 需要读取 {} 个点位", deviceId, points.size()); - - DevicePointRulesDO devicePointRulesDO = devicePointRulesMapper.selectOne(Wrappers.lambdaQuery() - .eq(DevicePointRulesDO::getDeviceId, deviceId) - .eq(DevicePointRulesDO::getIdentifier, DeviceBasicStatusEnum.RUNNING)); - if(devicePointRulesDO !=null && devicePointRulesDO.getFieldRule() == null ){ - //更新状态为待机中 - DeviceOperationRecordDO record = new DeviceOperationRecordDO(); - record.setDeviceId(device.getId()); - record.setRule(DeviceStatusEnum.STANDBY.getCode()); - record.setTotalStandbyTime(deviceDO.getSampleCycle()); - //TODO 创建人和更新人为内置默认管理员 - record.setCreator("1"); - record.setUpdater("1"); - deviceOperationRecordMapper.insert(record); + // 2. 构建有效地址列表及索引映射,确保顺序对应 + List addresses = new ArrayList<>(); + List indexMap = new ArrayList<>(); // 对应原 points 的索引 + for (int i = 0; i < points.size(); i++) { + String address = StringUtils.trimToEmpty(points.get(i).getAddress()); + if (!address.isEmpty()) { + addresses.add(address); + indexMap.add(i); + } } + // 3. 批量读取 OPC UA 点位 + List values = new ArrayList<>(); + try { + if (!addresses.isEmpty()) { + values = OpcUtils.readValues(device.getId(), addresses); + } + } catch (Exception e) { + logger.error("设备 {} 批量读取 OPC UA 点位异常", deviceId, e); + } - // 2. 读取并处理数据 + // 4. 处理读取到的数据 int successCount = 0; List validDataList = new ArrayList<>(); + for (int i = 0; i < values.size(); i++) { + DeviceContactModelDO point = points.get(indexMap.get(i)); + Object value = values.get(i); + + try { + String processedValue = processOpcValue(value); + + // 规则判断和告警/运行记录处理 + judgmentRules(processedValue, point.getAttributeCode(), device, point.getId()); - for (DeviceContactModelDO point : points) { - processSinglePoint(point, validDataList,device); - if (point.getAddressValue() != null) { - successCount++; + point.setAddressValue(processedValue.isEmpty() ? null : processedValue); + + if (point.getAddressValue() != null) { + successCount++; + } + validDataList.add(point); + + } catch (Exception e) { + logger.error("处理点位 {} 异常,地址: {}", point.getId(), point.getAddress(), e); } } - // 3. 入库处理 + // 5. 入库处理 if (!validDataList.isEmpty()) { saveToDatabase(deviceId, validDataList, successCount); } else { @@ -287,6 +302,7 @@ public class DeviceTask implements Task { } } + /** * 获取设备点位 */ @@ -296,36 +312,35 @@ public class DeviceTask implements Task { return deviceContactModelMapper.selectList(query); } - /** - * 处理单个点位 - */ - private void processSinglePoint(DeviceContactModelDO point, List validDataList, DeviceDO device) { - try { - String address = StringUtils.trimToEmpty(point.getAddress()); - if (address.isEmpty()) { - logger.warn("点位ID {} 地址为空", point.getId()); - return; - } - - Object value = OpcUtils.readValue(address); -// if (value == null) { - logger.warn("读取点位 {} ,地址: {}", point.getId(), address); -// } else { - String processedValue = processOpcValue(value); - - //判断规则 - judgmentRules(processedValue,point.getAttributeCode(),device,point.getId()); - point.setAddressValue(processedValue.isEmpty() ? null : processedValue); - +// /** +// * 处理单个点位 +// */ +// private void processSinglePoint(DeviceContactModelDO point, List validDataList, DeviceDO device) { +// try { +// String address = StringUtils.trimToEmpty(point.getAddress()); +// if (address.isEmpty()) { +// logger.warn("点位ID {} 地址为空", point.getId()); +// return; // } - - validDataList.add(point); - } catch (Exception e) { - logger.error("处理点位 {} 异常,地址: {}", - point.getId(), point.getAddress(), e); - } - } - +// +// Object value = OpcUtils.readValue(device.getId(),address); +//// if (value == null) { +// logger.info("读取点位 {} ,地址: {}", point.getId(), address); +//// } else { +// String processedValue = processOpcValue(value); +// +// //判断规则 +// judgmentRules(processedValue,point.getAttributeCode(),device,point.getId()); +// point.setAddressValue(processedValue.isEmpty() ? null : processedValue); +// +//// } +// +// validDataList.add(point); +// } catch (Exception e) { +// logger.error("处理点位 {} 异常,地址: {}", +// point.getId(), point.getAddress(), e); +// } +// } /** @@ -365,7 +380,7 @@ public class DeviceTask implements Task { } } - private void judgmentRules(String processedValue, String attributeCode, DeviceDO device,Long modelId) { + private void judgmentRules(String processedValue, String attributeCode, DeviceDO device, Long modelId) { if (StringUtils.isBlank(processedValue)) { logger.warn("待判断的值为空,编码attributeCode: {}, deviceId: {}", attributeCode, device.getId()); // return; @@ -383,9 +398,9 @@ public class DeviceTask implements Task { // 2. 遍历规则 for (DevicePointRulesDO devicePointRulesDO : devicePointRulesDOList) { - if (StringUtils.isBlank(devicePointRulesDO.getFieldRule())) { + if (StringUtils.isBlank(devicePointRulesDO.getFieldRule())) { continue; - } + } // 3. 解析规则列表 List pointRulesVOList = JSON.parseArray( @@ -408,14 +423,14 @@ public class DeviceTask implements Task { JSON.toJSONString(pointRulesRespVO)); // 执行匹配成功后的逻辑 - handleMatchedSuccessRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode,modelId); + handleMatchedSuccessRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode, modelId); break; } else { logger.debug("规则不匹配: modelId={}, value={}, rule={}", attributeCode, processedValue, JSON.toJSONString(pointRulesRespVO)); // 执行匹配失败后的逻辑 - handleMatchedFailureRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode); + handleMatchedFailureRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode); } } @@ -423,7 +438,7 @@ public class DeviceTask implements Task { } } - private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO,PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) { + private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO, PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) { //TODO 离线待优化 // if (devicePointRulesDO.getIdentifier().equals(DeviceBasicStatusEnum.RUNNING.getCode())){ // DeviceOperationRecordDO record = new DeviceOperationRecordDO(); @@ -446,13 +461,13 @@ public class DeviceTask implements Task { String attributeCode, Long modelId) { DeviceContactModelDO deviceContactModelDO = deviceContactModelMapper.selectById(modelId); - if (deviceContactModelDO == null){ + if (deviceContactModelDO == null) { return; } //分别处理运行记录和告警记录 - if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())){ + if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())) { DeviceOperationRecordDO record = new DeviceOperationRecordDO(); record.setDeviceId(device.getId()); record.setModelId(modelId); @@ -468,7 +483,7 @@ public class DeviceTask implements Task { calculateAndSetTotalTime(record, pointRulesRespVO.getRule(), device.getSampleCycle()); deviceOperationRecordMapper.insert(record); - }else { + } else { DeviceWarinningRecordDO deviceWarinningRecordDO = new DeviceWarinningRecordDO(); deviceWarinningRecordDO.setDeviceId(device.getId()); @@ -649,12 +664,18 @@ public class DeviceTask implements Task { */ private boolean compareNumbers(Double value, Double ruleValue, String operator) { switch (operator) { - case "EQ": return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度 - case "NE": return Math.abs(value - ruleValue) >= 0.000001; - case "GT": return value > ruleValue; - case "GE": return value >= ruleValue; - case "LT": return value < ruleValue; - case "LE": return value <= ruleValue; + case "EQ": + return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度 + case "NE": + return Math.abs(value - ruleValue) >= 0.000001; + case "GT": + return value > ruleValue; + case "GE": + return value >= ruleValue; + case "LT": + return value < ruleValue; + case "LE": + return value <= ruleValue; default: logger.warn("不支持的操作符: {}", operator); return false; @@ -666,12 +687,18 @@ public class DeviceTask implements Task { */ private boolean compareStrings(String value, String ruleValue, String operator) { switch (operator) { - case "EQ": return value.equals(ruleValue); - case "NE": return !value.equals(ruleValue); - case "GT": return value.compareTo(ruleValue) > 0; - case "GE": return value.compareTo(ruleValue) >= 0; - case "LT": return value.compareTo(ruleValue) < 0; - case "LE": return value.compareTo(ruleValue) <= 0; + case "EQ": + return value.equals(ruleValue); + case "NE": + return !value.equals(ruleValue); + case "GT": + return value.compareTo(ruleValue) > 0; + case "GE": + return value.compareTo(ruleValue) >= 0; + case "LT": + return value.compareTo(ruleValue) < 0; + case "LE": + return value.compareTo(ruleValue) <= 0; default: logger.warn("不支持的操作符: {}", operator); return false; @@ -694,5 +721,4 @@ public class DeviceTask implements Task { } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java new file mode 100644 index 000000000..ad35378ad --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java @@ -0,0 +1,14 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.opcuv; + +import cn.iocoder.yudao.framework.common.util.opc.OpcUtils; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class OpcShutdown { + @PreDestroy + public void shutdown() { + OpcUtils.disconnectAll(); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DevicePageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DevicePageReqVO.java index f5071418a..3b592f9c0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DevicePageReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DevicePageReqVO.java @@ -77,4 +77,7 @@ public class DevicePageReqVO extends PageParam { @Schema(description = "id集合导出用") private String ids; + @Schema(description = "mqtt订阅主题") + private String topic; + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceRespVO.java index 8c81d0139..08e1bcdf4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceRespVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceRespVO.java @@ -122,4 +122,7 @@ public class DeviceRespVO { @Schema(description = "关联组织", example = "1") private Long org; + + @Schema(description = "mqtt订阅主题") + private String topic; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceSaveReqVO.java index c05d0c38d..264ac27c9 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceSaveReqVO.java @@ -75,5 +75,8 @@ public class DeviceSaveReqVO { // @NotNull private Integer isConnect; + @Schema(description = "mqtt订阅主题") + private String topic; + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceUpdateEnabledReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceUpdateEnabledReqVO.java new file mode 100644 index 000000000..811950aa9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/DeviceUpdateEnabledReqVO.java @@ -0,0 +1,20 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotNull; + + +@Schema(description = "采集设备 - 更新启用状态 Request VO") +@Data +public class DeviceUpdateEnabledReqVO { + + @Schema(description = "任务ID", required = true, example = "1024") + @NotNull(message = "任务ID不能为空") + private Long id; + + @Schema(description = "是否启用", required = true, example = "true") + @NotNull(message = "启用状态不能为空") + private Boolean enabled; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedeviceattribute/RecipeDeviceAttributeController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedeviceattribute/RecipeDeviceAttributeController.java index 951364ead..dc92ac8c6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedeviceattribute/RecipeDeviceAttributeController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedeviceattribute/RecipeDeviceAttributeController.java @@ -95,6 +95,14 @@ public class RecipeDeviceAttributeController { return success(pageResult); } + @GetMapping("/getList") + @Operation(summary = "查询配方配置列表") + public CommonResult> getList(RecipeDeviceAttributePageReqVO reqVO) { + // 替换为关联查询方法 + List recipeDeviceAttributeServiceList = recipeDeviceAttributeService.getList(reqVO); + return success(recipeDeviceAttributeServiceList); + } + @GetMapping("/list") @Operation(summary = "查询配方配置列表") public CommonResult> list(RecipeDeviceAttributePageReqVO reqVO) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java index 461513499..75fe0c168 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java @@ -156,68 +156,78 @@ public class RecipeDeviceRecordController { @Operation(summary = "批量创建设备点位采集记录和配方点位记录") @PreAuthorize("@ss.hasPermission('iot:recipe-device-record:create')") public CommonResult batchCreateRecipeDeviceRecord( - @RequestParam("id") Long recipeId) { + @RequestParam("id") Long id) { - try { - RecipePlanDetailDO recipePlanDetailDO = recipePlanDetailService.getRecipePlanDetail(recipeId); - //RecipeRespVO recipeRespVO = recipeService.getRecipeWithDeviceId(recipePlanDetailDO.getRecipeId()); + RecipePlanDetailDO recipePlanDetailDO = recipePlanDetailService.getRecipePlanDetail(id); + Long recipeId = recipePlanDetailDO.getRecipeId(); - //RecipeDO recipeDO = recipeService.getRecipe(recipePlanDetailDO.getRecipeId()); - // ========== 第一步:查询配方关联的点位属性信息 ========== - // 1.1 根据recipeId查询iot_recipe_device_attribute表记录 + // ========== 第一步:查询配方关联的点位属性信息 ========== + // 1.1 根据recipeId查询iot_recipe_device_attribute表记录 - recipePlanDetailDO.setRecipeId(recipeId); - List attributeList = recipeDeviceAttributeService.getByRecipeId(recipePlanDetailDO.getRecipeId()); +// recipePlanDetailDO.setRecipeId(recipeId); + List attributeList = recipeDeviceAttributeService.getByRecipeId(recipeId); - //先删除在添加 - List recipeDeviceRecordDOS = recipeDeviceRecordService.getListByRecipeId(recipeId); - if (!recipeDeviceRecordDOS.isEmpty()){ - recipeDeviceRecordService.deleteByIds(recipeDeviceRecordDOS); - } - RecipeDO recipe = recipeService.getRecipe(recipeId); - if (recipe == null){ - throw exception(RECIPE_NOT_EXISTS); - } + //先删除在添加 + List recipeDeviceRecordDOS = recipeDeviceRecordService.getListByRecipeId(recipeId); + if (!recipeDeviceRecordDOS.isEmpty()){ + recipeDeviceRecordService.deleteByIds(recipeDeviceRecordDOS); + } + RecipeDO recipe = recipeService.getRecipe(recipeId); + if (recipe == null){ + throw exception(RECIPE_NOT_EXISTS); + } - DeviceRespVO device = deviceService.getDevice(recipe.getMachineId()); - if (device== null ){ - throw exception(DEVICE_NOT_EXISTS); - } + DeviceRespVO device = deviceService.getDevice(recipe.getMachineId()); + if (device != null) { +// throw exception(DEVICE_NOT_EXISTS); - Map deviceContactModelMap = new HashMap<>(); - List deviceContactModelDOS = deviceContactModelService.selectListByDeviceId(device.getId()); - if (!deviceContactModelDOS.isEmpty()){ - deviceContactModelMap = deviceContactModelDOS.stream() - .collect(Collectors.toMap( - DeviceContactModelDO::getId, - Function.identity() - )); - } - OpcUtils.connect(device.getUrl(),device.getUsername(),device.getPassword(),10); + try { - for (RecipeDeviceAttributeDO attributeDO : attributeList) { - DeviceContactModelDO deviceContactModelDO = deviceContactModelMap.get(attributeDO.getAttributeId()); - if (deviceContactModelDO == null){ - continue; + Map deviceContactModelMap = new HashMap<>(); + List deviceContactModelDOS = deviceContactModelService.selectListByDeviceId(device.getId()); + if (!deviceContactModelDOS.isEmpty()) { + deviceContactModelMap = deviceContactModelDOS.stream() + .collect(Collectors.toMap( + DeviceContactModelDO::getId, + Function.identity() + )); } - // 创建 - RecipeDeviceRecordDO recipeDeviceRecordDO = new RecipeDeviceRecordDO(); - recipeDeviceRecordDO.setRecipeId(recipeId); - recipeDeviceRecordDO.setAttributeCode(deviceContactModelDO.getAttributeName()); - recipeDeviceRecordDO.setDataType(deviceContactModelDO.getDataType()); - recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit()); - recipeDeviceRecordDO.setValue((String) OpcUtils.readValue(deviceContactModelDO.getAddress())); - recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class)); + Map> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId() + +// OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10); + + for (RecipeDeviceAttributeDO attributeDO : attributeList) { + Map data = deviceDataMap.get(attributeDO.getAttributeId()); + + DeviceContactModelDO deviceContactModelDO = deviceContactModelMap.get(attributeDO.getAttributeId()); + if (deviceContactModelDO == null) { + continue; + } + // 创建 + RecipeDeviceRecordDO recipeDeviceRecordDO = new RecipeDeviceRecordDO(); + recipeDeviceRecordDO.setRecipeId(recipeId); + recipeDeviceRecordDO.setAttributeCode(deviceContactModelDO.getAttributeCode()); + recipeDeviceRecordDO.setAttributeName(deviceContactModelDO.getAttributeName()); + recipeDeviceRecordDO.setDataType(deviceContactModelDO.getDataType()); + recipeDeviceRecordDO.setDeviceId(deviceContactModelDO.getDeviceId()); + recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit()); +// recipeDeviceRecordDO.setValue((String) OpcUtils.readValues(device.getId(),deviceContactModelDO.getAddress())); + if (data.get("addressValue") != null && data.get("addressValue").toString() != null) { + recipeDeviceRecordDO.setValue(data.get("addressValue").toString()); + } + + recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class)); + + } + } catch (Exception e) { + throw new RuntimeException(e); } - } finally { - OpcUtils.disconnect(); } - return success(true); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/DeviceDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/DeviceDO.java index bdf6786ff..52668bca1 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/DeviceDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/DeviceDO.java @@ -111,5 +111,9 @@ public class DeviceDO extends BaseDO { */ private String tenantId; + /** + * 订阅主题 + */ + private String topic; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java index d00f01f8c..c066f3f41 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java @@ -32,6 +32,7 @@ public interface DeviceMapper extends BaseMapperX { LambdaQueryWrapperX deviceDOLambdaQueryWrapperX = new LambdaQueryWrapperX<>(); deviceDOLambdaQueryWrapperX.likeIfPresent(DeviceDO::getDeviceCode, reqVO.getDeviceCode()) .likeIfPresent(DeviceDO::getDeviceName, reqVO.getDeviceName()) + .likeIfPresent(DeviceDO::getTopic, reqVO.getTopic()) .eqIfPresent(DeviceDO::getDeviceType, reqVO.getDeviceType()) .eqIfPresent(DeviceDO::getStatus, reqVO.getStatus()) .eqIfPresent(DeviceDO::getReadTopic, reqVO.getReadTopic()) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipedeviceattribute/RecipeDeviceAttributeMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipedeviceattribute/RecipeDeviceAttributeMapper.java index 76c7ab31b..f11e61677 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipedeviceattribute/RecipeDeviceAttributeMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipedeviceattribute/RecipeDeviceAttributeMapper.java @@ -98,6 +98,12 @@ public interface RecipeDeviceAttributeMapper extends BaseMapperX(list, page.getTotal()); } + + default List getList(RecipeDeviceAttributePageReqVO reqVO) { + List list = selectPageWithAttribute(null, reqVO); + return list; + } + /** * 根据配方ID查询设备属性记录 * @param recipeId 配方ID diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipeplandetail/RecipePlanDetailMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipeplandetail/RecipePlanDetailMapper.java index 142fb7184..50ab68679 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipeplandetail/RecipePlanDetailMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/recipeplandetail/RecipePlanDetailMapper.java @@ -41,9 +41,9 @@ public interface RecipePlanDetailMapper extends BaseMapperX " p.code AS planCode, ", // 关联mes_plan表的编码 " d.id AS deviceId ", // 新增:关联iot_device表的id作为deviceId "FROM iot_recipe_plan_detail rpd", - "LEFT JOIN iot_recipe r ON rpd.recipe_id = r.id ", // 左关联配方表(避免配方ID不存在时数据丢失) + "LEFT JOIN iot_recipe r ON rpd.recipe_id = r.id ", // 左关联配方表(避免配方ID不存在时数据丢失) "LEFT JOIN mes_plan p ON rpd.plan_id = p.id ", // 左关联计划表 - "LEFT JOIN iot_device d ON r.machine_name = d.device_name ", // 新增:左关联设备表,通过machine_name匹配device_name + "LEFT JOIN iot_device d ON r.machine_id = d.id ", // 新增:左关联设备表,通过machine_name匹配device_name "WHERE rpd.deleted = 0 ", // 过滤已删除数据 "", " AND rpd.code LIKE CONCAT('%', #{reqVO.code}, '%')", diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java index 23f32fa6f..a465b6e2b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java @@ -1,8 +1,22 @@ package cn.iocoder.yudao.module.iot.framework.mqtt.consumer; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceBasicStatusEnum; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum; +import cn.iocoder.yudao.module.iot.controller.admin.devicemodelrules.vo.PointRulesRespVO; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.devicewarinningrecord.DeviceWarinningRecordDO; import cn.iocoder.yudao.module.iot.dal.dataobject.iotorganization.IotOrganizationDO; import cn.iocoder.yudao.module.iot.dal.dataobject.mqttrecord.MqttRecordDO; +import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.devicewarinningrecord.DeviceWarinningRecordMapper; import cn.iocoder.yudao.module.iot.dal.mysql.mqttrecord.MqttRecordMapper; import cn.iocoder.yudao.module.iot.framework.constant.Constants; import cn.iocoder.yudao.module.iot.framework.mqtt.common.SuperConsumer; @@ -11,14 +25,27 @@ import cn.iocoder.yudao.module.iot.framework.mqtt.entity.MqttData; import cn.iocoder.yudao.module.iot.framework.mqtt.utils.DateUtils; import cn.iocoder.yudao.module.iot.framework.mqtt.utils.MqttDataUtils; import cn.iocoder.yudao.module.iot.service.device.DeviceService; +import cn.iocoder.yudao.module.iot.service.device.TDengineService; import cn.iocoder.yudao.module.iot.service.iotorganization.IotOrganizationService; import cn.iocoder.yudao.module.iot.service.mqttrecord.MqttRecordService; +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; @Slf4j @Component @@ -27,7 +54,8 @@ public class MqttDataHandler extends SuperConsumer { @Resource private IotOrganizationService organizationService; @Resource - private DeviceService deviceService; + @Lazy + private DeviceMapper deviceMapper; @Resource private AsyncService asyncService; @@ -36,6 +64,25 @@ public class MqttDataHandler extends SuperConsumer { @Resource private MqttRecordMapper mqttRecordMapper; + @Resource + private DeviceOperationRecordMapper deviceOperationRecordMapper; + + @Resource + private DeviceContactModelMapper deviceContactModelMapper; + + @Resource + private TDengineService tDengineService; + + @Resource + private DevicePointRulesMapper devicePointRulesMapper; + + @Resource + private DeviceWarinningRecordMapper deviceWarinningRecordMapper; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + + @Override public String decoder(MqttMessage msg) { return new String(msg.getPayload()); @@ -58,33 +105,633 @@ public class MqttDataHandler extends SuperConsumer { // }catch (Exception e){ // log.error("asyncService.transferBase error:"+entity); // } + //异步线程查询时需带TenantId,框架TenantContextHolder限制 + //TODO 后续是否要其他TenantId + try { + // 设置租户ID + TenantContextHolder.setTenantId(1L); + save(machine, entity, data, topic); + } finally { + TenantContextHolder.clear(); + } + } + } + + public void save(IotOrganizationDO machine, String entity, MqttData data,String topic) { +// try { +// long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format); +// //timestamp = DateUtils.getMillsLong(); +// LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime()); + + // 如果 deviceDataTime 为空,使用当前时间 +// LocalDateTime date; +// long timestamp; +// if (data.getDeviceDataTime() != null && !data.getDeviceDataTime().isEmpty()) { +// try { +// timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format); +// date = DateUtils.strToLocalDateTime(data.getDeviceDataTime()); +// } catch (Exception e) { +// log.warn("解析 deviceDataTime 异常, 使用当前时间: {}", data.getDeviceDataTime(), e); +// timestamp = DateUtils.getMillsLong(); +// date = LocalDateTime.now(); +// } +// } else { +// timestamp = DateUtils.getMillsLong(); +// date = LocalDateTime.now(); +// } +// +// +// +// MqttRecordDO recordDO = new MqttRecordDO(); +// recordDO.setDeviceCode(data.getDeviceID()); +// recordDO.setGatewayCode(data.getGatewayID()); +// recordDO.setDeviceData(entity); +// recordDO.setDeviceDataTime(date); +// recordDO.setDeviceDataTimeLong(timestamp); +// /**直接保存原始mqtt*/ +// mqttRecordMapper.insert(recordDO); + if (StringUtils.isBlank(entity)) { + log.warn("MQTT消息为空 topic={}", topic); + return; + } + + if (StringUtils.isBlank(topic)) { + log.warn("MQTT topic为空"); + return; + } + try { + + + JsonNode rootNode = OBJECT_MAPPER.readTree(entity); + + JsonNode devListNode = rootNode.get("devList"); + + if (devListNode == null || devListNode.isEmpty()) { + return; + } + + JsonNode varListNode = + devListNode.get(0).get("varList"); + + if (varListNode == null) { + return; + } + + Map varListMap = + OBJECT_MAPPER.convertValue( + varListNode, + Map.class + ); + + DeviceDO deviceDO = deviceMapper.selectOne(Wrappers.lambdaQuery().eq(DeviceDO::getTopic,topic)); + log.info("getDeviceByMqttTopic参数:{}", topic); + if (deviceDO == null) { + log.info("getDeviceByMqttTopic查询出来deviceDO为空"); + + return; + } + + + processDeviceDataFromMqtt( + deviceDO, + varListMap + ); + + } catch (Exception e) { + + log.error("MQTT数据处理异常", e); + + } + } + + + public void processDeviceDataFromMqtt(DeviceDO device, + Map varListMap) { + + Long deviceId = device.getId(); + + // 1. 查询点位配置 + List points = getDevicePoints(deviceId); + + if (CollectionUtils.isEmpty(points)) { + + log.warn("设备 {} 未配置点位", device.getId()); + + DeviceOperationRecordDO record = new DeviceOperationRecordDO(); + record.setDeviceId(deviceId); + record.setRule(DeviceStatusEnum.STANDBY.getCode()); + //TODO 待优化 + record.setTotalStandbyTime(device.getSampleCycle()); + record.setCreator("1"); + record.setUpdater("1"); + + deviceOperationRecordMapper.insert(record); + + return; + } + + if (varListMap == null || varListMap.isEmpty()) { + + log.warn("设备 {} MQTT varList 为空", deviceId); + return; + } + + // 查询RUNNING点位规则 + DevicePointRulesDO devicePoints = getDevicePointRules(deviceId); + if (StringUtils.isBlank(devicePoints.getFieldRule())){ + log.warn("设备 {} 没有RUNNING点位规则", device.getId()); + + DeviceOperationRecordDO record = new DeviceOperationRecordDO(); + record.setDeviceId(deviceId); + record.setRule(DeviceStatusEnum.STANDBY.getCode()); + //TODO 待优化 + record.setTotalStandbyTime(device.getSampleCycle()); + record.setCreator("1"); + record.setUpdater("1"); + + deviceOperationRecordMapper.insert(record); + + } + + + + log.info("设备 {} MQTT 数据点位数量 {}", deviceId, varListMap.size()); + + int successCount = 0; + + List validDataList = new ArrayList<>(); + + // 2. 遍历数据库点位,通过 code 匹配 MQTT + for (DeviceContactModelDO point : points) { + + try { + + String code = point.getAttributeCode(); + + Object value = varListMap.get(code); + + if (value == null) { + validDataList.add(point); + continue; + } + + String processedValue = processOpcValue(value); + + // 规则判断 + judgmentRules( + processedValue, + code, + device, + point.getId() + ); + + point.setAddressValue(processedValue); + + successCount++; + + validDataList.add(point); + + } catch (Exception e) { + + log.error("处理 MQTT 点位异常 deviceId={}, code={}", + deviceId, + point.getAttributeCode(), + e); + } + } + + // 3. 入库 + if (!validDataList.isEmpty()) { + + saveToDatabase( + deviceId, + validDataList, + successCount + ); + + } else { + + log.warn("设备 {} 未匹配到 MQTT 数据", deviceId); + + } + } + + + private DevicePointRulesDO getDevicePointRules(Long deviceId) { + + List list = + devicePointRulesMapper.selectList( + Wrappers.lambdaQuery() + .eq(DevicePointRulesDO::getDeviceId, deviceId) + .eq(DevicePointRulesDO::getIdentifier, "RUNNING") + .orderByDesc(DevicePointRulesDO::getCreateTime) + .last("LIMIT 1") + ); + + if (CollectionUtils.isEmpty(list)) { + + log.info("设备 {} 未找到 RUNNING 规则", deviceId); + + return null; + } + + DevicePointRulesDO rule = list.get(0); + + log.info("设备 {} 使用 RUNNING 规则,规则ID={}, 创建时间={}", + deviceId, + rule.getId(), + rule.getCreateTime()); + + return rule; + } + + - save(machine, entity, data); + /** + * 获取设备点位 + */ + private List getDevicePoints(Long deviceId) { + LambdaQueryWrapper query = new LambdaQueryWrapper<>(); + query.eq(DeviceContactModelDO::getDeviceId, deviceId); + return deviceContactModelMapper.selectList(query); + } + + + /** + * 处理OPC值 + */ + private String processOpcValue(Object value) { + if (value == null) { + return ""; } + + if (value instanceof String) { + return ((String) value).trim(); + } + + return value.toString(); } - public void save(IotOrganizationDO machine, String entity, MqttData data) { + /** + * 保存到数据库 + */ + private void saveToDatabase(Long deviceId, List dataList, int successCount) { try { - long timestamp = DateUtils.strToTimeStamp(data.getDeviceDataTime(), Constants.MQTT_timestamp_format); - //timestamp = DateUtils.getMillsLong(); - LocalDateTime date = DateUtils.strToLocalDateTime(data.getDeviceDataTime()); - - MqttRecordDO recordDO = new MqttRecordDO(); - recordDO.setDeviceCode(data.getDeviceID()); - recordDO.setGatewayCode(data.getGatewayID()); - recordDO.setDeviceData(entity); - recordDO.setDeviceDataTime(date); - recordDO.setDeviceDataTimeLong(timestamp); - /**直接保存原始mqtt*/ - mqttRecordMapper.insert(recordDO); - - /** - * 保存解析后的mqtt数据 - * */ - //tsMqttService.insertDataAddress(data, taskId, timestamp, equipment); + for (DeviceContactModelDO deviceContactModelDO : dataList) { + deviceContactModelDO.setAddress(null); + } + String json = JSON.toJSONString(dataList); + boolean inserted = tDengineService.insertDeviceData(deviceId, json); + + if (inserted) { + log.info("设备 {} 数据入库成功,总数: {},有效: {}", + deviceId, dataList.size(), successCount); + } else { + log.error("设备 {} 数据入库失败", deviceId); + } } catch (Exception e) { - log.error("-----mqttTableName:"); - e.printStackTrace(); + log.error("设备 {} 数据入库异常", deviceId, e); + } + } + + private void judgmentRules(String processedValue, String attributeCode, DeviceDO device, Long modelId) { + if (StringUtils.isBlank(processedValue)) { + log.warn("待判断的值为空,编码attributeCode: {}, deviceId: {}", attributeCode, device.getId()); +// return; + } + + // 1. 查询设备规则 + List devicePointRulesDOList = devicePointRulesMapper.selectList( + Wrappers.lambdaQuery() + .eq(DevicePointRulesDO::getDeviceId, device.getId()).orderByDesc(DevicePointRulesDO::getCreateTime)); + + if (CollectionUtils.isEmpty(devicePointRulesDOList)) { + log.debug("设备 {} 未配置规则", device.getId()); + return; + } + + // 2. 遍历规则 + for (DevicePointRulesDO devicePointRulesDO : devicePointRulesDOList) { + if (StringUtils.isBlank(devicePointRulesDO.getFieldRule())) { + continue; + } + + // 3. 解析规则列表 + List pointRulesVOList = JSON.parseArray( + devicePointRulesDO.getFieldRule(), PointRulesRespVO.class); + + if (CollectionUtils.isEmpty(pointRulesVOList)) { + continue; + } + + // 4. 找到对应modelId的规则并进行判断 + for (PointRulesRespVO pointRulesRespVO : pointRulesVOList) { + if (pointRulesRespVO.getCode() != null && + pointRulesRespVO.getCode().equals(attributeCode)) { + + boolean matched = matchRule(processedValue, pointRulesRespVO); + + if (matched) { + log.info("规则匹配成功: modelId={}, value={}, rule={}", + attributeCode, processedValue, + JSON.toJSONString(pointRulesRespVO)); + + // 执行匹配成功后的逻辑 + handleMatchedSuccessRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode, modelId); + break; + } else { + log.debug("规则不匹配: modelId={}, value={}, rule={}", + attributeCode, processedValue, + JSON.toJSONString(pointRulesRespVO)); + // 执行匹配失败后的逻辑 + handleMatchedFailureRule(devicePointRulesDO, pointRulesRespVO, processedValue, device, attributeCode); + + } + } + } + } + } + + private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO, PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) { + //TODO 离线待优化 +// if (devicePointRulesDO.getIdentifier().equals(DeviceBasicStatusEnum.RUNNING.getCode())){ +// DeviceOperationRecordDO record = new DeviceOperationRecordDO(); +// record.setDeviceId(device.getId()); +// record.setModelId(modelId); +// record.setRule(pointRulesRespVO.getRule()); +// record.setAddressValue(processedValue); +// record.setRecordType(getRecordType(devicePointRulesDO)); +// record.setRuleId(devicePointRulesDO.getId()); +// +// } + + + } + + private void handleMatchedSuccessRule(DevicePointRulesDO devicePointRulesDO, + PointRulesRespVO pointRulesRespVO, + String processedValue, + DeviceDO device, + String attributeCode, + Long modelId) { + DeviceContactModelDO deviceContactModelDO = deviceContactModelMapper.selectById(modelId); + if (deviceContactModelDO == null) { + return; + } + + + //分别处理运行记录和告警记录 + if (StringUtils.isBlank(devicePointRulesDO.getAlarmLevel())) { + DeviceOperationRecordDO record = new DeviceOperationRecordDO(); + record.setDeviceId(device.getId()); + record.setModelId(modelId); + record.setRule(pointRulesRespVO.getRule()); + record.setAddressValue(processedValue); + record.setRecordType(getRecordType(devicePointRulesDO)); + record.setRuleId(devicePointRulesDO.getId()); + //TODO 创建人和更新人为内置默认管理员 + record.setCreator("1"); + record.setUpdater("1"); + + // 处理累计时间 + calculateAndSetTotalTime(record, pointRulesRespVO.getRule(), device.getSampleCycle()); + + deviceOperationRecordMapper.insert(record); + } else { + + DeviceWarinningRecordDO deviceWarinningRecordDO = new DeviceWarinningRecordDO(); + deviceWarinningRecordDO.setDeviceId(device.getId()); + deviceWarinningRecordDO.setModelId(modelId); + deviceWarinningRecordDO.setRule(pointRulesRespVO.getRule()); + deviceWarinningRecordDO.setAlarmLevel(devicePointRulesDO.getAlarmLevel()); + deviceWarinningRecordDO.setAddressValue(processedValue); + deviceWarinningRecordDO.setRuleId(devicePointRulesDO.getId()); + deviceWarinningRecordDO.setDeviceName(device.getDeviceName()); + deviceWarinningRecordDO.setModelName(deviceContactModelDO.getAttributeName()); + deviceWarinningRecordDO.setRuleName(devicePointRulesDO.getFieldName()); + //TODO 创建人和更新人为内置默认管理员 + deviceWarinningRecordDO.setCreator("1"); + deviceWarinningRecordDO.setUpdater("1"); + deviceWarinningRecordMapper.insert(deviceWarinningRecordDO); } } + + private void calculateAndSetTotalTime(DeviceOperationRecordDO record, String ruleCode, Double sampleCycle) { + if (!isTimeRelatedStatus(ruleCode)) { + return; + } + +// DeviceOperationRecordDO lastRecord = deviceOperationRecordMapper.selectOne( +// Wrappers.lambdaQuery() +// .eq(DeviceOperationRecordDO::getRule, ruleCode) +// .orderByDesc(DeviceOperationRecordDO::getCreateTime) +// .last("LIMIT 1") +// ); + + if (ruleCode.equals(DeviceStatusEnum.RUNNING.getCode())) { +// Double totalTime = (lastRecord != null && lastRecord.getTotalRunningTime() != null) +// ? lastRecord.getTotalRunningTime() + sampleCycle +// : sampleCycle; + record.setTotalRunningTime(sampleCycle); + + } else if (ruleCode.equals(DeviceStatusEnum.STANDBY.getCode())) { +// Double totalTime = (lastRecord != null && lastRecord.getTotalStandbyTime() != null) +// ? lastRecord.getTotalStandbyTime() + sampleCycle +// : sampleCycle; + record.setTotalStandbyTime(sampleCycle); + + } else if (ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode())) { +// Double totalTime = (lastRecord != null && lastRecord.getTotalFaultTime() != null) +// ? lastRecord.getTotalFaultTime() + sampleCycle +// : sampleCycle; + record.setTotalFaultTime(sampleCycle); + + } else if (ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode())) { +// Double totalTime = (lastRecord != null && lastRecord.getTotalWarningTime() != null) +// ? lastRecord.getTotalWarningTime() + sampleCycle +// : sampleCycle; + record.setTotalWarningTime(sampleCycle); + } + } + + private Integer getRecordType(DevicePointRulesDO devicePointRulesDO) { + return devicePointRulesDO.getIdentifier() + .equals(DeviceBasicStatusEnum.RUNNING.getDescription()) + ? Integer.parseInt(DeviceBasicStatusEnum.RUNNING.getCode()) + : Integer.parseInt(DeviceBasicStatusEnum.ALARM.getCode()); + } + + private boolean isTimeRelatedStatus(String ruleCode) { + return ruleCode.equals(DeviceStatusEnum.RUNNING.getCode()) || + ruleCode.equals(DeviceStatusEnum.STANDBY.getCode()) || + ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode()) || + ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode()); + } + + + /** + * 判断值是否符合规则 + * 支持操作符: EQ(等于), NE(不等于), GT(大于), GE(大于等于), + * LT(小于), LE(小于等于), TRUE(为真), FALSE(为假) + */ + private boolean matchRule(String value, PointRulesRespVO rule) { + if (StringUtils.isBlank(value) || rule == null || + StringUtils.isBlank(rule.getOperator())) { + return false; + } + + try { + String operator = rule.getOperator().toUpperCase(); + String inputValue = value.trim().toLowerCase(); + String ruleValue = StringUtils.trimToEmpty(rule.getOperatorRule()); + + // 1. 处理布尔值判断 + if ("TRUE".equals(operator) || "FALSE".equals(operator)) { + return matchBooleanRule(inputValue, operator); + } + + // 2. 如果operatorRule为空,且不是布尔操作符,则返回false + if (StringUtils.isBlank(ruleValue)) { + log.warn("规则比较值为空,但操作符不是布尔类型: operator={}", operator); + return false; + } + + ruleValue = ruleValue.trim(); + + // 3. 尝试数值比较 + if (isNumeric(inputValue) && isNumeric(ruleValue)) { + Double num1 = Double.parseDouble(inputValue); + Double num2 = Double.parseDouble(ruleValue); + + return compareNumbers(num1, num2, operator); + } + // 4. 字符串比较 + else { + return compareStrings(inputValue, ruleValue, operator); + } + + } catch (Exception e) { + log.error("规则匹配异常: value={}, rule={}, error={}", + value, JSON.toJSONString(rule), e.getMessage()); + return false; + } + } + + /** + * 处理布尔值判断 + */ + private boolean matchBooleanRule(String value, String operator) { + // 常见布尔值表示 + boolean booleanValue = parseBoolean(value); + + if ("TRUE".equals(operator)) { + return booleanValue; + } else if ("FALSE".equals(operator)) { + return !booleanValue; + } + + return false; + } + + /** + * 解析字符串为布尔值 + * 支持: true, false, 1, 0, yes, no, on, off等 + */ + private boolean parseBoolean(String value) { + if (StringUtils.isBlank(value)) { + return false; + } + + String lowerValue = value.toLowerCase(); + + // 常见真值表示 + if ("true".equals(lowerValue) || + "1".equals(lowerValue) || + "yes".equals(lowerValue) || + "on".equals(lowerValue) || + "是".equals(lowerValue) || // 中文支持 + "成功".equals(lowerValue)) { + return true; + } + + // 常见假值表示 + if ("false".equals(lowerValue) || + "0".equals(lowerValue) || + "no".equals(lowerValue) || + "off".equals(lowerValue) || + "否".equals(lowerValue) || // 中文支持 + "失败".equals(lowerValue)) { + return false; + } + + // 尝试转换为布尔值 + try { + return Boolean.parseBoolean(lowerValue); + } catch (Exception e) { + log.warn("无法解析为布尔值: {}", value); + return false; + } + } + + /** + * 数值比较 + */ + private boolean compareNumbers(Double value, Double ruleValue, String operator) { + switch (operator) { + case "EQ": + return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度 + case "NE": + return Math.abs(value - ruleValue) >= 0.000001; + case "GT": + return value > ruleValue; + case "GE": + return value >= ruleValue; + case "LT": + return value < ruleValue; + case "LE": + return value <= ruleValue; + default: + log.warn("不支持的操作符: {}", operator); + return false; + } + } + + /** + * 字符串比较 + */ + private boolean compareStrings(String value, String ruleValue, String operator) { + switch (operator) { + case "EQ": + return value.equals(ruleValue); + case "NE": + return !value.equals(ruleValue); + case "GT": + return value.compareTo(ruleValue) > 0; + case "GE": + return value.compareTo(ruleValue) >= 0; + case "LT": + return value.compareTo(ruleValue) < 0; + case "LE": + return value.compareTo(ruleValue) <= 0; + default: + log.warn("不支持的操作符: {}", operator); + return false; + } + } + + /** + * 判断字符串是否为数字 + */ + private boolean isNumeric(String str) { + if (StringUtils.isBlank(str)) { + return false; + } + try { + Double.parseDouble(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java index 0ff9e2f39..03b84cf4b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/impl/AsyncService.java @@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.service.device.DeviceService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @@ -20,6 +21,7 @@ import java.util.Map; @EnableScheduling public class AsyncService { @Resource + @Lazy private DeviceService deviceService; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceService.java index a91ba835c..ca2e3a1d4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceService.java @@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import com.fasterxml.jackson.core.JsonProcessingException; +import org.eclipse.paho.client.mqttv3.MqttException; import javax.validation.Valid; import java.util.Collection; @@ -140,4 +141,8 @@ public interface DeviceService { List getDeviceAttributeList(Long deviceId); List deviceLedgerList(); + + void updateDeviceEnabled(@Valid DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException; + + DeviceDO getDeviceByMqttTopic(String topic); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index ebcd4842d..51a6c209b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -19,6 +19,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceMod import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelrules.DeviceModelRulesDO; import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.gateway.GatewayDO; import cn.iocoder.yudao.module.iot.dal.dataobject.mqttdatarecord.MqttDataRecordDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; @@ -29,12 +30,16 @@ import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelattribute.DeviceModelAtt import cn.iocoder.yudao.module.iot.dal.mysql.devicemodelrules.DeviceModelRulesMapper; import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper; import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.gateway.GatewayMapper; import cn.iocoder.yudao.module.iot.dal.mysql.mqttdatarecord.MqttDataRecordMapper; import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceAttributeMapper; +import cn.iocoder.yudao.module.iot.framework.mqtt.consumer.IMqttservice; +import cn.iocoder.yudao.module.iot.service.gateway.GatewayService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -43,6 +48,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; @@ -50,6 +56,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; import javax.annotation.Resource; +import javax.validation.constraints.NotNull; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.LocalDateTime; @@ -108,6 +115,14 @@ public class DeviceServiceImpl implements DeviceService { @Resource private DeviceOperationRecordMapper deviceOperationRecordMapper; + @Resource + private IMqttservice mqttService; + +// @Resource +// private GatewayService gatewayService; + + @Resource + private GatewayMapper gatewayMapper; @@ -283,28 +298,59 @@ public class DeviceServiceImpl implements DeviceService { @Override public PageResult getDevicePage(DevicePageReqVO pageReqVO) { + // 1. 查询分页设备 PageResult deviceDOPageResult = deviceMapper.selectPage(pageReqVO); - PageResult deviceRespVOPageResult = BeanUtils.toBean(deviceDOPageResult, DeviceRespVO.class); + List deviceIds = deviceDOPageResult.getList().stream() + .map(DeviceDO::getId) + .collect(Collectors.toList()); + + // 2. 批量获取 TDengine 最新 ts + Map latestTsMap = tdengineService.selectLatestTsBatch(deviceIds); + // 3. 批量获取最新的 DeviceOperationRecord List ruleCodes = Arrays.stream(DeviceStatusEnum.values()) .map(DeviceStatusEnum::getCode) .collect(Collectors.toList()); + List operationRecords = deviceOperationRecordMapper.selectList( + Wrappers.lambdaQuery() + .in(DeviceOperationRecordDO::getDeviceId, deviceIds) + .in(DeviceOperationRecordDO::getRule, ruleCodes) + .orderByDesc(DeviceOperationRecordDO::getCreateTime) + ); + + // 按 deviceId 分组,取最新一条 + Map latestRecordMap = operationRecords.stream() + .collect(Collectors.toMap( + DeviceOperationRecordDO::getDeviceId, + r -> r, + (r1, r2) -> r1.getCreateTime().isAfter(r2.getCreateTime()) ? r1 : r2 + )); + + // 4. 转换分页 DTO + PageResult deviceRespVOPageResult = BeanUtils.toBean(deviceDOPageResult, DeviceRespVO.class); + for (DeviceRespVO deviceRespVO : deviceRespVOPageResult.getList()) { + Long deviceId = deviceRespVO.getId(); - DeviceOperationRecordDO deviceOperationRecordDO = deviceOperationRecordMapper.selectOne(Wrappers.lambdaQuery() - .eq(DeviceOperationRecordDO::getDeviceId, deviceRespVO.getId()) - .in(DeviceOperationRecordDO::getRule, ruleCodes) - .orderByDesc(DeviceOperationRecordDO::getCreateTime) - .last("LIMIT 1")); - if(deviceOperationRecordDO !=null){ - deviceRespVO.setOperatingStatus(DeviceStatusEnum.getByCode(deviceOperationRecordDO.getRule()).getName()); - deviceRespVO.setCollectionTime(deviceOperationRecordDO.getCreateTime()); - }else { + // 设置最新 ts + if(latestTsMap.get(deviceId) != null){ + deviceRespVO.setCollectionTime(latestTsMap.get(deviceId)); + + } + + // 设置运行状态 + DeviceOperationRecordDO record = latestRecordMap.get(deviceId); + if (record != null) { + deviceRespVO.setOperatingStatus(DeviceStatusEnum.getByCode(record.getRule()).getName()); + } else { deviceRespVO.setOperatingStatus(DeviceStatusEnum.OFFLINE.getName()); } + + } + return deviceRespVOPageResult; } @@ -502,7 +548,7 @@ public class DeviceServiceImpl implements DeviceService { DeviceDO deviceDO = validateConnectRequest(createReqVO); if (Objects.equals(createReqVO.getIsConnect(), DeviceConnectionStatusEnum.CONNECTED.getStatus())){ - boolean connected = OpcUtils.connect(deviceDO.getUrl(),deviceDO.getUsername(),deviceDO.getPassword(),10); + boolean connected = OpcUtils.connect(deviceDO.getId(),deviceDO.getUrl(),deviceDO.getUsername(),deviceDO.getPassword(),10); if (connected){ deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus())); deviceMapper.updateById(deviceDO); @@ -519,7 +565,7 @@ public class DeviceServiceImpl implements DeviceService { throw exception(OPC_CONNECT_FAILURE_DOES_NOT_EXIST); } }else if(Objects.equals(createReqVO.getIsConnect(), DeviceConnectionStatusEnum.DISCONNECTED.getStatus())){ - boolean disconnect = OpcUtils.disconnect(); + boolean disconnect = OpcUtils.disconnect(deviceDO.getId()); if (disconnect){ //更新连接状态 deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus())); @@ -1011,6 +1057,124 @@ public class DeviceServiceImpl implements DeviceService { .orderByDesc(DeviceContactModelDO::getId)); } + @Override + public void updateDeviceEnabled(DeviceUpdateEnabledReqVO updateEnabledReqVO) throws MqttException { + + // 1. 校验任务是否存在 + DeviceDO deviceDO = deviceMapper.selectById(updateEnabledReqVO.getId()); + if (deviceDO == null) { + throw exception(DEVICE_NOT_EXISTS); + } + if (StringUtils.isBlank(deviceDO.getTopic())) { + throw exception(DEVICE_MQTT_TOPIC_EXIST); + } + + //TODO 待优化 + if (!"MQTT".equals(deviceDO.getProtocol())) { + throw exception(DEVICE_MQTT_TOPIC_EXIST); + } + + // 2. 如果状态没有变化,直接返回 + if (Objects.equals(deviceDO.getIsEnable(), updateEnabledReqVO.getEnabled())) { + return; + } + + // 3. 执行状态更新操作 + executeEnableUpdate(deviceDO, updateEnabledReqVO.getEnabled()); + + + + } + + @Override + public DeviceDO getDeviceByMqttTopic(String topic) { + return deviceMapper.selectOne(Wrappers.lambdaQuery().eq(DeviceDO::getTopic,topic)); + } + + private void executeEnableUpdate(DeviceDO deviceDO, Boolean enabled) { + + try { + // 1. 更新设备启用状态 + deviceDO.setIsEnable(enabled); + deviceMapper.updateById(deviceDO); + + String topic = deviceDO.getTopic(); + + // 2. 启用设备 + if (enabled) { + + // 2.1 MQTT订阅 + try { + mqttService.subscribeTopic(topic); + log.info("MQTT订阅成功: {}", topic); + } catch (MqttException e) { + log.error("MQTT订阅失败: {}", topic, e); + } + + // 2.2 保存到iot_gateway表-用于重启后自动订阅 + GatewayDO gateway = gatewayMapper.selectOne( + new LambdaQueryWrapper() + .eq(GatewayDO::getTopic, topic) + .last("LIMIT 1") + ); + + if (gateway == null) { + + gateway = new GatewayDO(); + gateway.setGatewayName(deviceDO.getDeviceName()); + gateway.setGatewayCode(deviceDO.getDeviceCode()); + gateway.setTopic(topic); + gateway.setIsEnable(true); + gateway.setDeleted(false); + + gatewayMapper.insert(gateway); + + log.info("新增gateway订阅记录成功 topic={}", topic); + + } else { + + gateway.setIsEnable(true); + gateway.setUpdateTime(LocalDateTime.now()); + + gatewayMapper.updateById(gateway); + + log.info("更新gateway启用状态 topic={}", topic); + } + + } + + // 3. 禁用设备 + else { + + // 3.1 MQTT取消订阅 + try { + mqttService.unsubscribeTopic(topic); + log.info("MQTT取消订阅成功: {}", topic); + } catch (MqttException e) { + log.error("MQTT取消订阅失败: {}", topic, e); + } + + // 3.2 更新gateway状态为禁用 + gatewayMapper.update( + null, + new LambdaUpdateWrapper() + .eq(GatewayDO::getTopic, topic) + .set(GatewayDO::getIsEnable, false) + .set(GatewayDO::getUpdateTime, LocalDateTime.now()) + ); + + log.info("gateway订阅记录已禁用 topic={}", topic); + } + + } catch (Exception e) { + + log.error("更新设备状态失败 deviceCode={}", deviceDO.getDeviceCode(), e); + + } + } + + + @Override public List deviceLedgerList() { return deviceMapper.deviceLedgerList(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java index 588df44be..02893aa98 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java @@ -668,20 +668,6 @@ public class TDengineService { - - - - - - - - - - - - - - private String decodeQueryData(byte[] blob) { if (blob == null || blob.length == 0) return "[]"; @@ -701,4 +687,42 @@ public class TDengineService { } + @DS("tdengine") + public LocalDateTime selectLatestTs(Long deviceId) { + String sql = "SELECT ts FROM besure.d_" + deviceId + " ORDER BY ts DESC LIMIT 1"; + + final LocalDateTime[] latestTs = {null}; + + jdbcTemplate.query(sql, rs -> { + if (rs.next()) { + latestTs[0] = rs.getTimestamp("ts").toLocalDateTime(); + } + }); + + return latestTs[0]; + } + + + @DS("tdengine") + public Map selectLatestTsBatch(List deviceIds) { + Map result = new HashMap<>(); + + for (Long deviceId : deviceIds) { + String tableName = "besure.d_" + deviceId; + String sql = "SELECT ts FROM " + tableName + " ORDER BY ts DESC LIMIT 1"; + + try { + LocalDateTime ts = jdbcTemplate.queryForObject(sql, (rs, rowNum) -> rs.getTimestamp("ts").toLocalDateTime()); + result.put(deviceId, ts); + } catch (Exception e) { + // 表不存在或者查询失败 + result.put(deviceId, null); + log.warn("设备: {}, 获取 ts 失败: {}", deviceId, e.getMessage()); + } + } + + return result; + } + + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeService.java index b0250e667..386c00bef 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeService.java @@ -84,4 +84,5 @@ public interface RecipeDeviceAttributeService { */ PageResult selectPageWithAttribute(RecipeDeviceAttributePageReqVO reqVO); + List getList(RecipeDeviceAttributePageReqVO reqVO); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeServiceImpl.java index 227719a12..04ac8d904 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipedeviceattribute/RecipeDeviceAttributeServiceImpl.java @@ -156,6 +156,13 @@ public class RecipeDeviceAttributeServiceImpl implements RecipeDeviceAttributeSe return recipeDeviceAttributeMapper.selectPageWithAttributeWrap(reqVO); } + @Override + public List getList(RecipeDeviceAttributePageReqVO reqVO) { + return recipeDeviceAttributeMapper.getList(reqVO); + + + } + // @Override // public List> operationAnalysisDetails(Long deviceId, String collectionStartTime, String collectionEndTime) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipeplandetail/RecipePlanDetailServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipeplandetail/RecipePlanDetailServiceImpl.java index 1aeec0594..0336dcdaa 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipeplandetail/RecipePlanDetailServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipeplandetail/RecipePlanDetailServiceImpl.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.recipeplandetail; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import org.springframework.stereotype.Service; import javax.annotation.Resource; import org.springframework.validation.annotation.Validated; @@ -29,12 +30,20 @@ public class RecipePlanDetailServiceImpl implements RecipePlanDetailService { @Override public Long createRecipePlanDetail(RecipePlanDetailSaveReqVO createReqVO) { + // 校验编码是否存在 + validateCodeOnly(createReqVO.getCode()); // 插入 RecipePlanDetailDO recipePlanDetail = BeanUtils.toBean(createReqVO, RecipePlanDetailDO.class); recipePlanDetailMapper.insert(recipePlanDetail); // 返回 return recipePlanDetail.getId(); } + private void validateCodeOnly(String code) { + if (recipePlanDetailMapper.exists(Wrappers.lambdaQuery() + .eq(RecipePlanDetailDO::getCode, code))) { + throw exception(RECIPE_PLAN_DETAIL_CODE_EXISTS); + } + } @Override public void updateRecipePlanDetail(RecipePlanDetailSaveReqVO updateReqVO) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipepointrecord/RecipePointRecordServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipepointrecord/RecipePointRecordServiceImpl.java index a07dbee01..7f19a6fdd 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipepointrecord/RecipePointRecordServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/recipepointrecord/RecipePointRecordServiceImpl.java @@ -37,7 +37,6 @@ public class RecipePointRecordServiceImpl implements RecipePointRecordService { recipePointRecordMapper.deleteByIds(recipePointRecordDOS); } - // 插入 RecipePointRecordDO recipePointRecord = BeanUtils.toBean(createReqVO, RecipePointRecordDO.class); recipePointRecordMapper.insert(recipePointRecord); diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/task/TaskController.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/task/TaskController.java index 3c2f8a333..06127cfc8 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/task/TaskController.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/task/TaskController.java @@ -186,7 +186,7 @@ public class TaskController { @Operation(summary = "获得生产任务单汇总明细") @Parameter(name = "taskId", description = "task ID") @PreAuthorize("@ss.hasPermission('mes:task:query')") - public CommonResult> getTaskDetailSummary(@RequestParam("taskId") Long taskId) { + public CommonResult> getTaskDetailSummary(@RequestParam("taskId") Long taskId) { return success(taskService.getTaskProductSummaryList(taskId)); } } \ No newline at end of file diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataService.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataService.java index fc2498f1b..1c9df16c8 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataService.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataService.java @@ -107,4 +107,14 @@ public interface DictDataService { */ List getDictDataListByDictType(String dictType); + + /** + * 根据label获取字典数据 + * + * @param label 字典类型 + * @return 字典数据 + */ + DictDataDO getDictDataByDictLabel(String label); + + } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataServiceImpl.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataServiceImpl.java index e9f215de8..77ab82114 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataServiceImpl.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/service/dict/DictDataServiceImpl.java @@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.system.controller.admin.dict.vo.data.DictDataSave import cn.iocoder.yudao.module.system.dal.dataobject.dict.DictDataDO; import cn.iocoder.yudao.module.system.dal.dataobject.dict.DictTypeDO; import cn.iocoder.yudao.module.system.dal.mysql.dict.DictDataMapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -176,4 +177,13 @@ public class DictDataServiceImpl implements DictDataService { return list; } + @Override + public DictDataDO getDictDataByDictLabel(String label) { + + return dictDataMapper.selectOne(Wrappers.lambdaQuery() + .eq(DictDataDO::getLabel,label) + .orderByDesc(DictDataDO::getCreateTime) + .last("LIMIT 1")); + } + } diff --git a/yudao-server/src/main/resources/application-dev.yaml b/yudao-server/src/main/resources/application-dev.yaml index 5ce1e071f..9cbb68f46 100644 --- a/yudao-server/src/main/resources/application-dev.yaml +++ b/yudao-server/src/main/resources/application-dev.yaml @@ -262,3 +262,15 @@ justauth: prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE:: timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟 + + +emqx: + is-enable: true # 是否启用 MQTT + broker: tcp://192.168.5.119:1883 # EMQX 服务器地址(TCP 协议) + client-id: mqtt-client-besure_server-dev # 客户端ID + user-name: ngsk # 用户名 + password: ngskcloud0809 # 密码 + clean-session: true # 是否清空 session + reconnect: true # 是否自动断线重连 + timeout: 30 # 连接超时时间(秒) + keep-alive: 60 # 心跳间隔(秒) \ No newline at end of file diff --git a/yudao-server/src/main/resources/application-prod.yaml b/yudao-server/src/main/resources/application-prod.yaml index 4d1e81fcd..43dbb1956 100644 --- a/yudao-server/src/main/resources/application-prod.yaml +++ b/yudao-server/src/main/resources/application-prod.yaml @@ -262,3 +262,14 @@ justauth: prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE:: timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟 + +emqx: + is-enable: true # 是否启用 MQTT + broker: tcp://192.168.21.2:1883 # EMQX 服务器地址(TCP 协议) + client-id: mqtt-client-besure-server-prod # 客户端ID + user-name: admin # 用户名 + password: admin # 密码 + clean-session: true # 是否清空 session + reconnect: true # 是否自动断线重连 + timeout: 30 # 连接超时时间(秒) + keep-alive: 60 # 心跳间隔(秒) \ No newline at end of file