|
|
|
@ -1,10 +1,10 @@
|
|
|
|
package cn.iocoder.yudao.module.iot.service.device;
|
|
|
|
package cn.iocoder.yudao.module.iot.service.device;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cn.iocoder.yudao.framework.common.pojo.DeviceEdgeData;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
|
|
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.taosdata.jdbc.utils.BlobUtil;
|
|
|
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import org.apache.commons.codec.DecoderException;
|
|
|
|
import org.apache.commons.codec.DecoderException;
|
|
|
|
import org.apache.commons.codec.binary.Hex;
|
|
|
|
import org.apache.commons.codec.binary.Hex;
|
|
|
|
@ -15,11 +15,11 @@ import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
import java.sql.Blob;
|
|
|
|
|
|
|
|
import java.sql.ResultSet;
|
|
|
|
import java.sql.ResultSet;
|
|
|
|
import java.sql.SQLException;
|
|
|
|
import java.sql.SQLException;
|
|
|
|
import java.sql.Timestamp;
|
|
|
|
import java.sql.Timestamp;
|
|
|
|
import java.text.SimpleDateFormat;
|
|
|
|
import java.text.SimpleDateFormat;
|
|
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.*;
|
|
|
|
|
|
|
|
|
|
|
|
@Service
|
|
|
|
@Service
|
|
|
|
@ -387,4 +387,145 @@ public class TDengineService {
|
|
|
|
return Collections.singletonList(createEmptyResult(id));
|
|
|
|
return Collections.singletonList(createEmptyResult(id));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 从 TDengine 中查询设备边缘数据
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* 说明:
|
|
|
|
|
|
|
|
* 1. TDengine 中每个设备对应一张子表,表名规则:d_{deviceId}
|
|
|
|
|
|
|
|
* 2. 根据时间范围查询
|
|
|
|
|
|
|
|
* 3. 根据 latest 参数决定取最新一条还是最早一条
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* @param deviceId 设备ID,同时也是子表后缀
|
|
|
|
|
|
|
|
* @param startTime 起始时间(可以为 null)
|
|
|
|
|
|
|
|
* @param endTime 结束时间(可以为 null)
|
|
|
|
|
|
|
|
* @param latest true 表示取最新一条,false 表示取最早一条
|
|
|
|
|
|
|
|
* @return 查询结果,包含 timestamp / deviceId / queryData
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
@DS("tdengine")
|
|
|
|
|
|
|
|
public Map<String, Object> getDeviceEdgeData(
|
|
|
|
|
|
|
|
Long deviceId,
|
|
|
|
|
|
|
|
String startTime,
|
|
|
|
|
|
|
|
String endTime,
|
|
|
|
|
|
|
|
boolean latest) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TDengine 子表名:d_设备ID
|
|
|
|
|
|
|
|
String tableName = "d_" + deviceId;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 构建 SQL
|
|
|
|
|
|
|
|
StringBuilder sql = new StringBuilder();
|
|
|
|
|
|
|
|
sql.append("SELECT ts, query_data ")
|
|
|
|
|
|
|
|
.append("FROM besure.").append(tableName)
|
|
|
|
|
|
|
|
.append(" WHERE 1=1 ");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 起始时间条件
|
|
|
|
|
|
|
|
if (startTime != null) {
|
|
|
|
|
|
|
|
sql.append(" AND ts >= '").append(startTime).append("' ");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 结束时间条件
|
|
|
|
|
|
|
|
if (endTime != null) {
|
|
|
|
|
|
|
|
sql.append(" AND ts <= '").append(endTime).append("' ");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 根据 latest 决定排序方式
|
|
|
|
|
|
|
|
// latest = true -> 按时间倒序,取最新一条
|
|
|
|
|
|
|
|
// latest = false -> 按时间正序,取最早一条
|
|
|
|
|
|
|
|
sql.append(" ORDER BY ts ")
|
|
|
|
|
|
|
|
.append(latest ? "DESC" : "ASC")
|
|
|
|
|
|
|
|
.append(" LIMIT 1");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
return jdbcTemplate.query(sql.toString(), rs -> {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 没有数据直接返回 null
|
|
|
|
|
|
|
|
if (!rs.next()) {
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Map<String, Object> result = new HashMap<>();
|
|
|
|
|
|
|
|
result.put("timestamp", rs.getTimestamp("ts"));
|
|
|
|
|
|
|
|
result.put("deviceId", deviceId);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 读取 query_data(二进制字段)
|
|
|
|
|
|
|
|
byte[] blob = rs.getBytes("query_data");
|
|
|
|
|
|
|
|
if (blob != null) {
|
|
|
|
|
|
|
|
// 转为字符串
|
|
|
|
|
|
|
|
String json = new String(blob, StandardCharsets.UTF_8).trim();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 去除 TDengine 中可能存在的外层双引号
|
|
|
|
|
|
|
|
if (json.startsWith("\"") && json.endsWith("\"")) {
|
|
|
|
|
|
|
|
json = json.substring(1, json.length() - 1);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 如果是十六进制字符串,先解码
|
|
|
|
|
|
|
|
if (isHexString(json)) {
|
|
|
|
|
|
|
|
json = hexToString(json);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 统一约定:queryData 始终返回 String,由上层决定是否解析 JSON
|
|
|
|
|
|
|
|
result.put("queryData", json);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
// 没有数据时返回空数组字符串
|
|
|
|
|
|
|
|
result.put("queryData", "[]");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("TDengine 查询失败,deviceId={} ,td表不存在", deviceId);
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 批量获取设备在时间区间的首末数据
|
|
|
|
|
|
|
|
* @param deviceIds 设备ID列表
|
|
|
|
|
|
|
|
* @param startTime 开始时间, 格式 yyyy-MM-dd HH:mm:ss
|
|
|
|
|
|
|
|
* @param endTime 结束时间, 格式 yyyy-MM-dd HH:mm:ss
|
|
|
|
|
|
|
|
* @return Map<deviceId, DeviceEdgeData>
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
@DS("tdengine")
|
|
|
|
|
|
|
|
public Map<Long, DeviceEdgeData> queryDeviceFirstAndLast(Set<Long> deviceIds, String startTime, String endTime) {
|
|
|
|
|
|
|
|
if (deviceIds == null || deviceIds.isEmpty()) {
|
|
|
|
|
|
|
|
return Collections.emptyMap();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Map<Long, DeviceEdgeData> result = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (Long deviceId : deviceIds) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
// 查询首条数据
|
|
|
|
|
|
|
|
List<Map<String, Object>> firstList = getstDeviceDataOrderByTimeDesc(deviceId, startTime, endTime, 1);
|
|
|
|
|
|
|
|
Map<String, Object> firstData = firstList.isEmpty() ? null : firstList.get(0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 查询末条数据
|
|
|
|
|
|
|
|
List<Map<String, Object>> lastList = getstDeviceDataOrderByTimeDesc(deviceId, startTime, endTime, 1);
|
|
|
|
|
|
|
|
Map<String, Object> lastData = lastList.isEmpty() ? null : lastList.get(0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DeviceEdgeData edgeData = new DeviceEdgeData();
|
|
|
|
|
|
|
|
edgeData.setDeviceId(deviceId);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (firstData != null) {
|
|
|
|
|
|
|
|
edgeData.setFirstTs(((Timestamp) firstData.get("timestamp")).toLocalDateTime());
|
|
|
|
|
|
|
|
edgeData.setFirstData(JSON.toJSONString(firstData.get("queryData")));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (lastData != null) {
|
|
|
|
|
|
|
|
edgeData.setLastTs(((Timestamp) lastData.get("timestamp")).toLocalDateTime());
|
|
|
|
|
|
|
|
edgeData.setLastData(JSON.toJSONString(lastData.get("queryData")));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result.put(deviceId, edgeData);
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("查询设备首末数据失败, deviceId={}", deviceId, e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|