@ -14,6 +14,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.Device
import cn.iocoder.yudao.module.iot.controller.admin.devicemodelattribute.vo.DeviceModelAttributePageReqVO ;
import cn.iocoder.yudao.module.iot.controller.admin.mqttdatarecord.vo.MqttDataRecordPageReqVO ;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO ;
import cn.iocoder.yudao.module.iot.dal.dataobject.deviceattributetype.DeviceAttributeTypeDO ;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodel.DeviceModelDO ;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceModelAttributeDO ;
import cn.iocoder.yudao.module.iot.dal.dataobject.mqttdatarecord.MqttDataRecordDO ;
@ -28,10 +29,18 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO ;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper ;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceAttributeMapper ;
import com.alibaba.fastjson.JSON ;
import com.baomidou.dynamic.datasource.annotation.DS ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.core.type.TypeReference ;
import com.fasterxml.jackson.databind.DeserializationFeature ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.fasterxml.jackson.databind.SerializationFeature ;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule ;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer ;
import org.apache.commons.lang3.StringUtils ;
import org.springframework.beans.factory.annotation.Qualifier ;
import org.springframework.jdbc.core.JdbcTemplate ;
@ -40,8 +49,12 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated ;
import javax.annotation.Resource ;
import java.sql.Timestamp ;
import java.text.SimpleDateFormat ;
import java.time.LocalDateTime ;
import java.time.format.DateTimeFormatter ;
import java.util.* ;
import java.util.stream.Collectors ;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception ;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.* ;
@ -79,9 +92,12 @@ public class DeviceServiceImpl implements DeviceService {
@Resource
private TDengineService tdengineService ;
@Resource
private DeviceAttributeTypeMapper deviceAttributeTypeMapper ;
@Override
@Transactional ( rollbackFor = Exception . class )
public Long createDevice ( DeviceSaveReqVO createReqVO ) {
public DeviceDO createDevice ( DeviceSaveReqVO createReqVO ) {
if ( StringUtils . isNotBlank ( createReqVO . getReadTopic ( ) ) ) {
DeviceDO temp = deviceMapper . selectByTopic ( createReqVO . getReadTopic ( ) ) ;
if ( temp ! = null ) {
@ -94,6 +110,8 @@ public class DeviceServiceImpl implements DeviceService {
// 插入
DeviceDO device = BeanUtils . toBean ( createReqVO , DeviceDO . class ) ;
device . setProtocol ( deviceModelDO ! = null ? deviceModelDO . getProtocol ( ) : "" ) ;
//租户ID
device . setTenantId ( "1" ) ;
deviceMapper . insert ( device ) ;
LambdaQueryWrapper < DeviceModelAttributeDO > lambdaQueryWrapper = new LambdaQueryWrapper < > ( ) ;
@ -115,55 +133,8 @@ public class DeviceServiceImpl implements DeviceService {
//创建时序数据库
// createTDengine(device.getId());
tdengineService . initDatabaseAndTable ( device . getId ( ) ) ;
// 返回
return device . getId ( ) ;
}
@DS ( "tdengine" )
public void createTDengine ( Long id ) {
try {
// 测试TDengine连接
String testSQL = "SELECT 1" ;
tdengineJdbcTemplate . queryForObject ( testSQL , Integer . class ) ;
System . out . println ( "TDengine连接正常" ) ;
} catch ( Exception e ) {
throw new RuntimeException ( "无法连接到TDengine, 请检查数据源配置" , e ) ;
}
try {
// 创建数据库 - 使用更兼容的语法[1,6](@ref)
String createDbSQL = "CREATE DATABASE IF NOT EXISTS besure KEEP 365 DURATION 30" ;
tdengineJdbcTemplate . execute ( createDbSQL ) ;
// 使用数据库
tdengineJdbcTemplate . execute ( "USE besure" ) ;
// 创建超级表
String createSuperTableSQL = "CREATE STABLE IF NOT EXISTS device_data (" +
"ts TIMESTAMP, " +
"query_data NCHAR(2048)" +
") TAGS (device_id BIGINT)" ;
tdengineJdbcTemplate . execute ( createSuperTableSQL ) ;
// 创建子表
String tableName = "d_" + id ;
String createTableSql = String . format (
"CREATE TABLE IF NOT EXISTS %s USING device_data TAGS(%d)" ,
tableName , id ) ;
tdengineJdbcTemplate . execute ( createTableSql ) ;
System . out . println ( "TDengine表创建成功: " + tableName ) ;
} catch ( Exception e ) {
System . err . println ( "TDengine操作失败: " + e . getMessage ( ) ) ;
e . printStackTrace ( ) ;
throw exception ( CREATE_TDENGINE_FAILURE ) ;
}
return device ;
}
//@Scheduled(cron="0/5 * * * * ? ") //每1秒执行一次
@ -260,11 +231,108 @@ public class DeviceServiceImpl implements DeviceService {
deviceModelAttributePageReqVO . setDeviceId ( device . getId ( ) ) ;
// 判断设备模型ID是否有效
PageResult < DeviceContactModelDO > deviceModelAttributeDOPageResult = deviceContactModelMapper . selectPageById ( pageReqVO , deviceModelAttributePageReqVO ) ;
Map < Long , Map < String , Object > > deviceDataMap = createDeviceDataMap ( device . getId ( ) ) ;
// 合并数据:将 deviceDataMap 的值赋给分页结果中的对应记录
List < DeviceContactModelDO > records = deviceModelAttributeDOPageResult . getList ( ) ;
for ( DeviceContactModelDO record : records ) {
Map < String , Object > data = deviceDataMap . get ( record . getId ( ) ) ;
if ( data ! = null ) {
record . setAddressValue ( data . get ( "addressValue" ) ) ; // 设置 addressValue
record . setLatestCollectionTime ( ( String ) data . get ( "timestamp" ) ) ; // 设置 latestCollectionTime
}
}
return deviceModelAttributeDOPageResult ;
}
public Map < Long , Map < String , Object > > createDeviceDataMap ( Long deviceId ) {
// 创建结果Map: 键为数据记录ID (Long),值为该条记录的详细信息 (Map<String, Object>)
Map < Long , Map < String , Object > > resultMap = new HashMap < > ( ) ;
// 1. 从TDengine获取设备的最新数据记录
Map < String , Object > latestDeviceData = tdengineService . getLatestDeviceData ( deviceId ) ;
if ( latestDeviceData = = null ) {
return resultMap ; // 如果没有数据, 返回空Map
}
try {
// 2. 解析queryData字段中的JSON数组, 它包含多条数据记录
String queryDataJson = ( String ) latestDeviceData . get ( "queryData" ) ;
if ( queryDataJson ! = null & & ! queryDataJson . isEmpty ( ) ) {
List < DeviceContactModelDO > dataRecords = JSON . parseArray ( queryDataJson , DeviceContactModelDO . class ) ;
Timestamp ts = null ;
String formattedTime = null ;
Object timestampObj = latestDeviceData . get ( "timestamp" ) ;
if ( timestampObj ! = null ) {
if ( timestampObj instanceof Timestamp ) {
// 如果已经是Timestamp类型, 直接转换
ts = ( Timestamp ) timestampObj ;
} else if ( timestampObj instanceof String ) {
// 如果是String类型, 需要解析
String timestampStr = ( String ) timestampObj ;
try {
// 假设字符串是时间戳格式( 如: 2023-10-01 10:20:30)
ts = Timestamp . valueOf ( timestampStr ) ;
} catch ( IllegalArgumentException e ) {
// 如果格式不正确,尝试其他解析方式
System . err . println ( "时间戳格式不正确: " + timestampStr ) ;
// 可以设置默认值或使用当前时间
ts = new Timestamp ( System . currentTimeMillis ( ) ) ;
}
}
if ( ts ! = null ) {
SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ;
formattedTime = sdf . format ( ts ) ;
}
}
// 4. 遍历每一条数据记录
for ( DeviceContactModelDO record : dataRecords ) {
// 创建用于存储单条记录详细信息的Map
Map < String , Object > recordInfoMap = new HashMap < > ( ) ;
// 4.1 放入记录的基础信息
recordInfoMap . put ( "deviceId" , deviceId ) ; // 设备ID
recordInfoMap . put ( "timestamp" , formattedTime ) ; // 时间戳
recordInfoMap . put ( "tableName" , "d_" + deviceId ) ; // 源表名
// 4.2 放入从JSON记录中解析出的具体数据
recordInfoMap . put ( "addressValue" , record . getAddressValue ( ) ) ;
// 可以根据需要放入其他字段,例如:
// recordInfoMap.put("address", record.getAddress());
// recordInfoMap.put("name", record.getName());
// 5. 关键步骤: 以数据记录自身的ID为键, 将其详细信息放入结果Map
// 这里假设 DeviceContactModelDO 有一个唯一标识的id字段
Long recordId = record . getId ( ) ;
if ( recordId ! = null ) {
resultMap . put ( recordId , recordInfoMap ) ;
} else {
// 如果记录没有ID, 可以记录日志或使用其他策略( 如生成临时ID) , 这里简单跳过
System . err . println ( "警告: 发现一条数据记录缺少ID, 已跳过。" ) ;
}
}
}
} catch ( Exception e ) {
// 异常处理
System . err . println ( "处理设备" + deviceId + "的数据时发生异常: " + e . getMessage ( ) ) ;
// 可以选择在异常时返回空Map, 或包含错误信息的特殊Map, 根据业务需求决定
}
return resultMap ;
}
@Override
public Long createDeviceAttribute ( DeviceAttributeDO deviceAttribute ) {
deviceAttributeMapper . insert ( deviceAttribute ) ;
@ -303,6 +371,33 @@ public class DeviceServiceImpl implements DeviceService {
if ( connected ) {
deviceDO . setStatus ( String . valueOf ( DeviceConnectionStatusEnum . CONNECTED . getStatus ( ) ) ) ;
deviceMapper . updateById ( deviceDO ) ;
//查询存储
LambdaQueryWrapper < DeviceContactModelDO > deviceModelAttributeLambdaQueryWrapper = new LambdaQueryWrapper < > ( ) ;
deviceModelAttributeLambdaQueryWrapper . eq ( DeviceContactModelDO : : getDeviceId , createReqVO . getId ( ) ) ;
List < DeviceContactModelDO > deviceContactModelDOS = deviceContactModelMapper . selectList ( deviceModelAttributeLambdaQueryWrapper ) ;
//连接后查询5次保存到数据库
for ( int i = 0 ; i < 3 ; i + + ) {
if ( deviceContactModelDOS ! = null & & deviceContactModelDOS . size ( ) > 0 ) {
for ( DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS ) {
Object addressValue = OpcUtils . readValue ( deviceContactModelDO . getAddress ( ) ! = null ? deviceContactModelDO . getAddress ( ) : "" ) ;
deviceContactModelDO . setAddressValue ( addressValue ) ;
}
String json = JSON . toJSONString ( deviceContactModelDOS ) ;
tdengineService . insertDeviceData ( createReqVO . getId ( ) , json ) ;
}
}
} else {
throw exception ( OPC_CONNECT_FAILURE_DOES_NOT_EXIST ) ;
}
@ -318,6 +413,15 @@ public class DeviceServiceImpl implements DeviceService {
throw exception ( OPC_PARAMETER_DOES_NOT_EXIST ) ;
}
return Boolean . TRUE ;
}
@ -384,9 +488,15 @@ public class DeviceServiceImpl implements DeviceService {
Page < LineDeviceRespVO > page = new Page < > ( pageReqVO . getPageNo ( ) , pageReqVO . getPageSize ( ) ) ;
IPage < LineDeviceRespVO > lineDeviceRespVO = deviceMapper . lineDevicePage ( page , pageReqVO ) ;
List < LineDeviceRespVO > records = lineDeviceRespVO . getRecords ( ) ;
for ( LineDeviceRespVO record : records ) {
Map < String , Object > latestDeviceData = tdengineService . getLatestDeviceData ( record . getDeviceId ( ) ) ;
if ( latestDeviceData ! = null ) {
record . setCollectionTime ( ( String ) latestDeviceData . get ( "timestamp" ) ) ;
}
}
PageResult < LineDeviceRespVO > lineDeviceRespVOPageResult = new PageResult < > ( lineDeviceRespVO . getRecords ( ) , lineDeviceRespVO . getTotal ( ) ) ;
@ -395,6 +505,92 @@ public class DeviceServiceImpl implements DeviceService {
}
@Override
public Map < String , List < DeviceContactModelDO > > singleDevice ( Long deviceId ) throws JsonProcessingException {
List < DeviceContactModelDO > resultList = new ArrayList < > ( ) ;
try {
// 获取设备数据列表
List < Map < String , Object > > deviceDataList = tdengineService . getAllDeviceDataOrderByTimeDesc ( deviceId ) ;
for ( Map < String , Object > deviceData : deviceDataList ) {
String queryDataJson = ( String ) deviceData . get ( "queryData" ) ;
Timestamp timestamp = ( Timestamp ) deviceData . get ( "timestamp" ) ;
if ( queryDataJson ! = null & & ! queryDataJson . isEmpty ( ) ) {
ObjectMapper objectMapper = new ObjectMapper ( ) ;
// 简化配置,只注册基础模块
objectMapper . registerModule ( new JavaTimeModule ( ) ) ;
objectMapper . disable ( SerializationFeature . WRITE_DATES_AS_TIMESTAMPS ) ;
// 忽略未知属性,避免因缺少字段而报错
objectMapper . configure ( DeserializationFeature . FAIL_ON_UNKNOWN_PROPERTIES , false ) ;
// 解析JSON数组为对象列表
List < DeviceContactModelDO > models = objectMapper . readValue (
queryDataJson ,
new TypeReference < List < DeviceContactModelDO > > ( ) { }
) ;
// 可以为每个对象设置时间戳(如果需要)
for ( DeviceContactModelDO model : models ) {
// 设置查询时间戳
model . setLatestCollectionTime ( String . valueOf ( timestamp ) ) ;
resultList . add ( model ) ;
}
}
}
} catch ( Exception e ) {
System . out . println ( "处理设备数据时发生异常: " + e . getMessage ( ) ) ;
}
List < DeviceAttributeTypeDO > deviceAttributeTypeDOS = deviceAttributeTypeMapper . selectList ( ) ;
// 最基本的转换方式
Map < Long , String > idToNameMap = deviceAttributeTypeDOS . stream ( )
. collect ( Collectors . toMap ( DeviceAttributeTypeDO : : getId , DeviceAttributeTypeDO : : getName ) ) ;
// 分组并排序
Map < String , List < DeviceContactModelDO > > groupedAndSorted = resultList . stream ( )
. collect ( Collectors . groupingBy (
// 处理attributeType为null的情况, 设为"其他"
item - > {
String typeStr = item . getAttributeType ( ) ;
if ( typeStr = = null ) {
return "其他" ;
}
try {
// 关键步骤:将 String 转换为 Long
Long typeLong = Long . valueOf ( typeStr ) ;
String name = idToNameMap . get ( typeLong ) ;
return ( name = = null ) ? "未知" : name ;
} catch ( NumberFormatException e ) {
// 如果字符串不能转换为Long, 则归类为"未知"
return "未知" ;
}
} , // 使用LinkedHashMap保持分组顺序( 可选)
LinkedHashMap : : new ,
// 对每个分组内的元素按latestCollectionTime倒序排序
Collectors . collectingAndThen (
Collectors . toList ( ) ,
list - > list . stream ( )
. sorted ( Comparator . comparing (
DeviceContactModelDO : : getLatestCollectionTime ,
Comparator . nullsLast ( Comparator . reverseOrder ( ) ) // 处理latestCollectionTime为null的情况
) )
. collect ( Collectors . toList ( ) )
)
) ) ;
return groupedAndSorted ;
}
private void validateDeviceAttributeExists ( Long id ) {
if ( deviceAttributeMapper . selectById ( id ) = = null ) {
throw exception ( DEVICE_ATTRIBUTE_NOT_EXISTS ) ;