fix:修改数据采集Td数据库结构存储
parent
ebb6014659
commit
e510a3b976
@ -1,136 +0,0 @@
|
|||||||
package cn.iocoder.yudao.module.iot.controller.admin.device;
|
|
||||||
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
|
||||||
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
|
|
||||||
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
|
|
||||||
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
|
|
||||||
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.*;
|
|
||||||
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
|
||||||
import org.eclipse.milo.opcua.stack.core.types.structured.*;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class OpcUaSubscriptionService {
|
|
||||||
|
|
||||||
private static final String URL = "opc.tcp://192.168.21.5:4840";
|
|
||||||
private static final String USERNAME = "bst";
|
|
||||||
private static final String PASSWORD = "Bst123456";
|
|
||||||
|
|
||||||
private OpcUaClient client;
|
|
||||||
private UaSubscription subscription;
|
|
||||||
|
|
||||||
// 保存已订阅的节点
|
|
||||||
private final ConcurrentHashMap<String, UaMonitoredItem> monitoredItems = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 订阅 OPC UA 节点变化
|
|
||||||
*/
|
|
||||||
public void subscribeNode(String nodeAddress, int samplingMillis) throws Exception {
|
|
||||||
|
|
||||||
if (client == null) {
|
|
||||||
connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subscription == null) {
|
|
||||||
subscription = client.getSubscriptionManager()
|
|
||||||
.createSubscription((double) samplingMillis)
|
|
||||||
.get();
|
|
||||||
log.info("创建订阅,采样间隔: {} ms", samplingMillis);
|
|
||||||
}
|
|
||||||
|
|
||||||
NodeId nodeId = NodeId.parse(nodeAddress);
|
|
||||||
|
|
||||||
ReadValueId readValueId = new ReadValueId(
|
|
||||||
nodeId,
|
|
||||||
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger.valueOf(13), // Value 属性
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
|
|
||||||
MonitoringParameters parameters = new MonitoringParameters(
|
|
||||||
uint(nodeId.hashCode()),
|
|
||||||
(double) samplingMillis,
|
|
||||||
null,
|
|
||||||
uint(10),
|
|
||||||
true
|
|
||||||
);
|
|
||||||
|
|
||||||
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
|
|
||||||
readValueId,
|
|
||||||
MonitoringMode.Reporting,
|
|
||||||
parameters
|
|
||||||
);
|
|
||||||
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
|
|
||||||
requests.add(request);
|
|
||||||
List<UaMonitoredItem> items = subscription.createMonitoredItems(
|
|
||||||
TimestampsToReturn.Both,
|
|
||||||
requests
|
|
||||||
).get();
|
|
||||||
|
|
||||||
UaMonitoredItem item = items.get(0);
|
|
||||||
item.setValueConsumer((monitoredItem, value) -> {
|
|
||||||
Variant variant = value.getValue();
|
|
||||||
Object v = variant != null ? variant.getValue() : null;
|
|
||||||
log.info("节点 {} 值变化: {}", nodeAddress, v);
|
|
||||||
});
|
|
||||||
|
|
||||||
monitoredItems.put(nodeAddress, item);
|
|
||||||
|
|
||||||
log.info("成功订阅节点: {}", nodeAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void connect() throws Exception {
|
|
||||||
IdentityProvider identity = new UsernameProvider(USERNAME, PASSWORD);
|
|
||||||
|
|
||||||
// 1. 获取端点
|
|
||||||
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(URL)
|
|
||||||
.get(5, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
EndpointDescription endpoint = endpoints.stream()
|
|
||||||
.filter(e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri()))
|
|
||||||
.findFirst()
|
|
||||||
.orElseThrow(() -> new RuntimeException("未找到 SecurityPolicy=None 的端点"));
|
|
||||||
|
|
||||||
// 2. 构建客户端配置
|
|
||||||
OpcUaClientConfig config = OpcUaClientConfig.builder()
|
|
||||||
.setEndpoint(endpoint)
|
|
||||||
.setIdentityProvider(identity)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 3. 创建客户端并连接
|
|
||||||
client = OpcUaClient.create(config);
|
|
||||||
client.connect().get();
|
|
||||||
|
|
||||||
log.info("成功连接 OPC UA 服务端: {}", URL);
|
|
||||||
}
|
|
||||||
|
|
||||||
// /**
|
|
||||||
// * 断开连接
|
|
||||||
// */
|
|
||||||
// public void disconnect() {
|
|
||||||
// if (client != null) {
|
|
||||||
// try {
|
|
||||||
// client.disconnect().get();
|
|
||||||
// log.info("断开 OPC UA 连接");
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// log.error("断开 OPC UA 连接失败", e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
@ -0,0 +1,136 @@
|
|||||||
|
package cn.iocoder.yudao.module.iot.controller.admin.device.enums;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java 基本类型对应 TDengine 列类型枚举
|
||||||
|
*/
|
||||||
|
public enum JavaToTdengineTypeEnum {
|
||||||
|
|
||||||
|
BYTE("byte", "TINYINT") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Byte.parseByte(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
SHORT("short", "SMALLINT") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Short.parseShort(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
INT("int", "INT") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Integer.parseInt(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
LONG("long", "BIGINT") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Long.parseLong(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
FLOAT("float", "FLOAT") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Float.parseFloat(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
DOUBLE("double", "DOUBLE") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Double.parseDouble(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
BOOLEAN("boolean", "BOOL") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return Boolean.parseBoolean(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
CHAR("char", "NCHAR(1)") {
|
||||||
|
@Override
|
||||||
|
public Object convertValue(String value) {
|
||||||
|
return value.charAt(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private final String javaType;
|
||||||
|
private final String tdType;
|
||||||
|
|
||||||
|
JavaToTdengineTypeEnum(String javaType, String tdType) {
|
||||||
|
this.javaType = javaType;
|
||||||
|
this.tdType = tdType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJavaType() {
|
||||||
|
return javaType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTdType() {
|
||||||
|
return tdType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 每个枚举实现自己的值转换
|
||||||
|
*/
|
||||||
|
public abstract Object convertValue(String value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据 javaType 获取枚举
|
||||||
|
*/
|
||||||
|
public static JavaToTdengineTypeEnum fromJavaType(String javaType) {
|
||||||
|
|
||||||
|
if (javaType == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (JavaToTdengineTypeEnum e : values()) {
|
||||||
|
|
||||||
|
if (e.javaType.equalsIgnoreCase(javaType)) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据 javaType 获取 TDengine 类型
|
||||||
|
*/
|
||||||
|
public static String getTdTypeByJavaType(String javaType) {
|
||||||
|
|
||||||
|
JavaToTdengineTypeEnum e = fromJavaType(javaType);
|
||||||
|
|
||||||
|
return e != null ? e.getTdType() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据 javaType 转换 value
|
||||||
|
*/
|
||||||
|
public static Object convertValue(String javaType, String value) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
JavaToTdengineTypeEnum e = fromJavaType(javaType);
|
||||||
|
|
||||||
|
if (e == null) {
|
||||||
|
return Double.parseDouble(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.convertValue(value);
|
||||||
|
|
||||||
|
} catch (Exception ex) {
|
||||||
|
|
||||||
|
throw new RuntimeException(
|
||||||
|
"数据类型转换失败: javaType=" + javaType + ", value=" + value, ex
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,14 +0,0 @@
|
|||||||
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.opcuv;
|
|
||||||
|
|
||||||
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class OpcShutdown {
|
|
||||||
@PreDestroy
|
|
||||||
public void shutdown() {
|
|
||||||
OpcUtils.disconnectAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue