diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java index e3a4e86e5..ccadf63ef 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java @@ -195,14 +195,14 @@ public class RecipeDeviceRecordController { } - Map> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId() +// Map> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId() Map map = tDengineService.newSelectLatestRow(device.getId()); // OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10); for (RecipeDeviceAttributeDO attributeDO : attributeList) { - Map data = deviceDataMap.get(attributeDO.getAttributeId()); +// Map data = deviceDataMap.get(attributeDO.getAttributeId()); DeviceContactModelDO deviceContactModelDO = deviceContactModelMap.get(attributeDO.getAttributeId()); if (deviceContactModelDO == null) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index cf7d15916..2dad10c11 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -809,12 +809,24 @@ public class DeviceServiceImpl implements DeviceService { @Override public List lineDeviceList(LineDeviceRequestVO pageReqVO) { - List records = deviceMapper.lineDeviceList(pageReqVO); + if (records.isEmpty()) { + return records; + } + + // 1. 收集所有设备ID + List deviceIds = records.stream() + .map(LineDeviceRespVO::getDeviceId) + .collect(Collectors.toList()); + + // 2. 批量查询每个设备的最新时间 + Map latestTsMap = tdengineService.newSelectLatestTsBatch(deviceIds); + + // 3. 填充 collectionTime for (LineDeviceRespVO record : records) { - Map latestDeviceData = tdengineService.getLatestDeviceData(record.getDeviceId()); - if(latestDeviceData != null) { - record.setCollectionTime((String) latestDeviceData.get("timestamp")); + LocalDateTime ts = latestTsMap.get(record.getDeviceId()); + if (ts != null) { + record.setCollectionTime(ts.toString()); // 或 formatDateTime(ts) 根据你的需求 } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java index f8469c189..6555cec39 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java @@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.device.enums.JavaToTdengineT import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import com.alibaba.fastjson.JSON; import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; @@ -1192,4 +1193,243 @@ public class TDengineService { } + + @DS("tdengine") + public Map> queryDevicesLatestRow( + List deviceIds, + String startTime, + String endTime) { + + if (CollectionUtils.isEmpty(deviceIds)) { + return Collections.emptyMap(); + } + + Map> result = new HashMap<>(); + + for (Long deviceId : deviceIds) { + + String tableName = "besure_server.d_" + deviceId; + + StringBuilder sql = new StringBuilder(); + + sql.append("SELECT * FROM ") + .append(tableName) + .append(" WHERE 1=1 "); + + if (StringUtils.isNotBlank(startTime)) { + sql.append(" AND ts >= '").append(startTime).append("' "); + } + + if (StringUtils.isNotBlank(endTime)) { + sql.append(" AND ts <= '").append(endTime).append("' "); + } + + sql.append(" ORDER BY ts DESC LIMIT 1"); + + try { + + List> list = + jdbcTemplate.queryForList(sql.toString()); + + if (!list.isEmpty()) { + + Map row = list.get(0); + + convertTs(row); + + result.put(deviceId, row); + + } + + } catch (Exception e) { + + log.error("查询设备最新数据失败 deviceId={}", deviceId, e); + + } + } + + return result; + } + + @DS("tdengine") + public Map> queryDevicesEarliestRow( + List deviceIds, + String startTime, + String endTime) { + + if (CollectionUtils.isEmpty(deviceIds)) { + return Collections.emptyMap(); + } + + Map> result = new HashMap<>(); + + for (Long deviceId : deviceIds) { + + String tableName = "besure_server.d_" + deviceId; + + StringBuilder sql = new StringBuilder(); + + sql.append("SELECT * FROM ") + .append(tableName) + .append(" WHERE 1=1 "); + + if (StringUtils.isNotBlank(startTime)) { + sql.append(" AND ts >= '").append(startTime).append("' "); + } + + if (StringUtils.isNotBlank(endTime)) { + sql.append(" AND ts <= '").append(endTime).append("' "); + } + + sql.append(" ORDER BY ts ASC LIMIT 1"); + + try { + + List> list = + jdbcTemplate.queryForList(sql.toString()); + + if (!list.isEmpty()) { + + Map row = list.get(0); + + convertTs(row); + + result.put(deviceId, row); + + } + + } catch (Exception e) { + + log.error("查询设备最早数据失败 deviceId={}", deviceId, e); + + } + } + + return result; + } + + + /** + * 查询近七个小时的数据 + * @param deviceIds + * @param startTime + * @param endTime + * @return + */ + @DS("tdengine") + public List> newQueryLastDataByHourBatch( + Set deviceIds, + LocalDateTime startTime, + LocalDateTime endTime) { + + if (deviceIds == null || deviceIds.isEmpty()) { + return Collections.emptyList(); + } + + List> result = new ArrayList<>(); + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String startStr = startTime.format(fmt); + String endStr = endTime.format(fmt); + + for (Long deviceId : deviceIds) { + String tableName = "besure_server.d_" + deviceId; + + // 动态查询所有列(*),按时间倒序 + String sql = "SELECT * FROM " + tableName + + " WHERE ts >= '" + startStr + "' AND ts <= '" + endStr + "'" + + " ORDER BY ts DESC"; + + try { + List> rows = jdbcTemplate.queryForList(sql); + + // 用 map 存储每小时最新一条数据 + Map> hourMap = new HashMap<>(); + DateTimeFormatter hourFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH"); + + for (Map row : rows) { + LocalDateTime ts = ((Timestamp) row.get("ts")).toLocalDateTime(); + String hourKey = ts.format(hourFmt); + + // 每小时只保留最新一条 + if (!hourMap.containsKey(hourKey)) { + Map map = new HashMap<>(); + map.put("deviceId", deviceId); + map.put("timestamp", ts); + + for (Map.Entry entry : row.entrySet()) { + String key = entry.getKey(); + if (!"ts".equals(key)) { + map.put(key, entry.getValue()); + } + } + + hourMap.put(hourKey, map); + } + } + + result.addAll(hourMap.values()); + + } catch (Exception e) { + log.error("查询设备 {} 数据异常", deviceId, e); + } + } + + return result; + } + + + /** + * 批量查询设备最近 N 天按天汇总数据 + * + * @param deviceIds 设备ID列表 + * @param startDate 起始日期(包含) + * @param days 查询天数 + * @return 每条记录 Map 包含 day、deviceId、timestamp、各列数据 + */ + @DS("tdengine") + public List> queryLastDataByDayBatch(Set deviceIds, LocalDate startDate, int days) { + if (deviceIds == null || deviceIds.isEmpty() || days <= 0) { + return Collections.emptyList(); + } + + List> result = new ArrayList<>(); + LocalDate endDate = startDate.plusDays(days); + + for (Long deviceId : deviceIds) { + String tableName = "besure_server.d_" + deviceId; + + String sql = String.format( + "SELECT *, ts as timestamp, DATE_FORMAT(ts, '%%Y-%%m-%%d') as day " + + "FROM %s " + + "WHERE ts >= '%s' AND ts < '%s' " + + "ORDER BY ts DESC", + tableName, startDate.toString(), endDate.toString() + ); + + try { + result.addAll(jdbcTemplate.queryForList(sql)); + } catch (Exception e) { + log.error("查询设备 {} 最近 {} 天数据异常", deviceId, days, e); + } + } + + return result; + } + + + + + private void convertTs(Map row) { + + Object tsObj = row.get("ts"); + + if (tsObj instanceof Timestamp) { + + row.put("ts", + ((Timestamp) tsObj) + .toLocalDateTime()); + } + } + + } \ No newline at end of file diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/energydevice/vo/TimePointCache.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/energydevice/vo/TimePointCache.java index 827a19a10..3f259a22c 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/energydevice/vo/TimePointCache.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/energydevice/vo/TimePointCache.java @@ -8,7 +8,12 @@ import java.util.Map; @Data public class TimePointCache { +// private LocalDateTime timestamp; +// private String queryData; +// private Map pointIndex; + + private LocalDateTime timestamp; - private String queryData; - private Map pointIndex; + + private Map pointValueMap; } diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/energydevice/EnergyDeviceServiceImpl.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/energydevice/EnergyDeviceServiceImpl.java index 487bb10d8..ecdaa0042 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/energydevice/EnergyDeviceServiceImpl.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/energydevice/EnergyDeviceServiceImpl.java @@ -5,6 +5,7 @@ import cn.iocoder.yudao.framework.common.pojo.PageParam; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.JavaToTdengineTypeEnum; import cn.iocoder.yudao.module.mes.controller.admin.energydevice.vo.HourEnergyValueVO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper; @@ -28,11 +29,13 @@ import org.springframework.validation.annotation.Validated; import javax.annotation.Resource; import java.math.BigDecimal; +import java.math.RoundingMode; import java.text.DecimalFormat; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; @@ -252,53 +255,54 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { return energyDeviceMapper.selectBatchIds(ids); } - @Override - public List queryDataRecords(EnergyDeviceConsumptionReqVO req) { - List result = new ArrayList<>(); - - // 查询设备列表 - List devices = queryDevices(req); - if (devices.isEmpty()) return result; - - // 解析规则 & 收集所有 ruleDeviceIds 和点位ID - Map> deviceRulesMap = new HashMap<>(); - Set ruleDeviceIds = new HashSet<>(); - Set pointIds = new HashSet<>(); - for (EnergyDeviceDO device : devices) { - List rules = parseRules(device); - if (!rules.isEmpty()) { - deviceRulesMap.put(device.getId(), rules); - rules.forEach(r -> { - if (r.getDeviceId() != null) ruleDeviceIds.add(r.getDeviceId()); - if (r.getPointId() != null) pointIds.add(r.getPointId()); - }); - } - } +// @Override +// public List queryDataRecords(EnergyDeviceConsumptionReqVO req) { +// List result = new ArrayList<>(); +// +// // 查询设备列表 +// List devices = queryDevices(req); +// if (devices.isEmpty()) return result; +// +// // 解析规则 & 收集所有 ruleDeviceIds 和点位ID +// Map> deviceRulesMap = new HashMap<>(); +// Set ruleDeviceIds = new HashSet<>(); +// Set pointIds = new HashSet<>(); +// for (EnergyDeviceDO device : devices) { +// List rules = parseRules(device); +// if (!rules.isEmpty()) { +// deviceRulesMap.put(device.getId(), rules); +// rules.forEach(r -> { +// if (r.getDeviceId() != null) ruleDeviceIds.add(r.getDeviceId()); +// if (r.getPointId() != null) pointIds.add(r.getPointId()); +// }); +// } +// } +// +// if (ruleDeviceIds.isEmpty()) return result; +// +// // TDengine 查询所有设备首末数据 +// Map edgeDataMap = +// tDengineService.queryDeviceFirstAndLast(ruleDeviceIds, req.getStartTime(), req.getEndTime()); +// +// // 构建缓存 +// Map firstCache = buildCache(edgeDataMap, true); +// Map lastCache = buildCache(edgeDataMap, false); +// +// // 批量获取点位信息(名称/单位)避免 N+1 查询 +// Map pointInfoMap = batchGetPointInfoFromLocalDB(new ArrayList<>(pointIds)); +// +// // 逐设备计算 EnergyDeviceRespVO +// for (EnergyDeviceDO device : devices) { +// List rules = deviceRulesMap.get(device.getId()); +// if (rules == null || rules.isEmpty()) continue; +// +// EnergyDeviceRespVO vo = calculateDevice(device, rules, firstCache, lastCache, pointInfoMap, req); +// result.add(vo); +// } +// +// return result; +// } - if (ruleDeviceIds.isEmpty()) return result; - - // TDengine 查询所有设备首末数据 - Map edgeDataMap = - tDengineService.queryDeviceFirstAndLast(ruleDeviceIds, req.getStartTime(), req.getEndTime()); - - // 构建缓存 - Map firstCache = buildCache(edgeDataMap, true); - Map lastCache = buildCache(edgeDataMap, false); - - // 批量获取点位信息(名称/单位)避免 N+1 查询 - Map pointInfoMap = batchGetPointInfoFromLocalDB(new ArrayList<>(pointIds)); - - // 逐设备计算 EnergyDeviceRespVO - for (EnergyDeviceDO device : devices) { - List rules = deviceRulesMap.get(device.getId()); - if (rules == null || rules.isEmpty()) continue; - - EnergyDeviceRespVO vo = calculateDevice(device, rules, firstCache, lastCache, pointInfoMap, req); - result.add(vo); - } - - return result; - } @Override public List lastEnergyStatistics(Long deviceTypeId, Long orgId) { @@ -316,7 +320,6 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { return buildEmptyLast7Hours(LocalDateTime.now()); } - // 2. 解析规则 List rules = JsonUtils.parseArray(energyDevice.getRules(), OperationRulesVO.class); if (rules.isEmpty()) return Collections.emptyList(); @@ -328,43 +331,86 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { .collect(Collectors.toSet()); if (deviceIds.isEmpty()) return Collections.emptyList(); - // 4. 时间范围:最近 7 小时 + // 4. 查询设备点位信息 DeviceContactModelDO + List pointIds = rules.stream() + .map(OperationRulesVO::getPointId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + Map pointInfoMap = deviceContactModelMapper.selectList( + Wrappers.lambdaQuery().in(DeviceContactModelDO::getId, pointIds) + ).stream().collect(Collectors.toMap(DeviceContactModelDO::getId, Function.identity())); + + // 5. 时间范围:最近 7 小时 LocalDateTime end = LocalDateTime.now(); LocalDateTime start = end.minusHours(7); DateTimeFormatter hourFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH"); - // 5. 批量查询所有设备数据 - List> allRows = tDengineService.queryLastDataByHourBatch(deviceIds, start, end); + // 6. 批量查询 TDengine 数据 + List> allRows = tDengineService.newQueryLastDataByHourBatch(deviceIds, start, end); - // 6. 按小时 + 设备构建缓存(每小时最新覆盖) + // 7. 按小时 + 设备构建缓存 Map> hourCacheMap = new LinkedHashMap<>(); + for (Map row : allRows) { LocalDateTime ts = (LocalDateTime) row.get("timestamp"); if (ts == null) continue; String hourKey = ts.format(hourFormatter); Long deviceId = (Long) row.get("deviceId"); - String queryData = (String) row.get("queryData"); + + // 过滤该设备的规则 + List deviceRules = rules.stream() + .filter(r -> deviceId.equals(r.getDeviceId())) + .collect(Collectors.toList()); TimePointCache cache = new TimePointCache(); cache.setTimestamp(ts); - cache.setQueryData(queryData); + + Map pointValueMap = new HashMap<>(); + for (OperationRulesVO rule : deviceRules) { + Long pointId = rule.getPointId(); + DeviceContactModelDO pointInfo = pointInfoMap.get(pointId); + if (pointInfo == null || !deviceId.equals(pointInfo.getDeviceId())) continue; + + String attributeCode = pointInfo.getAttributeCode(); + if (attributeCode == null) continue; + + Object value = row.get(attributeCode.toLowerCase()); + if (value != null) { + try { + + // 保留2位小数 + BigDecimal bd = new BigDecimal(value.toString()); + Double finalValue = bd.setScale(2, RoundingMode.HALF_UP).doubleValue(); + pointValueMap.put(pointId, finalValue); + } catch (Exception e) { + log.warn("点位值转换失败, pointId={}, value={}, dataType={}", pointId, value, pointInfo.getDataType(), e); + } + } + } + + cache.setPointValueMap(pointValueMap); hourCacheMap.computeIfAbsent(hourKey, k -> new HashMap<>()).put(deviceId, cache); } - // 7. 生成每小时结果,补齐缺失小时 + // 8. 生成每小时结果,补齐缺失小时 List result = new ArrayList<>(); for (int i = 0; i < 7; i++) { LocalDateTime hourTime = start.plusHours(i); String hourKey = hourTime.format(hourFormatter); Map deviceCache = hourCacheMap.getOrDefault(hourKey, new HashMap<>()); - Double value = calculateByRules(rules, deviceCache); + + double total = 0.0; + for (Long deviceId : deviceIds) { + total += calculateByRules(deviceId, rules, deviceCache); + } HourEnergyValueVO vo = new HourEnergyValueVO(); vo.setHour(hourKey); - vo.setValue(formatDouble(value)); + vo.setValue(String.valueOf(total)); result.add(vo); } @@ -373,6 +419,7 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { } + private List buildEmptyLast7Hours(LocalDateTime end) { List result = new ArrayList<>(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH"); @@ -404,7 +451,7 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { @Override public List latestSevenDaysStatistics(Long deviceTypeId, Long orgId) { - // 找最新的能耗设备配置 + // 1. 找最新能耗设备 EnergyDeviceDO energyDevice = energyDeviceMapper.selectOne( Wrappers.lambdaQuery() .eq(EnergyDeviceDO::getDeviceTypeId, deviceTypeId) @@ -413,85 +460,260 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { .last("LIMIT 1") ); - if (energyDevice == null || StringUtils.isBlank(energyDevice.getRules())) { return buildEmptyLast7Days(LocalDate.now()); } - // 解析规则 - List rules = - JsonUtils.parseArray(energyDevice.getRules(), OperationRulesVO.class); - - if (rules.isEmpty()) { - return Collections.emptyList(); - } + // 2. 解析规则 + List rules = JsonUtils.parseArray(energyDevice.getRules(), OperationRulesVO.class); + if (rules.isEmpty()) return Collections.emptyList(); - // 收集 deviceId + // 3. 收集设备列表 Set deviceIds = rules.stream() .map(OperationRulesVO::getDeviceId) .filter(Objects::nonNull) .collect(Collectors.toSet()); + if (deviceIds.isEmpty()) return Collections.emptyList(); - if (deviceIds.isEmpty()) { - return Collections.emptyList(); - } + // 4. 查询设备点位信息 DeviceContactModelDO + List pointIds = rules.stream() + .map(OperationRulesVO::getPointId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + Map pointInfoMap = deviceContactModelMapper.selectList( + Wrappers.lambdaQuery().in(DeviceContactModelDO::getId, pointIds) + ).stream().collect(Collectors.toMap(DeviceContactModelDO::getId, Function.identity())); - // 时间范围:前 7 天 + // 5. 时间范围:最近 7 天 LocalDate startDate = LocalDate.now().minusDays(7); int days = 7; - // 查询 TDengine(方案 B) - List> rows = - tDengineService.queryLastDataByDaySafe(deviceIds, startDate, days); - + // 6. 批量查询 TDengine 数据(按天) + List> allRows = tDengineService.queryLastDataByDayBatch(deviceIds, startDate, days); + // 7. 构建按天 + 设备缓存 Map> dayCacheMap = new LinkedHashMap<>(); - for (Map row : rows) { - String day = (String) row.get("day"); + for (Map row : allRows) { + String dayKey = (String) row.get("day"); Long deviceId = (Long) row.get("deviceId"); LocalDateTime ts = (LocalDateTime) row.get("timestamp"); - - if (day == null || deviceId == null || ts == null) { - continue; - } + if (dayKey == null || deviceId == null || ts == null) continue; TimePointCache cache = new TimePointCache(); cache.setTimestamp(ts); - cache.setQueryData((String) row.get("queryData")); - dayCacheMap - .computeIfAbsent(day, k -> new HashMap<>()) - .put(deviceId, cache); + Map pointValueMap = new HashMap<>(); + + // 设备对应规则 + List deviceRules = rules.stream() + .filter(r -> deviceId.equals(r.getDeviceId())) + .collect(Collectors.toList()); + + for (OperationRulesVO rule : deviceRules) { + Long pointId = rule.getPointId(); + DeviceContactModelDO pointInfo = pointInfoMap.get(pointId); + if (pointInfo == null || !deviceId.equals(pointInfo.getDeviceId())) continue; + + String attributeCode = pointInfo.getAttributeCode(); + if (attributeCode == null) continue; + + Object value = row.get(attributeCode.toLowerCase()); + if (value != null) { + try { + BigDecimal bd = new BigDecimal(value.toString()); + pointValueMap.put(pointId, bd.setScale(2, RoundingMode.HALF_UP).doubleValue()); + } catch (Exception e) { + log.warn("点位值转换失败, pointId={}, value={}, dataType={}", pointId, value, pointInfo.getDataType(), e); + } + } + } + + cache.setPointValueMap(pointValueMap); + dayCacheMap.computeIfAbsent(dayKey, k -> new HashMap<>()).put(deviceId, cache); } - // 生成结果(补齐缺失日期) + // 8. 生成结果,补齐缺失日期 List result = new ArrayList<>(); - for (int i = 0; i < days; i++) { LocalDate day = startDate.plusDays(i); String dayKey = day.toString(); - Map deviceCache = - dayCacheMap.getOrDefault(dayKey, Collections.emptyMap()); + Map deviceCache = dayCacheMap.getOrDefault(dayKey, Collections.emptyMap()); - Double value = calculateByRules(rules, deviceCache); + double total = 0.0; + for (Long deviceId : deviceIds) { + total += calculateByRules(deviceId, rules, deviceCache); + } DayEnergyValueVO vo = new DayEnergyValueVO(); vo.setDay(dayKey); - vo.setValue(formatDouble(value)); + vo.setValue(String.valueOf(total)); result.add(vo); } return result; + } + + + + @Override + public List queryDataRecords(EnergyDeviceConsumptionReqVO req) { + + List result = new ArrayList<>(); + + // 1. 查询能耗设备列表 + List devices = queryDevices(req); + + if (devices.isEmpty()) { + return result; + } + + // 2. 解析规则 + Map> deviceRulesMap = new HashMap<>(); + List ruleDeviceIds = new ArrayList<>(); + List pointIds = new ArrayList<>(); + for (EnergyDeviceDO device : devices) { + List rules = parseRules(device); + if (rules.isEmpty()) { + continue; + } + deviceRulesMap.put(device.getId(), rules); + for (OperationRulesVO r : rules) { + if (r.getDeviceId() != null) { + ruleDeviceIds.add(r.getDeviceId()); + } + if (r.getPointId() != null) { + + pointIds.add(r.getPointId()); + } + } + } + + if (ruleDeviceIds.isEmpty()) { + return result; + } + + // 3. 查询每个 device 最早数据 + Map> firstRowMap = tDengineService.queryDevicesEarliestRow(ruleDeviceIds, req.getStartTime(), req.getEndTime()); + + // 4. 查询每个 device 最新数据 + Map> lastRowMap = tDengineService.queryDevicesLatestRow(ruleDeviceIds, req.getStartTime(), req.getEndTime()); + + + // 5. 查询点位信息(名称/单位) + Map pointInfoMap = batchGetPointInfoFromLocalDB(new ArrayList<>(pointIds)); + + + // 6. 构建缓存 + Map firstCache = buildCacheFromRow(devices, firstRowMap, pointInfoMap); + Map lastCache = buildCacheFromRow(devices, lastRowMap, pointInfoMap); + + + // 7. 逐个能耗设备计算 + for (EnergyDeviceDO device : devices) { + List rules = deviceRulesMap.get(device.getId()); + if (rules == null || rules.isEmpty()) { + continue; + } + EnergyDeviceRespVO vo = calculateDevice(device, rules, firstCache, lastCache, pointInfoMap, req); + result.add(vo); + } + return result; } + private Map buildCacheFromRow( + List devices, + Map> rowMap, + Map pointInfoMap) { + + Map cacheMap = new HashMap<>(); + if (devices == null || devices.isEmpty()) return cacheMap; + + for (EnergyDeviceDO device : devices) { + + List rules = parseRules(device); + if (rules == null || rules.isEmpty()) continue; + + TimePointCache cache = new TimePointCache(); + + Map valueMap = new HashMap<>(); + + // 时间戳变量 + LocalDateTime timestamp = null; + + // 遍历规则 + for (OperationRulesVO rule : rules) { + + Long ruleDeviceId = rule.getDeviceId(); + Long pointId = rule.getPointId(); + + if (ruleDeviceId == null || pointId == null) + continue; + + DeviceContactModelDO pointInfo = pointInfoMap.get(pointId); + + if (pointInfo == null) + continue; + + // 规则设备和点位设备匹配 + if (!ruleDeviceId.equals(pointInfo.getDeviceId())) + continue; + + String attributeCode = pointInfo.getAttributeCode(); + + if (attributeCode == null) + continue; + + // 从 TDengine 数据里取值 + Map row = rowMap.get(ruleDeviceId); + + if (row == null) + continue; + + // 设置 timestamp(只需要一次) + if (timestamp == null) { + Object ts = row.get("ts"); + if (ts instanceof LocalDateTime) { + timestamp = (LocalDateTime) ts; + } + } + + Object value = row.get(attributeCode); + + if (value != null) { + try { + //计算类型统一转换为通用double类型 + Object converted = JavaToTdengineTypeEnum.convertValue( + pointInfo.getDataType(), + value.toString() + ); + Double finalValue = convertToDouble(converted); + valueMap.put(pointId, finalValue); + } catch (Exception e) { + log.warn("点位值转换失败, pointId={}, value={}, dataType={}", pointId, value, pointInfo.getDataType(), e); + } + } + } + + // 设置 timestamp + cache.setTimestamp(timestamp); + + cache.setPointValueMap(valueMap); + + cacheMap.put(device.getId(), cache); + } + + return cacheMap; + } // =================== 核心方法 =================== @@ -521,17 +743,7 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { } } - /** 构建缓存 */ - private Map buildCache(Map dataMap, boolean first) { - Map cache = new HashMap<>(); - for (DeviceEdgeData d : dataMap.values()) { - TimePointCache c = new TimePointCache(); - c.setTimestamp(first ? d.getFirstTs() : d.getLastTs()); - c.setQueryData(first ? d.getFirstData() : d.getLastData()); - cache.put(d.getDeviceId(), c); - } - return cache; - } + /** 单设备计算 */ private EnergyDeviceRespVO calculateDevice(EnergyDeviceDO device, @@ -541,8 +753,9 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { Map pointInfoMap, EnergyDeviceConsumptionReqVO req) { - Double firstTotal = calculateByRules(rules, firstCache); - Double lastTotal = calculateByRules(rules, lastCache); + Double firstTotal = calculateByRules(device.getId(), rules, firstCache); + Double lastTotal = calculateByRules(device.getId(), rules, lastCache); + EnergyDeviceRespVO vo = new EnergyDeviceRespVO(); vo.setId(device.getId()); @@ -559,138 +772,144 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { vo.setLatestDataTime(req.getEndTime()); // 构建点位详情 - vo.setPointDetails(buildPointDetails(rules, firstCache, lastCache, pointInfoMap)); + vo.setPointDetails(buildPointDetails(device.getId(), rules, firstCache, lastCache, pointInfoMap)); return vo; } /** 根据规则计算总值 */ - private Double calculateByRules(List rules, Map cache) { - Double total = null; + private Double calculateByRules( + Long energyDeviceId, + List rules, + Map cache) { + + TimePointCache c = cache.get(energyDeviceId); + + if (c == null) { + return 0.0; + } + + BigDecimal total = null; String lastOp = "+"; for (OperationRulesVO r : rules) { - TimePointCache c = cache.get(r.getDeviceId()); - if (c == null) continue; - Double value = getPointValue(c, r.getPointId()); if (value == null) continue; + BigDecimal bdValue = BigDecimal.valueOf(value); + if (total == null) { - total = value; + total = bdValue.setScale(2, RoundingMode.HALF_UP); } else { - total = applyOperator(total, value, StringUtils.defaultIfBlank(r.getOperator(), lastOp)); + total = applyOperator(total, bdValue, StringUtils.defaultIfBlank(r.getOperator(), lastOp)); } lastOp = r.getOperator(); } - return total != null ? total : 0.0; + return total != null ? total.doubleValue() : 0.0; } + /** 延迟解析 JSON 获取点位值 */ - private Double getPointValue(TimePointCache cache, Long pointId) { - if (cache == null || pointId == null) return null; - if (cache.getPointIndex() == null) { - cache.setPointIndex(buildPointIndex(cache.getQueryData())); - } - JSONObject obj = cache.getPointIndex().get(pointId); - if (obj == null) return null; + private Double getPointValue( + TimePointCache cache, + Long pointId) { - Object v = obj.get("addressValue"); - if (v instanceof Number) return ((Number) v).doubleValue(); - try { - return Double.parseDouble(String.valueOf(v)); - } catch (Exception e) { + if (cache == null) { return null; } - } - /** 构建 JSON 点位索引 */ - private Map buildPointIndex(String jsonData) { - Map index = new HashMap<>(); - if (StringUtils.isBlank(jsonData)) return index; - - Object obj = JSON.parse(jsonData); - JSONArray array; - if (obj instanceof JSONArray) { - array = (JSONArray) obj; - } else if (obj instanceof String) { - // 外层字符串 -> 再解析一次 - array = JSON.parseArray((String) obj); - } else { - array = new JSONArray(); - } + Map map = + cache.getPointValueMap(); - - for (int i = 0; i < array.size(); i++) { - JSONObject jsonObject = array.getJSONObject(i); - Long id = jsonObject.getLong("id"); - if (id != null) index.put(id, jsonObject); + if (map == null) { + return null; } - return index; + + return map.get(pointId); } + /** 应用运算符 */ - private Double applyOperator(Double current, Double value, String operator) { + private BigDecimal applyOperator(BigDecimal current, BigDecimal value, String operator) { if (current == null || value == null || StringUtils.isBlank(operator)) return current; switch (operator.trim()) { - case "+": return current + value; - case "-": return current - value; - case "*": return current * value; - case "/": return Math.abs(value) > 1e-6 ? current / value : current; - default: return current + value; + case "+": + return current.add(value).setScale(2, RoundingMode.HALF_UP); + case "-": + return current.subtract(value).setScale(2, RoundingMode.HALF_UP); + case "*": + return current.multiply(value).setScale(2, RoundingMode.HALF_UP); + case "/": + return value.abs().compareTo(BigDecimal.valueOf(1e-6)) > 0 + ? current.divide(value, 2, RoundingMode.HALF_UP) + : current; + default: + return current.add(value).setScale(2, RoundingMode.HALF_UP); } } + /** 构建点位详情列表并使用本地数据库批量获取点位信息 */ - private List buildPointDetails(List rules, - Map firstCache, - Map lastCache, - Map pointInfoMap) { + private List buildPointDetails( + Long energyDeviceId, + List rules, + Map firstCache, + Map lastCache, + Map pointInfoMap) { + List result = new ArrayList<>(); if (rules == null || rules.isEmpty()) return result; + TimePointCache first = firstCache.get(energyDeviceId); + TimePointCache last = lastCache.get(energyDeviceId); + for (OperationRulesVO r : rules) { + PointDetailVO p = new PointDetailVO(); + p.setDeviceId(r.getDeviceId()); p.setPointId(r.getPointId()); p.setOperator(r.getOperator()); - // ====== 这里是改动点 ====== - TimePointCache first = firstCache.get(r.getDeviceId()); - TimePointCache last = lastCache.get(r.getDeviceId()); - Double firstVal = getPointValue(first, r.getPointId()); Double lastVal = getPointValue(last, r.getPointId()); p.setEarliestValue(formatDouble(firstVal)); p.setLatestValue(formatDouble(lastVal)); + p.setDifference(formatDouble( - (lastVal != null ? lastVal : 0.0) - (firstVal != null ? firstVal : 0.0) + (lastVal != null ? lastVal : 0.0) + - (firstVal != null ? firstVal : 0.0) )); - if (first != null) { + if (first != null) p.setEarliestTime(formatToSecond(first.getTimestamp())); - } - if (last != null) { + + if (last != null) p.setLatestTime(formatToSecond(last.getTimestamp())); - } - // 从本地数据库获取点位名称和单位 DeviceContactModelDO pointInfo = pointInfoMap.get(r.getPointId()); + if (pointInfo != null) { - p.setPointName(StringUtils.isNotBlank(pointInfo.getAttributeName()) - ? pointInfo.getAttributeName() - : pointInfo.getAttributeCode()); + + p.setPointName( + StringUtils.isNotBlank(pointInfo.getAttributeName()) + ? pointInfo.getAttributeName() + : pointInfo.getAttributeCode() + ); + p.setUnit(pointInfo.getDataUnit()); } result.add(p); } + return result; } + private String formatToSecond(Object timestamp) { if (timestamp == null) { return ""; @@ -741,7 +960,13 @@ public class EnergyDeviceServiceImpl implements EnergyDeviceService { } - + private Double convertToDouble(Object value) { + if (value == null) return null; + if (value instanceof Number) return ((Number) value).doubleValue(); + if (value instanceof Boolean) return (Boolean) value ? 1.0 : 0.0; + if (value instanceof Character) return (double) ((Character) value); + return Double.valueOf(value.toString()); + } } \ No newline at end of file