Merge branch 'hhk' into main

hhk
HuangHuiKang 1 month ago
commit 340ae2f6f3

@ -12,6 +12,7 @@ public interface ErrorCodeConstants {
ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_003_000_000, "设备不存在"); ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_003_000_000, "设备不存在");
ErrorCode DEVICE_REFERENCES_EXIST = new ErrorCode(1_003_000_000, "存在设备已被引用,请先删除引用。"); ErrorCode DEVICE_REFERENCES_EXIST = new ErrorCode(1_003_000_000, "存在设备已被引用,请先删除引用。");
ErrorCode DEVICE_MQTT_TOPIC_EXIST = new ErrorCode(1_003_000_000, "设备MQTT主题不存在。"); ErrorCode DEVICE_MQTT_TOPIC_EXIST = new ErrorCode(1_003_000_000, "设备MQTT主题不存在。");
ErrorCode DEVICE_MQTT_TOPIC_ALREADY_EXIST = new ErrorCode(1_003_000_000, "该MQTT主题已被其他设备订阅一台设备只能绑定一个主题。");
ErrorCode DEVICE_EXISTS = new ErrorCode(1_003_000_000, "同名或同主题设备已存在"); ErrorCode DEVICE_EXISTS = new ErrorCode(1_003_000_000, "同名或同主题设备已存在");
@ -78,4 +79,11 @@ public interface ErrorCodeConstants {
ErrorCode RECIPE_PLAN_DETAIL_CODE_EXISTS = new ErrorCode(1_003_000_006, "编码已存在"); ErrorCode RECIPE_PLAN_DETAIL_CODE_EXISTS = new ErrorCode(1_003_000_006, "编码已存在");
ErrorCode RECIPE_POINT_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_007, "IoT配方点位记录不存在"); ErrorCode RECIPE_POINT_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_007, "IoT配方点位记录不存在");
ErrorCode RECIPE_DEVICE_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_008, "设备点位采集值记录不存在"); ErrorCode RECIPE_DEVICE_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_008, "设备点位采集值记录不存在");
// ======================================= Tdengine ============================================
ErrorCode TABLE_CREATION_FAILED = new ErrorCode(1_004_000_008, "TDengine 表创建失败");
ErrorCode COLOUMN_CREATION_FAILED = new ErrorCode(1_004_000_008, "TDengine 列创建失败");
} }

@ -52,8 +52,8 @@ public class DeviceController {
@Resource @Resource
private JobService jobService; private JobService jobService;
@Resource // @Resource
private OpcUaSubscriptionService opcUaService; // private OpcUaSubscriptionService opcUaService;
// @Resource // @Resource
// private IMqttservice mqttService; // private IMqttservice mqttService;
@ -65,7 +65,7 @@ public class DeviceController {
public CommonResult<DeviceDO> createDevice(@Valid @RequestBody DeviceSaveReqVO createReqVO) throws SchedulerException { public CommonResult<DeviceDO> createDevice(@Valid @RequestBody DeviceSaveReqVO createReqVO) throws SchedulerException {
DeviceDO device = deviceService.createDevice(createReqVO); DeviceDO device = deviceService.createDevice(createReqVO);
//初始化Td表 //初始化Td表
tDengineService.initDatabaseAndTable(device.getId()); // tDengineService.initDatabaseAndTable(device.getId());
return success(device); return success(device);
} }

@ -1,136 +0,0 @@
package cn.iocoder.yudao.module.iot.controller.admin.device;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.*;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.*;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
@Slf4j
@Service
public class OpcUaSubscriptionService {
private static final String URL = "opc.tcp://192.168.21.5:4840";
private static final String USERNAME = "bst";
private static final String PASSWORD = "Bst123456";
private OpcUaClient client;
private UaSubscription subscription;
// 保存已订阅的节点
private final ConcurrentHashMap<String, UaMonitoredItem> monitoredItems = new ConcurrentHashMap<>();
/**
* OPC UA
*/
public void subscribeNode(String nodeAddress, int samplingMillis) throws Exception {
if (client == null) {
connect();
}
if (subscription == null) {
subscription = client.getSubscriptionManager()
.createSubscription((double) samplingMillis)
.get();
log.info("创建订阅,采样间隔: {} ms", samplingMillis);
}
NodeId nodeId = NodeId.parse(nodeAddress);
ReadValueId readValueId = new ReadValueId(
nodeId,
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger.valueOf(13), // Value 属性
null,
null
);
MonitoringParameters parameters = new MonitoringParameters(
uint(nodeId.hashCode()),
(double) samplingMillis,
null,
uint(10),
true
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId,
MonitoringMode.Reporting,
parameters
);
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
requests.add(request);
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
requests
).get();
UaMonitoredItem item = items.get(0);
item.setValueConsumer((monitoredItem, value) -> {
Variant variant = value.getValue();
Object v = variant != null ? variant.getValue() : null;
log.info("节点 {} 值变化: {}", nodeAddress, v);
});
monitoredItems.put(nodeAddress, item);
log.info("成功订阅节点: {}", nodeAddress);
}
private void connect() throws Exception {
IdentityProvider identity = new UsernameProvider(USERNAME, PASSWORD);
// 1. 获取端点
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(URL)
.get(5, TimeUnit.SECONDS);
EndpointDescription endpoint = endpoints.stream()
.filter(e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri()))
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到 SecurityPolicy=None 的端点"));
// 2. 构建客户端配置
OpcUaClientConfig config = OpcUaClientConfig.builder()
.setEndpoint(endpoint)
.setIdentityProvider(identity)
.build();
// 3. 创建客户端并连接
client = OpcUaClient.create(config);
client.connect().get();
log.info("成功连接 OPC UA 服务端: {}", URL);
}
// /**
// * 断开连接
// */
// public void disconnect() {
// if (client != null) {
// try {
// client.disconnect().get();
// log.info("断开 OPC UA 连接");
// } catch (Exception e) {
// log.error("断开 OPC UA 连接失败", e);
// }
// }
// }
}

@ -0,0 +1,136 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.enums;
/**
* Java TDengine
*/
public enum JavaToTdengineTypeEnum {
BYTE("byte", "TINYINT") {
@Override
public Object convertValue(String value) {
return Byte.parseByte(value);
}
},
SHORT("short", "SMALLINT") {
@Override
public Object convertValue(String value) {
return Short.parseShort(value);
}
},
INT("int", "INT") {
@Override
public Object convertValue(String value) {
return Integer.parseInt(value);
}
},
LONG("long", "BIGINT") {
@Override
public Object convertValue(String value) {
return Long.parseLong(value);
}
},
FLOAT("float", "FLOAT") {
@Override
public Object convertValue(String value) {
return Float.parseFloat(value);
}
},
DOUBLE("double", "DOUBLE") {
@Override
public Object convertValue(String value) {
return Double.parseDouble(value);
}
},
BOOLEAN("boolean", "BOOL") {
@Override
public Object convertValue(String value) {
return Boolean.parseBoolean(value);
}
},
CHAR("char", "NCHAR(1)") {
@Override
public Object convertValue(String value) {
return value.charAt(0);
}
};
private final String javaType;
private final String tdType;
JavaToTdengineTypeEnum(String javaType, String tdType) {
this.javaType = javaType;
this.tdType = tdType;
}
public String getJavaType() {
return javaType;
}
public String getTdType() {
return tdType;
}
/**
*
*/
public abstract Object convertValue(String value);
/**
* javaType
*/
public static JavaToTdengineTypeEnum fromJavaType(String javaType) {
if (javaType == null) {
return null;
}
for (JavaToTdengineTypeEnum e : values()) {
if (e.javaType.equalsIgnoreCase(javaType)) {
return e;
}
}
return null;
}
/**
* javaType TDengine
*/
public static String getTdTypeByJavaType(String javaType) {
JavaToTdengineTypeEnum e = fromJavaType(javaType);
return e != null ? e.getTdType() : null;
}
/**
* javaType value
*/
public static Object convertValue(String javaType, String value) {
try {
JavaToTdengineTypeEnum e = fromJavaType(javaType);
if (e == null) {
return Double.parseDouble(value);
}
return e.convertValue(value);
} catch (Exception ex) {
throw new RuntimeException(
"数据类型转换失败: javaType=" + javaType + ", value=" + value, ex
);
}
}
}

