|
|
|
|
@ -2,16 +2,27 @@
|
|
|
|
|
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask;
|
|
|
|
|
|
|
|
|
|
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceBasicStatusEnum;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.devicemodelrules.vo.PointRulesRespVO;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.deviceoperationrecord.DeviceOperationRecordDO;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.devicepointrules.DevicePointRulesDO;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.mysql.deviceoperationrecord.DeviceOperationRecordMapper;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.dal.mysql.devicepointrules.DevicePointRulesMapper;
|
|
|
|
|
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
@ -21,6 +32,11 @@ import java.text.SimpleDateFormat;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Date;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
@Component
|
|
|
|
|
@Slf4j
|
|
|
|
|
@ -36,13 +52,51 @@ public class DeviceTask implements Task {
|
|
|
|
|
@Resource
|
|
|
|
|
private TDengineService tDengineService;
|
|
|
|
|
|
|
|
|
|
@Resource
|
|
|
|
|
private DevicePointRulesMapper devicePointRulesMapper;
|
|
|
|
|
|
|
|
|
|
@Resource
|
|
|
|
|
private DeviceOperationRecordMapper deviceOperationRecordMapper;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String getTaskType() {
|
|
|
|
|
return TaskTypeEnum.DEVICE.getCode();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// static {
|
|
|
|
|
// // 定期清理过期连接
|
|
|
|
|
// ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
|
|
// cleanupExecutor.scheduleAtFixedRate(DeviceTask::cleanupExpiredConnections,
|
|
|
|
|
// CLEANUP_INTERVAL, CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // 会话信息
|
|
|
|
|
// private static class SessionInfo {
|
|
|
|
|
// long lastUsedTime;
|
|
|
|
|
// String deviceUrl;
|
|
|
|
|
// String username;
|
|
|
|
|
//
|
|
|
|
|
// SessionInfo(String deviceUrl, String username) {
|
|
|
|
|
// this.deviceUrl = deviceUrl;
|
|
|
|
|
// this.username = username;
|
|
|
|
|
// this.lastUsedTime = System.currentTimeMillis();
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// boolean isExpired() {
|
|
|
|
|
// return System.currentTimeMillis() - lastUsedTime > CONNECTION_TIMEOUT;
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// void updateLastUsed() {
|
|
|
|
|
// this.lastUsedTime = System.currentTimeMillis();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void execute(Long taskId, String taskParam) {
|
|
|
|
|
Long deviceId = taskId - 1000000L;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
String currentTime = sdf.format(new Date());
|
|
|
|
|
@ -62,6 +116,9 @@ public class DeviceTask implements Task {
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("执行设备任务异常,任务ID: {}", taskId, e);
|
|
|
|
|
} finally {
|
|
|
|
|
//确保出问题不会打满opcv服务器连接数
|
|
|
|
|
OpcUtils.disconnect();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -69,128 +126,560 @@ public class DeviceTask implements Task {
|
|
|
|
|
* 具体的设备执行逻辑
|
|
|
|
|
*/
|
|
|
|
|
private void executeDeviceLogic(Long sourceDeviceId, String param) {
|
|
|
|
|
logger.info("执行设备 {} 的具体逻辑,参数: {}", sourceDeviceId, param);
|
|
|
|
|
logger.info("执行设备逻辑,源设备ID: {},参数: {}", sourceDeviceId, param);
|
|
|
|
|
|
|
|
|
|
// 1. 计算实际设备ID
|
|
|
|
|
Long deviceId = sourceDeviceId - 1000000L;
|
|
|
|
|
logger.info("处理后id:{} ", deviceId );
|
|
|
|
|
logger.info("处理后设备ID: {}", deviceId);
|
|
|
|
|
|
|
|
|
|
// 1. 参数校验
|
|
|
|
|
if (deviceId == null){
|
|
|
|
|
logger.error("设备ID不能为空");
|
|
|
|
|
if (deviceId == null) {
|
|
|
|
|
throw new RuntimeException("设备ID不能为空");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. 查询设备信息
|
|
|
|
|
DeviceDO deviceDO = deviceMapper.selectById(deviceId);
|
|
|
|
|
if (deviceDO == null) {
|
|
|
|
|
logger.error("设备不存在,设备ID: {}", deviceId);
|
|
|
|
|
throw new RuntimeException("设备不存在,设备ID: " + deviceId);
|
|
|
|
|
// 2. 获取设备信息
|
|
|
|
|
DeviceDO device = getDeviceInfo(deviceId);
|
|
|
|
|
|
|
|
|
|
// 3. 连接OPC服务器
|
|
|
|
|
connectOpcServer(device);
|
|
|
|
|
|
|
|
|
|
// 4. 处理数据读取和入库
|
|
|
|
|
processDeviceData(deviceId, device);
|
|
|
|
|
|
|
|
|
|
// 5. 断开连接
|
|
|
|
|
OpcUtils.disconnect();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 具体的设备执行逻辑
|
|
|
|
|
*/
|
|
|
|
|
// private void executeDeviceLogic(Long sourceDeviceId, String param) {
|
|
|
|
|
// logger.info("执行设备逻辑,源设备ID: {},参数: {}", sourceDeviceId, param);
|
|
|
|
|
//
|
|
|
|
|
// // 1. 计算实际设备ID
|
|
|
|
|
// Long deviceId = sourceDeviceId - 1000000L;
|
|
|
|
|
// logger.info("处理后设备ID: {}", deviceId);
|
|
|
|
|
//
|
|
|
|
|
// if (deviceId == null) {
|
|
|
|
|
// throw new RuntimeException("设备ID不能为空");
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // 2. 获取设备信息
|
|
|
|
|
// DeviceDO device = getDeviceInfo(deviceId);
|
|
|
|
|
//
|
|
|
|
|
// // 3. 连接OPC服务器
|
|
|
|
|
// connectOpcServer(device);
|
|
|
|
|
//
|
|
|
|
|
// // 4. 处理数据读取和入库
|
|
|
|
|
// processDeviceData(deviceId, device);
|
|
|
|
|
//
|
|
|
|
|
// // 5. 断开连接
|
|
|
|
|
// OpcUtils.disconnect();
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取设备信息
|
|
|
|
|
*/
|
|
|
|
|
private DeviceDO getDeviceInfo(Long deviceId) {
|
|
|
|
|
DeviceDO device = deviceMapper.selectById(deviceId);
|
|
|
|
|
|
|
|
|
|
if (device == null) {
|
|
|
|
|
throw new RuntimeException("设备不存在,ID: " + deviceId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (deviceDO.getUrl() == null || deviceDO.getUrl().trim().isEmpty()) {
|
|
|
|
|
logger.error("设备URL不能为空,设备ID: {}", deviceId);
|
|
|
|
|
throw new RuntimeException("设备URL不能为空");
|
|
|
|
|
if (StringUtils.isBlank(device.getUrl())) {
|
|
|
|
|
throw new RuntimeException("设备URL不能为空,ID: " + deviceId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. 连接OPC服务器
|
|
|
|
|
String username = deviceDO.getUsername() != null ? deviceDO.getUsername() : "";
|
|
|
|
|
String password = deviceDO.getPassword() != null ? deviceDO.getPassword() : "";
|
|
|
|
|
return device;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
boolean connected = OpcUtils.connect(deviceDO.getUrl(), username, password, 10);
|
|
|
|
|
/**
|
|
|
|
|
* 连接OPC服务器
|
|
|
|
|
*/
|
|
|
|
|
private void connectOpcServer(DeviceDO device) {
|
|
|
|
|
String username = StringUtils.defaultString(device.getUsername());
|
|
|
|
|
String password = StringUtils.defaultString(device.getPassword());
|
|
|
|
|
|
|
|
|
|
boolean connected = OpcUtils.connect(device.getUrl(), username, password, 10);
|
|
|
|
|
if (!connected) {
|
|
|
|
|
logger.error("连接OPC服务器失败,设备ID: {},URL: {}", deviceId, deviceDO.getUrl());
|
|
|
|
|
throw new RuntimeException("连接OPC服务器失败");
|
|
|
|
|
throw new RuntimeException("连接OPC服务器失败,URL: " + device.getUrl());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. 查询设备点位配置
|
|
|
|
|
LambdaQueryWrapper<DeviceContactModelDO> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
|
|
queryWrapper.eq(DeviceContactModelDO::getDeviceId, deviceId);
|
|
|
|
|
|
|
|
|
|
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelMapper.selectList(queryWrapper);
|
|
|
|
|
|
|
|
|
|
// 5. 判断是否有点位数据
|
|
|
|
|
if (deviceContactModelDOS == null || deviceContactModelDOS.isEmpty()) {
|
|
|
|
|
logger.warn("设备 {} 没有配置数据点位,跳过数据读取", deviceId);
|
|
|
|
|
/**
|
|
|
|
|
* 处理设备数据
|
|
|
|
|
*/
|
|
|
|
|
private void processDeviceData(Long deviceId, DeviceDO device) {
|
|
|
|
|
// 1. 查询点位配置
|
|
|
|
|
List<DeviceContactModelDO> points = getDevicePoints(deviceId);
|
|
|
|
|
if (CollectionUtils.isEmpty(points)) {
|
|
|
|
|
logger.warn("设备 {} 未配置点位", deviceId);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info("设备 {} 共有 {} 个点位需要读取", deviceId, deviceContactModelDOS.size());
|
|
|
|
|
logger.info("设备 {} 需要读取 {} 个点位", deviceId, points.size());
|
|
|
|
|
|
|
|
|
|
// 6. 读取OPC数据
|
|
|
|
|
// 2. 读取并处理数据
|
|
|
|
|
int successCount = 0;
|
|
|
|
|
List<DeviceContactModelDO> validDataList = new ArrayList<>();
|
|
|
|
|
|
|
|
|
|
for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) {
|
|
|
|
|
try {
|
|
|
|
|
// 判断点位地址是否有效
|
|
|
|
|
String address = deviceContactModelDO.getAddress();
|
|
|
|
|
if (address == null || address.trim().isEmpty()) {
|
|
|
|
|
logger.warn("点位ID {} 的地址为空,跳过", deviceContactModelDO.getId());
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
for (DeviceContactModelDO point : points) {
|
|
|
|
|
processSinglePoint(point, validDataList,device);
|
|
|
|
|
if (point.getAddressValue() != null) {
|
|
|
|
|
successCount++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. 入库处理
|
|
|
|
|
if (!validDataList.isEmpty()) {
|
|
|
|
|
saveToDatabase(deviceId, validDataList, successCount);
|
|
|
|
|
} else {
|
|
|
|
|
logger.warn("设备 {} 未读取到有效数据", deviceId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取设备点位
|
|
|
|
|
*/
|
|
|
|
|
private List<DeviceContactModelDO> getDevicePoints(Long deviceId) {
|
|
|
|
|
LambdaQueryWrapper<DeviceContactModelDO> query = new LambdaQueryWrapper<>();
|
|
|
|
|
query.eq(DeviceContactModelDO::getDeviceId, deviceId);
|
|
|
|
|
return deviceContactModelMapper.selectList(query);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理单个点位
|
|
|
|
|
*/
|
|
|
|
|
private void processSinglePoint(DeviceContactModelDO point, List<DeviceContactModelDO> validDataList, DeviceDO device) {
|
|
|
|
|
try {
|
|
|
|
|
String address = StringUtils.trimToEmpty(point.getAddress());
|
|
|
|
|
if (address.isEmpty()) {
|
|
|
|
|
logger.warn("点位ID {} 地址为空", point.getId());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Object value = OpcUtils.readValue(address);
|
|
|
|
|
// if (value == null) {
|
|
|
|
|
logger.warn("读取点位 {} ,地址: {}", point.getId(), address);
|
|
|
|
|
// } else {
|
|
|
|
|
String processedValue = processOpcValue(value);
|
|
|
|
|
|
|
|
|
|
//判断规则
|
|
|
|
|
judgmentRules(processedValue,point.getAttributeCode(),device,point.getId());
|
|
|
|
|
point.setAddressValue(processedValue.isEmpty() ? null : processedValue);
|
|
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
validDataList.add(point);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("处理点位 {} 异常,地址: {}",
|
|
|
|
|
point.getId(), point.getAddress(), e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理OPC值
|
|
|
|
|
*/
|
|
|
|
|
private String processOpcValue(Object value) {
|
|
|
|
|
if (value == null) {
|
|
|
|
|
return "";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (value instanceof String) {
|
|
|
|
|
return ((String) value).trim();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return value.toString();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 读取OPC值
|
|
|
|
|
Object addressValue = OpcUtils.readValue(address);
|
|
|
|
|
|
|
|
|
|
if (addressValue == null) {
|
|
|
|
|
logger.warn("读取点位 {} 的值返回null,地址: {}",
|
|
|
|
|
deviceContactModelDO.getId(), address);
|
|
|
|
|
} else {
|
|
|
|
|
// 值验证
|
|
|
|
|
if (addressValue instanceof String) {
|
|
|
|
|
String strValue = (String) addressValue;
|
|
|
|
|
if (strValue.trim().isEmpty()) {
|
|
|
|
|
logger.warn("读取点位 {} 的值为空字符串", deviceContactModelDO.getId());
|
|
|
|
|
deviceContactModelDO.setAddressValue("");
|
|
|
|
|
} else {
|
|
|
|
|
deviceContactModelDO.setAddressValue(addressValue);
|
|
|
|
|
successCount++;
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 保存到数据库
|
|
|
|
|
*/
|
|
|
|
|
private void saveToDatabase(Long deviceId, List<DeviceContactModelDO> dataList, int successCount) {
|
|
|
|
|
try {
|
|
|
|
|
String json = JSON.toJSONString(dataList);
|
|
|
|
|
boolean inserted = tDengineService.insertDeviceData(deviceId, json);
|
|
|
|
|
|
|
|
|
|
if (inserted) {
|
|
|
|
|
logger.info("设备 {} 数据入库成功,总数: {},有效: {}",
|
|
|
|
|
deviceId, dataList.size(), successCount);
|
|
|
|
|
} else {
|
|
|
|
|
logger.error("设备 {} 数据入库失败", deviceId);
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("设备 {} 数据入库异常", deviceId, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void judgmentRules(String processedValue, String attributeCode, DeviceDO device,Long modelId) {
|
|
|
|
|
if (StringUtils.isBlank(processedValue)) {
|
|
|
|
|
logger.warn("待判断的值为空,编码attributeCode: {}, deviceId: {}", attributeCode, device.getId());
|
|
|
|
|
// return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 1. 查询设备规则
|
|
|
|
|
List<DevicePointRulesDO> devicePointRulesDOList = devicePointRulesMapper.selectList(
|
|
|
|
|
Wrappers.<DevicePointRulesDO>lambdaQuery()
|
|
|
|
|
.eq(DevicePointRulesDO::getDeviceId, device.getId()));
|
|
|
|
|
|
|
|
|
|
if (CollectionUtils.isEmpty(devicePointRulesDOList)) {
|
|
|
|
|
logger.debug("设备 {} 未配置规则", device.getId());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. 遍历规则
|
|
|
|
|
for (DevicePointRulesDO devicePointRulesDO : devicePointRulesDOList) {
|
|
|
|
|
if (StringUtils.isBlank(devicePointRulesDO.getFieldRule())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. 解析规则列表
|
|
|
|
|
List<PointRulesRespVO> pointRulesVOList = JSON.parseArray(
|
|
|
|
|
devicePointRulesDO.getFieldRule(), PointRulesRespVO.class);
|
|
|
|
|
|
|
|
|
|
if (CollectionUtils.isEmpty(pointRulesVOList)) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. 找到对应modelId的规则并进行判断
|
|
|
|
|
for (PointRulesRespVO pointRulesRespVO : pointRulesVOList) {
|
|
|
|
|
if (pointRulesRespVO.getCode() != null &&
|
|
|
|
|
pointRulesRespVO.getCode().equals(attributeCode)) {
|
|
|
|
|
|
|
|
|
|
boolean matched = matchRule(processedValue, pointRulesRespVO);
|
|
|
|
|
|
|
|
|
|
if (matched) {
|
|
|
|
|
logger.info("规则匹配成功: modelId={}, value={}, rule={}",
|
|
|
|
|
attributeCode, processedValue,
|
|
|
|
|
JSON.toJSONString(pointRulesRespVO));
|
|
|
|
|
|
|
|
|
|
// 执行匹配成功后的逻辑
|
|
|
|
|
handleMatchedSuccessRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode,modelId);
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
deviceContactModelDO.setAddressValue(addressValue);
|
|
|
|
|
successCount++;
|
|
|
|
|
logger.debug("规则不匹配: modelId={}, value={}, rule={}",
|
|
|
|
|
attributeCode, processedValue,
|
|
|
|
|
JSON.toJSONString(pointRulesRespVO));
|
|
|
|
|
// 执行匹配失败后的逻辑
|
|
|
|
|
handleMatchedFailureRule(devicePointRulesDO,pointRulesRespVO, processedValue, device, attributeCode);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
validDataList.add(deviceContactModelDO);
|
|
|
|
|
private void handleMatchedFailureRule(DevicePointRulesDO devicePointRulesDO,PointRulesRespVO pointRulesRespVO, String processedValue, DeviceDO device, String attributeCode) {
|
|
|
|
|
//TODO 离线待优化
|
|
|
|
|
// if (devicePointRulesDO.getIdentifier().equals(DeviceBasicStatusEnum.RUNNING.getCode())){
|
|
|
|
|
// DeviceOperationRecordDO record = new DeviceOperationRecordDO();
|
|
|
|
|
// record.setDeviceId(device.getId());
|
|
|
|
|
// record.setModelId(modelId);
|
|
|
|
|
// record.setRule(pointRulesRespVO.getRule());
|
|
|
|
|
// record.setAddressValue(processedValue);
|
|
|
|
|
// record.setRecordType(getRecordType(devicePointRulesDO));
|
|
|
|
|
// record.setRuleId(devicePointRulesDO.getId());
|
|
|
|
|
//
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("读取点位 {} 异常,地址: {}",
|
|
|
|
|
deviceContactModelDO.getId(),
|
|
|
|
|
deviceContactModelDO.getAddress(), e);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void handleMatchedSuccessRule(DevicePointRulesDO devicePointRulesDO,
|
|
|
|
|
PointRulesRespVO pointRulesRespVO,
|
|
|
|
|
String processedValue,
|
|
|
|
|
DeviceDO device,
|
|
|
|
|
String attributeCode,
|
|
|
|
|
Long modelId) {
|
|
|
|
|
|
|
|
|
|
DeviceOperationRecordDO record = new DeviceOperationRecordDO();
|
|
|
|
|
record.setDeviceId(device.getId());
|
|
|
|
|
record.setModelId(modelId);
|
|
|
|
|
record.setRule(pointRulesRespVO.getRule());
|
|
|
|
|
record.setAddressValue(processedValue);
|
|
|
|
|
record.setRecordType(getRecordType(devicePointRulesDO));
|
|
|
|
|
record.setRuleId(devicePointRulesDO.getId());
|
|
|
|
|
//TODO 创建人和更新人为内置默认管理员
|
|
|
|
|
record.setCreator("1");
|
|
|
|
|
record.setUpdater("1");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 处理累计时间
|
|
|
|
|
calculateAndSetTotalTime(record, pointRulesRespVO.getRule(), device.getSampleCycle());
|
|
|
|
|
|
|
|
|
|
deviceOperationRecordMapper.insert(record);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 7. 判断是否有有效数据
|
|
|
|
|
if (validDataList.isEmpty()) {
|
|
|
|
|
logger.warn("设备 {} 没有读取到任何有效数据,跳过入库", deviceId);
|
|
|
|
|
private void calculateAndSetTotalTime(DeviceOperationRecordDO record, String ruleCode, Double sampleCycle) {
|
|
|
|
|
if (!isTimeRelatedStatus(ruleCode)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info("设备 {} 成功读取 {} 个点位数据,总计 {} 个点位",
|
|
|
|
|
deviceId, successCount, validDataList.size());
|
|
|
|
|
DeviceOperationRecordDO lastRecord = deviceOperationRecordMapper.selectOne(
|
|
|
|
|
Wrappers.<DeviceOperationRecordDO>lambdaQuery()
|
|
|
|
|
.eq(DeviceOperationRecordDO::getRule, ruleCode)
|
|
|
|
|
.orderByDesc(DeviceOperationRecordDO::getCreateTime)
|
|
|
|
|
.last("LIMIT 1")
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (ruleCode.equals(DeviceStatusEnum.RUNNING.getCode())) {
|
|
|
|
|
Double totalTime = (lastRecord != null && lastRecord.getTotalRunningTime() != null)
|
|
|
|
|
? lastRecord.getTotalRunningTime() + sampleCycle
|
|
|
|
|
: sampleCycle;
|
|
|
|
|
record.setTotalRunningTime(totalTime);
|
|
|
|
|
|
|
|
|
|
} else if (ruleCode.equals(DeviceStatusEnum.STANDBY.getCode())) {
|
|
|
|
|
Double totalTime = (lastRecord != null && lastRecord.getTotalStandbyTime() != null)
|
|
|
|
|
? lastRecord.getTotalStandbyTime() + sampleCycle
|
|
|
|
|
: sampleCycle;
|
|
|
|
|
record.setTotalStandbyTime(totalTime);
|
|
|
|
|
|
|
|
|
|
} else if (ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode())) {
|
|
|
|
|
Double totalTime = (lastRecord != null && lastRecord.getTotalFaultTime() != null)
|
|
|
|
|
? lastRecord.getTotalFaultTime() + sampleCycle
|
|
|
|
|
: sampleCycle;
|
|
|
|
|
record.setTotalFaultTime(totalTime);
|
|
|
|
|
|
|
|
|
|
} else if (ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode())) {
|
|
|
|
|
Double totalTime = (lastRecord != null && lastRecord.getTotalWarningTime() != null)
|
|
|
|
|
? lastRecord.getTotalWarningTime() + sampleCycle
|
|
|
|
|
: sampleCycle;
|
|
|
|
|
record.setTotalWarningTime(totalTime);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Integer getRecordType(DevicePointRulesDO devicePointRulesDO) {
|
|
|
|
|
return devicePointRulesDO.getIdentifier()
|
|
|
|
|
.equals(DeviceBasicStatusEnum.RUNNING.getDescription())
|
|
|
|
|
? Integer.parseInt(DeviceBasicStatusEnum.RUNNING.getCode())
|
|
|
|
|
: Integer.parseInt(DeviceBasicStatusEnum.ALARM.getCode());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean isTimeRelatedStatus(String ruleCode) {
|
|
|
|
|
return ruleCode.equals(DeviceStatusEnum.RUNNING.getCode()) ||
|
|
|
|
|
ruleCode.equals(DeviceStatusEnum.STANDBY.getCode()) ||
|
|
|
|
|
ruleCode.equals(DeviceStatusEnum.FAULT_STANDBY.getCode()) ||
|
|
|
|
|
ruleCode.equals(DeviceStatusEnum.ALARM_RUNNING.getCode());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 判断值是否符合规则
|
|
|
|
|
* 支持操作符: EQ(等于), NE(不等于), GT(大于), GE(大于等于),
|
|
|
|
|
* LT(小于), LE(小于等于), TRUE(为真), FALSE(为假)
|
|
|
|
|
*/
|
|
|
|
|
private boolean matchRule(String value, PointRulesRespVO rule) {
|
|
|
|
|
if (StringUtils.isBlank(value) || rule == null ||
|
|
|
|
|
StringUtils.isBlank(rule.getOperator())) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// 8. 数据入库
|
|
|
|
|
String json = JSON.toJSONString(validDataList);
|
|
|
|
|
boolean insertSuccess = tDengineService.insertDeviceData(deviceId, json);
|
|
|
|
|
String operator = rule.getOperator().toUpperCase();
|
|
|
|
|
String inputValue = value.trim().toLowerCase();
|
|
|
|
|
String ruleValue = StringUtils.trimToEmpty(rule.getOperatorRule());
|
|
|
|
|
|
|
|
|
|
if (insertSuccess) {
|
|
|
|
|
logger.info("设备 {} 数据成功插入TDengine,数据量: {}", deviceId, validDataList.size());
|
|
|
|
|
// 1. 处理布尔值判断
|
|
|
|
|
if ("TRUE".equals(operator) || "FALSE".equals(operator)) {
|
|
|
|
|
return matchBooleanRule(inputValue, operator);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
logger.error("设备 {} 数据插入TDengine失败", deviceId);
|
|
|
|
|
// 2. 如果operatorRule为空,且不是布尔操作符,则返回false
|
|
|
|
|
if (StringUtils.isBlank(ruleValue)) {
|
|
|
|
|
logger.warn("规则比较值为空,但操作符不是布尔类型: operator={}", operator);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("设备 {} 数据入库异常", deviceId, e);
|
|
|
|
|
} finally {
|
|
|
|
|
// 9. 确保断开连接
|
|
|
|
|
try {
|
|
|
|
|
OpcUtils.disconnect();
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("断开OPC连接异常,设备ID: {}", deviceId, e);
|
|
|
|
|
ruleValue = ruleValue.trim();
|
|
|
|
|
|
|
|
|
|
// 3. 尝试数值比较
|
|
|
|
|
if (isNumeric(inputValue) && isNumeric(ruleValue)) {
|
|
|
|
|
Double num1 = Double.parseDouble(inputValue);
|
|
|
|
|
Double num2 = Double.parseDouble(ruleValue);
|
|
|
|
|
|
|
|
|
|
return compareNumbers(num1, num2, operator);
|
|
|
|
|
}
|
|
|
|
|
// 4. 字符串比较
|
|
|
|
|
else {
|
|
|
|
|
return compareStrings(inputValue, ruleValue, operator);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("规则匹配异常: value={}, rule={}, error={}",
|
|
|
|
|
value, JSON.toJSONString(rule), e.getMessage());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理布尔值判断
|
|
|
|
|
*/
|
|
|
|
|
private boolean matchBooleanRule(String value, String operator) {
|
|
|
|
|
// 常见布尔值表示
|
|
|
|
|
boolean booleanValue = parseBoolean(value);
|
|
|
|
|
|
|
|
|
|
if ("TRUE".equals(operator)) {
|
|
|
|
|
return booleanValue;
|
|
|
|
|
} else if ("FALSE".equals(operator)) {
|
|
|
|
|
return !booleanValue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 解析字符串为布尔值
|
|
|
|
|
* 支持: true, false, 1, 0, yes, no, on, off等
|
|
|
|
|
*/
|
|
|
|
|
private boolean parseBoolean(String value) {
|
|
|
|
|
if (StringUtils.isBlank(value)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String lowerValue = value.toLowerCase();
|
|
|
|
|
|
|
|
|
|
// 常见真值表示
|
|
|
|
|
if ("true".equals(lowerValue) ||
|
|
|
|
|
"1".equals(lowerValue) ||
|
|
|
|
|
"yes".equals(lowerValue) ||
|
|
|
|
|
"on".equals(lowerValue) ||
|
|
|
|
|
"是".equals(lowerValue) || // 中文支持
|
|
|
|
|
"成功".equals(lowerValue)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 常见假值表示
|
|
|
|
|
if ("false".equals(lowerValue) ||
|
|
|
|
|
"0".equals(lowerValue) ||
|
|
|
|
|
"no".equals(lowerValue) ||
|
|
|
|
|
"off".equals(lowerValue) ||
|
|
|
|
|
"否".equals(lowerValue) || // 中文支持
|
|
|
|
|
"失败".equals(lowerValue)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 尝试转换为布尔值
|
|
|
|
|
try {
|
|
|
|
|
return Boolean.parseBoolean(lowerValue);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.warn("无法解析为布尔值: {}", value);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 数值比较
|
|
|
|
|
*/
|
|
|
|
|
private boolean compareNumbers(Double value, Double ruleValue, String operator) {
|
|
|
|
|
switch (operator) {
|
|
|
|
|
case "EQ": return Math.abs(value - ruleValue) < 0.000001; // 处理浮点数精度
|
|
|
|
|
case "NE": return Math.abs(value - ruleValue) >= 0.000001;
|
|
|
|
|
case "GT": return value > ruleValue;
|
|
|
|
|
case "GE": return value >= ruleValue;
|
|
|
|
|
case "LT": return value < ruleValue;
|
|
|
|
|
case "LE": return value <= ruleValue;
|
|
|
|
|
default:
|
|
|
|
|
logger.warn("不支持的操作符: {}", operator);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 字符串比较
|
|
|
|
|
*/
|
|
|
|
|
private boolean compareStrings(String value, String ruleValue, String operator) {
|
|
|
|
|
switch (operator) {
|
|
|
|
|
case "EQ": return value.equals(ruleValue);
|
|
|
|
|
case "NE": return !value.equals(ruleValue);
|
|
|
|
|
case "GT": return value.compareTo(ruleValue) > 0;
|
|
|
|
|
case "GE": return value.compareTo(ruleValue) >= 0;
|
|
|
|
|
case "LT": return value.compareTo(ruleValue) < 0;
|
|
|
|
|
case "LE": return value.compareTo(ruleValue) <= 0;
|
|
|
|
|
default:
|
|
|
|
|
logger.warn("不支持的操作符: {}", operator);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 判断字符串是否为数字
|
|
|
|
|
*/
|
|
|
|
|
private boolean isNumeric(String str) {
|
|
|
|
|
if (StringUtils.isBlank(str)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
Double.parseDouble(str);
|
|
|
|
|
return true;
|
|
|
|
|
} catch (NumberFormatException e) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 使用连接池执行任务
|
|
|
|
|
*/
|
|
|
|
|
// private void executeWithConnectionPool(Long deviceId, DeviceDO device) {
|
|
|
|
|
// SessionInfo sessionInfo = connectionPool.get(deviceId);
|
|
|
|
|
//
|
|
|
|
|
// // 检查是否可复用现有连接
|
|
|
|
|
// if (sessionInfo != null && !sessionInfo.isExpired() && OpcUtils.isConnected()) {
|
|
|
|
|
// log.debug("复用现有连接,设备ID: {}", deviceId);
|
|
|
|
|
// sessionInfo.updateLastUsed();
|
|
|
|
|
// } else {
|
|
|
|
|
// // 需要创建新连接
|
|
|
|
|
// if (sessionInfo != null) {
|
|
|
|
|
// // 清理过期连接
|
|
|
|
|
// connectionPool.remove(deviceId);
|
|
|
|
|
// OpcUtils.disconnect();
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// log.info("创建新OPC连接,设备ID: {}", deviceId);
|
|
|
|
|
// if (!OpcUtils.connect(device.getUrl(), device.getUsername(), device.getPassword(), 10)) {
|
|
|
|
|
// throw new RuntimeException("OPC连接失败: " + device.getUrl());
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // 记录新会话
|
|
|
|
|
// sessionInfo = new SessionInfo(device.getUrl(), device.getUsername());
|
|
|
|
|
// connectionPool.put(deviceId, sessionInfo);
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// try {
|
|
|
|
|
// // 执行数据采集
|
|
|
|
|
// processDeviceData(deviceId, device);
|
|
|
|
|
// } catch (Exception e) {
|
|
|
|
|
// // 发生异常时清理连接
|
|
|
|
|
// connectionPool.remove(deviceId);
|
|
|
|
|
// OpcUtils.disconnect();
|
|
|
|
|
// throw e;
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// /**
|
|
|
|
|
// * 清理过期连接
|
|
|
|
|
// */
|
|
|
|
|
// private static void cleanupExpiredConnections() {
|
|
|
|
|
// int initialSize = connectionPool.size();
|
|
|
|
|
// if (initialSize == 0) return;
|
|
|
|
|
//
|
|
|
|
|
// connectionPool.entrySet().removeIf(entry -> {
|
|
|
|
|
// SessionInfo session = entry.getValue();
|
|
|
|
|
// if (session != null && session.isExpired()) {
|
|
|
|
|
// log.debug("清理过期连接,设备ID: {}", entry.getKey());
|
|
|
|
|
// OpcUtils.disconnect();
|
|
|
|
|
// return true;
|
|
|
|
|
// }
|
|
|
|
|
// return false;
|
|
|
|
|
// });
|
|
|
|
|
//
|
|
|
|
|
// int finalSize = connectionPool.size();
|
|
|
|
|
// if (initialSize != finalSize) {
|
|
|
|
|
// log.info("连接池清理完成: {} -> {}", initialSize, finalSize);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|