Merge branch 'hhk' into main

plp
HuangHuiKang 1 month ago
commit fc2d3f4017

@ -6,6 +6,7 @@ import org.quartz.*;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.NOT_IMPLEMENTED;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception0;
import static org.quartz.Scheduler.DEFAULT_GROUP;
/**
* {@link org.quartz.Scheduler}
@ -41,13 +42,15 @@ public class SchedulerManager {
Integer retryCount, Integer retryInterval)
throws SchedulerException {
validateScheduler();
String jobKeyName = jobHandlerName+ "_"+ jobId;
JobKey jobKey = JobKey.jobKey(jobKeyName, DEFAULT_GROUP);
// 创建 JobDetail 对象
JobDetail jobDetail = JobBuilder.newJob(JobHandlerInvoker.class)
.usingJobData(JobDataKeyEnum.JOB_ID.name(), jobId)
.usingJobData(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName)
.withIdentity(jobHandlerName).build();
.withIdentity(jobKey).build();
// 创建 Trigger 对象
Trigger trigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval);
Trigger trigger = this.buildTrigger(jobKey, jobHandlerParam, cronExpression, retryCount, retryInterval);
// 新增 Job 调度
scheduler.scheduleJob(jobDetail, trigger);
}
@ -140,6 +143,18 @@ public class SchedulerManager {
.build();
}
private Trigger buildTrigger(JobKey jobKey, String jobHandlerParam, String cronExpression,
Integer retryCount, Integer retryInterval) {
return TriggerBuilder.newTrigger()
.withIdentity(jobKey.getName())
.forJob(jobKey)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.usingJobData(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam)
.usingJobData(JobDataKeyEnum.JOB_RETRY_COUNT.name(), retryCount)
.usingJobData(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), retryInterval)
.build();
}
private void validateScheduler() {
if (scheduler == null) {
throw exception0(NOT_IMPLEMENTED.getCode(),

@ -61,7 +61,7 @@ public class JobServiceImpl implements JobService {
jobMapper.insert(job);
// 3.1 添加 Job 到 Quartz 中
schedulerManager.addJob(job.getId(), job.getName(), job.getHandlerParam(), job.getCronExpression(),
schedulerManager.addJob(job.getId(), job.getHandlerName(), job.getHandlerParam(), job.getCronExpression(),
createReqVO.getRetryCount(), createReqVO.getRetryInterval());
// 3.2 更新 JobDO
JobDO updateObj = JobDO.builder().id(job.getId()).status(JobStatusEnum.STOP.getStatus()).build();
@ -135,7 +135,7 @@ public class JobServiceImpl implements JobService {
JobDO job = validateJobExists(id);
// 触发 Quartz 中的 Job
schedulerManager.triggerJob(job.getId(), job.getName(), job.getHandlerParam());
schedulerManager.triggerJob(job.getId(), job.getHandlerName(), job.getHandlerParam());
}
@Override

@ -176,13 +176,22 @@ public class DeviceController {
@GetMapping("/singleDevice")
@Operation(summary = "单设备查看")
// @PreAuthorize("@ss.hasPermission('iot:device:query')")
public CommonResult<Map<String, List<DeviceContactModelDO>> > singleDevice(@RequestParam("deviceId") Long deviceId) throws JsonProcessingException {
Map<String, List<DeviceContactModelDO>> deviceContactModelDO=deviceService.singleDevice(deviceId);
public CommonResult<Map<String, List<Map<String, Object>>>> singleDevice(@RequestParam("deviceId") Long deviceId) throws JsonProcessingException {
Map<String, List<Map<String, Object>>> deviceContactModelDO=deviceService.singleDevice(deviceId);
return success(deviceContactModelDO);
}
@GetMapping("/historyRecord")
@Operation(summary = "历史记录查询")
// @PreAuthorize("@ss.hasPermission('iot:device:query')")
public CommonResult<List<Map<String, Object>>> historyRecord(@RequestParam("deviceId") Long deviceId,
@RequestParam(name = "collectionStartTime", required = false) String collectionStartTime,
@RequestParam(name = "collectionEndTime", required = false) String collectionEndTime
) throws JsonProcessingException {
List<Map<String, Object>> deviceContactModelDO=deviceService.historyRecord(deviceId,collectionStartTime,collectionEndTime);
return success(deviceContactModelDO);
}

@ -31,4 +31,10 @@ public class LineDeviceRequestVO extends PageParam {
@Schema(description = "采集时间")
private LocalDateTime collectionTime;
@Schema(description = "开始采集时间")
private String collectionStartTime;
@Schema(description = "结束采集时间")
private String collectionEndTime;
}

@ -0,0 +1,19 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Schema(description = "管理后台 - 单设备监控 Resp VO")
@Data
public class SingleDeviceRespVO {
@Schema(description = "点位名称", example = "26404")
private String pointName;
@Schema(description = "点位值", example = "26404")
private String pointValue;
@Schema(description = "采集时间")
private String collectTime;
}

@ -15,6 +15,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceAttributeDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceModelAttributeDO;
import cn.iocoder.yudao.module.iot.dal.devicecontactmodel.DeviceContactModelDO;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.web.bind.annotation.RequestParam;
import javax.validation.Valid;
import java.util.Collection;
@ -124,5 +125,7 @@ public interface DeviceService {
PageResult<LineDeviceRespVO> lineDevicePage(LineDeviceRequestVO pageReqVO);
Map<String, List<DeviceContactModelDO>> singleDevice(Long deviceId) throws JsonProcessingException;
Map<String, List<Map<String, Object>>> singleDevice(Long deviceId) throws JsonProcessingException;
List<Map<String, Object>> historyRecord(Long deviceId,String collectionStartTime, String collectionEndTime);
}

@ -393,11 +393,6 @@ public class DeviceServiceImpl implements DeviceService {
}
}else {
throw exception(OPC_CONNECT_FAILURE_DOES_NOT_EXIST);
}
@ -502,93 +497,139 @@ public class DeviceServiceImpl implements DeviceService {
return lineDeviceRespVOPageResult;
}
@Override
public Map<String, List<DeviceContactModelDO>> singleDevice(Long deviceId) throws JsonProcessingException {
public Map<String, List<Map<String, Object>>> singleDevice(Long deviceId) throws JsonProcessingException {
List<DeviceContactModelDO> resultList = new ArrayList<>();
Map<String, List<Map<String, Object>>> resultMap = new LinkedHashMap<>();
try {
// 获取设备数据列表
List<Map<String, Object>> deviceDataList = tdengineService.getAllDeviceDataOrderByTimeDesc(deviceId);
// 1. 获取设备数据列表
List<Map<String, Object>> deviceDataList = tdengineService.getNewestDeviceDataOrderByTimeDesc(deviceId);
// 2. 获取属性类型映射
Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList()
.stream()
.collect(Collectors.toMap(
DeviceAttributeTypeDO::getId,
DeviceAttributeTypeDO::getName
));
// 3. 遍历并处理
for (Map<String, Object> deviceData : deviceDataList) {
String queryDataJson = (String) deviceData.get("queryData");
Timestamp timestamp = (Timestamp) deviceData.get("timestamp");
if (queryDataJson != null && !queryDataJson.isEmpty()) {
ObjectMapper objectMapper = new ObjectMapper();
// 简化配置,只注册基础模块
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
// 忽略未知属性,避免因缺少字段而报错
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 解析JSON数组为对象列表
List<DeviceContactModelDO> models = objectMapper.readValue(
if (StringUtils.isNotBlank(queryDataJson)) {
// 使用TypeReference解析为List<Map>而不是具体的DO对象
List<Map<String, Object>> dataList = new ObjectMapper().readValue(
queryDataJson,
new TypeReference<List<DeviceContactModelDO>>() {}
new TypeReference<List<Map<String, Object>>>() {}
);
// 可以为每个对象设置时间戳(如果需要)
for (DeviceContactModelDO model : models) {
// 设置查询时间戳
model.setLatestCollectionTime(String.valueOf(timestamp));
resultList.add(model);
for (Map<String, Object> data : dataList) {
// 获取属性类型名称
String attributeTypeName = "其他";
String typeStr = (String) data.get("attributeType");
if (typeStr != null) {
try {
attributeTypeName = typeStr;
} catch (Exception e) {
attributeTypeName = "未知";
}
}
// 提取需要的字段
Map<String, Object> simplifiedData = new HashMap<>();
simplifiedData.put("addressValue", data.get("addressValue"));
simplifiedData.put("attributeName", data.get("attributeName"));
resultMap
.computeIfAbsent(attributeTypeName, k -> new ArrayList<>())
.add(simplifiedData);
}
}
}
} catch (Exception e) {
System.out.println("处理设备数据时发生异常: " + e.getMessage());
}
return resultMap;
}
@Override
public List<Map<String, Object>> historyRecord(Long deviceId,String collectionStartTime, String collectionEndTime) {
List<DeviceAttributeTypeDO> deviceAttributeTypeDOS = deviceAttributeTypeMapper.selectList();
// 最基本的转换方式
Map<Long, String> idToNameMap = deviceAttributeTypeDOS.stream()
.collect(Collectors.toMap(DeviceAttributeTypeDO::getId, DeviceAttributeTypeDO::getName));
List<Map<String, Object>> resultList = new ArrayList<>();
try {
// 1. 获取设备数据列表
List<Map<String, Object>> deviceDataList = tdengineService.getstDeviceDataOrderByTimeDesc(deviceId,collectionStartTime,collectionEndTime);
// 2. 获取属性类型映射
Map<Long, String> idToNameMap = deviceAttributeTypeMapper.selectList()
.stream()
.collect(Collectors.toMap(
DeviceAttributeTypeDO::getId,
DeviceAttributeTypeDO::getName
));
// 3. 遍历每个时间点的数据
for (Map<String, Object> deviceData : deviceDataList) {
String queryDataJson = (String) deviceData.get("queryData");
Timestamp timestamp = (Timestamp) deviceData.get("timestamp");
// 分组并排序
Map<String, List<DeviceContactModelDO>> groupedAndSorted = resultList.stream()
.collect(Collectors.groupingBy(
// 处理attributeType为null的情况设为"其他"
item -> {
String typeStr = item.getAttributeType();
if (typeStr == null) {
return "其他";
}
if (StringUtils.isNotBlank(queryDataJson) && timestamp != null) {
List<Map<String, Object>> dataList = new ObjectMapper().readValue(
queryDataJson,
new TypeReference<List<Map<String, Object>>>() {}
);
// 按属性类型分组
Map<String, List<Map<String, Object>>> groupedData = new LinkedHashMap<>();
for (Map<String, Object> data : dataList) {
String attributeTypeName = "其他";
String typeStr = (String) data.get("attributeType");
if (typeStr != null) {
try {
// 关键步骤:将 String 转换为 Long
Long typeLong = Long.valueOf(typeStr);
String name = idToNameMap.get(typeLong);
return (name == null) ? "未知" : name;
} catch (NumberFormatException e) {
// 如果字符串不能转换为Long则归类为"未知"
return "未知";
attributeTypeName = typeStr;
} catch (Exception e) {
attributeTypeName = "未知";
}
}, // 使用LinkedHashMap保持分组顺序可选
LinkedHashMap::new,
// 对每个分组内的元素按latestCollectionTime倒序排序
Collectors.collectingAndThen(
Collectors.toList(),
list -> list.stream()
.sorted(Comparator.comparing(
DeviceContactModelDO::getLatestCollectionTime,
Comparator.nullsLast(Comparator.reverseOrder()) // 处理latestCollectionTime为null的情况
))
.collect(Collectors.toList())
)
));
return groupedAndSorted;
}
Map<String, Object> simplifiedData = new HashMap<>();
simplifiedData.put("addressValue", data.get("addressValue"));
simplifiedData.put("attributeName", data.get("attributeName"));
groupedData
.computeIfAbsent(attributeTypeName, k -> new ArrayList<>())
.add(simplifiedData);
}
// 创建当前时间点的Map
Map<String, Object> timePointData = new LinkedHashMap<>();
// 添加属性分组
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedData.entrySet()) {
timePointData.put(entry.getKey(), entry.getValue());
}
// 添加收集时间
timePointData.put("collectTime", timestamp.toString());
resultList.add(timePointData);
}
}
} catch (Exception e) {
System.out.println("处理设备数据时发生异常: " + e.getMessage());
}
return resultList;
}
private void validateDeviceAttributeExists(Long id) {

@ -1,19 +1,25 @@
package cn.iocoder.yudao.module.iot.service.device;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taosdata.jdbc.utils.BlobUtil;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Service
public class TDengineService {
@ -40,7 +46,7 @@ public class TDengineService {
// 3. 创建超级表
String createSuperTableSQL = "CREATE STABLE IF NOT EXISTS device_data (" +
"ts TIMESTAMP, " +
"query_data NCHAR(2048)" +
"query_data BLOB" +
") TAGS (device_id BIGINT)";
jdbcTemplate.execute(createSuperTableSQL);
@ -82,8 +88,44 @@ public class TDengineService {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timestampStr = sdf.format(ts);
result.put("timestamp", timestampStr);
String queryData = rs.getString("query_data");
result.put("queryData", queryData);
byte[] blob = rs.getBytes("query_data");
if (blob != null) {
String jsonStr = new String(blob, StandardCharsets.UTF_8);
try {
// 1. 先去除外层的双引号(如果存在)
String trimmed = jsonStr.trim();
if (trimmed.startsWith("\"") && trimmed.endsWith("\"")) {
trimmed = trimmed.substring(1, trimmed.length() - 1);
}
// 2. 检查是否是十六进制字符串
if (isHexString(trimmed)) {
// 3. 十六进制解码
String decodedJson = hexToString(trimmed);
result.put("queryData", decodedJson);
} else {
// 如果不是十六进制,尝试直接解析
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> queryData = objectMapper.readValue(
trimmed,
new TypeReference<List<Map<String, Object>>>() {}
);
result.put("queryData", queryData);
}
} catch (Exception e) {
System.err.println("解析JSON失败: " + e.getMessage());
result.put("queryData", new ArrayList<>());
result.put("rawData", jsonStr);
}
} else {
result.put("queryData", new ArrayList<>());
}
result.put("deviceId", id);
result.put("tableName", tableName);
return result;
@ -112,12 +154,11 @@ public class TDengineService {
/**
*
* @param id ID
* @param queryData
* @param timestamp
* @return
*/
@DS("tdengine")
public boolean insertDeviceData(Long id, String queryData, Timestamp timestamp) {
public boolean insertDeviceData(Long id, String jsonString, Timestamp timestamp) {
try {
// 确保使用正确的数据库
jdbcTemplate.execute("USE besure");
@ -125,9 +166,12 @@ public class TDengineService {
String tableName = "d_" + id;
String sql = "INSERT INTO " + tableName + " (ts, query_data) VALUES (?, ?)";
int affectedRows = jdbcTemplate.update(sql, timestamp, queryData);
System.out.println("向设备" + id + "插入数据成功,时间戳: " + timestamp);
return affectedRows > 0;
return jdbcTemplate.update(sql, ps -> {
ps.setTimestamp(1, timestamp);
// JSON字符串转byte数组
byte[] blobData = jsonString.getBytes(StandardCharsets.UTF_8);
ps.setBytes(2, blobData);
}) > 0;
} catch (Exception e) {
System.out.println("向设备" + id + "插入数据时发生异常: " + e.getMessage());
return false;
@ -140,24 +184,202 @@ public class TDengineService {
* @return
*/
@DS("tdengine")
public List<Map<String, Object>> getAllDeviceDataOrderByTimeDesc(Long id) {
public List<Map<String, Object>> getNewestDeviceDataOrderByTimeDesc(Long id) {
String tableName = "d_" + id;
String sql = "SELECT ts, query_data FROM besure." + tableName + " ORDER BY ts DESC";
String sql = "SELECT ts, query_data FROM besure." + tableName + " ORDER BY ts DESC LIMIT 1";
try {
return jdbcTemplate.query(sql, new RowMapper<Map<String, Object>>() {
return Collections.singletonList(jdbcTemplate.queryForObject(sql, new RowMapper<Map<String, Object>>() {
@Override
public Map<String, Object> mapRow(ResultSet rs, int rowNum) throws SQLException {
Map<String, Object> result = new HashMap<>();
result.put("timestamp", rs.getTimestamp("ts"));
result.put("queryData", rs.getString("query_data"));
result.put("deviceId", id);
byte[] blob = rs.getBytes("query_data");
if (blob != null) {
String jsonStr = new String(blob, StandardCharsets.UTF_8);
try {
// 1. 先去除外层的双引号(如果存在)
String trimmed = jsonStr.trim();
if (trimmed.startsWith("\"") && trimmed.endsWith("\"")) {
trimmed = trimmed.substring(1, trimmed.length() - 1);
}
// 2. 检查是否是十六进制字符串
if (isHexString(trimmed)) {
// 3. 十六进制解码
String decodedJson = hexToString(trimmed);
result.put("queryData", decodedJson);
} else {
// 如果不是十六进制,尝试直接解析
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> queryData = objectMapper.readValue(
trimmed,
new TypeReference<List<Map<String, Object>>>() {}
);
result.put("queryData", queryData);
}
} catch (Exception e) {
System.err.println("解析JSON失败: " + e.getMessage());
result.put("queryData", new ArrayList<>());
result.put("rawData", jsonStr);
}
} else {
result.put("queryData", new ArrayList<>());
}
return result;
}
});
}));
} catch (EmptyResultDataAccessException e) {
return Collections.singletonList(createEmptyResult(id));
} catch (Exception e) {
System.out.println("查询设备" + id + "的全部数据时发生异常: " + e.getMessage());
System.err.println("查询设备" + id + "的最新数据时发生异常: " + e.getMessage());
e.printStackTrace();
return Collections.singletonList(createEmptyResult(id));
}
}
// 检查是否是十六进制字符串
private boolean isHexString(String str) {
if (str == null || str.trim().isEmpty()) {
return false;
}
String s = str.trim();
// 检查是否只包含十六进制字符0-9, a-f, A-F
return s.matches("^[0-9a-fA-F]+$");
}
// 十六进制解码
private String hexToString(String hex) {
try {
// 使用 Apache Commons Codec
byte[] bytes = Hex.decodeHex(hex);
return new String(bytes, StandardCharsets.UTF_8);
} catch (DecoderException e) {
throw new RuntimeException("十六进制解码失败: " + e.getMessage(), e);
}
}
private List<Map<String, Object>> parseJsonData(byte[] blob) {
if (blob == null || blob.length == 0) {
return new ArrayList<>();
}
try {
String jsonStr = new String(blob, StandardCharsets.UTF_8);
// 检查是否是有效的JSON
if (jsonStr.trim().startsWith("[") && jsonStr.trim().endsWith("]")) {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(jsonStr,
new TypeReference<List<Map<String, Object>>>() {});
} else {
// 可能是字符串化的JSON尝试去除引号
jsonStr = jsonStr.trim();
if (jsonStr.startsWith("\"") && jsonStr.endsWith("\"")) {
jsonStr = jsonStr.substring(1, jsonStr.length() - 1);
return new ObjectMapper().readValue(jsonStr,
new TypeReference<List<Map<String, Object>>>() {});
}
}
} catch (Exception e) {
System.err.println("解析JSON数据失败: " + e.getMessage());
}
return new ArrayList<>();
}
private Map<String, Object> createEmptyResult(Long deviceId) {
Map<String, Object> result = new HashMap<>();
result.put("deviceId", deviceId);
result.put("queryData", new ArrayList<>());
result.put("timestamp", null);
return result;
}
/**
*
* @param id ID
* @return
*/
@DS("tdengine")
public List<Map<String, Object>> getstDeviceDataOrderByTimeDesc(Long id,String collectionStartTime, String collectionEndTime) {
String tableName = "d_" + id;
StringBuilder sqlBuilder = new StringBuilder();
List<Object> params = new ArrayList<>();
sqlBuilder.append("SELECT ts, query_data FROM besure.").append(tableName).append(" WHERE 1=1");
if (collectionStartTime != null) {
// 直接将时间字符串拼接到SQL中
sqlBuilder.append(" AND ts >= '").append(collectionStartTime).append("'");
}
if (collectionEndTime != null) {
sqlBuilder.append(" AND ts <= '").append(collectionEndTime).append("'");
}
sqlBuilder.append(" ORDER BY ts DESC");
try {
return jdbcTemplate.query(sqlBuilder.toString(), new RowMapper<Map<String, Object>>() {
@Override
public Map<String, Object> mapRow(ResultSet rs, int rowNum) throws SQLException {
Map<String, Object> result = new HashMap<>();
result.put("timestamp", rs.getTimestamp("ts"));
result.put("deviceId", id);
byte[] blob = rs.getBytes("query_data");
if (blob != null) {
String jsonStr = new String(blob, StandardCharsets.UTF_8);
try {
// 1. 先去除外层的双引号(如果存在)
String trimmed = jsonStr.trim();
if (trimmed.startsWith("\"") && trimmed.endsWith("\"")) {
trimmed = trimmed.substring(1, trimmed.length() - 1);
}
// 2. 检查是否是十六进制字符串
if (isHexString(trimmed)) {
// 3. 十六进制解码
String decodedJson = hexToString(trimmed);
result.put("queryData", decodedJson);
} else {
// 如果不是十六进制,尝试直接解析
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> queryData = objectMapper.readValue(
trimmed,
new TypeReference<List<Map<String, Object>>>() {}
);
result.put("queryData", queryData);
}
} catch (Exception e) {
System.err.println("解析JSON失败: " + e.getMessage());
result.put("queryData", new ArrayList<>());
result.put("rawData", jsonStr);
}
} else {
result.put("queryData", new ArrayList<>());
}
return result;
}
});
} catch (EmptyResultDataAccessException e) {
return Collections.singletonList(createEmptyResult(id));
} catch (Exception e) {
System.err.println("查询设备" + id + "的最新数据时发生异常: " + e.getMessage());
e.printStackTrace();
return Collections.singletonList(createEmptyResult(id));
}
}
}
Loading…
Cancel
Save