@ -1,14 +0,0 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.opcuv;
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
@Component
public class OpcShutdown {
@PreDestroy
public void shutdown() {
OpcUtils.disconnectAll();
}
}

@ -0,0 +1,162 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.utils;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
/**
*
*
* TDengine / MQTT / OPC / JDBC
*/
@Slf4j
public final class DataTypeParseUtil {
/**
*
*/
private DataTypeParseUtil() {
throw new UnsupportedOperationException("Utility class cannot be instantiated");
}
/**
* dataType value
*
* @param value
* @param dataType
* @return
*/
public static Object parse(Object value, String dataType) {
if (value == null || dataType == null) {
return null;
}
try {
switch (dataType.toLowerCase()) {
case "int":
case "integer":
return toInt(value);
case "long":
return toLong(value);
case "double":
return toDouble(value);
case "float":
return toFloat(value);
case "short":
return toShort(value);
case "boolean":
return toBoolean(value);
case "string":
case "varchar":
case "text":
return value.toString();
case "bigdecimal":
return toBigDecimal(value);
case "timestamp":
case "datetime":
return toLocalDateTime(value);
default:
return value;
}
} catch (Exception e) {
log.warn("数据类型解析失败 value={}, dataType={}", value, dataType);
return value;
}
}
public static Integer toInt(Object value) {
if (value instanceof Number) {
return ((Number) value).intValue();
}
return Integer.parseInt(value.toString());
}
public static Long toLong(Object value) {
if (value instanceof Number) {
return ((Number) value).longValue();
}
return Long.parseLong(value.toString());
}
public static Double toDouble(Object value) {
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
return Double.parseDouble(value.toString());
}
public static Float toFloat(Object value) {
if (value instanceof Number) {
return ((Number) value).floatValue();
}
return Float.parseFloat(value.toString());
}
public static Short toShort(Object value) {
if (value instanceof Number) {
return ((Number) value).shortValue();
}
return Short.parseShort(value.toString());
}
public static Boolean toBoolean(Object value) {
if (value instanceof Boolean) {
return (Boolean) value;
}
String str = value.toString().toLowerCase();
return "true".equals(str) || "1".equals(str);
}
public static BigDecimal toBigDecimal(Object value) {
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}
return new BigDecimal(value.toString());
}
public static LocalDateTime toLocalDateTime(Object value) {
if (value instanceof Timestamp) {
return ((Timestamp) value).toLocalDateTime();
}
if (value instanceof LocalDateTime) {
return (LocalDateTime) value;
}
return LocalDateTime.parse(value.toString());
}
}

@ -148,7 +148,6 @@ public class RecipeDeviceRecordController {
/** /**
* *
* @param recipeId ID
* @return * @return
* @throws JsonProcessingException JSON * @throws JsonProcessingException JSON
*/ */
@ -198,6 +197,8 @@ public class RecipeDeviceRecordController {
Map<Long, Map<String, Object>> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId() Map<Long, Map<String, Object>> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId()
Map<String, Object> map = tDengineService.newSelectLatestRow(device.getId());
// OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10); // OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10);
for (RecipeDeviceAttributeDO attributeDO : attributeList) { for (RecipeDeviceAttributeDO attributeDO : attributeList) {
@ -216,9 +217,10 @@ public class RecipeDeviceRecordController {
recipeDeviceRecordDO.setDeviceId(deviceContactModelDO.getDeviceId()); recipeDeviceRecordDO.setDeviceId(deviceContactModelDO.getDeviceId());
recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit()); recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit());
// recipeDeviceRecordDO.setValue((String) OpcUtils.readValues(device.getId(),deviceContactModelDO.getAddress())); // recipeDeviceRecordDO.setValue((String) OpcUtils.readValues(device.getId(),deviceContactModelDO.getAddress()));
if (data.get("addressValue") != null && data.get("addressValue").toString() != null) { // if (data.get("addressValue") != null && data.get("addressValue").toString() != null) {
recipeDeviceRecordDO.setValue(data.get("addressValue").toString()); // recipeDeviceRecordDO.setValue(data.get("addressValue").toString());
} // }
recipeDeviceRecordDO.setValue(map.get(deviceContactModelDO.getAttributeCode()));
recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class)); recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class));

@ -66,7 +66,7 @@ public class RecipeDeviceRecordDO extends BaseDO {
/** /**
* *
*/ */
private String value; private Object value;
/** /**
* id * id

@ -12,12 +12,14 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import com.alibaba.excel.util.StringUtils; import com.alibaba.excel.util.StringUtils;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Select;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -103,6 +105,8 @@ public interface DeviceMapper extends BaseMapperX<DeviceDO> {
List<Long> deviceLedgerList(); List<Long> deviceLedgerList();
List<Map<String,Object>> selectWorkshopBatch(@Param("deviceIds") List<Long> deviceIds);
List<LineDeviceRespVO> lineDeviceList(@Param("pageReqVO") LineDeviceRequestVO pageReqVO); List<LineDeviceRespVO> lineDeviceList(@Param("pageReqVO") LineDeviceRequestVO pageReqVO);

@ -381,9 +381,11 @@ public class MqttDataHandler extends SuperConsumer<String> {
deviceContactModelDO.setAddress(null); deviceContactModelDO.setAddress(null);
} }
String json = JSON.toJSONString(dataList); String json = JSON.toJSONString(dataList);
boolean inserted = tDengineService.insertDeviceData(deviceId, json); // boolean inserted = tDengineService.insertDeviceData(deviceId, json);
if (inserted) { boolean isSuccess = tDengineService.newInsertDeviceData(deviceId, dataList);
if (isSuccess) {
log.info("设备 {} 数据入库成功,总数: {},有效: {}", log.info("设备 {} 数据入库成功,总数: {},有效: {}",
deviceId, dataList.size(), successCount); deviceId, dataList.size(), successCount);
} else { } else {

@ -9,6 +9,7 @@ import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum; import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.utils.CronExpressionUtils; import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.utils.CronExpressionUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler.TaskSchedulerManager; import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler.TaskSchedulerManager;
import cn.iocoder.yudao.module.iot.controller.admin.device.utils.DataTypeParseUtil;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.*; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.*;
import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.DeviceContactModelPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.DeviceContactModelPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.mqttdatarecord.vo.MqttDataRecordPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.mqttdatarecord.vo.MqttDataRecordPageReqVO;
@ -60,6 +61,7 @@ import javax.validation.constraints.NotNull;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -158,9 +160,13 @@ public class DeviceServiceImpl implements DeviceService {
//新增模板点位 //新增模板点位
LambdaQueryWrapper<DeviceModelAttributeDO> lambdaQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<DeviceModelAttributeDO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(DeviceModelAttributeDO::getDeviceModelId,createReqVO.getDeviceModelId()); lambdaQueryWrapper.eq(DeviceModelAttributeDO::getDeviceModelId,createReqVO.getDeviceModelId()).orderByDesc(DeviceModelAttributeDO::getId);
List<DeviceModelAttributeDO> deviceModelAttributeDOS = deviceModelAttributeMapper.selectList(lambdaQueryWrapper); List<DeviceModelAttributeDO> deviceModelAttributeDOS = deviceModelAttributeMapper.selectList(lambdaQueryWrapper);
if (deviceModelAttributeDOS.isEmpty()){
throw exception(DEVICE_MODEL_ATTRIBUTE_NOT_EXISTS);
}
List<DeviceContactModelDO> contactModelList = new ArrayList<>(); List<DeviceContactModelDO> contactModelList = new ArrayList<>();
for (DeviceModelAttributeDO attributeDO : deviceModelAttributeDOS) { for (DeviceModelAttributeDO attributeDO : deviceModelAttributeDOS) {
DeviceContactModelDO contactModel = new DeviceContactModelDO(); DeviceContactModelDO contactModel = new DeviceContactModelDO();
@ -228,11 +234,21 @@ public class DeviceServiceImpl implements DeviceService {
public void updateDevice(DeviceSaveReqVO updateReqVO) { public void updateDevice(DeviceSaveReqVO updateReqVO) {
// 校验存在 // 校验存在
validateDeviceExists(updateReqVO.getId()); validateDeviceExists(updateReqVO.getId());
//校驗topic是否唯一
validTopicExists(updateReqVO.getTopic());
// 更新 // 更新
DeviceDO updateObj = BeanUtils.toBean(updateReqVO, DeviceDO.class); DeviceDO updateObj = BeanUtils.toBean(updateReqVO, DeviceDO.class);
deviceMapper.updateById(updateObj); deviceMapper.updateById(updateObj);
} }
private void validTopicExists(String topic) {
boolean exists = deviceMapper.exists(Wrappers.<DeviceDO>lambdaQuery().eq(DeviceDO::getTopic, topic));
if (exists){
throw exception(DEVICE_MQTT_TOPIC_ALREADY_EXIST);
}
}
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void deleteDevice(List<Long> ids) { public void deleteDevice(List<Long> ids) {
@ -305,7 +321,7 @@ public class DeviceServiceImpl implements DeviceService {
.collect(Collectors.toList()); .collect(Collectors.toList());
// 2. 批量获取 TDengine 最新 ts // 2. 批量获取 TDengine 最新 ts
Map<Long, LocalDateTime> latestTsMap = tdengineService.selectLatestTsBatch(deviceIds); Map<Long, LocalDateTime> latestTsMap = tdengineService.newSelectLatestTsBatch(deviceIds);
// 3. 批量获取最新的 DeviceOperationRecord // 3. 批量获取最新的 DeviceOperationRecord
List<String> ruleCodes = Arrays.stream(DeviceStatusEnum.values()) List<String> ruleCodes = Arrays.stream(DeviceStatusEnum.values())
@ -378,18 +394,70 @@ public class DeviceServiceImpl implements DeviceService {
PageResult<DeviceContactModelDO> deviceModelAttributeDOPageResult = deviceContactModelMapper.selectPageById(pageReqVO, deviceModelAttributePageReqVO); PageResult<DeviceContactModelDO> deviceModelAttributeDOPageResult = deviceContactModelMapper.selectPageById(pageReqVO, deviceModelAttributePageReqVO);
Map<Long, Map<String, Object>> deviceDataMap = createDeviceDataMap(device.getId()); // Map<Long, Map<String, Object>> deviceDataMap = createDeviceDataMap(device.getId());
//获取td数据库最新一条数据
Map<String, Object> latestRow = tdengineService.newSelectLatestRow(device.getId());
// 合并数据:将 deviceDataMap 的值赋给分页结果中的对应记录
List<DeviceContactModelDO> records = deviceModelAttributeDOPageResult.getList(); List<DeviceContactModelDO> records = deviceModelAttributeDOPageResult.getList();
for (DeviceContactModelDO record : records) {
Map<String, Object> data = deviceDataMap.get(record.getId()); if (latestRow != null && !latestRow.isEmpty()) {
if (data != null) {
record.setAddressValue(adjustByRatio(data.get("addressValue"), record.getRatio())); // 设置 addressValue // 1. 取 ts 时间
record.setLatestCollectionTime((String) data.get("timestamp")); // 设置 latestCollectionTime Object tsObj = latestRow.get("ts");
String latestTime = null;
if (tsObj instanceof Timestamp) {
latestTime = ((Timestamp) tsObj).toLocalDateTime().toString();
} else if (tsObj instanceof LocalDateTime) {
latestTime = tsObj.toString();
} else if (tsObj != null) {
latestTime = tsObj.toString();
} }
// 2. 遍历 records匹配列名attributeCode
for (DeviceContactModelDO record : records) {
String attributeCode = record.getAttributeCode();
if (attributeCode == null) {
continue;
}
// 从 latestRow 获取对应列值
Object rawValue = latestRow.get(attributeCode);
if (rawValue == null) {
continue;
}
// 根据 dataType 解析
Object parsedValue = DataTypeParseUtil.parse(rawValue, record.getDataType());
// 按比例调整
Object finalValue = adjustByRatio(parsedValue, record.getRatio());
// 设置值
record.setAddressValue(finalValue);
// 使用最新 ts 设置最新采集时间
record.setLatestCollectionTime(latestTime);
}
} }
// // 合并数据:将 deviceDataMap 的值赋给分页结果中的对应记录
// List<DeviceContactModelDO> records = deviceModelAttributeDOPageResult.getList();
// for (DeviceContactModelDO record : records) {
// Map<String, Object> data = deviceDataMap.get(record.getId());
// if (data != null) {
// record.setAddressValue(adjustByRatio(data.get("addressValue"), record.getRatio())); // 设置 addressValue
// record.setLatestCollectionTime((String) data.get("timestamp")); // 设置 latestCollectionTime
// }
// }
return deviceModelAttributeDOPageResult; return deviceModelAttributeDOPageResult;
} }
@ -655,40 +723,89 @@ public class DeviceServiceImpl implements DeviceService {
@Override @Override
public PageResult<LineDeviceRespVO> lineDevicePage(LineDeviceRequestVO pageReqVO) { public PageResult<LineDeviceRespVO> lineDevicePage(LineDeviceRequestVO pageReqVO) {
// Page<LineDeviceRespVO> page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); // 1. 查询分页设备
PageResult<DeviceRespVO> pageResult =
getDevicePage(BeanUtils.toBean(pageReqVO, DevicePageReqVO.class));
// IPage<LineDeviceRespVO> lineDeviceRespVO = deviceMapper.lineDevicePage(page,pageReqVO); List<DeviceRespVO> deviceList =
PageResult<DeviceRespVO> pageResult = getDevicePage(BeanUtils.toBean(pageReqVO, DevicePageReqVO.class)); Optional.ofNullable(pageResult.getList())
.orElse(Collections.emptyList());
List<LineDeviceRespVO> list = new ArrayList<>(); if (deviceList.isEmpty()) {
for (DeviceRespVO record : pageResult.getList()) { return PageResult.empty();
LineDeviceRespVO lineDeviceRespVO = new LineDeviceRespVO();
lineDeviceRespVO.setDeviceCode(record.getDeviceCode());
lineDeviceRespVO.setDeviceName(record.getDeviceName());
lineDeviceRespVO.setStatus(record.getStatus());
lineDeviceRespVO.setCollectionTime(String.valueOf(record.getCollectionTime()));
lineDeviceRespVO.setId(record.getId());
Map<String, Object> latestDeviceData = tdengineService.getLatestDeviceData(record.getId());
if(latestDeviceData != null) {
lineDeviceRespVO.setCollectionTime((String) latestDeviceData.get("timestamp"));
}
String line = deviceMapper.lineDeviceLedgerPage(record.getId());
if (line != null) {
lineDeviceRespVO.setLineName(line);
lineDeviceRespVO.setLineNode(line);
} else {
lineDeviceRespVO.setLineName("-");
lineDeviceRespVO.setLineNode("-");
}
list.add(lineDeviceRespVO);
}
if (list.isEmpty()) {
return PageResult.empty(); // 返回空Page
} }
return new PageResult<>(list, pageResult.getTotal());
// 2. 提取 deviceIds
List<Long> deviceIds =
deviceList.stream()
.map(DeviceRespVO::getId)
.filter(Objects::nonNull)
.collect(Collectors.toList());
// // 批量查 workshop
// List<Map<String,Object>> mapList = deviceMapper.selectWorkshopBatch(deviceIds);
//
// Map<Long, String> workshopMap = mapList.stream()
// .filter(m -> m.get("deviceId") != null) // 避免 deviceId null
// .collect(Collectors.toMap(
// m -> ((Number) m.get("deviceId")).longValue(),
// m -> m.get("workshop") != null ? m.get("workshop").toString() : "",
// (existing, replacement) -> replacement
// ));
// 3. 批量查询最新采集时间TDengine
Map<Long, LocalDateTime> latestTsMap =
tdengineService.newSelectLatestTsBatch(deviceIds);
// 4. 时间格式化器
DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 5. 转换结果(使用 stream 更简洁)
List<LineDeviceRespVO> list =
deviceList.stream()
.map(device -> {
LineDeviceRespVO vo = new LineDeviceRespVO();
vo.setId(device.getId());
vo.setDeviceCode(device.getDeviceCode());
vo.setDeviceName(device.getDeviceName());
vo.setStatus(device.getStatus());
// 优先使用 TDengine 最新时间
LocalDateTime latestTs =
latestTsMap.get(device.getId());
if (latestTs != null) {
vo.setCollectionTime(latestTs.format(formatter));
} else if (device.getCollectionTime() != null) {
vo.setCollectionTime(
device.getCollectionTime()
.format(formatter)
);
} else {
vo.setCollectionTime(null);
}
// 查询产线名称
vo.setLineName(
deviceMapper.lineDeviceLedgerPage(device.getId())
);
// vo.setLineName(workshopMap.get(device.getId()));
return vo;
})
.collect(Collectors.toList());
// 6. 返回分页结果
return new PageResult<>(list, pageResult.getTotal());
} }
@Override @Override
public List<LineDeviceRespVO> lineDeviceList(LineDeviceRequestVO pageReqVO) { public List<LineDeviceRespVO> lineDeviceList(LineDeviceRequestVO pageReqVO) {
@ -706,168 +823,234 @@ public class DeviceServiceImpl implements DeviceService {
} }
@Override @Override
public Map<String, List<Map<String, Object>>> singleDevice(Long deviceId) throws JsonProcessingException { public Map<String, List<Map<String, Object>>> singleDevice(Long deviceId) {
Map<String, List<Map<String, Object>>> resultMap = new LinkedHashMap<>(); Map<String, List<Map<String, Object>>> resultMap = new LinkedHashMap<>();
List<DeviceContactModelDO> records = deviceContactModelMapper.selectList(Wrappers.<DeviceContactModelDO>lambdaQuery()
.eq(DeviceContactModelDO::getDeviceId,deviceId)
.orderByDesc(DeviceContactModelDO::getId));
if (records == null || records.isEmpty()) {
return resultMap;
}
try { try {
// 1. 获取设备数据列表 // 获取最新一行数据
List<Map<String, Object>> deviceDataList = tdengineService.getNewestDeviceDataOrderByTimeDesc(deviceId); Map<String, Object> latestRow = tdengineService.newSelectLatestRow(deviceId);
if (latestRow == null || latestRow.isEmpty()) {
return resultMap;
}
// // 获取 ts 时间
// Object tsObj = latestRow.get("ts");
// String latestTime = tsObj instanceof Timestamp ? ((Timestamp) tsObj).toLocalDateTime().toString()
// : tsObj != null ? tsObj.toString() : null;
// 2. 获取属性类型映射 // 属性类型映射
Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList() Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList()
.stream() .stream()
.collect(Collectors.toMap( .collect(Collectors.toMap(
DeviceAttributeTypeDO::getId, DeviceAttributeTypeDO::getId,
DeviceAttributeTypeDO::getName DeviceAttributeTypeDO::getName
)); ));
Map<Long, String> attributeTypeCache = new HashMap<>();
// 3. 遍历并处理 // 遍历 records 填充数据
for (Map<String, Object> deviceData : deviceDataList) { for (DeviceContactModelDO record : records) {
String queryDataJson = (String) deviceData.get("queryData");
if (StringUtils.isNotBlank(queryDataJson)) { String attributeCode = record.getAttributeCode();
// 使用TypeReference解析为List<Map>而不是具体的DO对象 if (attributeCode == null) continue;
List<Map<String, Object>> dataList = new ObjectMapper().readValue(
queryDataJson,
new TypeReference<List<Map<String, Object>>>() {}
);
for (Map<String, Object> data : dataList) { Object rawValue = latestRow.get(attributeCode);
// 获取属性类型名称 // if (rawValue == null) continue;
String attributeTypeName = "其他";
String typeStr = (String) data.get("attributeType"); Object finalValue = DataTypeParseUtil.parse(rawValue, record.getDataType());
if (typeStr != null) { // Object finalValue = adjustByRatio(finalValueparsedValue, record.getRatio());
try {
attributeTypeName = typeStr;
} catch (Exception e) {
attributeTypeName = "未知";
}
}
// 提取需要的字段 record.setAddressValue(finalValue);
Map<String, Object> simplifiedData = new HashMap<>(); // record.setLatestCollectionTime(latestTime);
simplifiedData.put("addressValue", data.get("addressValue"));
simplifiedData.put("attributeName", data.get("attributeName"));
resultMap // 属性类型名称
.computeIfAbsent(attributeTypeName, k -> new ArrayList<>()) String attributeTypeName = "其他";
.add(simplifiedData); Object typeObj = record.getAttributeType();
if (typeObj != null) {
String typeStr = typeObj.toString();
try {
Long typeId = Long.parseLong(typeStr);
attributeTypeName = attributeTypeCache.computeIfAbsent(typeId,
k -> idToNameMap.getOrDefault(typeId, typeStr));
} catch (NumberFormatException e) {
attributeTypeName = typeStr;
} }
} }
Map<String, Object> simplifiedData = new HashMap<>();
simplifiedData.put("addressValue", record.getAddressValue());
simplifiedData.put("attributeName", record.getAttributeName());
resultMap.computeIfAbsent(attributeTypeName, k -> new ArrayList<>()).add(simplifiedData);
} }
} catch (Exception e) { } catch (Exception e) {
System.out.println("处理设备数据时发生异常: " + e.getMessage()); log.warn("处理设备 {} 最新数据时异常: {}", deviceId, e.getMessage());
} }
return resultMap; return resultMap;
} }
@Override @Override
public PageResult<Map<String, Object>> historyRecord(Long deviceId, String collectionStartTime, String collectionEndTime,Integer page, public PageResult<Map<String, Object>> historyRecord(
Integer pageSize) { Long deviceId,
String collectionStartTime,
String collectionEndTime,
Integer page,
Integer pageSize) {
List<Map<String, Object>> resultList = new ArrayList<>(); List<Map<String, Object>> resultList = new ArrayList<>();
if (deviceId == null) { if (deviceId == null) {
return null; return PageResult.empty();
} }
try { try {
// 1. 获取设备数据列表
List<Map<String, Object>> deviceDataList = tdengineService.getstDeviceDataOrderByTimeDescPage(
deviceId, collectionStartTime, collectionEndTime, page, pageSize
);
long total = tdengineService.queryDeviceDataTotal(deviceId, collectionStartTime, collectionEndTime); // 1. 查询TDengine分页数据
List<Map<String, Object>> deviceDataList =
tdengineService.newSelectBatchPage(
deviceId,
collectionStartTime,
collectionEndTime,
page,
pageSize
);
long total =
tdengineService.newSelectBatchTotal(
deviceId,
collectionStartTime,
collectionEndTime
);
if (deviceDataList.isEmpty()) { if (deviceDataList.isEmpty()) {
return null; return new PageResult<>(resultList, total);
} }
// 2. 获取属性类型映射 (ID -> Name) 并构建缓存,避免重复数据库查询 // 2. 查询属性配置(完整字段来源)
Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList() List<DeviceContactModelDO> attributeList =
.stream() deviceContactModelMapper.selectList(
.collect(Collectors.toMap(DeviceAttributeTypeDO::getId, DeviceAttributeTypeDO::getName)); Wrappers.lambdaQuery(DeviceContactModelDO.class)
.eq(DeviceContactModelDO::getDeviceId, deviceId)
.orderByAsc(DeviceContactModelDO::getSort)
);
// 额外缓存数字ID -> 名称防止同一个ID多次查询数据库 // 时间格式化器
Map<Long, String> attributeTypeCache = new HashMap<>(); DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ObjectMapper objectMapper = new ObjectMapper(); // 3. 遍历TDengine数据
for (Map<String, Object> row : deviceDataList) {
// 3. 遍历每条设备数据 Object tsObj = row.get("ts");
for (Map<String, Object> deviceData : deviceDataList) {
String queryDataJson = (String) deviceData.get("queryData");
Timestamp timestamp = (Timestamp) deviceData.get("timestamp");
if (StringUtils.isBlank(queryDataJson) || timestamp == null) { if (tsObj == null) {
continue; // 跳过无效数据 continue;
} }
List<Map<String, Object>> dataList = objectMapper.readValue( // 格式化时间
queryDataJson, new TypeReference<List<Map<String, Object>>>() {} String collectTime;
);
// 4. 按属性类型分组 if (tsObj instanceof LocalDateTime) {
Map<String, List<Map<String, Object>>> groupedData = new LinkedHashMap<>();
for (Map<String, Object> data : dataList) { collectTime =
String attributeTypeName = "其他"; ((LocalDateTime) tsObj)
Object typeObj = data.get("attributeType"); .format(formatter);
if (typeObj != null) { } else if (tsObj instanceof Timestamp) {
String typeStr = typeObj.toString();
try { collectTime =
// 尝试当作数字 ID ((Timestamp) tsObj)
Long typeId = Long.parseLong(typeStr); .toLocalDateTime()
.format(formatter);
// 先从缓存查 } else {
if (attributeTypeCache.containsKey(typeId)) {
attributeTypeName = attributeTypeCache.get(typeId); collectTime = tsObj.toString();
} else { }
// 查数据库
String nameFromDb = idToNameMap.get(typeId); // 按 typeName 分组
if (nameFromDb != null) { Map<String, List<Map<String, Object>>> groupedData =
attributeTypeName = nameFromDb; new LinkedHashMap<>();
} else {
attributeTypeName = typeStr; // 查不到就用原值 // 核心逻辑:遍历 attributeList保证全部字段返回
} for (DeviceContactModelDO attribute : attributeList) {
attributeTypeCache.put(typeId, attributeTypeName);
} String column =
} catch (NumberFormatException e) { attribute.getAttributeCode();
// 不是数字,直接用原字符串
attributeTypeName = typeStr; Object rawValue =
} row.get(column);
// 使用 typeName 分组(核心优化点)
String typeName =
attribute.getTypeName();
if (typeName == null || typeName.isEmpty()) {
typeName = "其他";
} }
// 构建属性数据 Map Map<String, Object> simplifiedData =
Map<String, Object> simplifiedData = new HashMap<>(); new HashMap<>();
simplifiedData.put("addressValue", data.get("addressValue"));
simplifiedData.put("attributeName", data.get("attributeName"));
groupedData.computeIfAbsent(attributeTypeName, k -> new ArrayList<>()) simplifiedData.put(
"addressValue",
rawValue != null
? rawValue.toString()
: null
);
simplifiedData.put(
"attributeName",
attribute.getAttributeName()
);
groupedData
.computeIfAbsent(
typeName,
k -> new ArrayList<>()
)
.add(simplifiedData); .add(simplifiedData);
} }
// 5. 构建时间点 Map // 构建最终结构(保持旧结构完全一致)
Map<String, Object> timePointData = new LinkedHashMap<>(groupedData); Map<String, Object> timePointData =
timePointData.put("collectTime", timestamp.toString()); new LinkedHashMap<>(groupedData);
timePointData.put(
"collectTime",
collectTime
);
resultList.add(timePointData); resultList.add(timePointData);
} }
return new PageResult<>(resultList, total); return new PageResult<>(resultList, total);
} catch (Exception e) { } catch (Exception e) {
log.error("处理设备数据时发生异常", e);
return PageResult.empty();
log.error("处理设备历史数据异常", e);
return PageResult.empty();
} }
} }
private void validateDeviceAttributeExists(Long id) { private void validateDeviceAttributeExists(Long id) {
if (deviceAttributeMapper.selectById(id) == null) { if (deviceAttributeMapper.selectById(id) == null) {
throw exception(DEVICE_ATTRIBUTE_NOT_EXISTS); throw exception(DEVICE_ATTRIBUTE_NOT_EXISTS);
@ -1282,13 +1465,13 @@ public class DeviceServiceImpl implements DeviceService {
log.info("新增gateway订阅记录成功 topic={}", topic); log.info("新增gateway订阅记录成功 topic={}", topic);
} else { } else {
//更新gateway状态
gateway.setIsEnable(true); gateway.setIsEnable(true);
gateway.setUpdateTime(LocalDateTime.now()); gateway.setUpdateTime(LocalDateTime.now());
gatewayMapper.updateById(gateway); gatewayMapper.updateById(gateway);
log.info("更新gateway启用状态 topic={}", topic); log.info("更新gateway启用状态 topic={}", topic);
} }
} }
@ -1314,6 +1497,12 @@ public class DeviceServiceImpl implements DeviceService {
); );
log.info("gateway订阅记录已禁用 topic={}", topic); log.info("gateway订阅记录已禁用 topic={}", topic);
//更新设备运行状态为离线
updateOperationalStatus(deviceDO);
log.info("更新设备运行状态为离线 deviceId={}", deviceDO.getId());
} }
} catch (Exception e) { } catch (Exception e) {

@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.service.device; package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.DeviceEdgeData; import cn.iocoder.yudao.framework.common.pojo.DeviceEdgeData;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.JavaToTdengineTypeEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
@ -15,7 +17,9 @@ import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
@ -26,6 +30,10 @@ import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.COLOUMN_CREATION_FAILED;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.TABLE_CREATION_FAILED;
@Service @Service
@Slf4j @Slf4j
public class TDengineService { public class TDengineService {
@ -137,7 +145,7 @@ public class TDengineService {
jdbcTemplate.execute("USE besure"); jdbcTemplate.execute("USE besure");
String tableName = "d_" + id; String tableName = "d_" + id;
String sql = "INSERT INTO " + tableName + " (ts, query_data) VALUES (?, ?)"; String sql = "INSERT INTO besure." + tableName + " (ts, query_data) VALUES (?, ?)";
return jdbcTemplate.update(sql, ps -> { return jdbcTemplate.update(sql, ps -> {
ps.setTimestamp(1, timestamp); ps.setTimestamp(1, timestamp);
@ -725,4 +733,451 @@ public class TDengineService {
} }
//=================================修改数据后新增方法===============================
@DS("tdengine")
public void createTdengineTable(Long deviceId, List<DeviceContactModelDO> contactModelList) {
if (deviceId == null) {
return;
}
// 1. 表名
String tableName = "d_" + deviceId;
// 2. 构建列SQLTDengine必须有 ts
StringBuilder columnsSql = new StringBuilder("ts TIMESTAMP");
// 3. 遍历 contactModelList如果为空则不执行循环表只含 ts
if (contactModelList != null && !contactModelList.isEmpty()) {
for (DeviceContactModelDO contact : contactModelList) {
String attributeCode = contact.getAttributeCode();
String dataType = contact.getDataType();
if (attributeCode == null || dataType == null) {
continue;
}
// 使用枚举获取 TDengine 类型
String tdType = JavaToTdengineTypeEnum.getTdTypeByJavaType(dataType);
if (tdType == null) {
tdType = "DOUBLE"; // 默认使用 DOUBLE
}
// 拼接列
columnsSql.append(", ").append(attributeCode).append(" ").append(tdType);
}
}
// 4. 构建完整 SQL
String createSql = "CREATE TABLE IF NOT EXISTS besure_server." + tableName + " ("
+ columnsSql.toString()
+ ")";
// 5. 使用 JdbcTemplate 执行 SQL
try {
jdbcTemplate.execute(createSql);
log.info("TDengine 表创建成功: {}", tableName);
} catch (Exception e) {
log.error("TDengine 表创建失败: {}", tableName, e);
throw exception(TABLE_CREATION_FAILED);
}
}
/**
* td
* @param deviceId
* @param deviceContactModel
*/
@DS("tdengine")
public void AddTDDatabaseColumn(Long deviceId, DeviceContactModelDO deviceContactModel) {
if (deviceId == null || deviceContactModel == null) {
return;
}
String attributeCode = deviceContactModel.getAttributeCode();
String dataType = deviceContactModel.getDataType();
if (attributeCode == null || dataType == null) {
return;
}
// 表名
String tableName = "besure_server.d_" + deviceId;
// Java类型 -> TDengine类型
String tdType = JavaToTdengineTypeEnum.getTdTypeByJavaType(dataType);
if (tdType == null) {
tdType = "DOUBLE";
}
// ALTER TABLE SQL
String alterSql = "ALTER TABLE " + tableName
+ " ADD COLUMN " + attributeCode + " " + tdType;
try {
jdbcTemplate.execute(alterSql);
log.info("TDengine 表新增列成功: table={}, column={}, type={}",
tableName, attributeCode, tdType);
} catch (Exception e) {
// TDengine 如果列已存在会报错,这里可以忽略
// if (e.getMessage() != null && e.getMessage().contains("duplicated column name")) {
// log.warn("列已存在,跳过: table={}, column={}", tableName, attributeCode);
// return;
// }
log.error("TDengine 表新增列失败: table={}, column={}", tableName, attributeCode, e);
throw exception(COLOUMN_CREATION_FAILED);
}
}
/**
*
* @param deviceId
* @param dataList
* @return
*/
@DS("tdengine")
public boolean newInsertDeviceData(Long deviceId, List<DeviceContactModelDO> dataList) {
if (deviceId == null || dataList == null || dataList.isEmpty()) {
return false;
}
// 表名
String tableName = "besure_server.d_" + deviceId;
// 列名构建
StringBuilder columnBuilder = new StringBuilder("ts");
// 值占位符
StringBuilder valueBuilder = new StringBuilder("?");
// 参数列表
List<Object> params = new ArrayList<>();
// TDengine 必须字段
params.add(new Timestamp(System.currentTimeMillis()));
for (DeviceContactModelDO model : dataList) {
if (model == null) {
continue;
}
String column = model.getAttributeCode();
Object valueObj = model.getAddressValue();
String javaType = model.getDataType();
// 1. 基础校验
if (column == null || column.trim().isEmpty()) {
continue;
}
if (javaType == null || javaType.trim().isEmpty()) {
javaType = "double";
}
// 防止 SQL 注入(只允许字母数字下划线)
if (!column.matches("^[a-zA-Z0-9_]+$")) {
log.warn("非法列名,跳过: {}", column);
continue;
}
try {
// 2. 自动新增列
// try {
// AddTDDatabaseColumn(deviceId, model);
// } catch (Exception ignored) {
// // 忽略已存在异常
// }
// 3. 值为空 -> 插入 NULL
Object convertedValue = null;
if (valueObj != null) {
String value = valueObj.toString();
if (!value.isEmpty()) {
convertedValue =
JavaToTdengineTypeEnum.convertValue(javaType, value);
}
}
// 4. 拼接 SQL
columnBuilder.append(", ").append(column);
valueBuilder.append(", ?");
params.add(convertedValue);
} catch (Exception e) {
log.error("TDengine 数据处理失败: column={}, value={}, type={}",
column, valueObj, javaType, e);
}
}
// 没有有效字段不插入
if (params.size() <= 1) {
log.warn("TDengine 插入数据为空,跳过 deviceId={}", deviceId);
return true;
}
String sql =
"INSERT INTO "
+ tableName
+ " (" + columnBuilder + ") VALUES (" + valueBuilder + ")";
try {
jdbcTemplate.update(sql, params.toArray());
log.info("TDengine 插入成功: table={}, columnCount={}",
tableName, params.size() - 1);
} catch (Exception e) {
log.error("TDengine 插入失败: table={}, sql={}", tableName, sql, e);
return false;
}
return true;
}
/**
* deviceId
* @param deviceIds
* @return
*/
@DS("tdengine")
public Map<Long, LocalDateTime> newSelectLatestTsBatch(List<Long> deviceIds) {
Map<Long, LocalDateTime> result = new HashMap<>();
for (Long deviceId : deviceIds) {
String tableName = "besure_server.d_" + deviceId;
String sql = "SELECT ts FROM " + tableName + " ORDER BY ts DESC LIMIT 1";
try {
LocalDateTime ts = jdbcTemplate.queryForObject(sql, (rs, rowNum) -> rs.getTimestamp("ts").toLocalDateTime());
result.put(deviceId, ts);
} catch (Exception e) {
// 表不存在或者查询失败
result.put(deviceId, null);
log.warn("设备: {}, 获取 ts 失败: {}", deviceId, e.getMessage());
}
}
return result;
}
/**
*
* @param deviceId
* @return
*/
@DS("tdengine")
public Map<String, Object> newSelectLatestRow(Long deviceId) {
if (deviceId == null) {
return null;
}
String tableName = "besure_server.d_" + deviceId;
String sql = "SELECT * FROM " + tableName + " ORDER BY ts DESC LIMIT 1";
try {
List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
if (list == null || list.isEmpty()) {
return null;
}
Map<String, Object> row = list.get(0);
// 转换 ts 为 LocalDateTime推荐
Object tsObj = row.get("ts");
if (tsObj instanceof Timestamp) {
row.put("ts", ((Timestamp) tsObj).toLocalDateTime());
}
log.debug("设备 {} 最新数据: {}", deviceId, row);
return row;
} catch (Exception e) {
log.warn("设备 {} 查询最新数据失败: {}", deviceId, e.getMessage());
return null;
}
}
/**
*
*
* @param deviceId ID
* @param startTime "2026-02-09 00:00:00"
* @param endTime
* @param page 1
* @param pageSize
* @return List<Map<String,Object>> ts +
*/
@DS("tdengine")
public List<Map<String, Object>> newSelectBatchPage(Long deviceId, String startTime, String endTime,
Integer page, Integer pageSize) {
if (deviceId == null || page == null || pageSize == null) {
return Collections.emptyList();
}
String tableName = "besure_server.d_" + deviceId;
// 计算分页偏移量
int offset = (page - 1) * pageSize;
// 构建 SQL
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT * FROM ").append(tableName)
.append(" WHERE 1=1 ");
if (StringUtils.isNotBlank(startTime)) {
sqlBuilder.append(" AND ts >= '").append(startTime).append("' ");
}
if (StringUtils.isNotBlank(endTime)) {
sqlBuilder.append(" AND ts <= '").append(endTime).append("' ");
}
sqlBuilder.append(" ORDER BY ts DESC ")
.append(" LIMIT ").append(pageSize)
.append(" OFFSET ").append(offset);
String sql = sqlBuilder.toString();
try {
List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
// 如果 ts 需要转换为 LocalDateTime
for (Map<String, Object> row : list) {
Object tsObj = row.get("ts");
if (tsObj instanceof Timestamp) {
row.put("ts", ((Timestamp) tsObj).toLocalDateTime());
}
}
return list;
} catch (EmptyResultDataAccessException e) {
return Collections.emptyList();
} catch (Exception e) {
log.error("查询设备 {} 功率数据异常", deviceId, e);
return Collections.emptyList();
}
}
/**
*
*
* @param deviceId ID
* @param startTime
* @param endTime
* @return
*/
@DS("tdengine")
public long newSelectBatchTotal(Long deviceId, String startTime, String endTime) {
if (deviceId == null) {
return 0;
}
String tableName = "besure_server.d_" + deviceId;
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT COUNT(*) AS total FROM ").append(tableName).append(" WHERE 1=1 ");
if (StringUtils.isNotBlank(startTime)) {
sqlBuilder.append(" AND ts >= '").append(startTime).append("' ");
}
if (StringUtils.isNotBlank(endTime)) {
sqlBuilder.append(" AND ts <= '").append(endTime).append("' ");
}
String sql = sqlBuilder.toString();
try {
// 查询单行结果
Map<String, Object> result = jdbcTemplate.queryForMap(sql);
Object totalObj = result.get("total");
if (totalObj instanceof Number) {
return ((Number) totalObj).longValue();
} else {
return 0;
}
} catch (EmptyResultDataAccessException e) {
return 0;
} catch (Exception e) {
log.error("查询设备 {} 数据总条数异常", deviceId, e);
return 0;
}
}
/**
* N
*
* @param deviceId ID
* @param startTime
* @param endTime
* @param limit
* @return Map<String,Object>
*/
@DS("tdengine")
public List<Map<String, Object>> newSelectLatestData(Long deviceId, String startTime, String endTime, int limit) {
if (deviceId == null || limit <= 0) {
return Collections.emptyList();
}
String tableName = "besure_server.d_" + deviceId;
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT * FROM ").append(tableName).append(" WHERE 1=1 ");
if (StringUtils.isNotBlank(startTime)) {
sqlBuilder.append(" AND ts >= '").append(startTime).append("' ");
}
if (StringUtils.isNotBlank(endTime)) {
sqlBuilder.append(" AND ts <= '").append(endTime).append("' ");
}
sqlBuilder.append(" ORDER BY ts DESC "); // 按时间倒序
sqlBuilder.append(" LIMIT ").append(limit); // 限制条数
String sql = sqlBuilder.toString();
try {
return jdbcTemplate.queryForList(sql);
} catch (EmptyResultDataAccessException e) {
return Collections.emptyList();
} catch (Exception e) {
log.error("查询设备 {} 最新 {} 条数据异常", deviceId, limit, e);
return Collections.emptyList();
}
}
} }

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.devicecontactmodel;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceModelAttributeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceModelAttributeDO;
import cn.iocoder.yudao.module.iot.dal.mysql.deviceattributetype.DeviceAttributeTypeMapper; import cn.iocoder.yudao.module.iot.dal.mysql.deviceattributetype.DeviceAttributeTypeMapper;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -34,6 +35,8 @@ public class DeviceContactModelServiceImpl implements DeviceContactModelService
private DeviceContactModelMapper deviceContactModelMapper; private DeviceContactModelMapper deviceContactModelMapper;
@Resource @Resource
private DeviceAttributeTypeMapper deviceAttributeTypeMapper; private DeviceAttributeTypeMapper deviceAttributeTypeMapper;
@Resource
private TDengineService tDengineService;
@Override @Override
public Long createDeviceContactModel(DeviceContactModelSaveReqVO createReqVO) { public Long createDeviceContactModel(DeviceContactModelSaveReqVO createReqVO) {
@ -53,6 +56,10 @@ public class DeviceContactModelServiceImpl implements DeviceContactModelService
DeviceContactModelDO deviceContactModel = BeanUtils.toBean(createReqVO, DeviceContactModelDO.class); DeviceContactModelDO deviceContactModel = BeanUtils.toBean(createReqVO, DeviceContactModelDO.class);
// deviceContactModel.setTypeName(deviceAttributeTypeMapper.selectById(createReqVO.getAttributeCode()).getName()); // deviceContactModel.setTypeName(deviceAttributeTypeMapper.selectById(createReqVO.getAttributeCode()).getName());
deviceContactModelMapper.insert(deviceContactModel); deviceContactModelMapper.insert(deviceContactModel);
//新增td数据库列
tDengineService.AddTDDatabaseColumn(createReqVO.getDeviceId(),deviceContactModel);
// 返回 // 返回
return deviceContactModel.getId(); return deviceContactModel.getId();
} }

@ -161,19 +161,21 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
} }
@Override @Override
public List<Map<String, Object>> operationAnalysisDetails(Long deviceId, Long modelId,String collectionStartTime, String collectionEndTime) { public List<Map<String, Object>> operationAnalysisDetails(Long deviceId, Long modelId,
if(deviceId == null){ String collectionStartTime, String collectionEndTime) {
if (deviceId == null) {
throw exception(DEVICE_ID_DOES_NOT_EXIST); throw exception(DEVICE_ID_DOES_NOT_EXIST);
} }
if(modelId == null){ if (modelId == null) {
throw exception(POINT_ID_MODEL_NOT_EXISTS); throw exception(POINT_ID_MODEL_NOT_EXISTS);
} }
List<Map<String, Object>> resultList = new ArrayList<>(); List<Map<String, Object>> resultList = new ArrayList<>();
try { try {
// 1. 获取最新10条设备数据列表 // 1. 获取最新10条设备数据列表直接从TDengine获取原始字段
List<Map<String, Object>> deviceDataList = tdengineService.getstDeviceDataOrderByTimeDesc(deviceId,collectionStartTime,collectionEndTime,10); List<Map<String, Object>> deviceDataList =
tdengineService.newSelectLatestData(deviceId, collectionStartTime, collectionEndTime, 10);
// 2. 获取属性类型映射 // 2. 获取属性类型映射
Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList() Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList()
@ -185,66 +187,42 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
// 3. 遍历每个时间点的数据 // 3. 遍历每个时间点的数据
for (Map<String, Object> deviceData : deviceDataList) { for (Map<String, Object> deviceData : deviceDataList) {
String queryDataJson = (String) deviceData.get("queryData"); Timestamp timestamp = (Timestamp) deviceData.get("ts"); // TDengine 时间列
Timestamp timestamp = (Timestamp) deviceData.get("timestamp"); if (timestamp == null) continue;
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
String formattedTime = sdf.format(timestamp); // 例如 "26-01-06 14:30:45"
if (StringUtils.isNotBlank(queryDataJson) && timestamp != null) {
List<Map<String, Object>> dataList = new ObjectMapper().readValue(
queryDataJson,
new TypeReference<List<Map<String, Object>>>() {}
);
// 按属性类型分组
Map<String, List<Map<String, Object>>> groupedData = new LinkedHashMap<>();
for (Map<String, Object> data : dataList) {
if (((Integer) data.get("id")).longValue() == modelId){
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
String formattedTime = sdf.format(timestamp);
String attributeTypeName = "其他"; Map<String, Object> timePointData = new LinkedHashMap<>();
String typeStr = (String) data.get("attributeType"); timePointData.put("collectTime", formattedTime);
if (typeStr != null) {
try {
attributeTypeName = typeStr;
} catch (Exception e) {
attributeTypeName = "未知";
}
}
Map<String, Object> simplifiedData = new HashMap<>();
simplifiedData.put("addressValue", data.get("addressValue"));
simplifiedData.put("attributeName", data.get("attributeName"));
groupedData
.computeIfAbsent(attributeTypeName, k -> new ArrayList<>())
.add(simplifiedData);
}
}
// 创建当前时间点的Map // 遍历所有字段(除 ts 之外)
Map<String, Object> timePointData = new LinkedHashMap<>(); Map<String, List<Map<String, Object>>> groupedData = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : deviceData.entrySet()) {
String key = entry.getKey();
if ("ts".equalsIgnoreCase(key)) continue; // 跳过时间字段
// 添加属性分组 Map<String, Object> simplifiedData = new HashMap<>();
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedData.entrySet()) { simplifiedData.put("attributeName", key);
timePointData.put(entry.getKey(), entry.getValue()); simplifiedData.put("addressValue", entry.getValue());
}
// 添加收集时间 // 根据 modelId 或 attributeType 分类,这里暂时按 "其他" 分组
timePointData.put("collectTime", formattedTime); groupedData.computeIfAbsent("其他", k -> new ArrayList<>()).add(simplifiedData);
}
resultList.add(timePointData); // 添加属性分组
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedData.entrySet()) {
timePointData.put(entry.getKey(), entry.getValue());
} }
resultList.add(timePointData);
} }
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage()); log.error("处理设备 {} 数据时发生异常", deviceId, e);
// e.printStackTrace();
// throw new RuntimeException("处理设备数据时发生异常", e);
return new ArrayList<>(); return new ArrayList<>();
} }
return resultList; return resultList;
} }

@ -146,4 +146,21 @@
where machine_id = #{deviceId} where machine_id = #{deviceId}
and deleted = 0 and deleted = 0
</select> </select>
<select id="selectWorkshopBatch" resultType="map">
SELECT deviceId, workshop
FROM (
SELECT dv_id AS deviceId,
workshop,
ROW_NUMBER() OVER (PARTITION BY dv_id ORDER BY id DESC) AS rn
FROM mes_device_ledger
WHERE deleted = 0
AND dv_id IN
<foreach collection="deviceIds" item="id" open="(" separator="," close=")">
#{id}
</foreach>
) t
WHERE rn = 1
</select>
</mapper> </mapper>

@ -56,7 +56,7 @@ spring:
tdengine: tdengine:
name: tdengine name: tdengine
url: jdbc:TAOS-RS://ngsk.tech:16042/besure?charset=UTF-8&locale=en_US.UTF-8 url: jdbc:TAOS-RS://ngsk.tech:16042/besure_server?charset=UTF-8&locale=en_US.UTF-8
username: root username: root
password: taosdata password: taosdata
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
@ -88,7 +88,7 @@ spring:
# Quartz 配置项,对应 QuartzProperties 配置类 # Quartz 配置项,对应 QuartzProperties 配置类
spring: spring:
quartz: quartz:
auto-startup: true # 本地开发环境,尽量不要开启 Job auto-startup: false # 本地开发环境,尽量不要开启 Job
scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName
job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
@ -266,10 +266,10 @@ justauth:
emqx: emqx:
is-enable: true # 是否启用 MQTT is-enable: true # 是否启用 MQTT
broker: tcp://192.168.5.119:1883 # EMQX 服务器地址TCP 协议) broker: tcp://47.106.185.127:1883 # EMQX 服务器地址TCP 协议)
client-id: mqtt-client-besure_server-dev # 客户端ID client-id: mqtt-client-besure_server-dev # 客户端ID
user-name: ngsk # 用户名 user-name: admin # 用户名
password: ngskcloud0809 # 密码 password: admin # 密码
clean-session: true # 是否清空 session clean-session: true # 是否清空 session
reconnect: true # 是否自动断线重连 reconnect: true # 是否自动断线重连
timeout: 30 # 连接超时时间(秒) timeout: 30 # 连接超时时间(秒)

@ -56,7 +56,7 @@ spring:
tdengine: tdengine:
name: tdengine name: tdengine
url: jdbc:TAOS-RS://192.168.21.2:6041/besure?charset=UTF-8&locale=en_US.UTF-8 url: jdbc:TAOS-RS://192.168.21.2:6041/besure_server?charset=UTF-8&locale=en_US.UTF-8
username: root username: root
password: taosdata password: taosdata
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
@ -77,11 +77,11 @@ spring:
# Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优 # Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优
redis: redis:
host: 192.168.21.2 # 地址 host: 47.106.185.127 # 地址
port: 6379 # 端口 port: 6379 # 端口
database: 0 # 数据库索引 database: 0 # 数据库索引
#password: bkcaydy8ydhZZnS2 # 密码,建议生产环境开启 #password: bkcaydy8ydhZZnS2 # 密码,建议生产环境开启
password: redis123 password: BstPwd258
--- #################### 定时任务相关配置 #################### --- #################### 定时任务相关配置 ####################
@ -265,10 +265,10 @@ justauth:
emqx: emqx:
is-enable: true # 是否启用 MQTT is-enable: true # 是否启用 MQTT
broker: tcp://192.168.21.2:1883 # EMQX 服务器地址TCP 协议) broker: tcp://47.106.185.127:1883 # EMQX 服务器地址TCP 协议)
client-id: mqtt-client-besure-server-prod # 客户端ID client-id: mqtt-client-besure_server-prod # 客户端ID
user-name: admin # 用户名 user-name: admin # 用户名
password: admin # 密码 password: admin # 密码
clean-session: true # 是否清空 session clean-session: true # 是否清空 session
reconnect: true # 是否自动断线重连 reconnect: true # 是否自动断线重连
timeout: 30 # 连接超时时间(秒) timeout: 30 # 连接超时时间(秒)

Loading…
Cancel
Save